Ruby queue : rq-2.3.1

URIS:

http://codeforpeople.com/lib/ruby/rq/
Linux Clustering with Ruby Queue: Small Is Beautiful | Linux Journal
http://raa.ruby-lang.org/project/rq/

HISTORY:


2.3.1:
- added ‘stage’ option to submit mode, which allows submission in a
‘holding’
state. thanks to [email protected] for this fix!

README:

NAME
rq v2.3.1

SYNOPSIS
rq (queue | export RQ_Q=q) mode [mode_args]* [options]*

DESCRIPTION
ruby queue (rq) is a tool used to create instant linux clusters by
managing
sqlite databases as nfs mounted priority work queues. multiple
instances of
rq running from multiples hosts can work from these queues to
distribute
processing load to n nodes - bringing many dozens of otherwise
powerful cpus
to their knees with a single blow. clearly this software should be
kept out
of the hands of free radicals, seti enthusiasts, and mr. jeff
safran.

 the central concept of rq is that n nodes work in isolation to pull 

jobs from
an central nfs mounted priority work queue in a synchronized
fashion. the
nodes have absolutely no knowledge of each other and all
communication if done
via the queue meaning that, so long as the queue is available via
nfs and a
single node is running jobs from it, the system will continue to
process jobs.
there is no centralized process whatsoever - all nodes work to take
jobs from
the queue and run them as fast as possible. this creates a system
which load
balances automatically and is robust in face of node failures.

 the first argument to any rq command is the name of the queue. 

this name may
be omitted if, and only if, the environment variable RQ_Q has been
set to
contain the absolute path of target queue.

 rq operates in one of the modes create, submit, resubmit, list, 

status,
delete, update, query, execute, configure, snapshot, lock, backup,
rotate,
feed, or help. depending on the mode of operation and the options
used the
meaning of ‘mode_args’ may change.

MODES

 the following mode abbreviations exist

   c  => create
   s  => submit
   r  => resubmit
   l  => list
   ls => list
   t  => status
   d  => delete
   rm => delete
   u  => update
   q  => query
   e  => execute
   C  => configure
   S  => snapshot
   L  => lock
   b  => backup
   R  => rotate
   f  => feed
   h  => help

 not all modes have abbreviations

 create, c :

   create a queue.  the queue must be located on an nfs mounted file 

system
visible from all nodes intended to run jobs from it. nfs locking
must be
functional on this file system.

   examples :

     0) to create a queue
         ~ > rq /path/to/nfs/mounted/q create
       or simply
         ~ > rq /path/to/nfs/mounted/q c


 submit, s :

   submit jobs to a queue to be proccesed by a feeding node.  any 

‘mode_args’
are taken as the command to run. note that ‘mode_args’ are
subject to shell
expansion - if you don’t understand what this means do not use
this feature
and pass jobs on stdin.

   when running in submit mode a file may by specified as a list of 

commands to
run using the ‘–infile, -i’ option. this file is taken to be a
newline
separated list of commands to submit, blank lines and comments
(#) are
allowed. if submitting a large number of jobs the input file
method is
MUCH, more efficient. if no commands are specified on the
command line rq
automatically reads them from STDIN. yaml formatted files are
also allowed
as input (http://www.yaml.org/) - note that the output of nearly
all rq
commands is valid yaml and may, therefore, be piped as input into
the submit
command. the leading ‘—’ of yaml file may not be omitted.

   when submitting the '--priority, -p' option can be used here to 

determine
the priority of jobs. priorities may be any whole number - zero
is the
default. note that submission of a high priority job will NOT
supplant
currently running low priority jobs, but higher priority jobs
WILL always
migrate above lower priority jobs in the queue in order that they
be run as
soon as possible. constant submission of high priority jobs may
create a
starvation situation whereby low priority jobs are never allowed
to run.
avoiding this situation is the responsibility of the user. the
only
guaruntee rq makes regarding job execution is that jobs are
executed in an
‘oldest highest priority’ order and that running jobs are never
supplanted.
jobs submitted with the ‘–stage’ option will not be run by any
node and
will remain in a ‘holding’ state until updated (see update mode)
into the
‘pending’ mode, this option allows jobs to entered, or staged, in
the queue
and made candidates for running at a later date.

   examples :

     0) submit the job ls to run on some feeding host

       ~ > rq q s ls

     1) submit the job ls to run on some feeding host, at priority 9

       ~ > rq -p9 q s ls

     2) submit 42000 jobs (quietly) from a command file, marking 

