Managing a fork pool to handle tasks

Hey guys,

I am doing some background processes in ruby, and I would like to use
forks. But I can’t figure out how I can manage them.

  • I want to create a pool of child processes (forks) running in the
    background, limited on a number I specify
  • Create them from a parent task but creation of new child processes
    should be blocked untill one of the child processes in the pool is done
  • I want to limit the time a child process can run (so it should quit
    after x seconds)

I tried to use Threads but somehow when managing a pool of threads
doesn’t work (they get stuck while doing stuff and don’t die). I tried
to use the code below but somehow the pool fill’s up and I keep waiting
for new threads to become available…

pool = ThreadPool.new(10) # up to 10 threads
pool.process do
timeout(4) do
fetch pages, parse stuff, enc…
end
end

google ruby forkoff [?]

On Tue, Apr 29, 2008 at 3:01 PM, Abdul-rahman Advany

Roger P. wrote:

google ruby forkoff [?]

On Tue, Apr 29, 2008 at 3:01 PM, Abdul-rahman Advany

I don’t have a fixed queue (I use memcache to fill it and use other
processed to get values from memcache). I don’t think forkoff will work
(and I have a hard time understanding how it works. I only see one call
fork… does that fork the thread?

Abdul-rahman Advany wrote:

Roger P. wrote:

google ruby forkoff [?]

On Tue, Apr 29, 2008 at 3:01 PM, Abdul-rahman Advany

I don’t have a fixed queue (I use memcache to fill it and use other
processed to get values from memcache). I don’t think forkoff will work
(and I have a hard time understanding how it works. I only see one call
fork… does that fork the thread?

Sorry, I didn’t know that calling fork makes the thread become a child
process

On Wed, 30 Apr 2008, Abdul-rahman Advany wrote:

end
Try my nifty MultiThread class. Creates a pool of N worker threads (no
point in creating much more than you have CPU cores to do the work
anyway)

require ‘thread’
Thread.abort_on_exception = true

class MultiFail < Exception
attr_reader :queue

def initialize( _queue)
@queue = _queue
end
end

class MultiThread
private

def do_stuff
job = @jobs.deq
while job
job.call(Thread.current[:index])
job = @jobs.deq
end
rescue Exception => failure
@failed << failure
end

public

Spawns a pool of _jobs worker threads

def initialize( _jobs = 1)
raise “Insufficient threads to do anything! ‘#{_jobs}’” if _jobs <=
0
@jobs = SizedQueue.new( 2 * _jobs)
@threads = Array.new(_jobs){|i|
Thread.new{Thread.current[:index]=i;do_stuff}}
@failed = Queue.new
end

Run block in one of the threads

def run(&block)
raise MultiFail.new(@failed) if @failed.size > 0
@jobs.enq( block)
end

Wait until all threads are finished doing whatever they’re doing.

def join
@threads.each{|t| @jobs.enq nil}
@threads.each{|t| t.join}
raise MultiFail.new(@failed) if @failed.size > 0
end
end

if $0 == FILE then
require ‘test/unit’

class TC_MultiThread < Test::Unit::TestCase
def initialize(test)
super(test)
@c = 0
end

 def wrap(s)
   @c += s
   if @c > 70
     puts
     @c = 0
   end
 end

 def dot(c)
   s = sprintf( '%x< ',c)
   print s
   wrap s.size
 end

 def undot(c)
   s = sprintf( '>%x ',c )
   print s
   wrap s.size
 end

 def try_for(loops,threads)
   puts "Trying [#{loops},#{threads}]"
   i = 0
   k = 0
   max = 0
   mutex = Mutex.new
   multi_thread = MultiThread.new(threads)

   loops.times do |j|
     multi_thread.run do |t|
       dot(t)
       mutex.synchronize do
         i += 1
       end
       sleep 1
       mutex.synchronize do
         assert( i <= threads)
         k +=1
         max = i if i > max
       end
       mutex.synchronize do
         i -= 1
       end
       undot(t)
     end
   end
   multi_thread.join
   assert_equal(0, i)
   assert( ((threads <= 1) || (loops <= 1)) || max > 1)
   assert_equal( loops, k)
 end

 def test_multi
   assert_raises(RuntimeError){ try_for(0,0)}
   try_for(0,1)
   try_for(0,2)
   try_for(1,1)
   try_for(2,1)
   try_for(2,2)
   try_for(2,100)
   try_for(3,1)
   try_for(3,2)
   try_for(3,3)
   try_for(3,100)
   try_for(100,100)
 end

 def test_fail
   multi_thread = MultiThread.new(3)

   multi_thread.run do
     sleep 2
   end

   multi_thread.run do
     raise "This thread failed for test purposes"
   end

   assert_raises( MultiFail) do
     multi_thread.run do
       sleep 2
     end
   end

   begin
     multi_thread.join
   rescue MultiFail => multi_fail
     assert_equal( RuntimeError, multi_fail.queue.pop.class)
   end
 end

end

end

John C. Phone : (64)(3) 358 6639
Tait Electronics Fax : (64)(3) 359 4632
PO Box 1645 Christchurch Email : [email protected]
New Zealand

On Wed, 30 Apr 2008, Abdul-rahman Advany wrote:

Your multithread class doesn’t catch failures…
http://ruby-rails.pl/true-ruby-thread-pool

Contrariwise.

It does.

Of course it’s a bit debatable what you want to do with a failure once
you have caught it.

Having a exception bubble up the call frames to the top level of a
generic pool worker thread is not very helpful.

Having all the tasks complete before you act on a failure is not what
I wanted either.

The gotcha is two or more failures can happen before you start
handling them in the parent thread.

So what I do is catch failues, and drop them in a list which I check
before every run / join.

If there have been any failures I throw them all in a bundle up the
parent thread.

That may not be what you want, but it makes sense to me.

John C. Phone : (64)(3) 358 6639
Tait Electronics Fax : (64)(3) 359 4632
PO Box 1645 Christchurch Email : [email protected]
New Zealand

Your multithread class doesn’t catch failures…
http://ruby-rails.pl/true-ruby-thread-pool