Hi,
I’m trying to build a message queue that can receive messages, blocks
until messages arrive and has the ability to only block for a limited
amount of time. Unfortunately, the build-in Queue doesn’t support
timeout behaviour, so I went about implementing a queue myself. I
missed java’s Object.wait method, i.e. using a plain object to notify
the blocking thread to resume .
I ended up using thread.join(timeout) which blocks the current
thread until the thread that join was called on finishes (or until
timeout is reached). My queue doesn’t need to join any particular
thread, though, it just needs to be notified when a new message is
added to the queue. So I needed to create a dummy thread solely to
provide notification. What I came up with is this:
require ‘thread’
class MessageQueue
def initialize
@lock = Mutex.new
@t = Thread.new{Thread.stop} # this thread exists solely for
notification
@queue = []
end
def enqueue msg
@lock.synchronize {
@queue.push msg
t=@t
@t=Thread.new{Thread.stop}
t.kill# notifiy waiting thread
}
end
def dequeue timeout=nil
msg = nil
stop = Time.now+timeout if timeout
while msg == nil
@lock.synchronize {msg = @queue.shift}
break if timeout && Time.new >= stop
@t.join(timeout) unless msg # wait for new message
end #while
msg
end
end
While this works fine, it seems like a bit of a kludge because I have
to create/misuses threads solely to provided notification to the
blocking thread.
Any thought? Am I missing an easier/build-in method to block a thread
only for a limited about of time?
Thanks,
-tim
On 5/12/07, Tim B. [email protected] wrote:
I’m trying to build a message queue that can receive messages, blocks
until messages arrive and has the ability to only block for a limited
amount of time. Unfortunately, the build-in Queue doesn’t support
timeout behaviour, so I went about implementing a queue myself. I
missed java’s Object.wait method, i.e. using a plain object to notify
the blocking thread to resume .
Use ‘timeout’:
require ‘timeout’
require ‘thread’
q = Queue.new
Thread.new do
q.push “thing”
sleep 10
end
begin
Timeout::timeout(1) do
q.pop
q.pop
end
rescue Timeout::Error
puts “timed out!”
end
On Sat, 2007-05-12 at 22:39 +0900, Tim B. wrote:
Any thought? Am I missing an easier/build-in method to block a thread
only for a limited about of time?
Unless/until the built-in classes support timeouts, using a thread to
track the timeout is sadly your best option.
However, there are a number of problems with the specific approach
you’ve used here (e.g. not all accesses to @t are protected by the
mutex)… here’s an alternate implementation…
require 'thread'
begin
require 'fastthread'
rescue LoadError
end
class MessageQueue
def initialize
@lock = Mutex.new
@messages = []
@readers = []
end
def enqueue msg
@lock.synchronize do
unless @readers.empty?
@readers.pop << msg
else
@messages.push msg
end
end
end
def dequeue timeout=nil
timeout_thread = nil
begin
reader = nil
@lock.synchronize do
unless @messages.empty?
# fast path
return @messages.shift
else
reader = Queue.new
@readers.push reader
if timeout
timeout_thread = Thread.new do
sleep timeout
@lock.synchronize do
@readers.delete reader
reader << nil
end
end
end
end
end
# either timeout or writer will send to us
reader.shift
ensure
# (try to) clean up timeout thread
timeout_thread.run if timeout_thread
end
end
end
Queue may seem sort of heavyweight to use for “callbacks” this way, but
if you use fastthread they are pretty inexpensive, and queues take care
of a lot of the bookkeeping you would need to do to make this safe
otherwise.
One remaining issue is that it’s possible for the timeout thread to
sleep after the call to timeout_thread.run, so the cleanup isn’t
necessarily that effective. However, Thread#kill (or Thread#raise)
isn’t an option, since those can leave things in an inconsistent state.
A better solution would be to maintain a dedicated thread for tracking
timeouts which you don’t have to worry about cleaning up – I’ll be
releasing a library to do that soon, but for now the above solution is
the best I can do.
-mental
On Sun, 2007-05-13 at 04:17 +0900, Avdi G. wrote:
Use ‘timeout’:
I’d be cautious of using timeout, since not all of stdlib is safe with
respect to asynchronous exceptions (it’s sometimes very difficult to do
properly). I think it’s okay with the old thread.rb implementation of
Queue specifically, but I’m not 100% sure about the current C
implementation or other Ruby implementations like JRuby.
-mental