them as
restartable should the node they are running on reboot.

       ~ > wc -l cmdfile
       42000
       ~ > rq q s --quiet --restartable < cmdfile

     3) submit 42 priority 9 jobs from a command file.

       ~ > wc -l cmdfile
       42
       ~ > rq -p9 q s < cmdfile

     4) submit 42 priority 9 jobs from a command file, marking them 

as
‘important’ using the ‘–tag, -t’ option.

       ~ > wc -l cmdfile
       42
       ~ > rq -p9 -timportant q s < cmdfile

     5) re-submit all the 'important' jobs (see 'query' section 

below)

       ~ > rq q query tag=important | rq q s

     6) re-submit all jobs which are already finished (see 'list' 

section
below)

       ~ > rq q l f | rq q s


     7) stage the job wont_run_yet to the queue in a 'holding' 

state. no
feeder will run this job until it’s state is upgraded to
‘pending’

       ~ > rq q s --stage wont_run_yet


 resubmit, r :

   resubmit jobs back to a queue to be proccesed by a feeding node. 

resubmit
is essentially equivalent to submitting a job that is already in
the queue
as a new job and then deleting the original job except that using
resubmit
is atomic and, therefore, safer and more efficient. read docs
for delete
and submit for more info.

   examples :

     0) resubmit job 42 to the queue.  afterwards


 list, l, ls :

   list mode lists jobs of a certain state or job id.  state may be 

one of
pending, holding, running, finished, dead, or all. any
‘mode_args’ that are
numbers are taken to be job id’s to list.

   states may be abbreviated to uniqueness, therefore the following 

shortcuts
apply :

     p => pending
     h => holding
     r => running
     f => finished
     d => dead
     a => all

   examples :

     0) show everything in q
         ~ > rq q list all
       or
         ~ > rq q l all
       or
         ~ > export RQ_Q=q
         ~ > rq l

     1) show q's pending jobs
         ~ > rq q list pending

     2) show q's running jobs
         ~ > rq q list running

     3) show q's finished jobs
         ~ > rq q list finished

     4) show job id 42
         ~ > rq q l 42

     5) show q's holding jobs
         ~ > rq q list holding


 status, t :

   status mode shows the global state the queue.  there are no 

‘mode_args’.
the meaning of each state is as follows:

     pending  => no feeder has yet taken this job
     holding  => a hold has been placed on this job, thus no feeder 

will start
it
running => a feeder has taken this job
finished => a feeder has finished this job
dead => rq died while running a job, has restarted, and
moved
this job to the dead state

   note that rq cannot move jobs into the dead state unless it has 

been
restarted. this is because no node has any knowledge of other
nodes and
cannot possibly know if a job was started on a node that died, or
is simply
taking a very long time. only the node that dies, upon restart,
can
determine that is has jobs that ‘were started before it started’
and move
these jobs into the dead state. normally only a machine crash
would cause a
job to be placed into the dead state. dead jobs are never
automatically
restarted, this is the responsibility of an operator.

   examples :

     0) show q's status

       ~ > rq q t


 delete, d :

   delete combinations of pending, holding, finished, dead, or jobs 

specified
by jid. the delete mode is capable of parsing the output of list
and query
modes, making it possible to create custom filters to delete jobs
meeting
very specific conditions.

   'mode_args' are the same as for list.

   note that it is NOT possible to delete a running job.  rq has a
   decentralized architechture which means that compute nodes are 

