Forum: Ruby FasterGenerator (#66)

Announcement (2017-05-07): www.ruby-forum.com is now read-only since I unfortunately do not have the time to support and maintain the forum any more. Please see rubyonrails.org/community and ruby-lang.org/en/community for other Rails- und Ruby-related community platforms.
4299e35bacef054df40583da2d51edea?d=identicon&s=25 James Gray (bbazzarrakk)
on 2006-02-16 17:07
(Received via mailing list)
There were two kinds of solutions to this problem.  The first were
Array-based
solutions.  The logic here is that we can do the iteration, shove all
the
results in an Array, and then hand them out one-at-a-time.  Here is
Christoffer
Lerno's solution, doing exactly that:

	class MyGenerator
	    attr_reader :index
	    def initialize(enum = nil)
	        if enum then
	            @array = enum.to_a
	        else
	            @array = Array.new
	            yield self
	        end
	        @index = 0
	    end
	    def current
	        raise EOFError unless next?
	        @array[@index]
	    end
	    def next
	        value = current
	        @index += 1
	        return value
	    end
	    def next?
	        return @index < @array.length
	    end
	    def rewind
	        @index = 0
	        self
	    end
	    def each(&block)
	        @array.each(&block)
	    end
	    def yield(value)
	        @array << value
	    end
	    def pos
	        return @index
	    end
	    def end?
	        return !next?
	    end
	end

Even though Generator supposedly works off of the each() method, it
claims to
accept an Enumerable argument.  If we go on that assumption, there
should be a
to_a() method which will return all the objects each() iteration would.
(If you
are uncomfortable with this logic, you could iterate and add them to an
Array by
hand.)  You can see Christoffer putting this shortcut to work in
initialize(),
filling @array with a simple call to to_a().

The rest of the methods are trivial.  An @index is saved pointing into
the
@array, which can be incremented or reset to walk the objects.

The good news:  this is wicked fast in comparison to the old or even the
new
Generator library.  The bad news:  it doesn't work for all cases.
First, an
iterator does not have to be finite, and trying to shove something like
a lazily
evaluated infinite stream in an Array is going to go poorly.
Furthermore,
iteration may be sensitive to external factors like when it is executed.
In
those cases, running the full iteration up front may produce different
results.

For these reasons the standard Generator library requires a more general
solution.  However, I would still tuck the Array-and-index trick away in
the
back of your mind, as it may just come in handy some day.  If you can be
sure
you aren't dealing with any of the edge cases mentioned above and sure
you can
spare the memory, using an Array and an index is simple and fast.

For all other times, you have the standard Generator library.

We know that the library use to work with continuations, but now it has
a new
implementation.  The new technique was the other one used by the
solutions.  It
goes like this:  launch a Thread to iterate over the elements, stop the
Thread
just before each iteration, and wake it up each time you need a new
item.

Let's see how Jacob Fugal implemented that (with a small patch from Ross
Bamford).  Here's the setup:

	require 'thread'

	class FasterGenerator
	  def initialize( enum=nil, &block )
	    @position = 0
	    @values = []
	    @done = false
	    @mutex = Mutex.new
	    @block = block

	    if enum
	      @block = proc{ |g|
	        enum.each{ |x| g.yield x }
	      }
	    end
	  end

	  # ...

This almost looks like the Array-and-index trick again, when you see
@position
and @values, but this version isn't going to slurp the values all at
once.
@done is just a flag to tell us when we have exhausted the values and
@mutex is
a locking tool from the thread library.

Now @block is a little more interesting.  Generator can be called in two
ways,
either with an Enumerable object, or with a block that yield()s values
to the
Generator.  The code above merges the two cases, by providing a block
that
yield()s through each() when the Enumerable object is given.

That means the next piece of the puzzle is the yield() method:

	  # ...

	  def yield( x )
	    @mutex.synchronize{ @values << x }
	    Thread.stop
	  end

	  # ...

We're starting to see threading work here.  This method grabs the lock,
adds the
object to @values, drops the lock, and halts the Thread that fetched the
object.
The lock is to ensure that multiple Threads don't stomp on the internal
@values
queue at the same time and the halting is the pause before each
iteration I
mentioned earlier.

