Forkoff - parallel processing for ruby enumerables

NAME

forkoff

SYNOPSIS

brain-dead simple parallel processing for ruby

URI

http://rubyforge.org/projects/codeforpeople

INSTALL

gem install forkoff

DESCRIPTION

forkoff works for any enumerable object, iterating a code block to
run in a
child process and collecting the results. forkoff can limit the
number of
child processes which is, by default, 8.

SAMPLES

<========< samples/a.rb >========>

~ > cat samples/a.rb

 #
 # forkoff makes it trivial to do parallel processing with ruby,

the following
# prints out each word in a separate process
#

   require 'forkoff'

   %w( hey you ).forkoff!{|word| puts "#{ word } from

#{ Process.pid }"}

~ > ruby samples/a.rb

 hey from 3239
 you from 3240

<========< samples/b.rb >========>

~ > cat samples/b.rb

 #
 # for example, this takes only 1 second or so to complete
 #

   require 'forkoff'

   a = Time.now.to_f

   results =
     (0..7).forkoff do |i|

       sleep 1

       i ** 2

     end

   b = Time.now.to_f

   elapsed = b - a

   puts "elapsed: #{ elapsed }"
   puts "results: #{ results.inspect }"

~ > ruby samples/b.rb

 elapsed: 1.07044386863708
 results: [0, 1, 4, 9, 16, 25, 36, 49]

<========< samples/c.rb >========>

~ > cat samples/c.rb

 #
 # forkoff does *NOT* spawn processes in batches, waiting for each

batch to
# complete. rather, it keeps a certain number of processes busy
until all
# results have been gathered. in otherwords the following will
ensure that 2
# processes are running at all times, until the list is complete.
note that
# the following will take about 2 seconds to run (2 sets of 2 @ 1
second).
#

 require 'forkoff'

 pid = Process.pid

 a = Time.now.to_f

 pstrees =
   %w( a b c d ).forkoff! :processes => 2 do |letter|
     sleep 1
     { letter => ` pstree -l 2 #{ pid } ` }
   end


 b = Time.now.to_f

 puts
 puts "pid: #{ pid }"
 puts "elapsed: #{ b - a }"
 puts

 require 'yaml'

 pstrees.each do |pstree|
   y pstree
 end

~ > ruby samples/c.rb

 pid: 3254
 elapsed: 2.12998485565186

 ---
 a: |
   -+- 03254 ahoward ruby -Ilib samples/c.rb
    |-+- 03255 ahoward ruby -Ilib samples/c.rb
    \-+- 03256 ahoward ruby -Ilib samples/c.rb

 ---
 b: |
   -+- 03254 ahoward ruby -Ilib samples/c.rb
    |-+- 03255 ahoward ruby -Ilib samples/c.rb
    \-+- 03256 ahoward ruby -Ilib samples/c.rb

 ---
 c: |
   -+- 03254 ahoward ruby -Ilib samples/c.rb
    |-+- 03261 ahoward (ruby)
    \-+- 03262 ahoward ruby -Ilib samples/c.rb

 ---
 d: |
   -+- 03254 ahoward ruby -Ilib samples/c.rb
    |-+- 03261 ahoward ruby -Ilib samples/c.rb
    \-+- 03262 ahoward ruby -Ilib samples/c.rb

a @ http://codeforpeople.com/

NAME

forkoff

Nice. Great idea.

second).

I assume then that at most 2 processes are forked, and each keeps
working?

On Apr 17, 2008, at 7:51 PM, Roger P. wrote:

I assume then that at most 2 processes are forked, and each keeps
working?

right now it’s 8 - 2 is more reasonable. at this point this code is
fully proof of concept - i’ll take that as a suggestion (that i agree
with)

cheers.

a @ http://codeforpeople.com/

On Apr 17, 2008, at 8:48 PM, Phillip G. wrote:

So, the tool that captures run away processes and terminates them will
be called ‘sodoff’, I wager? :stuck_out_tongue:

oh yeah, that’s good - taken!

a @ http://codeforpeople.com/

-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1

ara howard wrote:
|
| NAME
|
| forkoff
|
| SYNOPSIS
|
| brain-dead simple parallel processing for ruby
|
| URI
|
| http://rubyforge.org/projects/codeforpeople
|
| INSTALL
|
| gem install forkoff
|
| DESCRIPTION
|
| forkoff works for any enumerable object, iterating a code block to run
| in a
| child process and collecting the results. forkoff can limit the
| number of
| child processes which is, by default, 8.

So, the tool that captures run away processes and terminates them will
be called ‘sodoff’, I wager? :stuck_out_tongue:

SCNR


Phillip G.
Twitter: twitter.com/cynicalryan