completely
independant of one another; an extension is that there is no way
to
communicate the deletion of a running job from the queue the the
node
actually running that job. it is not an error to force a job to
die
prematurely using a facility such as an ssh command spawned on
the remote
host to kill it. once a job has been noted to have finished,
whatever the
exit status, it can be deleted from the queue.

   examples :

     0) delete all pending, finished, and dead jobs from a queue

       ~ > rq q d all

     1) delete all pending jobs from a queue

       ~ > rq q d p

     2) delete all finished jobs from a queue

       ~ > rq q d f

     3) delete jobs via hand crafted filter program

       ~ > rq q list | yaml_filter_prog | rq q d

       an example ruby filter program (you have to love this)

       require 'yaml'
       joblist = YAML::load STDIN
       y joblist.select{|job| job['command'] =~ /bombing_program/}

       this program reads the list of jobs (yaml) from stdin and 

then dumps
only those jobs whose command matches ‘bombing_program’,
which is
subsequently piped to the delete command.

 update, u :

   update assumes all leading arguments are jids to update with 

subsequent
key=value pairs. currently only the ‘command’, ‘priority’, and
‘tag’ fields
of pending jobs can be generically updated and the ‘state’ field
may be
toggled between pending and holding.

   examples:

     0) update the priority of job 42

       ~ > rq q update 42 priority=7

     1) update the priority of all pending jobs

       ~ > rq q update pending priority=7

     2) query jobs with a command matching 'foobar' and update their 

command
to be ‘barfoo’

       ~ > rq q q "command like '%foobar%'" |\
           rq q u command=barfoo

     3) place a hold on jid 2

       ~ > rq q u 2 state=holding

     4) place a hold on all jobs with tag=disk_filler

       ~ > rq q q tag=disk_filler | rq q u state=holding

     5) remove the hold on jid 2

       ~ > rq q u 2 state=pending


 query, q :

   query exposes the database more directly the user, evaluating the 

where
clause specified on the command line (or read from STDIN). this
feature can
be used to make a fine grained slection of jobs for reporting or
as input
into the delete command. you must have a basic understanding of
SQL syntax
to use this feature, but it is fairly intuitive in this limited
capacity.

   examples:

     0) show all jobs submitted within a specific 10 minute range

       ~ > rq q query "started >= '2004-06-29 22:51:00' and started 

< ‘2004-06-29 22:51:10’"

     1) shell quoting can be tricky here so input on STDIN is also 

allowed to
avoid shell expansion

       ~ > cat constraints.txt
       started >= '2004-06-29 22:51:00' and
       started < '2004-06-29 22:51:10'

       ~ > rq q query < contraints.txt
         or (same thing)

       ~ > cat contraints.txt| rq q query


     2) this query output might then be used to delete those jobs

       ~ > cat contraints.txt | rq q q | rq q d

     3) show all jobs which are either finished or dead

       ~ > rq q q "state='finished' or state='dead'"

     4) show all jobs which have non-zero exit status

       ~ > rq q query exit_status!=0

     5) if you plan to query groups of jobs with some common feature 

consider
using the ‘–tag, -t’ feature of the submit mode which
allows a user to
tag a job with a user defined string which can then be used
to easily
query that job group

       ~ > rq q submit --tag=my_jobs < joblist
       ~ > rq q query tag=my_jobs


     6) in general all but numbers will need to be surrounded by 

single quotes
unless the query is a ‘simple’ one. a simple query is a
query with no
boolean operators, not quotes, and where every part of it
looks like

           key op value

        with ** NO SPACES ** between key, op, and value.  if, and 

only if, the
query is ‘simple’ rq will contruct the where clause
appropriately. the
operators accepted, and their meanings, are

          =  : equivalence : sql =
          =~ : matches     : sql like
          !~ : not matches : sql not like

        match, in the context is ** NOT ** a regular expression but 

