Suggestions for a distributed job queue

I’m looking at replacing our homebrew job queue with something better,
and
I’m wondering what’s out there. If there’s something that meets our
needs
I’d like to use it; otherwise I may end up building our next gen job
queue
by assembling various components that exist already. I’ve seen a couple
dozen solutions for this in the Ruby world, but haven’t scrutinized many
of
them in depth and am unsure if there are any that fully meet our needs.

We have many different kinds of jobs, however the main ones I’m worried
about are rather CPU intensive, take a long time to execute, and operate
on
large amounts of input and output data.

  • Distributed: we certainly have too many jobs to run on a single
    computer.
    We need to distribute them across a number of workers. Beyond that we’d
    like to automatically provision more workers when the backlog of jobs
    gets
    too large, and shut down workers if too many of them are idle.

  • Fault Tolerant: as with any distributed system, fault tolerance is
    important. An evil monkey should be able to futz with any part of the
    system, crashing computers willy nilly, and it should continue to Just
    Work. If a job breaks down and is somehow lost anywhere along the way,
    the
    system should eventually detect this and retry the job. From a design
    perspective, I would like to see as much of the system as stateless as
    possible. The agents executing the jobs and message queues should be
    stateless. Ideally the entirety of the state of the system is kept in
    the
    database, and that’s the only part of the system we need to ensure
    recovering state from after a crash. If a worker or message queue goes
    down
    we shouldn’t need to worry about recovering jobs-in-flight, they should
    simply be retried.

  • Idempotent: going along with a stateless approach to fault tolerance,
    if
    the system does misdetect a failed job and ends up executing it twice,
    the
    system should detect this and discard the redundant results. Having the
    same job accidentally complete multiple times should not screw up the
    state
    of the system.

  • Support for Temporary Failures: sometimes a job fails in such a way
    that
    it should be retried (i.e. external resources needed to complete a job
    are
    temporarily unavailable). The system should support retrying these jobs
    in
    a sensible way, and it’d be nice to specify on a job-by-job basis how
    many
    attempts should be made before the system should give up and consider it
    a
    permanent failure.

  • Support for Live Upgrades: we’d like to be able to add new types of
    jobs
    to the system . So along with this, agents should know what types of
    jobs
    they support, and not request unsupported jobs from the message queue.

I suppose some of my comments are presupposing the architecture: a list
of
jobs stored in the database, command and control processes which load
jobs
into and read results from the message queues, and agents which pull
jobs
from and report jobs to the message queues, and the message queues
themselves. I’d be open to other designs, so long as they have the
above
properties.

Given that, is there an existing job queue that meets my needs that I
should
be checking out?

On Dec 22, 2009, at 1:23 PM, Tony A. wrote:

Given that, is there an existing job queue that meets my needs that I should be checking out?

I don’t think this exactly meets everything you are after, but it gets
close in a lot of ways:

Introducing Resque | The GitHub Blog

James Edward G. II

On Tue, Dec 22, 2009 at 2:35 PM, James Edward G. II
<[email protected]

wrote:

James Edward G. II

I’ll +1 resque. It’s a pretty fantastic system.

Jason

On Tue, Dec 22, 2009 at 12:35 PM, James Edward G. II <
[email protected]> wrote:

I don’t think this exactly meets everything you are after, but it gets
close in a lot of ways:

Introducing Resque | The GitHub Blog

Resque looks very interesting. Thanks for the heads up.

by assembling various components that exist already. I’ve seen a

  • Distributed: we certainly have too many jobs to run on a single
    Work. If a job breaks down and is somehow lost anywhere along the way,
    simply be retried.

permanent failure.
jobs
into and read results from the message queues, and agents which pull
jobs
from and report jobs to the message queues, and the message queues
themselves. I’d be open to other designs, so long as they have the
above
properties.

Given that, is there an existing job queue that meets my needs that I
should
be checking out?

I don’t know off the top of my head if it does all of that, but
I’d look at Ruby Queue as a distributed work queue.
http://raa.ruby-lang.org/project/rq/

On Tue, Dec 22, 2009 at 12:40 PM, Walton H.
[email protected]wrote:

I don’t know off the top of my head if it does all of that, but
I’d look at Ruby Queue as a distributed work queue.
http://raa.ruby-lang.org/project/rq/

I’ve looked at RubyQueue in the past and it is rather interesting,
however
it as a number of issues which would prevent us from using it.

For starters, it uses NFS as the distribution protocol, and using NFS
isn’t
really practical in our environment.

A couple of other options:

(1) AMQP, e.g. rabbitmq. I believe it comes with ruby bindings. Can be
made as fault-tolerant as you like :slight_smile:

(2) Depending on your needs, you could consider rolling your own with
DRb. This means at least you know the system inside-out and can easily
customise it - although avoiding the queue server itself being a SPOF is
awkward.

Here are a couple of working proofs-of-concept.

In-RAM queue