So, let's see see what current() and next() look like:

	  # ...

	  def current
	    collect_value
	    raise EOFError if eof?
	    @values[@position]
	  end

	  # ...

	  def next
	    x = current
	    @position += 1
	    x
	  end

	  # ...

Seems like all the magic here is hidden in the collect_value() method
and
everything else is just indexing.  Here's the magic method:

	  # ...

	  private
	  def collect_value
	    @thread ||= Thread.new {
	      Thread.stop
	      @block[self]
	      @mutex.synchronize{ @done = true }
	    }
	    Thread.pass until @thread.stop?
	    @thread.run if @thread.status and need_value?
	    Thread.pass while need_value?
	  end

	  def need_value?
	    @mutex.synchronize{ not @position < @values.size and not @done }
	  end

	  # ...

collect_value() makes sure we have a Thread running, or creates it if
needed.
The Thread is trivial:  halt (to pause before the first iteration), run
the
block, grab the lock, toggle the @done flag, and release the lock.  Once
we are
sure there is a Thread, we can pass() the calling Thread until the
generator
Thread gets a chance to run() and reaches its stop()ing point.

need_value?() is just a check to make sure we don't have one queued, and
we're
not finished with iteration.

The only challenge left is knowing when we are at the end?() of
iteration:

	  # ...

	  def next?
	    return (not end?)
	  end

	  def end?
	    collect_value
	    return eof?
	  end

	  # ...

	  private

	  # ...

	  def eof?
	    @mutex.synchronize{ not @position < @values.size and @done }
	  end
	end

The secret to end?() is that we must make sure we have a value queued,
if
possible.  That's because we pause just after each item is added.  We
might be
in the very last pause before iteration would end.  Calling
collect_value() will
find the next item or cause the @done flag to be set.  Either way, we
then have
enough information to properly answer the question.

Here are the rest of the methods for Jacob's solution:

	  # ...

	  def pos
	    @position
	  end

	  # ...

	  def rewind
	    @position = 0
	    self
	  end

	  def each
	    self.rewind
	    while self.next?
	      yield self.next
	    end
	  end

	  def index
	    pos
	  end

	  # ...

There shouldn't be any surprises left in there.

Some other solutions that used this same technique were faster, or even
slower.
This seems primarily related to which tools were used to control Thread
synchronization.  Mutex, for example, is fairly slow while
`Thread.critical =
true` is quick and probably all you need in this instance.

Let me send out a big thank you to all who tried this quiz and an even
bigger
thank you to Ross Bamford for helping me out with the timings.

Tomorrow we have a deep meditation on metaprogramming...
Cff9eed5d8099e4c2d34eae663aae87e?d=identicon&s=25 Jacob Fugal (Guest)
on 2006-02-16 18:26
(Received via mailing list)
On 2/16/06, Ruby Quiz <james@grayproductions.net> wrote:
> Let's see how Jacob Fugal implemented that (with a small patch from Ross
> Bamford).  Here's the setup:

<snip>

> Here are the rest of the methods for Jacob's solution:

<snip>

>           def rewind
>             @position = 0
>             self
>           end

One important thing: I've since realized that this implementation of
rewind was flawed. It should be something like:

  def rewind
    if @thread
      @thread.kill if @thread.status
      @thread = nil
    end
    @values = []
    @position = 0
    @done = false
  end

My initial rewind just reset to the beginning of the list of generated
values, but for proper operation the list itself and the generating
thread should also be reset. @done needs to be reset as well, since
the previous run of @thread may have completed and marked @done as
true, and we definitely don't want that. I've got the if statement
around the operations on @thread since I don't want subsequent calls
to rewind to cause a NoMethodError on nil.

Given this structure in #rewind, we can also refactor #initialize a bit:

  def initialize( enum=nil, &block )
    self.rewind
    @mutex = Mutex.new
    @block = block

    if enum
      @block = proc{ |g|
        enum.each{ |x| g.yield x }
      }
    end
  end

Jacob Fugal
This topic is locked and can not be replied to.