Forum: Ruby How to run background processes (more than 1 worker) parallely.

Announcement (2017-05-07): www.ruby-forum.com is now read-only since I unfortunately do not have the time to support and maintain the forum any more. Please see rubyonrails.org/community and ruby-lang.org/en/community for other Rails- und Ruby-related community platforms.
Deepak G. (Guest)
on 2008-12-12 16:06
(Received via mailing list)
Hi

My requirement is as follows

1) I have around 200 feeds in the database that I need to parse (fetch
contents) *parallely*  after some interval and store those feed items in
database.

2) Now I am using backgroundrb with 10 workers each worker has assigned
a
job to parse data from 20 feeds (e.g 1st worker will fecth data from
feeds(1..20), 2nd from feeds(21..30) ..etc.....

3) But backgroundrb is not reliable and it fails after some time. So I
have
tried Starling & Workling but those worker doesn't run *parallely.

( I need to run **parallely because those feeds will increase say 1000
feeds. So I can't run them sequentially. ) *
*
Pls I need a help on above problem.*


Thanks
Deepak
Chris L. (Guest)
on 2008-12-12 16:19
> 3) But backgroundrb is not reliable and it fails after some time. So I
> have tried Starling & Workling but those worker doesn't run *parallely.

Nanite (http://github.com/ezmobius/nanite/tree/master) maybe? Perhaps
some of the Rails and/or Merb groups will be able to help you more.

Good luck,

Chris
saurabh purnaye (Guest)
on 2008-12-12 20:43
(Received via mailing list)
hi Deepak,
You may go for Starfish <http://rufy.com/starfish/doc/> or
Skynet<http://skynet.rubyforge.org/>,
which are based on google's map-reduce algorithm.
Even you may get more information on the rails cats episodes 127 to 129.

On Fri, Dec 12, 2008 at 7:28 PM, Deepak G. 
<removed_email_address@domain.invalid>
wrote:

> feeds(1..20), 2nd from feeds(21..30) ..etc.....
> Thanks
> Deepak
>



--
--
Thanks and Regards
Saurabh P.
+91-9922071155
skype: sorab_pune
yahoo & gtalk: saurabh.purnaye
msn:  removed_email_address@domain.invalid
Saji N Hameed (Guest)
on 2008-12-13 14:47
(Received via mailing list)
Hi Deepak,

As others mentioned, an adaptation of Google Map-Reduce technique
may be of use. To this end, you could you Ruby's Linda. For my needs
I wrote a small script that puts work descriptions on a tuple space.
This is taken up by one or more workers in parallel.

If you write distinct messages that are recognized by workers, you
could probably achieve your parallelism in a few lines without extra
libraries, perhaps except for DRBfire.

I attach it here (i am a novice Ruby programmer, the code may not
be optimal) - hope it helps.

saji

--queue code

require 'thread'
require 'sequel'
require 'rinda/tuplespace'
require 'drb'

ts = Rinda::TupleSpace.new
DRb.start_service("druby://:3130",ts)
puts "Drb server running at #{DRb.uri}"

dbname="sqlite://testQ.db"
db = Sequel.connect(dbname)
pause = 15

loop do
  th1 = Thread.new do
    job = db[:jobs].filter(:status => "queued").first
    submit = job.merge(:status => "submitted")
    ts.write [:q1, submit]
    db[:jobs].filter(job).update(submit)
  end
  th2 = Thread.new do
    result = ts.take [:rq1,nil,nil]
    unless result[1]==nil
      p "processing images"
      p "finished image processing"
      p "update job status in database"
      db[:jobs].filter(result[1]).update(:status => "finished")
    end
  end
sleep(pause)
end
th1.join
th2.join

# connect to database
# create tuplespace
# thread1
#     - collect from database
#     - put on tuple
#     - update db

# thread2
#     - check tuple
#     - download data
#     - update db

---worker code

require 'drb'
require 'rinda/rinda'

DRb.start_service
ro = DRbObject.new_with_uri('druby://localhost:3130')
ts = Rinda::TupleSpaceProxy.new(ro)

def make_mme(job)
  "This will be passed to AFS Server: don't worry yet"
  p job
end

job =  ts.take([:q1,nil])
  msg =  make_mme(job[1])
  ts.write [:rq1,job,0]  # write return status to tuplespace

DRb.thread.join

# worker takes job from tuple space (ts.take[:q1,..])
# executes job (make_mme)
# writes message on tuple space (ts.write[:rq1,..])
* Deepak G. <removed_email_address@domain.invalid> [2008-12-12 22:58:58 +0900]:

> feeds(1..20), 2nd from feeds(21..30) ..etc.....
> Thanks
> Deepak

--
Saji N. Hameed

APEC Climate Center                  +82 51 668 7470
National Pension Corporation Busan Building 12F
Yeonsan 2-dong, Yeonje-gu, BUSAN 611705 
removed_email_address@domain.invalid
KOREA
Robert K. (Guest)
on 2008-12-13 19:01
(Received via mailing list)
On 12.12.2008 14:58, Deepak G. wrote:

> 3) But backgroundrb is not reliable and it fails after some time.
Can you be more specific what you really mean by this?  How does it
fail?

