Forum: Ruby Multi Threading

Announcement (2017-05-07): www.ruby-forum.com is now read-only since I unfortunately do not have the time to support and maintain the forum any more. Please see rubyonrails.org/community and ruby-lang.org/en/community for other Rails- und Ruby-related community platforms.
F16e67cf8c0d59e1b98f5ba46d84b31d?d=identicon&s=25 Sriram Varahan (sriram)
on 2009-04-17 07:44
Hello,

#*******************STARTCODE

start_time = Time.now
$count = 0
class Queue
  def initialize *s # splat operator allows variable length argument
list
    @mutex = Mutex.new
    @queue = []
    s.each { |e| @queue.push e }
  end

  def enq v
      @queue.push v
  end

  def deq
    @mutex.synchronize {item = @queue.shift}
  end

  def empty?
    @mutex.synchronize{@queue.length == 0}
  end

  def count
   @mutex.synchronize do
     $count += 1
   end
  end
end


#*****Test

queue = Queue.new
500.times do |a|
  queue.enq a
end
threads = []


# Create 5 threads which fetch values from the Q.
  5.times do
    threads << Thread.new {
    until queue.empty?
      queue.count
      puts "Thread ID: #{Thread.current}.Job  started"
      puts "#{queue.deq}"
      #sleep 0.0001
      puts "Thread ID: #{Thread.current}.Job  complete"
    end
   }
  end


threads.each {|t| t.join }
puts "Count"
puts $count
puts "timeTaken:"
puts Time.now - start_time

# *************CODE ENDS******************


I have five threads which fetch values from a queue. The above code
works perfectly well in case of a single thread. But the issue arises
when there are more threads.

In case of 5 threads the number of times the block is executed is 503
where it should have been 500.

I know the reason why?
The "deq" and "empty?" methods are not synchronized.
So when the final item is removed from the thread, other threads  access
the empty? method before the @queue.length becomes 0.

Hence the difference in count.

If the sleep is activated this problem is solved.

Any suggestion on how to get this working without the sleep?

Thanks.
Baf83fa62a7481a08c40353795e11f44?d=identicon&s=25 Michael Neumann (mneumann)
on 2009-04-17 08:23
(Received via mailing list)
Sriram Varahan wrote:

>     @queue = []
>
>
> # Create 5 threads which fetch values from the Q.
>   end
>
> the empty? method before the @queue.length becomes 0.
>
> Hence the difference in count.
>
> If the sleep is activated this problem is solved.
>
> Any suggestion on how to get this working without the sleep?

You should also synchronize the enque operation (Queue#enq). Btw, there
is
an existing Queue class that does this thread-safe:

  require 'thread'
  q = Queue.new
  q.push 1
  x = q.pop
  q.pop # => would block the thread until a new element is available

  q2 = SizedQueue.new(10) # bounded queue, which blocks when size > 10

Regards,

  Michael
E0d864d9677f3c1482a20152b7cac0e2?d=identicon&s=25 Robert Klemme (Guest)
on 2009-04-17 08:56
(Received via mailing list)
2009/4/17 Sriram Varahan <sriram.varahan@gmail.com>:
>    @queue = []
>
>
> # Create 5 threads which fetch values from the Q.
>  end
>
> the empty? method before the @queue.length becomes 0.
>
> Hence the difference in count.
>
> If the sleep is activated this problem is solved.
>
> Any suggestion on how to get this working without the sleep?

There are several options. You could use MonitorMixin instead of Mutex
and include it in initialize

def initialize *s
  # @mutex == self so you do not need to change sync code
  @mutex = extend MonitorMixin
  @queue = s.dup
end

Then you can do external synchronization, e.g.

queue.synchronize do
  if queue.empty?
   # finish
  else
    elm = deq
  end
end

Much better though is this approach

require 'thread'

# use library class
queue = Queue.new

# _first_ start threads
# does not really matter but if filling
# the queue takes time work can
# begin immediately
threads = (1..5).map do
  label = Thread.current.to_s.freeze

  Thread.new queue do |q|
    until ( job = q.deq ) == :terminate
      puts "Thread ID: #{label}.Job  started"
      puts job
      puts "Thread ID: #{label}.Job  complete"
    end
  end
end

# fill queues
500.times do |a|
 queue.enq a
end

# "close" queues
threads.size.times { queue.enq :terminate }

# wait for termination
threads.each do |th|
  th.join
end

Cheers

robert
E1d641bfe4071a5413bac781f06d3fd1?d=identicon&s=25 Sean O'halpin (sean)
on 2009-04-17 09:23
(Received via mailing list)
On Fri, Apr 17, 2009 at 7:55 AM, Robert Klemme
<shortcutter@googlemail.com> wrote:
> # the queue takes time work can
>  end
> # wait for termination
> http://blog.rubybestpractices.com/
>
>

Minor nitpick - these lines should be reversed:

>  label = Thread.current.to_s.freeze
>  Thread.new queue do |q|

i.e.

>  Thread.new queue do |q|
>    label = Thread.current.to_s.freeze

Regards,
Sean
F16e67cf8c0d59e1b98f5ba46d84b31d?d=identicon&s=25 Sriram Varahan (sriram)
on 2009-04-17 09:43
Hey Robert,

  That was an amazing solution!Thanks a million :)


Thank you Michael and Sean for your time :)
E0d864d9677f3c1482a20152b7cac0e2?d=identicon&s=25 Robert Klemme (Guest)
on 2009-04-17 10:02
(Received via mailing list)
2009/4/17 Sean O'Halpin <sean.ohalpin@gmail.com>:

> Minor nitpick - these lines should be reversed:
>
>>  label = Thread.current.to_s.freeze
>>  Thread.new queue do |q|
>
> i.e.
>
>>  Thread.new queue do |q|
>>    label = Thread.current.to_s.freeze

Oh yes, absolutely!  Apparently I moved the line too high. Thanks for
catching that gotcha, Sean!

Kind regards

robert
This topic is locked and can not be replied to.