Thread.rb

I want to modify the Queue class in thread.rb, so I first searched for
the file. I found it in /usr/lib/ruby/1.8/thread.rb, both on a Mac
and a Linux machine. On the Mac, the file was as expected, but on the
Linux machine, it contained only this:

unless defined? Thread
fail “Thread not available for this ruby interpreter”
end

require ‘thread.so’

Does this mean the Ruby code for Queue is compiled into a shared
library? Is it possible to edit the code? (I know I can modify the
class from outside the file, and that is probably what I will end up
doing, but I’m rather surprised by the above file contents and would
like to know more about it).

Thanks,

Adam

P.S. On a related note, I want to add a method to the Queue class that
is similar to pop, but will take a timeout value and return nil if it
no item is push’ed before timeout seconds have elapsed. Can anyone
suggest something better than the following:

class Queue
def pop_with_timeout(timeout)
elapsed = 0.0
while (Thread.critical = true; @que.empty?)
@waiting.push Thread.current
Thread.critical = false
elapsed += sleep(timeout - elapsed)
puts elapsed
return nil if elapsed >= timeout
end
@que.shift
ensure
Thread.critical = false
end
end

That was the first thing I came up with, and I’m (again) surprised
that it works - why does sleep ever return less than 1? What
interrupts it?

2008/3/25, Adam B. [email protected]:

I want to modify the Queue class in thread.rb, so I first searched for

Why?

  Thread.critical = false

That was the first thing I came up with, and I’m (again) surprised
that it works - why does sleep ever return less than 1? What
interrupts it?

I opt against using Thread.critical because this is a global exclusive
lock on all threads and it is unlikely to continue to exist in the
light of JRuby and others. I’d rather use Mutex#try_lock or Monitor’s
#wait with timeout parameter.

Kind regards

robert

On Tue, Mar 25, 2008 at 7:57 AM, Robert K.
[email protected] wrote:

2008/3/25, Adam B. [email protected]:

I want to modify the Queue class in thread.rb, so I first searched for

Why?

What what? I want to modify the class as described below; I searched
for the file so I could see how pop was implemented and the class
variables it used.

  Thread.critical = false

That was the first thing I came up with, and I’m (again) surprised
that it works - why does sleep ever return less than 1? What
interrupts it?

I opt against using Thread.critical because this is a global exclusive
lock on all threads and it is unlikely to continue to exist in the
light of JRuby and others. I’d rather use Mutex#try_lock or Monitor’s
#wait with timeout parameter.

The Queue class uses Thread.critical. Though I guess writing a queue
using a Monitor would be easy enough to do.

Adam

On Tue, 25 Mar 2008 20:57:48 +0900, “Robert K.”
[email protected] wrote:

I opt against using Thread.critical because this is a global exclusive
lock on all threads and it is unlikely to continue to exist in the
light of JRuby and others. I’d rather use Mutex#try_lock or Monitor’s
#wait with timeout parameter.

Agreed, anything using Thread.critical is not very future-proof.

Note, however, that while Monitor is okay under JRuby, in MRI (1.9 at
least)
it uses Timeout in an uncontrolled way and is not entirely reliable as a
result. To implement Monitor robustly in JRuby we ended up adding
timeout
support to JRuby’s ConditionVariable#wait.

*** I would strongly recommend against using any concurrency primitive
in
1.9 except for the primitives from thread.rb/thread.so. ***

Given that, the most directly portable way to get a robust queue that
supports timeouts is to roll your own using Mutex, ConditionVariable,
and
some safe source of timed events like Scheduler.

-mental

On Wed, 26 Mar 2008 03:11:55 +0900, MenTaLguY [email protected] wrote:

Note, however, that while Monitor is okay under JRuby, in MRI (1.9 at
least) it uses Timeout in an uncontrolled way and is not entirely reliable
as a result.

I am wrong. Timeouts are currently unimplemented in 1.9’s Monitor.

*** I would strongly recommend against using any concurrency primitive
in 1.9 except for the primitives from thread.rb/thread.so. ***

Evidently more things have changed since 1.8 than I had thought, so
this may or may not still hold.

-mental

