Forum: Ruby Multi Threading

Announcement (2017-05-07): www.ruby-forum.com is now read-only since I unfortunately do not have the time to support and maintain the forum any more. Please see rubyonrails.org/community and ruby-lang.org/en/community for other Rails- und Ruby-related community platforms.
Sriram V. (Guest)
on 2009-04-17 09:44
Hello,

#*******************STARTCODE

start_time = Time.now
$count = 0
class Queue
  def initialize *s # splat operator allows variable length argument
list
    @mutex = Mutex.new
    @queue = []
    s.each { |e| @queue.push e }
  end

  def enq v
      @queue.push v
  end

  def deq
    @mutex.synchronize {item = @queue.shift}
  end

  def empty?
    @mutex.synchronize{@queue.length == 0}
  end

  def count
   @mutex.synchronize do
     $count += 1
   end
  end
end


#*****Test

queue = Queue.new
500.times do |a|
  queue.enq a
end
threads = []


# Create 5 threads which fetch values from the Q.
  5.times do
    threads << Thread.new {
    until queue.empty?
      queue.count
      puts "Thread ID: #{Thread.current}.Job  started"
      puts "#{queue.deq}"
      #sleep 0.0001
      puts "Thread ID: #{Thread.current}.Job  complete"
    end
   }
  end


threads.each {|t| t.join }
puts "Count"
puts $count
puts "timeTaken:"
puts Time.now - start_time

# *************CODE ENDS******************


I have five threads which fetch values from a queue. The above code
works perfectly well in case of a single thread. But the issue arises
when there are more threads.

In case of 5 threads the number of times the block is executed is 503
where it should have been 500.

I know the reason why?
The "deq" and "empty?" methods are not synchronized.
So when the final item is removed from the thread, other threads  access
the empty? method before the @queue.length becomes 0.

Hence the difference in count.

If the sleep is activated this problem is solved.

Any suggestion on how to get this working without the sleep?

Thanks.
Michael N. (Guest)
on 2009-04-17 10:23
(Received via mailing list)
Sriram V. wrote:

>     @queue = []
>
>
> # Create 5 threads which fetch values from the Q.
>   end
>
> the empty? method before the @queue.length becomes 0.
>
> Hence the difference in count.
>
> If the sleep is activated this problem is solved.
>
> Any suggestion on how to get this working without the sleep?

You should also synchronize the enque operation (Queue#enq). Btw, there
is
an existing Queue class that does this thread-safe:

  require 'thread'
  q = Queue.new
  q.push 1
  x = q.pop
  q.pop # => would block the thread until a new element is available

  q2 = SizedQueue.new(10) # bounded queue, which blocks when size > 10

Regards,

  Michael
Robert K. (Guest)
on 2009-04-17 10:56
(Received via mailing list)
2009/4/17 Sriram V. <removed_email_address@domain.invalid>:
>    @queue = []
>
>
> # Create 5 threads which fetch values from the Q.
>  end
>
> the empty? method before the @queue.length becomes 0.
>
> Hence the difference in count.
>
> If the sleep is activated this problem is solved.
>
> Any suggestion on how to get this working without the sleep?

There are several options. You could use MonitorMixin instead of Mutex
and include it in initialize

def initialize *s
  # @mutex == self so you do not need to change sync code
  @mutex = extend MonitorMixin
  @queue = s.dup
end

Then you can do external synchronization, e.g.

queue.synchronize do
  if queue.empty?
   # finish
  else
    elm = deq
  end
end

Much better though is this approach

require 'thread'

# use library class
queue = Queue.new

# _first_ start threads
# does not really matter but if filling
# the queue takes time work can
# begin immediately
threads = (1..5).map do
  label = Thread.current.to_s.freeze

  Thread.new queue do |q|
    until ( job = q.deq ) == :terminate
      puts "Thread ID: #{label}.Job  started"
      puts job
      puts "Thread ID: #{label}.Job  complete"
    end
  end
end

# fill queues
500.times do |a|
 queue.enq a
end

# "close" queues
threads.size.times { queue.enq :terminate }

# wait for termination
threads.each do |th|
  th.join
end

Cheers

robert
Sean O. (Guest)
on 2009-04-17 11:23
(Received via mailing list)
On Fri, Apr 17, 2009 at 7:55 AM, Robert K.
<removed_email_address@domain.invalid> wrote:
> # the queue takes time work can
>  end
> # wait for termination
> http://blog.rubybestpractices.com/
>
>

Minor nitpick - these lines should be reversed:

>  label = Thread.current.to_s.freeze
>  Thread.new queue do |q|

i.e.

>  Thread.new queue do |q|
>    label = Thread.current.to_s.freeze

Regards,
Sean
Sriram V. (Guest)
on 2009-04-17 11:43
Hey Robert,

  That was an amazing solution!Thanks a million :)


Thank you Michael and Sean for your time :)
Robert K. (Guest)
on 2009-04-17 12:02
(Received via mailing list)
2009/4/17 Sean O'Halpin <removed_email_address@domain.invalid>:

> Minor nitpick - these lines should be reversed:
>
>>  label = Thread.current.to_s.freeze
>>  Thread.new queue do |q|
>
> i.e.
>
>>  Thread.new queue do |q|
>>    label = Thread.current.to_s.freeze

Oh yes, absolutely!  Apparently I moved the line too high. Thanks for
catching that gotcha, Sean!

Kind regards

robert
This topic is locked and can not be replied to.