> So I have
> tried Starling & Workling but those worker doesn't run *parallely.

Maybe you used it not in the proper way.  From what I read on the web
site doing work concurrently is all that S+W is about.

Cheers

  robert
Jeff M. (Guest)
on 2008-12-13 20:11
Deepak G. wrote:
> Hi
>
> Pls I need a help on above problem.*
>
>
> Thanks
> Deepak

Here's my approach to a similar problem. Still not as polished as I'd
like, but it maybe useful.

The core is the PoolQM class (the CircularBuffer class exists to catch
a limited number of exceptions).

=begin

NAME

  class CircularBuffer

DESCRIPTION

  A lightweight but (hopefully) thread-safe version of the circular
buffer

  Designed primarily for intentionally limited in-memory event/error
logging.

URI



INSTALL



HISTORY

  0.1

SYNOPSIS

  cb = CircularBuffer.new(50)     # create a new CircularBuffer that
holds 50 nil elements
  cb << 'fnord'                   # append an element to the buffer
  elements = cb.to_a              # return elements as an array with
elements ordered from oldest to newest
  cb.clear                        # force all entires to nil

CAVEATS

 The CircularBuffer ignores nil elements and ignores attempts to append
them

2DOs



By Djief

=end

require 'thread'

class CircularBuffer

  def initialize(max_size)
    @max_size = max_size
    @ra = Array.new(@max_size, nil)
    @head = 0
    @mutex = Mutex.new
  end

  private

  def inc(index)
    (index +1) % @max_size
 end

  public

  # set all elements to nil
  #
  def clear
    @mutex.synchronize do
      @ra.collect! { |element| element = nil }
    end
  end

  # append a new element to the current 'end'
  #
  def <<(element)
    unless element.nil?
      @mutex.synchronize do
        @ra[@head]=element
        @head = inc(@head)
      end
    end
  end

  # return the entire buffer (except nil elements)
  # as an array
  #
  def to_a
    index = @head
    result = []
    @mutex.synchronize do
      @max_size.times do
        result << @ra[index] unless @ra[index].nil?
        index = inc(index)
      end
    end
    result
  end

end

=begin

NAME

  class PoolQM

DESCRIPTION

  PoolQM extends an Array with MonitorMixin to create a queue with
  an associated pool of worker threads that wait process any requests
  that are added to the queue.

  A dispatcher thread watches continuously for enqueued requests and
  signals idle worker threads (if any) to dequeue and process the
  request(s). If no idle workers exist, the request remains in the
  queue until one is available.

  During the creation of a new instance of PoolQM, the number of worker
  threads is established and the request processing block is defined:

    results = Queue.new
    NUM_OF_WORKERS = 10
    pqm = PoolQM.new(NUM_OF_WORKERS) do |request|
      results << "Current request: #{request}"    # processing request
here
    end

  Note that any output you expect to collect from your worker threads
should
  be returned via some thread-safe mechanism or container (Queue is a
good
  default).

  Enqueuing a request is all that is necessary to initiate it's
processing:

    pqm.enq("This is a test, this is only a test")

  If a request causes an exception within the processing block, the
Exception
  is appended to a circular buffer whose contents can be obtained as an
array
  with the PoolQM#exceptions method.

  If you're intested in logging exceptions, you'll have a bit more work
to
  do but replacing the CircularBuffer with a Queue that has it's own