On Tue, 25 Mar 2008 12:32:38 +0900, “Adam B.” [email protected]
wrote:

require ‘thread.so’

Does this mean the Ruby code for Queue is compiled into a shared
library? Is it possible to edit the code? (I know I can modify the
class from outside the file, and that is probably what I will end up
doing, but I’m rather surprised by the above file contents and would
like to know more about it).

Yes, it’s implemented in C. In JRuby, it’s implemented in Java.

I do agree that the following thread.rb operations need to directly
support timeouts, though:

ConditionVariable#wait
Queue#pop
SizedQueue#pop
SizedQueue#push

Unfortunately, there is no reliable way to patch timeouts in after
the fact. The support needs to be added to Ruby core.

In the interim, I think the best you can do is to implement your own
queue data structure which supports timeouts using Mutex,
ConditionVariable and the ‘scheduler’ gem. For instance:

require ‘thread’
require ‘scheduler’

class MyQueue
def initialize
@lock = Mutex.new
@values = []
@readers = []
end

class Timeout < Interrupt
end

class Reader
def initialize
@condition = ConditionVariable.new
@state = :waiting
@value = nil
end

 def timeout
   @state = :timeout
   @condition.signal
   self
 end

 def value=(value)
   @value = value
   @state = :ready
   @condition.signal
   self
 end

 def wait_for_value(lock)
   @condition.wait(lock) while @state == :waiting
   raise Timeout, "timed out" unless @state == :ready
   @value
 end

end

def push(value)
@lock.synchronize do
if @readers.empty?
@values.push value
else
@readers.pop.value = value
end
end
self
end

def pop(timeout=nil)
@lock.synchronize do
unless @values.empty?
@values.pop
else
reader = Reader.new
@readers.push reader
begin
if timeout
timeout = Scheduler.after_delay!(timeout) do
@lock.synchronize do
reader.timeout if @readers.delete reader
end
end
end
reader.wait_for_value(@lock)
finally
@readers.delete reader
timeout.cancel if timeout
end
end
end
end
end

(This is untested, but should basically do what you need and you’re
welcome to use it. Feel free to ask any questions you might have.)

-mental

On Tue, Mar 25, 2008 at 2:01 PM, MenTaLguY [email protected] wrote:

I do agree that the following thread.rb operations need to directly
support timeouts, though:

ConditionVariable#wait