a sql style
string match. about all you need to know about sql matches
is that the
‘%’ char matches anything. multiple simple queries will be
joined with
boolean ‘and’

        this sounds confusing - it isn't.  here are some examples of 

simple
queries

        6.a)
          query :
            rq q query tag=important

          where_clause :
            "( tag = 'important' )"

        6.b)
          query :
            rq q q priority=6 restartable=true

          where_clause :
            "( priority = 6 ) and ( restartable = 'true' )"

        6.c)
          query :
            rq q q command=~%bombing_job% runner=~%node_1%

          where_clause :
            "( command like '%bombing_job%') and (runner like 

‘%node_1%’)"

 execute, e :

   execute mode is to be used by expert users with a knowledge of 

sql syntax
only. it follows the locking protocol used by rq and then allows
the user
to execute arbitrary sql on the queue. unlike query mode a write
lock on
the queue is obtained allowing a user to definitively shoot
themselves in
the foot. for details on a queue’s schema the file ‘db.schema’
in the queue
directory should be examined.

     examples :

       0) list all jobs

         ~ > rq q execute 'select * from jobs'


 configure, C :

   this mode is not supported yet.


 snapshot, p :

   snapshot provides a means of taking a snapshot of the q. use this 

feature
when many queries are going to be run; for example when
attempting to figure
out a complex pipeline command your test queries will not compete
with the
feeders for the queue’s lock. you should use this option
whenever possible
to avoid lock competition.

   examples:

     0) take a snapshot using default snapshot naming, which is made 

via the
basename of the q plus ‘.snapshot’

       ~ > rq /path/to/nfs/q snapshot

     1) use this snapshot to chceck status

       ~ > rq ./q.snapshot status

     2) use the snapshot to see what's running on which host

       ~ > rq ./q.snapshot list running | grep `hostname`

   note that there is also a snapshot option - this option is not 

the same as
the snapshot command. the option can be applied to ANY command.
if in
effect then that command will be run on a snapshot of the
database and the
snapshot then immediately deleted. this is really only useful if
one were
to need to run a command against a very heavily loaded queue and
did not
wish to wait to obtain the lock. eg.

     0) get the status of a heavily loaded queue

       ~ > rq q t --snapshot

     1) same as above

       ~ > rq q t -s

   ** IMPORTANT **

     a really great way to hang all processing in your queue is to 

do this

       rq q list | less

     and then leave for the night.  you hold a read lock you won't 

release
until less dies. this is what snapshot is made for! use it
like

       rq q list -s | less

     now you've taken a snapshot of the queue to list so your locks 

affect no
one.

 lock, L :

   lock the queue and then execute an arbitrary shell command.  lock 

mode uses
the queue’s locking protocol to safely obtain a lock of the
specified type
and execute a command on the user’s behalf. lock type must be
one of

     (r)ead | (sh)ared | (w)rite | (ex)clusive

   examples :

     0) get a read lock on the queue and make a backup

       ~ > rq q L read -- cp -r q q.bak

       (the '--' is needed to tell rq to stop parsing command line
        options which allows the '-r' to be passed to the 'cp' 

command)

   ** IMPORTANT **

     this is another fantastic way to freeze your queue - use with 

care!

 backup, b :

   backup mode is exactly the same as getting a read lock on the 

queue and
making a copy of it. this mode is provided as a convenience.

     0) make a backup of the queue using default naming ( qname + 

timestamp + .bak )

       ~ > rq q b

     1) make a backup of the queue as 'q.bak'

       ~ > rq q b q.bak


 rotate, r :

   rotate mode is conceptually similar to log rolling.  normally the 

list of
finished jobs will grow without bound in a queue unless they are
manually
deleted. rotation is a method of trimming finished jobs from a
queue
without deleting them. the method used is that the queue is
copied to a
‘rotation’; all jobs that are dead or finished are deleted from
the original
queue and all pending and running jobs are deleted from the
rotation. in
this way the rotation becomes a record of the queue’s finished
and dead jobs
at the time the rotation was made.

     0) rotate a queue using default rotation name

       ~ > rq q rotate

     1) rotate a queue naming the rotation

       ~ > rq q rotate q.rotation

     2) a crontab entry like this could be used to rotate a queue 

