How to run background processes (more than 1 worker) parallely


#1

Hi

My requirement is as follows

  1. I have around 200 feeds in the database that I need to parse (fetch
    contents) parallely after some interval and store those feed items in
    database.

  2. Now I am using backgroundrb with 10 workers each worker has assigned
    a
    job to parse data from 20 feeds (e.g 1st worker will fecth data from
    feeds(1…20), 2nd from feeds(21…30) …etc…

  3. But backgroundrb is not reliable and it fails after some time. So I
    have
    tried Starling & Workling but those worker doesn’t run *parallely.

( I need to run *parallely because those feeds will increase say 1000
feeds. So I can’t run them sequentially. ) *
*
Pls I need a help on above problem.

Thanks
Deepak


#2
  1. But backgroundrb is not reliable and it fails after some time. So I
    have tried Starling & Workling but those worker doesn’t run *parallely.

Nanite (http://github.com/ezmobius/nanite/tree/master) maybe? Perhaps
some of the Rails and/or Merb groups will be able to help you more.

Good luck,

Chris


#3

hi Deepak,
You may go for Starfish http://rufy.com/starfish/doc/ or
Skynethttp://skynet.rubyforge.org/,
which are based on google’s map-reduce algorithm.
Even you may get more information on the rails cats episodes 127 to 129.

On Fri, Dec 12, 2008 at 7:28 PM, Deepak G. removed_email_address@domain.invalid
wrote:

feeds(1…20), 2nd from feeds(21…30) …etc…
Thanks
Deepak

Thanks and Regards
Saurabh P.
+91-9922071155
skype: sorab_pune
yahoo & gtalk: saurabh.purnaye
msn: removed_email_address@domain.invalid


#4

On 12.12.2008 14:58, Deepak G. wrote:

  1. But backgroundrb is not reliable and it fails after some time.
    Can you be more specific what you really mean by this? How does it
    fail?

So I have
tried Starling & Workling but those worker doesn’t run *parallely.

Maybe you used it not in the proper way. From what I read on the web
site doing work concurrently is all that S+W is about.

Cheers

robert


#5

Hi Deepak,

As others mentioned, an adaptation of Google Map-Reduce technique
may be of use. To this end, you could you Ruby’s Linda. For my needs
I wrote a small script that puts work descriptions on a tuple space.
This is taken up by one or more workers in parallel.

If you write distinct messages that are recognized by workers, you
could probably achieve your parallelism in a few lines without extra
libraries, perhaps except for DRBfire.

I attach it here (i am a novice Ruby programmer, the code may not
be optimal) - hope it helps.

saji

–queue code

require ‘thread’
require ‘sequel’
require ‘rinda/tuplespace’
require ‘drb’

ts = Rinda::TupleSpace.new
DRb.start_service(“druby://:3130”,ts)
puts “Drb server running at #{DRb.uri}”

dbname=“sqlite://testQ.db”
db = Sequel.connect(dbname)
pause = 15

loop do
th1 = Thread.new do
job = db[:jobs].filter(:status => “queued”).first
submit = job.merge(:status => “submitted”)
ts.write [:q1, submit]
db[:jobs].filter(job).update(submit)
end
th2 = Thread.new do
result = ts.take [:rq1,nil,nil]
unless result[1]==nil
p “processing images”
p “finished image processing”
p “update job status in database”
db[:jobs].filter(result[1]).update(:status => “finished”)
end
end
sleep(pause)
end
th1.join
th2.join

connect to database

create tuplespace

thread1

- collect from database

- put on tuple

- update db

thread2

- check tuple

- download data

- update db

—worker code

require ‘drb’
require ‘rinda/rinda’

DRb.start_service
ro = DRbObject.new_with_uri(‘druby://localhost:3130’)
ts = Rinda::TupleSpaceProxy.new(ro)

def make_mme(job)
“This will be passed to AFS Server: don’t worry yet”
p job
end

job = ts.take([:q1,nil])
msg = make_mme(job[1])
ts.write [:rq1,job,0] # write return status to tuplespace

DRb.thread.join

worker takes job from tuple space (ts.take[:q1,…])

executes job (make_mme)

writes message on tuple space (ts.write[:rq1,…])

feeds(1…20), 2nd from feeds(21…30) …etc…
Thanks
Deepak


Saji N. Hameed

APEC Climate Center +82 51 668 7470
National Pension Corporation Busan Building 12F
Yeonsan 2-dong, Yeonje-gu, BUSAN 611705 removed_email_address@domain.invalid
KOREA


#6

Hello Robert

On Sat, Dec 13, 2008 at 10:22 PM, Robert K.
removed_email_address@domain.invalidwrote:

feeds(1…20), 2nd from feeds(21…30) …etc…

  1. But backgroundrb is not reliable and it fails after some time.

Can you be more specific what you really mean by this? How does it fail?

Well when I start the backgroundrb processes then for next few or next
1,2 days everything works well, But after some time worker just gets
hangs I
can see there process ID till active.
But No output. I examine logs also but didn’t get anything in log files.
I
am not getting a single clue of what went wrong?

So I have

tried Starling & Workling but those worker doesn’t run *parallely.

Maybe you used it not in the proper way. From what I read on the web site
doing work concurrently is all that S+W is about.

I have created 2 workling workers each worker has one method which just
logs the some o/p and then I ran them from console. When I examined the
log
files I got those o/p sequentially

class MyWorker < Workling::Base

def sample_one(options)
5.times do |i|
logger.info “====Hi from 1st worker===============” end
end
end
end

class MySecondWorker < Workling::Base

def sample_twooptions)
5.times do |i|
logger.info “====Hi from 2nd worker===============” end
end
end

end

I got following o/p

====Hi from 1st worker===============
====Hi from 1st worker===============
====Hi from 1st worker===============
====Hi from 1st worker===============
====Hi from 1st worker===============
====Hi from 2nd worker===============
====Hi from 2nd worker===============
====Hi from 2nd worker===============
====Hi from 2nd worker===============
====Hi from 2nd worker===============

I was expecting something like this

====Hi from 1st worker===============
====Hi from 2nd worker===============
====Hi from 1st worker===============
====Hi from 2nd worker===============

Thanks For your help
Deepak


#7

Deepak G. wrote:

Hi

Pls I need a help on above problem.*

Thanks
Deepak

Here’s my approach to a similar problem. Still not as polished as I’d
like, but it maybe useful.

The core is the PoolQM class (the CircularBuffer class exists to catch
a limited number of exceptions).

=begin

NAME

class CircularBuffer

DESCRIPTION

A lightweight but (hopefully) thread-safe version of the circular
buffer

Designed primarily for intentionally limited in-memory event/error
logging.

URI

INSTALL

HISTORY

0.1

SYNOPSIS

cb = CircularBuffer.new(50) # create a new CircularBuffer that
holds 50 nil elements
cb << ‘fnord’ # append an element to the buffer
elements = cb.to_a # return elements as an array with
elements ordered from oldest to newest
cb.clear # force all entires to nil

CAVEATS

The CircularBuffer ignores nil elements and ignores attempts to append
them

2DOs

By Djief

=end

require ‘thread’

class CircularBuffer

def initialize(max_size)
@max_size = max_size
@ra = Array.new(@max_size, nil)
@head = 0
@mutex = Mutex.new
end

private

def inc(index)
(index +1) % @max_size
end

public

set all elements to nil

def clear
@mutex.synchronize do
@ra.collect! { |element| element = nil }
end
end

append a new element to the current ‘end’

def <<(element)
unless element.nil?
@mutex.synchronize do
@ra[@head]=element
@head = inc(@head)
end
end
end

return the entire buffer (except nil elements)

as an array

def to_a
index = @head
result = []
@mutex.synchronize do
@max_size.times do
result << @ra[index] unless @ra[index].nil?
index = inc(index)
end
end
result
end

end

=begin

NAME

class PoolQM

DESCRIPTION

PoolQM extends an Array with MonitorMixin to create a queue with
an associated pool of worker threads that wait process any requests
that are added to the queue.

A dispatcher thread watches continuously for enqueued requests and
signals idle worker threads (if any) to dequeue and process the
request(s). If no idle workers exist, the request remains in the
queue until one is available.

During the creation of a new instance of PoolQM, the number of worker
threads is established and the request processing block is defined:

results = Queue.new
NUM_OF_WORKERS = 10
pqm = PoolQM.new(NUM_OF_WORKERS) do |request|
  results << "Current request: #{request}"    # processing request 

here
end

Note that any output you expect to collect from your worker threads
should
be returned via some thread-safe mechanism or container (Queue is a
good
default).

Enqueuing a request is all that is necessary to initiate it’s
processing:

pqm.enq("This is a test, this is only a test")

If a request causes an exception within the processing block, the
Exception
is appended to a circular buffer whose contents can be obtained as an
array
with the PoolQM#exceptions method.

If you’re intested in logging exceptions, you’ll have a bit more work
to
do but replacing the CircularBuffer with a Queue that has it’s own
worker
to handle disk IO is probably a good bet.

Performance-wise this approach behaves more consistently than any I’ve
produced so far i.e. it’s both fast and performs with repeatable
uniformity.

No doubt, there’s still room for improvement.

URI

INSTALL

HISTORY

0.1 - genesis
0.2 - documentation and clean-up

SYNOPSIS

require ‘thread’

results = Queue.new # thread-safe container
for results! <<<<<<<<<< IMPORTANT

NUM_OF_WORKERS = 10

pqm = PoolQM.new(NUM_OF_WORKERS) do |request|
results << “Current request: #{request}” # processing request
here
end

100.times do |index|
pqm.enq(“Request number #{index}”) # enqueuing requests
here
end

pqm.wait_until_idle # wait for all requests
to be processed

until results.empty? do # dump results
p results.pop
end

pqm.exceptions.each do |exception| # obtain exceptions
array and dump it
p exception
end

CAVEATS

2DOs

By Djief

=end

require ‘monitor’

class PoolQM

default size for the exceptions CircularBuffer

DEFAULT_EXCEPTION_BUFFER_SIZE = 10

Create a new PoolQM with ‘worker_count’ worker threads to execute

the associated block

def initialize(worker_count = 1)
raise ‘block required: { |request| … }’ unless block_given?
@worker_count = worker_count
@request_q = []
@request_q.extend(MonitorMixin)
@request_ready = @request_q.new_cond
@exceptions = CircularBuffer.new(DEFAULT_EXCEPTION_BUFFER_SIZE)
@worker_count.times do
Thread.new do
loop do
request = nil
@request_q.synchronize do
@request_ready.wait
request = @request_q.delete_at(0)
end
begin
yield request
rescue Exception => e
@exceptions << e
end
Thread.pass
end
end
end
@dispatcher = Thread.new do
loop do
@request_q.synchronize do
@request_ready.signal unless @request_q.empty? ||
@request_ready.count_waiters == 0
end
Thread.pass
end
end

end

enq the request data

def enq(request)
@request_q.synchronize do
@request_q << request
end
end

Wait until all the queued requests have been removed

from the request_q && then wait until all threads have

compeleted their processing and are idle

def wait_until_idle(wait_resolution=0.3)
q_empty = false
until q_empty
@request_q.synchronize do
q_empty = @request_q.empty?
end
sleep(wait_resolution) unless q_empty
end
all_threads_idle = false
until all_threads_idle
@request_q.synchronize do
all_threads_idle = @request_ready.count_waiters == @worker_count
end
sleep(wait_resolution) unless all_threads_idle
end
end

create a new exceptions buffer of new_size

def exceptions_buffer_size=(new_size)
@exceptions = CircularBuffer.new(new_size)
end

report the size of the current exceptions buffer

def exceptions_buffer_size
@exceptions.size
end

return the current exceptions buffer as an ordered Array

def exceptions
@exceptions.to_a
end

end

if FILE == $0

the usual trivial example

require ‘thread’

>>>> thread-safe container for result <<<<

results = Queue.new

NUM_OF_WORKERS = 10

pqm = PoolQM.new(NUM_OF_WORKERS) do |request|
raise “Dummy Exception during #{request}” if rand(10) == 0 #
simulate random exceptions
results << “Current request: #{request}” # processing request
here
end

100.times do |index|
pqm.enq(“Request number #{index}”) # enqueuing requests
here
end

wait for all requests to be processed

pqm.wait_until_idle

dump results

until results.empty? do
p results.pop
end

obtain exceptions array and dump it

pqm.exceptions.each do |exception|
p exception
end

end

Regards,

djief


#8

On Dec 12, 2008, at 6:58 AM, Deepak G. wrote:

assigned a
Pls I need a help on above problem.*

Thanks
Deepak

use bj

http://codeforpeople.rubyforge.org/svn/bj/trunk/README

it was written for engine yard and is under heavy use there. the
focus is on simplicity and robustness.

a @ http://codeforpeople.com/


#9

On 15.12.2008 14:28, Deepak G. wrote:

On Sat, Dec 13, 2008 at 10:22 PM, Robert K.
removed_email_address@domain.invalidwrote:

Can you be more specific what you really mean by this? How does it fail?

Well when I start the backgroundrb processes then for next few or next
1,2 days everything works well, But after some time worker just gets hangs I
can see there process ID till active.
But No output. I examine logs also but didn’t get anything in log files. I
am not getting a single clue of what went wrong?

Apparently. Since I don’t know the code I cannot really make sense of
what you report. It does seem weird though that apparently you keep
your workers active for several days. Do you actually keep them busy or
do you just keep them around?

class MyWorker < Workling::Base
def sample_twooptions)
====Hi from 1st worker===============
I was expecting something like this

====Hi from 1st worker===============
====Hi from 2nd worker===============
====Hi from 1st worker===============
====Hi from 2nd worker===============

Well, there is no guarantee that messages are actually intermixed as you
expect - especially not with Ruby’s green threads - if that’s what
Workling is using.

Cheers

robert


#10

On Tue, Dec 16, 2008 at 10:15 PM, ara.t.howard removed_email_address@domain.invalid
wrote:

  1. Now I am using backgroundrb with 10 workers each worker has assigned a
    job to parse data from 20 feeds (e.g 1st worker will fecth data from
    feeds(1…20), 2nd from feeds(21…30) …etc…

  2. But backgroundrb is not reliable and it fails after some time. So I
    have
    tried Starling & Workling but those worker doesn’t run *parallely.

( I need to run **parallely because those feeds will increase say 1000
feeds. So I can’t run them sequentially. ) *

Do you have backtrace of any kind? Can you post your worker code?
Which version of BackgrounDRb you are running?