You thought I was taking your woman away from you. You’re jealous.
You tried to kill me with your bare hands. Would a Kelvan do that?
Would he have to? You’re reacting with the emotions of a human.
You are human.
~ – Kirk, “By Any Other Name,” stardate 4657.5
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.8 (MingW32)
Comment: Using GnuPG with Mozilla - http://enigmail.mozdev.org

iEYEARECAAYFAkgIC/AACgkQbtAgaoJTgL/V1wCeMxLdzlPEbQDtp3fya03PRP8z
O6AAn0BA5yY/MU1dzKYt1Ezd/YbsFakv
=QIF9
-----END PGP SIGNATURE-----

I think this is a great idea!
Kudos

On Fri, Apr 18, 2008 at 9:09 AM, Phillip G. <

On Thu, Apr 17, 2008 at 6:43 PM, ara howard [email protected]
wrote:

DESCRIPTION

forkoff works for any enumerable object, iterating a code block to run in
a
child process and collecting the results. forkoff can limit the number of
child processes which is, by default, 8.

Very neat indeed!

martin

-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1

ara.t.howard wrote:
|
| On Apr 17, 2008, at 8:48 PM, Phillip G. wrote:
|>
|> So, the tool that captures run away processes and terminates them will
|> be called ‘sodoff’, I wager? :stuck_out_tongue:
|
| oh yeah, that’s good - taken!

I want credit. Dollars aren’t worth a dime. :stuck_out_tongue:


Phillip G.
Twitter: twitter.com/cynicalryan

“You speak truth,” said Themistocles; “I should never have been famous
if I had been of Seriphus”
~ – Plutarch (46-120 AD)
~ – Life of Themistocles
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.8 (MingW32)
Comment: Using GnuPG with Mozilla - http://enigmail.mozdev.org

iEYEARECAAYFAkgIGAMACgkQbtAgaoJTgL/40QCgpIHgsDVOKQHPfTLEWA05FwLs
73gAn0D6YYgbh0Td+nNcVf6xGMr6ZPGM
=hUj0
-----END PGP SIGNATURE-----

On Apr 18, 2008, at 6:23 AM, fedzor wrote:

Since it’s using Kernel#fork(), does this mean it is using OS threads?

yes. forkoff has a number of consumer green threads used to manage
an array of queues containing the elements destined to be passed to a
forked process/native thread for execution of the block. the code is
very short, give a read.

cheers.

a @ http://codeforpeople.com/

Since it’s using Kernel#fork(), does this mean it is using OS threads?

I’ve once implemented Enumerable#fork myself. It doesn’t use
queues, or a producer-consumer like pattern. It simply tells a
generic ThreadLimiter to spawn a new thread. Within this
thread, a new process is spawned. The number of concurrent
threads, and thus the number of concurrent processes, is
controlled by ThreadLimiter.

We might learn from both implementations.

gegroet,
Erik V. - http://www.erikveen.dds.nl/


Here’s my code:


module Enumerable
def fork(max_number_of_threads=nil, &block)
thread_limiter =
EV::ThreadLimiter.new(max_number_of_threads)

 collect do |x|
   thread_limiter.fork do
     Thread.current.abort_on_exception = true

     r, w    = IO.pipe

     if pid = Process.fork
       w.close
       Process.wait(pid)
       data  = r.read
       r.close
       Marshal.load(data)
     else
       r.close
       Marshal.dump(block.call(x), w)
       w.close
       exit
     end
   end
 end.collect do |t|
   t.value
 end

end
end


module EV
class ThreadLimiter
def initialize(max_number_of_threads)
@number_of_threads = 0
@max_number_of_threads = max_number_of_threads

   yield(self)       if block_given?
 end

 def fork(*args, &block)
   Thread.pass       while @max_number_of_threads and
                           @max_number_of_threads > 0 and
                           @number_of_threads >

@max_number_of_threads

   # If this methods is called from several threads, then
   # @number_of_threads might get bigger than

@max_number_of_threads.
# This usually a) isn’t the case and b) doesn’t really matter
(to me…).
# I’m willing to accept this “risk”, because a)
Thread.exclusive is
# much, much faster than Mutex#synchronize and b) we can’t run
into
# deadlocks.

   Thread.exclusive{@number_of_threads += 1}

   Thread.fork do
     begin
       res   = block.call(*args)
     ensure
       Thread.exclusive{@number_of_threads -= 1}
     end

     res
   end
 end

end
end


Here’s a benchmark:

require “benchmark”

Benchmark.bm(15) do |bm|
rc = nil
r2 = nil
r4 = nil
rx = nil

data = 1…10
test = lambda{|x| 1_000_000.times{7+8}; [x, Process.pid]}