ConditionVariable#wait appears to support timeouts
(http://www.ruby-doc.org/stdlib/libdoc/monitor/rdoc/classes/MonitorMixin/ConditionVariable.html#M001018).
Do they not work as expected?

I couldn’t find the Scheduler gem, so I’m trying to hack something
together based your code example, but using Monitors. It depends on
ConditionVariable#wait, so it could be totally broken. I didn’t
understand the point of the Reader class in your code, since only one
thread at a time can acquire the lock in pop. This problem is closely
related to the example on page 146 of Pickaxe 2nd ed.

require ‘monitor’

class MyQueue

def initialize
@values = []
@values.extend(MonitorMixin)
@cond = @values.new_cond
end

def push(value)
@values.synchronize do
@values.push value
@cond.signal
end
self
end

def pop(timeout=nil)
ret = nil
@values.synchronize do
t = Time.now
@cond.wait(timeout)
puts “waited #{Time.now - t}”
ret = @values.shift unless @values.empty?
end
return ret
end
end

q = MyQueue.new

consumers = (1…3).map do |i|
Thread.new(“consumer #{i}”) do |name|
begin
obj = q.pop(5)
puts “#{name} consumed #{obj.inspect}”
sleep(rand(0.05))
end until obj == :END_OF_WORK
end
end

producers = (1…3).map do |i|
Thread.new(“producer #{i}”) do |name|
3.times do |j|
sleep(1)
q.push(“Item #{j} from #{name}”)
end
end
end

producers.each { |th| th.join }
consumers.size.times { q.push(:END_OF_WORK) }
consumers.each { |th| th.join }

The problem is that the wait() waits for timeout seconds, even when
something is in the queue.

Thanks for your help,

Adam

On Tue, Mar 25, 2008 at 6:31 PM, Adam B. [email protected] wrote:

On Tue, Mar 25, 2008 at 2:01 PM, MenTaLguY [email protected] wrote:

I do agree that the following thread.rb operations need to directly
support timeouts, though:

ConditionVariable#wait

ConditionVariable#wait appears to support timeouts
(http://www.ruby-doc.org/stdlib/libdoc/monitor/rdoc/classes/MonitorMixin/ConditionVariable.html#M001018).
Do they not work as expected?

My apologies, I didn’t read your previous replies thoroughly enough.
Anyways, I’m not worried about 1.9 portability just yet, I just need
something that works in 1.8.6.

Thanks again,

Adam

On Wed, 26 Mar 2008 07:32:27 +0900, “Adam B.” [email protected]
wrote:

I couldn’t find the Scheduler gem

gem install scheduler

I didn’t understand the point of the Reader class in your code, since
only one thread at a time can acquire the lock in pop.

The lock is released (by ConditionVariable#wait) while a reader is
waiting (and re-acquired before ConditionVariable#wait returns).
Otherwise writers couldn’t get in to write.

Conceptually, a Queue is actually two queues: a queue of “pushes” which
holds pushed values, and a queue of “pops” which holds threads waiting
for new values to be pushed.

If there have been more pushes than pops, the “pushes” queue will have
entries in it, and if there have been more pops than pushes, the “pops”
queue will have entries in it. Each push tries to take an entry from
the “pops” queue and vice-versa, so that when there have been an equal
number of pushes and pops, both queues will be empty.

Readers are just used to represent entries in the “pops” queue.

-mental

On Wed, 26 Mar 2008 07:37:32 +0900, “Adam B.” [email protected]
wrote:

My apologies, I didn’t read your previous replies thoroughly enough.
Anyways, I’m not worried about 1.9 portability just yet, I just need
something that works in 1.8.6.

Monitor should work then, although the implementation of timeouts is
a little flaky in 1.8. It should work reliably in JRuby though.

-mental

On Wed, 26 Mar 2008 07:32:27 +0900, “Adam B.” [email protected]
wrote:

@values.synchronize do
  t = Time.now
  @cond.wait(timeout)
  puts "waited #{Time.now - t}"
  ret = @values.shift unless @values.empty?
end
return ret

end

The problem is that the wait() waits for timeout seconds, even when
something is in the queue.

In this case, the problem is that you are always putting the thread to
sleep on the “pops” queue, even when there is already a pushed value
available on the “pushes” queue.

You need to check whether the “pushes” queue is empty and only sleep
when it is:

def pop
@values.synchronize do
while @values.empty?
@cond.wait
end
@values.shift
end
end

Note that I’ve used a while loop rather than a simple if test; this is
for several reasons, but mainly because it is possible for another
thread to “steal” a value from the “pops” queue before we finish
waking up. Because of that, we have to check the predicate again after
@cond.wait returns, and potentially go back to sleep.

Implementing this in conjunction with timeout is harder, but possible:

def pop(timeout = nil)
if timeout
deadline = Time.now + timeout
else
deadline = nil
end
@values.synchronize do
while @values.empty?
if deadline
timeout = deadline - Time.now
return nil if timeout <= 0
end
@cond.wait(timeout)
end
@values.shift
end
end

Since this uses the system clock rather than a monotonic timer, it does
still have a bug inasmuch as changing the system time can cause #pop to
wait for too long or too little, but Ruby 1.8 and 1.9 have that same bug
in their thread schedulers anyway.

-mental

On Tue, Mar 25, 2008 at 8:09 PM, MenTaLguY [email protected] wrote:

On Wed, 26 Mar 2008 07:32:27 +0900, “Adam B.” [email protected] wrote:

I couldn’t find the Scheduler gem

gem install scheduler

Hmm. That didn’t work the first time I tried it (the gem wasn’t
found), but it worked just now.

In this case, the problem is that you are always putting the thread to
sleep on the “pops” queue, even when there is already a pushed value
available on the “pushes” queue.

I realized later on that that was probably the problem, and indeed it
was.

Implementing this in conjunction with timeout is harder, but possible:

Awesome, this gives me exactly what I need. Thanks for you help.

Adam