Queue, with timeout. (Thread.notify...)

Hi,

I’m trying to build a message queue that can receive messages, blocks
until messages arrive and has the ability to only block for a limited
amount of time. Unfortunately, the build-in Queue doesn’t support
timeout behaviour, so I went about implementing a queue myself. I
missed java’s Object.wait method, i.e. using a plain object to notify
the blocking thread to resume .

I ended up using thread.join(timeout) which blocks the current
thread until the thread that join was called on finishes (or until
timeout is reached). My queue doesn’t need to join any particular
thread, though, it just needs to be notified when a new message is
added to the queue. So I needed to create a dummy thread solely to
provide notification. What I came up with is this:

require ‘thread’

class MessageQueue
def initialize
@lock = Mutex.new
@t = Thread.new{Thread.stop} # this thread exists solely for
notification
@queue = []
end

def enqueue msg
@lock.synchronize {
@queue.push msg
[email protected]
@t=Thread.new{Thread.stop}
t.kill# notifiy waiting thread
}
end

def dequeue timeout=nil
msg = nil
stop = Time.now+timeout if timeout
while msg == nil
@lock.synchronize {msg = @queue.shift}
break if timeout && Time.new >= stop
@t.join(timeout) unless msg # wait for new message
end #while
msg
end

end

While this works fine, it seems like a bit of a kludge because I have
to create/misuses threads solely to provided notification to the
blocking thread.

Any thought? Am I missing an easier/build-in method to block a thread
only for a limited about of time?

Thanks,
-tim

On 5/12/07, Tim B. [email protected] wrote:

I’m trying to build a message queue that can receive messages, blocks
until messages arrive and has the ability to only block for a limited
amount of time. Unfortunately, the build-in Queue doesn’t support
timeout behaviour, so I went about implementing a queue myself. I
missed java’s Object.wait method, i.e. using a plain object to notify
the blocking thread to resume .

Use ‘timeout’:

require ‘timeout’
require ‘thread’

q = Queue.new

Thread.new do
q.push “thing”
sleep 10
end

begin
Timeout::timeout(1) do
q.pop
q.pop
end
rescue Timeout::Error
puts “timed out!”
end

On Sat, 2007-05-12 at 22:39 +0900, Tim B. wrote:

Any thought? Am I missing an easier/build-in method to block a thread
only for a limited about of time?

Unless/until the built-in classes support timeouts, using a thread to
track the timeout is sadly your best option.

However, there are a number of problems with the specific approach
you’ve used here (e.g. not all accesses to @t are protected by the
mutex)… here’s an alternate implementation…

    require 'thread'
    begin
      require 'fastthread'
    rescue LoadError
    end

    class MessageQueue
      def initialize
        @lock = Mutex.new
        @messages = []
        @readers = []
      end

      def enqueue msg
        @lock.synchronize do
          unless @readers.empty?
            @readers.pop << msg
          else
            @messages.push msg
          end
        end
      end

      def dequeue timeout=nil
        timeout_thread = nil
        begin
          reader = nil
          @lock.synchronize do
         unless @messages.empty?
           # fast path
           return @messages.shift
         else
              reader = Queue.new
              @readers.push reader
              if timeout
                timeout_thread = Thread.new do
                  sleep timeout
                  @lock.synchronize do
                    @readers.delete reader
                    reader << nil
                  end
                end
              end
            end
          end
          # either timeout or writer will send to us
          reader.shift
        ensure
          # (try to) clean up timeout thread
          timeout_thread.run if timeout_thread
        end
      end
    end

Queue may seem sort of heavyweight to use for “callbacks” this way, but
if you use fastthread they are pretty inexpensive, and queues take care
of a lot of the bookkeeping you would need to do to make this safe
otherwise.

One remaining issue is that it’s possible for the timeout thread to
sleep after the call to timeout_thread.run, so the cleanup isn’t
necessarily that effective. However, Thread#kill (or Thread#raise)
isn’t an option, since those can leave things in an inconsistent state.

A better solution would be to maintain a dedicated thread for tracking
timeouts which you don’t have to worry about cleaning up – I’ll be
releasing a library to do that soon, but for now the above solution is
the best I can do.

-mental

On Sun, 2007-05-13 at 04:17 +0900, Avdi G. wrote:

Use ‘timeout’:

I’d be cautious of using timeout, since not all of stdlib is safe with
respect to asynchronous exceptions (it’s sometimes very difficult to do
properly). I think it’s okay with the old thread.rb implementation of
Queue specifically, but I’m not 100% sure about the current C
implementation or other Ruby implementations like JRuby.

-mental

This forum is not affiliated to the Ruby language, Ruby on Rails framework, nor any Ruby applications discussed here.

| Privacy Policy | Terms of Service | Remote Ruby Jobs