bm.report(" collect “){rc = data.collect(&test)}
bm.report(” 2 processes"){r2 = data.fork(2, &test)}
bm.report(" 4 processes"){r4 = data.fork(4, &test)}
bm.report(“inf processes”){rx = data.fork(-1, &test)}

p rc
p r2
p r4
p rx
end

It produces these results on a dual core machine:

                  user     system      total        real
 collect      4.530000   0.000000   4.530000 (  4.527982)

2 processes 0.030000 0.050000 3.170000 ( 1.733209)
4 processes 0.160000 0.370000 3.610000 ( 1.927826)
inf processes 0.000000 0.000000 3.080000 ( 1.691932)
[[1, 18732], [2, 18732], [3, 18732], [4, 18732], [5, 18732], [6,
18732], [7, 18732], [8, 18732], [9, 18732], [10, 18732]]
[[1, 18733], [2, 18734], [3, 18735], [4, 18736], [5, 18737], [6,
18738], [7, 18739], [8, 18740], [9, 18741], [10, 18742]]
[[1, 18743], [2, 18744], [3, 18745], [4, 18746], [5, 18747], [6,
18748], [7, 18749], [8, 18750], [9, 18751], [10, 18752]]
[[1, 18753], [2, 18754], [3, 18755], [4, 18756], [5, 18757], [6,
18758], [7, 18759], [8, 18760], [9, 18761], [10, 18762]]

Just a word of warning: The construction “Thread.new(i){|i|” is
useless, by definition. Just like “i=i” is useless too.

If i isn’t defined outside the loop, you don’t have to pas i to
the thread, so “Thread.new{” will do. However, if i is defined
outside the loop (which it isn’t, in your code…),
“Thread.new(i){|i|” won’t work (see below): It’s better to use
“Thread.new(i1){|i2|” instead.

gegroet,
Erik V. - http://www.erikveen.dds.nl/


a = (1…10).to_a
a1 = a.map{|i| Thread.new { sleep 0.01 ; i }}.map{|t|
t.value}
a2 = a.map{|i| Thread.new(i) {|i| sleep 0.01 ; i }}.map{|t|
t.value} # Will do.
i = nil
a3 = a.map{|i| Thread.new(i) {|i| sleep 0.01 ; i }}.map{|t|
t.value} # Won’t do!
a4 = a.map{|i1| Thread.new(i1){|i2|sleep 0.01 ; i2}}.map{|t|
t.value}

p a1 # ==> [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
p a2 # ==> [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
p a3 # ==> [10, 10, 10, 10, 10, 10, 10, 10, 10, 10]
p a4 # ==> [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

On Apr 18, 2008, at 12:19 PM, Erik V. wrote:

I’ve once implemented Enumerable#fork myself. It doesn’t use
queues, or a producer-consumer like pattern. It simply tells a
generic ThreadLimiter to spawn a new thread. Within this
thread, a new process is spawned. The number of concurrent
threads, and thus the number of concurrent processes, is
controlled by ThreadLimiter.

We might learn from both implementations.

this is precisely how forkoff manages the number of processes, only
it’s via Queues and a fixed number of threads consuming from those
queues

the latest impl from svn:

http://p.ramaze.net/1141

regards.

a @ http://codeforpeople.com/

ara howard:

forkoff works for any enumerable object, iterating a code
block to run in a child process and collecting the results.

Thanks a lot for this great gem – it’s a lifesaver for my
PhD, which does some heavy computing in a parallelable way.

A quick question, if I may. Assuming (which is not far from the truth)
that I have no much knowledge on threads and processes, is there any
way for the block under iteration to communicate with the outside world
other than by the returned result?

My example code:

results = []
run = 0
runs = 2 ** fsm_inputs.size
(0…runs).forkoff do |vector|
results[vector] = by_input_sets vector

snip some run-based stats

run += 1
end

This, obviously, doesn’t work (i.e., results is an empty array at the
end and run is 0 in every iteration). I can get the results by making
the block return the by_input_sets call’s result, but I still lose the
run-based stats.

It seems a singleton-based approach would work (I’d create a singleton
object outside of the loop and have the results array and run counter
be its properties), but maybe there is an easier way?

– Shot

This is really nice! I would prefer to change this syntax

%w( a b c d ).forkoff! :processes => 2 do |letter|

for this syntax

%w( a b c d ).forkoff! 2 do |letter|

though. Then I wouldn’t have to remember that ‘processes’ keyword.
Anyhow, it is a great piece of code.

/Fredrik

On Apr 18, 2008, at 12:20 PM, Erik V. wrote:

If i isn’t defined outside the loop, you don’t have to pas i to
the thread, so “Thread.new{” will do. However, if i is defined
outside the loop (which it isn’t, in your code…),
“Thread.new(i){|i|” won’t work (see below): It’s better to use
“Thread.new(i1){|i2|” instead.

indeed - left over from a previous iteration.

a @ http://codeforpeople.com/

On Apr 20, 2008, at 7:20 PM, Fredrik wrote:

This is really nice! I would prefer to change this syntax

%w( a b c d ).forkoff! :processes => 2 do |letter|

for this syntax

%w( a b c d ).forkoff! 2 do |letter|

though. Then I wouldn’t have to remember that ‘processes’ keyword.
Anyhow, it is a great piece of code.

the next release supports either an hash (options) or numeric argument

  • so either will work - may release tonight…

cheers.

a @ http://codeforpeople.com/

On Apr 20, 2008, at 12:32 PM, Shot (Piotr S.) wrote:

This, obviously, doesn’t work (i.e., results is an empty array at the
end and run is 0 in every iteration). I can get the results by making
the block return the by_input_sets call’s result, but I still lose the
run-based stats.

It seems a singleton-based approach would work (I’d create a singleton
object outside of the loop and have the results array and run counter
be its properties), but maybe there is an easier way?

to do this you’ll want to combine forkoff with my slave lib: which
sets up an object which is fronted by drb, an which can indeed be a
singleton - note that this object is, itself, running in a child
process, but you can ignore this for the most part. an simple example:

cfp:~ > cat a.rb
require ‘rubygems’
require ‘slave’
require ‘forkoff’

slave = Slave.new(:threadsafe => true){ Hash.new }
process_global = slave.object

( 0 … 4 ).each do |i|
process_global[i] = i ** 2
end

process_global.each do |k,v|
p k => v
end

cfp:~ > ruby a.rb
{0=>0}
{1=>1}
{2=>4}
{3=>9}
{4=>16}

even with these abstractions you have to consider deeply what’s
happening with threads/processes etc - but yes, it’s definitely
possible with little code.

cheers.

a @ http://codeforpeople.com/

Checking out the source I only see pid = fork (this is a call to
Thread.new isn’t it?), I don’t see that real fork (kernel) is used… or
I am wrong?

Thanks a lot for your reply, Ara, and for your time!

ara.t.howard:

to do this you’ll want to combine forkoff with my slave lib: which
sets up an object which is fronted by drb, an which can indeed be
a singleton - note that this object is, itself, running in a child
process, but you can ignore this for the most part.

Ahhh, a DRb – another thing on my ‘to-read’ list. :slight_smile:

a simple example:

Ah, thanks. I’m a bit lost there, though:

slave = Slave.new(:threadsafe => true){ Hash.new }
process_global = slave.object

( 0 … 4 ).each do |i|
process_global[i] = i ** 2
end

This doesn’t seem to be run concurrently, and
if I switch each to forkoff here it dies with

/home/shot/opt/ruby-1.8.6-p114/lib/ruby/1.8/drb/drb.rb:736:in `open’:
drbunix:///tmp/slave_hash_-605329008_13239_13240_0_0.32803043357426 -
#<Errno::ENOENT: No such file or directory -
///tmp/slave_hash_-605329008_13239_13240_0_0.32803043357426>
(DRb::DRbConnError)

even with these abstractions you have to consider deeply what’s happening
with threads/processes etc - but yes, it’s definitely possible with little
code.

Is there a good place (other than the relevant docs) to read a tutorial
on DRb? I feel really stupid to ask about stuff without knowing what I’m
writing about. :expressionless:

In the meantime, I tried the ‘simple’ forkoff approach
with just handling the returned values, but I end with

/home/shot/opt/ruby-1.8.6-p114/lib/ruby/gems/1.8/gems/forkoff-0.0.1/lib/forkoff.rb:53:in
`dump’: singleton can’t be dumped (TypeError)

Which, exactly, object is being marshalled by forkoff? The one returned,
or the one that contains the forkoff call? (I tried removing any
singleton references from both, but to no success so far.)

Also, a humble patch:

— slave-1.2.1.rb.orig 2008-04-24 21:01:38.000000000 +0200
+++ slave-1.2.1.rb 2008-04-24 21:02:54.000000000 +0200
@@ -11,8 +11,8 @@

the Slave class encapsulates the work of setting up a drb server in

another

process running on localhost via unix domain sockets. the slave

process is
-# attached to it’s parent via a LifeLine which is designed such that
the slave
-# cannot out-live it’s parent and become a zombie, even if the parent
dies and
+# attached to its parent via a LifeLine which is designed such that the
slave
+# cannot out-live its parent and become a zombie, even if the parent
dies an

early death, such as by ‘kill -9’. the concept and purpose of the

Slave

class is to be able to setup any server object in another process so

easily

that using a multi-process, drb/ipc, based design is as easy, or

easier,

– Shot, loving the ‘slave cannot out-live its parent and become
a zombie, even if the parent dies an early death’ quote.

This forum is not affiliated to the Ruby language, Ruby on Rails framework, nor any Ruby applications discussed here.

| Privacy Policy | Terms of Service | Remote Ruby Jobs