Thread-safe priority queue?

Hi,

Does anyone know of a solid, thread-safe priority queue implementation
in Ruby?

The only one I can find is Joel Vanderwerf’s
(http://groups.google.com/group/comp.lang.ruby/browse_thread/thread/9d9db98931e4a74f)
which doesn’t work with more recent versions of ruby (because Queue
implementation changed from Ruby to C).

Cheers,
Sean

Sean I seem to fail to understand why that change should have any
impact on Joël’s work, can you elaborate please?

Cheers
Robert


http://ruby-smalltalk.blogspot.com/


AALST (n.) One who changes his name to be further to the front
D.Adams; The Meaning of LIFF

On Jul 9, 9:08 am, “Sean O’Halpin” [email protected] wrote:

Hi,

Does anyone know of a solid, thread-safe priority queue implementation in Ruby?

The only one I can find is Joel Vanderwerf’s
(http://groups.google.com/group/comp.lang.ruby/browse_thread/thread/9d…)
which doesn’t work with more recent versions of ruby (because Queue
implementation changed from Ruby to C).

Hmm… I’m not sure if Facets implementation is thread safe. It’s may
be worth a look. If I recall correctly, Olivier R. was the last to
work on it, so he may know more. If it isn’t thread safe, btw, it
would make a nice patch.

T.

On Wed, Jul 9, 2008 at 6:34 PM, Robert D. [email protected]
wrote:

Sean I seem to fail to understand why that change should have any
impact on Joël’s work, can you elaborate please?

Cheers
Robert

I didn’t explain that very well, did I? Joel’s version inherits from
Queue and directly references an instance variable (@waiting) which
isn’t there in the C version.

On Wed, Jul 9, 2008 at 6:50 PM, Trans [email protected] wrote:

Hmm… I’m not sure if Facets implementation is thread safe. It’s may
be worth a look. If I recall correctly, Olivier R. was the last to
work on it, so he may know more. If it isn’t thread safe, btw, it
would make a nice patch.

Thanks for the pointer but the Facets version isn’t thread-safe.
Still searching… :slight_smile:

Sean O’halpin wrote:

On Wed, Jul 9, 2008 at 6:50 PM, Trans [email protected] wrote:

Hmm… I’m not sure if Facets implementation is thread safe. It’s may
be worth a look. If I recall correctly, Olivier R. was the last to
work on it, so he may know more. If it isn’t thread safe, btw, it
would make a nice patch.

Thanks for the pointer but the Facets version isn’t thread-safe.
Still searching… :slight_smile:

Could throw a mutex around the facets version.

Sean O’Halpin wrote:

Hi,

Does anyone know of a solid, thread-safe priority queue implementation in Ruby?

The only one I can find is Joel Vanderwerf’s
(http://groups.google.com/group/comp.lang.ruby/browse_thread/thread/9d9db98931e4a74f)
which doesn’t work with more recent versions of ruby (because Queue
implementation changed from Ruby to C).

It’s pretty easy to work around, I think. Try the following code. It’s
based on something I’m using in live code and it seems to pass the test
referenced in the above link.

Btw, it’s great that RBTree is a gem now. Thanks to whoever did that.

require ‘thread’
require ‘rbtree’

class PriorityQueue
def size
@tree.size
end

def initialize(*)
super
@tree = MultiRBTree.new
@que = Queue.new
@mutex = Mutex.new
end

Push +obj+ with priority equal to +pri+ if given or, otherwise,

the result of sending #queue_priority to +obj+. Objects are

dequeued in priority order, and first-in-first-out among objects

with equal priorities.

def push(obj, pri = obj.queue_priority)
@mutex.synchronize do
if @que.num_waiting > 0
@que << obj
else
@tree.store(pri, obj)
end
end
end

def pop(non_block=false)
@mutex.synchronize do
if ([email protected])
return @tree.delete(last[0]) # highest key, oldest first
end

   if non_block
     raise ThreadError, "priority queue empty"
   end
 end
 @que.pop # wait

end
end

On Wed, Jul 9, 2008 at 11:17 PM, Joel VanderWerf
[email protected] wrote:

Btw, it’s great that RBTree is a gem now. Thanks to whoever did that.

Ooh, seconded! Stand up and be thanked :slight_smile:

martin

Looks like a race condition in that…

Joel VanderWerf wrote:

@tree = MultiRBTree.new
  if @que.num_waiting > 0
    return @tree.delete(last[0]) # highest key, oldest first
  end

  if non_block
    raise ThreadError, "priority queue empty"
  end
end
   ### Race happens here: if someone else calls #push, then
   ### this thread will wait even though data is available.
@que.pop # wait

end
end

Will try to fix…

Joel VanderWerf wrote:

Looks like a race condition in that…

Proposed fix, using a condition var… still needs some eyeballing and
some tests:

require ‘thread’
require ‘rbtree’

class PriorityQueue
def size
@tree.size
end

def initialize(*)
super
@tree = MultiRBTree.new
@que = [] # should never have more than one entry
@num_waiting = 0
@mutex = Mutex.new
@cond = ConditionVariable.new
end

Push +obj+ with priority equal to +pri+ if given or, otherwise,

the result of sending #queue_priority to +obj+. Objects are

dequeued in priority order, and first-in-first-out among objects

with equal priorities.

def push(obj, pri = obj.queue_priority)
@mutex.synchronize do
if @num_waiting > 0
@que << obj
@cond.signal
else
@tree.store(pri, obj)
end
end
end

def pop(non_block=false)
@mutex.synchronize do
if ([email protected])
return @tree.delete(last[0]) # highest key, oldest first
end

   if non_block
     raise ThreadError, "priority queue empty"
   end

   @num_waiting += 1
   @cond.wait(@mutex)
   @num_waiting -= 1
   @que.pop
 end

end
end

Joel VanderWerf wrote:

Joel VanderWerf wrote:

Looks like a race condition in that…

Proposed fix, using a condition var… still needs some eyeballing and
some tests:

That was not quite right either (because cond.signal only wakes the
waiter, and doesn’t schedule it). The following seems to complete
without deadlocks or starvation.

require ‘thread’
require ‘rbtree’

class PriorityQueue
def size
@tree.size
end

def initialize(*)
super
@tree = MultiRBTree.new
@mutex = Mutex.new
@cond = ConditionVariable.new
end

Push +obj+ with priority equal to +pri+ if given or, otherwise,

the result of sending #queue_priority to +obj+. Objects are

dequeued in priority order, and first-in-first-out among objects

with equal priorities.

def push(obj, pri = obj.queue_priority)
@mutex.synchronize do
@tree.store(pri, obj)
@cond.signal
end
end

def pop(non_block=false)
@mutex.synchronize do
if ([email protected])
return @tree.delete(last[0]) # highest key, oldest first
end

   if non_block
     raise ThreadError, "priority queue empty"
   end

   loop do
     @cond.wait(@mutex)
     if ([email protected])
       return @tree.delete(last[0])
     end
   end
 end

end
end

if FILE == $0

Thread.abort_on_exception = true

pq = PriorityQueue.new

n_items_per_thread = 1000
n_writers = 10
n_readers = 10

writers = (0…n_writers).map do |i_thr|
Thread.new do
n_items_per_thread.times do |i|
pri = rand(10)
pq.push([pri, i, i_thr], pri)
Thread.pass if rand(5) == 0
end
end
end

sleep 0.1 until pq.size > 100 # a little head start populating the
tree

results = Array.new(n_readers, 0)

readers = (0…n_readers).map do |i|
Thread.new do
loop do
pq.pop
results[i] += 1
end
end
end

writers.each do |wr|
wr.join
end

p results
until pq.size == 0
sleep 0.1
p results
end

raise unless results.inject {|s,x|s+x} == n_items_per_thread *
n_writers

end

Sean O’Halpin wrote:

Thanks for taking the time to do this Joel. You can see why I was
hoping it had already been done… :wink:

And as you can see from my other thread, this code breaks on 1.8.6. :frowning:

On Thu, Jul 10, 2008 at 8:03 PM, Joel VanderWerf
[email protected] wrote:

and doesn’t schedule it). The following seems to complete without deadlocks
or starvation.

Thanks for taking the time to do this Joel. You can see why I was
hoping it had already been done… :wink:

I resurrected the Queue code from an old copy of 1.8.4 I had lying
around and went with that + your original version of the
PriorityQueue. Still putting it through its paces. I’ll give your new
version a whirl too.

@Roger - just putting a mutex around all the access methods isn’t
sufficient unfortunately (see Joel’s code for evidence). I want the
calling thread to block if there’s nothing in the queue (like the
standard lib Queue behaves). And once I start putting that machinery
in, I might as well write the whole thing myself. I was hoping to
avoid that (trying to be virtuous :slight_smile:

Regards,
Sean