daily

       59 23 * * * rq q rotate `date +q.%Y%m%d`


 feed, f :

   take jobs from the queue and run them on behalf of the submitter 

as quickly
as possible. jobs are taken from the queue in an ‘oldest highest
priority’
first order.

   feeders can be run from any number of nodes allowing you to 

harness the CPU
power of many nodes simoultaneously in order to more effectively
clobber
your network, anoy your sysads, and set output raids on fire.

   the most useful method of feeding from a queue is to do so in 

daemon mode so
that if the process loses it’s controling terminal it will not
exit when you
exit your terminal session. use the ‘–daemon, -d’ option to
accomplish
this. by default only one feeding process per host per queue is
allowed to
run at any given moment. because of this it is acceptable to
start a feeder
at some regular interval from a cron entry since, if a feeder is
alreay
running, the process will simply exit and otherwise a new feeder
will be
started. in this way you may keep feeder processing running even
acroess
machine reboots without requiring sysad intervention to add an
entry to the
machine’s startup tasks.

   examples :

     0) feed from a queue verbosely for debugging purposes, using a 

minimum and
maximum polling time of 2 and 4 respectively. you would
NEVER specify
polling times this brief except for debugging purposes!!!

       ~ > rq q feed -v4 -m2 -M4

     1) same as above, but viewing the executed sql as it is sent to 

the
database

       ~ > RQ_SQL_DEBUG=1 rq q f -v4 -m2 -M4

     2) feed from a queue in daemon mode - logging to 

/home/ahoward/rq.log

       ~ > rq q f -d -l/home/ahoward/rq.log

        log rolling in daemon mode is automatic so your logs should 

never need
to be deleted to prevent disk overflow.

     3) use something like this sample crontab entry to keep a 

feeder running
forever - it attempts to (re)start every fifteen minutes but
exits if
another process is already feeding.

       #
       # your crontab file - sample only
       #

       */15 * * * * /full/path/to/bin/rq /full/path/to/nfs/mounted/q 

f -d -l/home/username/cfq.log -q

       the '--quiet, -q' here tells rq to exit quietly (no STDERR)
       when another process is found to already be feeding so that 

no cron
message would be sent under these conditions.

 start :

   the start mode is equivalent to running the feed mode except the 

–daemon is
implied so the process instantly goes into the background. also,
if no log
(–log) is specified in start mode a default one is used. the
default is

     ENV['HOME'] + '/' + File::basename(queue) + '.log'

   the crontab line above could just as well be

     */15 * * * * /full/path/to/bin/rq /full/path/to/nfs/mounted/q 

start -q

   with the resulting log ending up in ~/q.log

   examples :

     0) start a daemon process feeding from q

       ~ > rq q start


 shutdown :

   tell a running feeder to finish any pending jobs and then to 

exit. this is
equivalent to sending signal ‘SIGTERM’ to the process - this is
what using
‘kill pid’ does by default.

   examples :

     0) stop a feeding process, if any, that is feeding from q. 

allow all jobs
to be finished first.

       ~ > rq q shutdown

   ** VERY IMPORTANT **

     if you are keeping your feeder alive with a crontab entry 

you’ll need to
comment it out before doing this or else it will simply
re-start!!!

 stop :

   tell any running feeder to stop NOW.  this sends signal 'SIGKILL' 

(-9) to
the feeder process. the same warning as for shutdown applies!!!

   examples :

     0) stop a feeding process, if any, that is feeding from q. 

allow NO jobs
to be finished first - exit instantly.

       ~ > rq q stop


 feeder :

   show the pid, if any, of the feeder

   ~ > rq q feeder

     feeder <15366>


 help, h :

   this message

   examples :

     0) get this message

       ~> rq q help
       or
       ~> rq help

