sriram
April 17, 2009, 7:44am
1
Hello,
#*******************STARTCODE
start_time = Time.now
$count = 0
class Queue
def initialize *s # splat operator allows variable length argument
list
@mutex = Mutex.new
@queue = []
s.each { |e| @queue.push e }
end
def enq v
@queue.push v
end
def deq
@mutex.synchronize {item = @queue.shift }
end
def empty?
@mutex.synchronize {@queue.length == 0}
end
def count
@mutex.synchronize do
$count += 1
end
end
end
#*****Test
queue = Queue.new
500.times do |a|
queue.enq a
end
threads = []
Create 5 threads which fetch values from the Q.
5.times do
threads << Thread.new {
until queue.empty?
queue.count
puts “Thread ID: #{Thread.current}.Job started”
puts “#{queue.deq}”
#sleep 0.0001
puts “Thread ID: #{Thread.current}.Job complete”
end
}
end
threads.each {|t| t.join }
puts “Count”
puts $count
puts “timeTaken:”
puts Time.now - start_time
CODE ENDS *****
I have five threads which fetch values from a queue. The above code
works perfectly well in case of a single thread. But the issue arises
when there are more threads.
In case of 5 threads the number of times the block is executed is 503
where it should have been 500.
I know the reason why?
The “deq” and “empty?” methods are not synchronized.
So when the final item is removed from the thread, other threads access
the empty? method before the @queue.length becomes 0.
Hence the difference in count.
If the sleep is activated this problem is solved.
Any suggestion on how to get this working without the sleep?
Thanks.
sriram
April 17, 2009, 8:23am
2
Sriram V. wrote:
@queue = []
Create 5 threads which fetch values from the Q.
end
the empty? method before the @queue.length becomes 0.
Hence the difference in count.
If the sleep is activated this problem is solved.
Any suggestion on how to get this working without the sleep?
You should also synchronize the enque operation (Queue#enq). Btw, there
is
an existing Queue class that does this thread-safe:
require ‘thread’
q = Queue.new
q.push 1
x = q.pop
q.pop # => would block the thread until a new element is available
q2 = SizedQueue.new(10) # bounded queue, which blocks when size > 10
Regards,
Michael
sriram
April 17, 2009, 8:56am
3
2009/4/17 Sriram V. [email protected] :
@queue = []
Create 5 threads which fetch values from the Q.
end
the empty? method before the @queue.length becomes 0.
Hence the difference in count.
If the sleep is activated this problem is solved.
Any suggestion on how to get this working without the sleep?
There are several options. You could use MonitorMixin instead of Mutex
and include it in initialize
def initialize *s
@mutex == self so you do not need to change sync code
@mutex = extend MonitorMixin
@queue = s.dup
end
Then you can do external synchronization, e.g.
queue.synchronize do
if queue.empty?
finish
else
elm = deq
end
end
Much better though is this approach
require ‘thread’
use library class
queue = Queue.new
first start threads
does not really matter but if filling
the queue takes time work can
begin immediately
threads = (1…5).map do
label = Thread.current.to_s.freeze
Thread.new queue do |q|
until ( job = q.deq ) == :terminate
puts “Thread ID: #{label}.Job started”
puts job
puts “Thread ID: #{label}.Job complete”
end
end
end
fill queues
500.times do |a|
queue.enq a
end
“close” queues
threads.size.times { queue.enq :terminate }
wait for termination
threads.each do |th|
th.join
end
Cheers
robert
sriram
April 17, 2009, 9:23am
4
On Fri, Apr 17, 2009 at 7:55 AM, Robert K.
[email protected] wrote:
the queue takes time work can
end
wait for termination
http://blog.rubybestpractices.com/
Minor nitpick - these lines should be reversed:
label = Thread.current.to_s.freeze
Thread.new queue do |q|
i.e.
Thread.new queue do |q|
label = Thread.current.to_s.freeze
Regards,
Sean
sriram
April 17, 2009, 9:43am
5
Hey Robert,
That was an amazing solution!Thanks a million
Thank you Michael and Sean for your time
sriram
April 17, 2009, 10:02am
6
2009/4/17 Sean O’Halpin [email protected] :
Minor nitpick - these lines should be reversed:
label = Thread.current.to_s.freeze
Thread.new queue do |q|
i.e.
Thread.new queue do |q|
label = Thread.current.to_s.freeze
Oh yes, absolutely! Apparently I moved the line too high. Thanks for
catching that gotcha, Sean!
Kind regards
robert