---- server ----
require ‘drb’
require ‘thread’
q = Queue.new # or SizedQueue.new(1000)
DRb.start_service(“druby://127.0.0.1:9911”, q)
DRb.thread.join

---- client ----
require ‘drb’
DRb.start_service
q = DRbObject.new(nil, “druby://localhost:9911”)
q.push “abc”
puts q.pop

On-disk queue using Madeleine

---- server ----
require ‘rubygems’
require ‘madeleine’

class MadQueue
def initialize(madeleine)
@madeleine = madeleine
end

Read operations don’t need to go via command objects (if you don’t

care about synchronization)

def length
@madeleine.system.length
end

class Pusher
def initialize(data)
@data = data
end
def execute(system)
system.push(@data)
end
end

class Popper
def execute(system)
system.shift
end
end

def push(data)
@madeleine.execute_command(Pusher.new(data))
end

def pop
@madeleine.execute_command(Popper.new)
end
end

require ‘drb’
madeleine = SnapshotMadeleine.new(“madqueue.dir”) { [] }

Thread.new(madeleine) {
puts “Taking snapshot every 30 seconds.”
while true
sleep(30)
madeleine.take_snapshot
end
}

DRb.start_service(“druby://127.0.0.1:9911”, MadQueue.new(madeleine))
DRb.thread.join

---- client ----
Same as above

On Tuesday 22 December 2009 03:39:15 pm Tony A. wrote:

For starters, it uses NFS as the distribution protocol, and using NFS isn’t
really practical in our environment.

Not that I doubt it, but I’m curious what the limitation is. Is it
scalability?

On Wed, Dec 23, 2009 at 12:10 AM, David M. [email protected]
wrote:

On Tuesday 22 December 2009 03:39:15 pm Tony A. wrote:

For starters, it uses NFS as the distribution protocol, and using NFS
isn’t
really practical in our environment.

Not that I doubt it, but I’m curious what the limitation is. Is it
scalability?

More like security. This is running partially in our datacenter and
partially on EC2. While I’m sure it “can be done”, it really doesn’t
seem
like the ideal solution.

Hi,

I am using an AMQP compliant queue for this. With its permanent queuing
and routing mechanisms it can be made to meet many if not all of your
requirements, I believe.

As job items I am using thrift RPC method calls, which is very
convenient on both sides (server, client). The library that allows you
to do this is here: http://github.com/kschiess/thrift_amqp_transport
(currently being redesigned).

There are many solutions in this space. Most recently, people have been
using resque (github I believe) and AMQP-Queues. Other solutions can be
found using the database or the filesystem, as you already know.

my 2 cents
kaspar

On Wed, Dec 23, 2009 at 11:57 AM, David M. [email protected]
wrote:

Worth mentioning: EC2’s internal IPs can be pretty much completely
firewalled
off, and VPNs are easy enough to set up. Probably easily doable.

Then what happens when the VPN goes down and the leaky abstraction that
is
NFS’s synchronous API grinds your message queue to a halt?

On 24.12.2009 03:08, Tony A. wrote:

Then what happens when the VPN goes down and the leaky abstraction that is
NFS’s synchronous API grinds your message queue to a halt?

Same thing as when you have an EC2 outage: You save state, and resume
processing ASAP.

On Wednesday 23 December 2009 11:04:18 am Tony A. wrote:

More like security. This is running partially in our datacenter and
partially on EC2. While I’m sure it “can be done”, it really doesn’t seem
like the ideal solution.

Worth mentioning: EC2’s internal IPs can be pretty much completely
firewalled
off, and VPNs are easy enough to set up. Probably easily doable.

But you’re right, probably not the ideal solution.

On 24.12.2009 03:36, Tony A. wrote:

On Wed, Dec 23, 2009 at 7:17 PM, Phillip G.[email protected] wrote:

Same thing as when you have an EC2 outage: You save state, and resume
processing ASAP.

Oof. Well for one, due to the nature of the synchronous filesystem API,
it’s hard for processes in userspace to detect when things are amiss in the
underlying NFS layers.

And implementing some sort of keep-alive/heartbeat system is too much
work, as well.

Also, if you read my OP, saving state (aside from the state of “what jobs
have not been run yet”) and recovering jobs in flight is something I want to
avoid.
Well, I didn’t mean that you implement a whole synchronization
framework (what it comes down to). Alas, I implied it.

If the system fails I’d rather it simply fail and restore it to a
clean state. That way, you can have only one stateful part of the system,
and that’s the only part you need to worry about recovering state from after
a failure.

And this would probably be best done on “your” end of the network, too.
That way you could ignore the EC2 nodes for the time being, in case of
some form of network outage (assuming I understood you correctly, in
that only part of your nodes are in Amazon’s cloud).

Not having had a look at RQueue’s implemtation details, maybe you could,
without too much effort, port to the Devil From Redmond’s SMB system,
via Samba.

Or roll your own, as has been suggested, with DRb. :slight_smile:

On Wed, Dec 23, 2009 at 7:17 PM, Phillip G. [email protected]
wrote:

Same thing as when you have an EC2 outage: You save state, and resume
processing ASAP.

Oof. Well for one, due to the nature of the synchronous filesystem API,
it’s hard for processes in userspace to detect when things are amiss in
the
underlying NFS layers.

Also, if you read my OP, saving state (aside from the state of “what
jobs
have not been run yet”) and recovering jobs in flight is something I
want to
avoid. If the system fails I’d rather it simply fail and restore it to
a
clean state. That way, you can have only one stateful part of the
system,
and that’s the only part you need to worry about recovering state from
after
a failure.