NOTES
- realize that your job is going to be running on a remote host and
this has
implications. paths, for example, should be absolute, not
relative.
specifically the submitted job script must be visible from all
hosts
currently feeding from a queue as must be the input and output
files/directories.

 - jobs are currently run under the bash shell using the --login 

option.
therefore any settings in your .bashrc will apply - specifically
your PATH
setting. you should not, however, rely on jobs running with any
given
environment.

 - you need to consider __CAREFULLY__ what the ramifications of 

having multiple
instances of your program all potentially running at the same
time will be.
for instance, it is beyond the scope of rq to ensure multiple
instances of a
given program will not overwrite each others output files.
coordination
of programs is left entirely to the user.

 - the list of finished jobs will grow without bound unless you 

sometimes
delete some (all) of them. the reason for this is that rq cannot
know when
the user has collected the exit_status of a given job, and so
keeps this
information in the queue forever until instructed to delete it.
if you have
collected the exit_status of you job(s) it is not an error to
then delete
that job from the finished list - the information is kept for
your
informational purposes only. in a production system it would be
normal to
periodically save, and then delete, all finished jobs.

 - know that it is a VERY bad idea to spawn several dozen process 

all
reading/writing huge output files to a single NFS server. use
this paradigm
instead

     copy data locally from input space
     work on date
     move data to output space

   the vsftp daemon is an excellent utility to have running on hosts 

in your
cluster so anonymous ftp can be used to get/put data.

 - know that nfs locking is very, very easy to break with firewalls 

put in
place by over zealous system administrators. be postive not only
that nfs
locking works, but that lock recovery server/client crash or
reboot works as
well. http://nfs.sourceforge.net/ is the place to learn about
NFS. my
experience thus far is that there are ZERO properly configured
NFS
installations in the world. please test yours. contact me for a
simple
script which can assist you. donations of beer may be required.

ENVIRONMENT
RQ_Q: set to the full path of nfs mounted queue

   the queue argument to all commands may be omitted if, and only 

if, the
environment variable ‘RQ_Q’ contains the full path to the q. eg.

     ~ > export RQ_Q=/full/path/to/my/q

   this feature can save a considerable amount of typing for those 

weak of
wrist.

DIAGNOSTICS
success : $? == 0
failure : $? != 0

CREDITS
- kim baugh : patient tester and design input
- jeff safran : the guy can break anything
- chris elvidge : made it possible
- trond myklebust : tons of help with nfs
- jamis buck : for writing the sqlite bindings for ruby
- _why : for writing yaml for ruby
- matz : for writing ruby

AUTHOR
[email protected]

BUGS
0 < bugno && bugno <= 42

reports to [email protected]

OPTIONS
–priority=priority, -p
modes : set the job(s) priority - lowest(0) …
highest(n) -
(default 0)
–tag=tag, -t
modes : set the job(s) user data tag
–runner=runner
modes : set the job(s) required runner(s)
–restartable
modes : set the job(s) to be restartable on node
reboot
–stage
modes : set the job(s) initial state to be holding
(default
pending)
–infile=infile
modes : infile
–quiet, -q
modes <submit, feed> : do not echo submitted jobs, fail
silently if
another process is already feeding
–daemon, -D
modes : spawn a daemon
–max_feed=max_feed
modes : the maximum number of concurrent jobs run
–retries=retries
modes : specify transaction retries
–min_sleep=min_sleep
modes : specify min sleep
–max_sleep=max_sleep
modes : specify max sleep
–snapshot, -s
operate on snapshot of queue
–verbosity=verbostiy, -v
0|fatal < 1|error < 2|warn < 3|info < 4|debug - (default
info)
–log=path, -l
set log file - (default stderr)
–log_age=log_age
daily | weekly | monthly - what age will cause log rolling
(default
nil)
–log_size=log_size
size in bytes - what size will cause log rolling (default
nil)
–help, -h
this message
–version
show version number

enjoy.

-a