I’m a Unix admin at a research lab in the technion, and use (and
loooove) Ruby
for all kinds of useful applications I wrote for use in the lab. One of
them is
the Matlab Application Server, a queue/batch manager for Matlab jobs.
Old version
was perl, text files, and one file per job - new version is Rails for
the UI
(which is on another machine), vanilla Ruby for the daemon, and MySQL to
communicate between them.
Usually it works spectacularily, and people don’t have to lock down the
Windows
workstations overnight (or over many nights) so they finish their jobs.
The queue
is also rather fair - 2 simultaneous jobs, 1 per each user.
However, today something strange happened. The server was under heavy
load and
using a lot of swap, as usual when two Matlab jobs are running. Two jobs
were
indeed running: pids A and B (A < B). However, they were both by the
same user
(which shouldn’t happen). Furthermore, Only B was a son of the daemon,
whereas
A was a son of init. The log file said that earlier, a son C died which
wasn’t
in the SQL table of running jobs (dying children are removed from there,
because
it means they finished running). And weirdest, the SQL table of running
jobs
contained job D, which wasn’t run (and belonged to a different user).
Very odd, and I can’t seem to find the bug. So here are relevant bits of
code,
if anyone has an idea I’d greatly appreciate it.
(snip)
module MatlabAS
class Daemon
def initialize
load_config
@total_procs = 0
@user_procs = {}
mysql_connect
setup_signals
clear_running_jobs
main_loop
end
# snip
protected
# Main program loop
def main_loop
@continue_running = true
while @continue_running
mysql_connect
begin
handle_command # For handling manual 'kill job' commands,
not relevant
rescue
$stderr.print 'Error handling command: ’
$stderr.puts $!
end
if @total_procs < @config[‘limits’][‘total_procs’]
queue_entry = get_queue_entry
if queue_entry
user_id = queue_entry[:user][:id]
@user_procs[user_id] = 0 if @user_procs[user_id].nil?
@user_procs[user_id] += 1
@total_procs += 1
delete_queue_entry queue_entry[:id]
child_pid = fork do
run_worker_process queue_entry[:user][:username],
queue_entry[:directory]
end
add_running_job_entry queue_entry[:sent_at],
queue_entry[:user],
queue_entry[:directory],
child_pid
end
end
mysql_disconnect
really_sleep @config['limits']['sleep_cycle']
end
end
# Make sure that the duration is slept, even if signals were
caught
def really_sleep(duration=0)
# Special case for 0-duration sleep, even though we don’t use it
if duration == 0
sleep 0
else
nap_remaining = duration
while nap_remaining > 0
nap_remaining -= sleep nap_remaining
end
end
end
# Do cleanup-work when children die
def handle_dying_children
mysql_connect
foreach_dying_child do |child_pid|
begin
child = get_running_job_by_pid child_pid
MatlabAS.mail child[:user][:username], "(snipped)"
@total_procs -= 1
@user_procs[child[:user][:id]] -= 1
add_historic_job_entry child[:sent_at], child[:started_at],
child[:user], child[:directory]
rescue
$stderr.print 'Error rescuing a dying child: '
$stderr.puts $!
end
end
end
# Loop across all currently dying children, yielding each
# one's +pid+. Very useful for handling +SIGCHLD+.
def foreach_dying_child
begin
while pid = Process.waitpid(-1,Process::WNOHANG)
yield pid
end
rescue Errno::ECHILD
# Out of dying children, no problem.
end
end
# Run an actual matlab job
def run_worker_process(user, directory)
user_info = Etc.getpwnam user
work_pathname = Base_path + user + directory
setup_signals false
Process::Sys.setgid user_info.gid
Process::Sys.setuid user_info.uid
# snip, handles errors for missing directory / startup file
# config variables were simplified here, obviously they're in
# a nice hash
system "#@bash -c \"#@matlab < #@startup_file > #@output 2>
#@error""
end
# Set up signal handlers. Default is to set up the signal handlers
# for a parent process, but if +parent+ is false, signals are set
# up for a child.
def setup_signals(parent = true)
if parent
Signal.trap('CHLD') { handle_dying_children }
Signal.trap('INT') do
puts "User interrupt. Dying gracefully..."
self.close
end
Signal.trap('TERM') do
puts "Termination requested. Dying gracefully..."
self.close
end
Signal.trap('HUP') do
puts "Configuration reload requested, reloading config."
load_config
end
else
Signal.trap('CHLD', 'DEFAULT')
Signal.trap('INT', 'DEFAULT')
Signal.trap('TERM', 'DEFAULT')
Signal.trap('HUP', 'DEFAULT')
end
end
def handle_command
# Snip, kills a job if such an instruction is in the table
end
# Reads one queue entry from the +queue_entries+ table, in which
the
# user doesn’t match any of the ones who have currently hit their
# process quota. If none are available, returns +nil+.
def get_queue_entry
query_string = 'SELECT * FROM queue_entries ’
if users_at_quota.size > 0
query_string += 'WHERE ' +
users_at_quota.collect do |user_id|
"user_id != #{user_id}"
end.join(' AND ') + ' '
end
query_string += 'LIMIT 1'
result = @mysql_connection.query query_string
queue_entry_hash = result.fetch_hash
result.free
if queue_entry_hash
return { :id => queue_entry_hash['id'],
:sent_at => Time.parse(queue_entry_hash['sent_at']),
:user => get_user_by_id(queue_entry_hash['user_id']),
:directory => queue_entry_hash['directory']
}
else
return nil
end
end
# Returns an array of all users which are at their process quota.
def users_at_quota
@user_procs.reject do |user_id,procs|
procs < @config['limits']['procs_per_user']
end.keys
end
# Deletes the given queued entry from the table.
def delete_queue_entry(id)
@mysql_connection.query "DELETE FROM queue_entries WHERE id =
#{id}"
end
# Adds information about the given historic job to the history
table.
def add_historic_job_entry(sent_at, started_at, user, directory)
@mysql_connection.query <<-EOF
INSERT INTO historic_jobs (sent_at,
started_at,
user_id,
directory)
VALUES (’#{sent_at.to_mysql_time}’,
‘#{started_at.to_mysql_time}’,
‘#{user[:id]}’,
‘#{directory}’)
EOF
end
# Adds information about the given running-job to the running-job
# table.
def add_running_job_entry(sent_at, user, directory, pid)
@mysql_connection.query <<-EOF
INSERT INTO running_jobs (sent_at,
user_id,
directory,
pid)
VALUES (’#{sent_at.to_mysql_time}’,
‘#{user[:id]}’,
‘#{directory}’,
‘#{pid}’)
EOF
end
# Gets a command from the +commands+ table and deletes it
def get_command
# snip
end
# Gets the running-job information for a job matching the supplied
# +pid+, and removes it from the running-jobs table unless
# :remove is set to false (default is to delete. Example:
#
# get_running_job_by_pid 1053, :remove => false
#
def get_running_job_by_pid(pid, options = { :remove => true })
result = @mysql_connection.query <<-EOF
SELECT * FROM running_jobs WHERE pid = #{pid} LIMIT 1
EOF
result_hash = result.fetch_hash
result.free
# THIS IS THE ERROR WHICH WAS RAISED
raise "No job with pid #{pid} in running jobs table." if
result_hash.nil?
if options[:remove]
@mysql_connection.query "DELETE FROM running_jobs WHERE pid =
#{pid}"
end
return { :sent_at => Time.parse(result_hash['sent_at']),
:started_at => Time.parse(result_hash['started_at']),
:user => get_user_by_id(result_hash['user_id']),
:directory => result_hash['directory']
}
end
# Clears the running-jobs table, and any STOP commands
def clear_running_jobs
@mysql_connection.query "DELETE FROM running_jobs"
@mysql_connection.query "DELETE FROM commands WHERE command =
‘stop’"
end
# Gets user information by the given user_id. User information
# is in the form <tt>{ :id, :username }</tt>.
def get_user_by_id(user_id)
result = @mysql_connection.query <<-EOF
SELECT username FROM users WHERE id = #{user_id} LIMIT 1
EOF
result_hash = result.fetch_hash
result.free
{ :id => user_id, :username => result_hash['username'] }
end
# Load configuration from YAML file
def load_config
@config = File.open('/etc/matlab_as.yml') { |yf| YAML::load yf }
end
def mysql_connect
mysql_disconnect
mysql_config = @config['mysql']
connection = Mysql.new mysql_config['host'],
mysql_config['username'], mysql_config['password']
connection.select_db mysql_config['database']
@mysql_connection = connection
end
def mysql_disconnect
@mysql_connection.close unless @mysql_connection.nil?
end
def MatlabAS.mail(address, message)
# snip
end
end
end
daemon = MatlabAS::Daemon.new
daemon.close