On Wed, 30 Apr 2008, Abdul-rahman Advany wrote:
end
Try my nifty MultiThread class. Creates a pool of N worker threads (no
point in creating much more than you have CPU cores to do the work
anyway)
require ‘thread’
Thread.abort_on_exception = true
class MultiFail < Exception
attr_reader :queue
def initialize( _queue)
@queue = _queue
end
end
class MultiThread
private
def do_stuff
job = @jobs.deq
while job
job.call(Thread.current[:index])
job = @jobs.deq
end
rescue Exception => failure
@failed << failure
end
public
Spawns a pool of _jobs worker threads
def initialize( _jobs = 1)
raise “Insufficient threads to do anything! ‘#{_jobs}’” if _jobs <=
0
@jobs = SizedQueue.new( 2 * _jobs)
@threads = Array.new(_jobs){|i|
Thread.new{Thread.current[:index]=i;do_stuff}}
@failed = Queue.new
end
Run block in one of the threads
def run(&block)
raise MultiFail.new(@failed) if @failed.size > 0
@jobs.enq( block)
end
Wait until all threads are finished doing whatever they’re doing.
def join
@threads.each{|t| @jobs.enq nil}
@threads.each{|t| t.join}
raise MultiFail.new(@failed) if @failed.size > 0
end
end
if $0 == FILE then
require ‘test/unit’
class TC_MultiThread < Test::Unit::TestCase
def initialize(test)
super(test)
@c = 0
end
def wrap(s)
@c += s
if @c > 70
puts
@c = 0
end
end
def dot(c)
s = sprintf( '%x< ',c)
print s
wrap s.size
end
def undot(c)
s = sprintf( '>%x ',c )
print s
wrap s.size
end
def try_for(loops,threads)
puts "Trying [#{loops},#{threads}]"
i = 0
k = 0
max = 0
mutex = Mutex.new
multi_thread = MultiThread.new(threads)
loops.times do |j|
multi_thread.run do |t|
dot(t)
mutex.synchronize do
i += 1
end
sleep 1
mutex.synchronize do
assert( i <= threads)
k +=1
max = i if i > max
end
mutex.synchronize do
i -= 1
end
undot(t)
end
end
multi_thread.join
assert_equal(0, i)
assert( ((threads <= 1) || (loops <= 1)) || max > 1)
assert_equal( loops, k)
end
def test_multi
assert_raises(RuntimeError){ try_for(0,0)}
try_for(0,1)
try_for(0,2)
try_for(1,1)
try_for(2,1)
try_for(2,2)
try_for(2,100)
try_for(3,1)
try_for(3,2)
try_for(3,3)
try_for(3,100)
try_for(100,100)
end
def test_fail
multi_thread = MultiThread.new(3)
multi_thread.run do
sleep 2
end
multi_thread.run do
raise "This thread failed for test purposes"
end
assert_raises( MultiFail) do
multi_thread.run do
sleep 2
end
end
begin
multi_thread.join
rescue MultiFail => multi_fail
assert_equal( RuntimeError, multi_fail.queue.pop.class)
end
end
end
end
John C. Phone : (64)(3) 358 6639
Tait Electronics Fax : (64)(3) 359 4632
PO Box 1645 Christchurch Email : [email protected]
New Zealand