worker
  to handle disk IO is probably a good bet.

  Performance-wise this approach behaves more consistently than any I've
  produced so far i.e. it's both fast and performs with repeatable
uniformity.

  No doubt, there's still room for improvement.


URI



INSTALL



HISTORY

  0.1 - genesis
  0.2 - documentation and clean-up

SYNOPSIS

  require 'thread'

  results = Queue.new                           # thread-safe container
for results! <<<<<<<<<< IMPORTANT

  NUM_OF_WORKERS = 10

  pqm = PoolQM.new(NUM_OF_WORKERS) do |request|
    results << "Current request: #{request}"    # processing request
here
  end

  100.times do |index|
    pqm.enq("Request number #{index}")          # enqueuing requests
here
  end

  pqm.wait_until_idle                            # wait for all requests
to be processed

  until results.empty? do                       # dump results
    p results.pop
  end

  pqm.exceptions.each do |exception|            # obtain exceptions
array and dump it
    p exception
  end

CAVEATS



2DOs



By Djief

=end


require 'monitor'

class PoolQM

  # default size for the exceptions CircularBuffer
  #
  DEFAULT_EXCEPTION_BUFFER_SIZE = 10

  # Create a new PoolQM with 'worker_count' worker threads to execute
  # the associated block
  #
  def initialize(worker_count = 1)
    raise 'block required: { |request| ... }' unless block_given?
    @worker_count = worker_count
    @request_q = []
    @request_q.extend(MonitorMixin)
    @request_ready = @request_q.new_cond
    @exceptions = CircularBuffer.new(DEFAULT_EXCEPTION_BUFFER_SIZE)
    @worker_count.times do
      Thread.new do
        loop do
          request = nil
          @request_q.synchronize do
            @request_ready.wait
            request = @request_q.delete_at(0)
          end
          begin
            yield request
          rescue Exception => e
            @exceptions << e
          end
          Thread.pass
        end
      end
    end
    @dispatcher = Thread.new do
      loop do
        @request_q.synchronize do
          @request_ready.signal unless @request_q.empty? ||
@request_ready.count_waiters == 0
        end
        Thread.pass
      end
    end

  end

  # enq the request data
  #
  def enq(request)
    @request_q.synchronize do
      @request_q << request
    end
  end

  # Wait until all the queued requests have been removed
  # from the request_q && then wait until all threads have
  # compeleted their processing and are idle
  #
  def wait_until_idle(wait_resolution=0.3)
    q_empty = false
    until q_empty
      @request_q.synchronize do
        q_empty = @request_q.empty?
      end
      sleep(wait_resolution) unless q_empty
    end
    all_threads_idle = false
    until all_threads_idle
      @request_q.synchronize do
        all_threads_idle = @request_ready.count_waiters == @worker_count
      end
      sleep(wait_resolution) unless all_threads_idle
    end
  end

  # create a new exceptions buffer of new_size
  #
  def exceptions_buffer_size=(new_size)
    @exceptions = CircularBuffer.new(new_size)
  end

  # report the size of the current exceptions buffer
  #
  def exceptions_buffer_size
    @exceptions.size
  end

  # return the current exceptions buffer as an ordered Array
  #
  def exceptions
    @exceptions.to_a
  end

end

if __FILE__ == $0

  # the usual trivial example

  require 'thread'

  # >>>> thread-safe container for result <<<<
  #
  results = Queue.new

  NUM_OF_WORKERS = 10

  pqm = PoolQM.new(NUM_OF_WORKERS) do |request|
    raise "Dummy Exception during #{request}" if rand(10) == 0  #
simulate random exceptions
    results << "Current request: #{request}"    # processing request
here
  end

  100.times do |index|
    pqm.enq("Request number #{index}")            # enqueuing requests
here
  end

  # wait for all requests to be processed
  pqm.wait_until_idle

  # dump results
  until results.empty? do
    p results.pop
  end

  # obtain exceptions array and dump it
  pqm.exceptions.each do |exception|
    p exception
  end

end


Regards,

djief
Deepak G. (Guest)
on 2008-12-15 15:42
(Received via mailing list)
Hello Robert



On Sat, Dec 13, 2008 at 10:22 PM, Robert K.
<removed_email_address@domain.invalid>wrote:

>> feeds(1..20), 2nd from feeds(21..30) ..etc.....
>>
>> 3) But backgroundrb is not reliable and it fails after some time.
>>
>
> Can you be more specific what you really mean by this?  How does it fail?
>

>>>  Well when I start the backgroundrb processes then for next few or next
1,2 days everything works well, But after some time worker just gets
hangs I
can see there process ID till active.
But No output. I examine logs also but didn't get anything in log files.
I
am not getting a single clue of what went wrong?

>
>
>  So I have
>> tried Starling & Workling but those worker doesn't run *parallely.
>>
>
> Maybe you used it not in the proper way.  From what I read on the web site
> doing work concurrently is all that S+W is about.
>
>>> I have created 2 workling workers each worker has one method which just
logs the some o/p and then I ran them from console. When I examined the
log
files I got those o/p sequentially

class MyWorker < Workling::Base

  def sample_one(options)
   5.times do |i|
     logger.info "====Hi from 1st worker==============="     end
   end
  end
end

class MySecondWorker < Workling::Base

  def sample_twooptions)
   5.times do |i|
     logger.info "====Hi from 2nd worker==============="     end
  end
 end

end

I got following o/p

====Hi from 1st worker===============
====Hi from 1st worker===============
====Hi from 1st worker===============
====Hi from 1st worker===============
====Hi from 1st worker===============
====Hi from 2nd worker===============
====Hi from 2nd worker===============
====Hi from 2nd worker===============
====Hi from 2nd worker===============
====Hi from 2nd worker===============


I was expecting something like this

====Hi from 1st worker===============
====Hi from 2nd worker===============
====Hi from 1st worker===============
====Hi from 2nd worker===============

> ........


Thanks For your help
Deepak
Robert K. (Guest)
on 2008-12-15 23:30
(Received via mailing list)
On 15.12.2008 14:28, Deepak G. wrote:

> On Sat, Dec 13, 2008 at 10:22 PM, Robert K.
> <removed_email_address@domain.invalid>wrote:

>> Can you be more specific what you really mean by this?  How does it fail?
>
>>>>  Well when I start the backgroundrb processes then for next few or next
> 1,2 days everything works well, But after some time worker just gets hangs I
> can see there process ID till active.
> But No output. I examine logs also but didn't get anything in log files. I
> am not getting a single clue of what went wrong?

Apparently.  Since I don't know the code I cannot really make sense of
what you report.  It does seem weird though that apparently you keep
your workers active for several days.  Do you actually keep them busy or
do you just keep them around?

> class MyWorker < Workling::Base
>   def sample_twooptions)
> ====Hi from 1st worker===============
> I was expecting something like this
>
> ====Hi from 1st worker===============
> ====Hi from 2nd worker===============
> ====Hi from 1st worker===============
> ====Hi from 2nd worker===============

Well, there is no guarantee that messages are actually intermixed as you
expect - especially not with Ruby's green threads - if that's what
Workling is using.

Cheers

  robert
Ara H. (Guest)
on 2008-12-16 18:53
(Received via mailing list)
On Dec 12, 2008, at 6:58 AM, Deepak G. wrote:

> assigned a
> Pls I need a help on above problem.*
>
>
> Thanks
> Deepak


use bj


http://codeforpeople.rubyforge.org/svn/bj/trunk/README


it was written for engine yard and is under heavy use there.  the
focus is on simplicity and robustness.

a @ http://codeforpeople.com/
Hemant K. (Guest)
on 2008-12-17 19:29
(Received via mailing list)
On Tue, Dec 16, 2008 at 10:15 PM, ara.t.howard 
<removed_email_address@domain.invalid>
wrote:
>>
>> 2) Now I am using backgroundrb with 10 workers each worker has assigned a
>> job to parse data from 20 feeds (e.g 1st worker will fecth data from
>> feeds(1..20), 2nd from feeds(21..30) ..etc.....
>>
>> 3) But backgroundrb is not reliable and it fails after some time. So I
>> have
>> tried Starling & Workling but those worker doesn't run *parallely.
>>
>> ( I need to run **parallely because those feeds will increase say 1000
>> feeds. So I can't run them sequentially. ) *

Do you have backtrace of any kind? Can you post your worker code?
Which version of BackgrounDRb you are running?
This topic is locked and can not be replied to.