Weird forking/rejoining issue


#1

I’m a Unix admin at a research lab in the technion, and use (and
loooove) Ruby
for all kinds of useful applications I wrote for use in the lab. One of
them is
the Matlab Application Server, a queue/batch manager for Matlab jobs.
Old version
was perl, text files, and one file per job - new version is Rails for
the UI
(which is on another machine), vanilla Ruby for the daemon, and MySQL to
communicate between them.

Usually it works spectacularily, and people don’t have to lock down the
Windows
workstations overnight (or over many nights) so they finish their jobs.
The queue
is also rather fair - 2 simultaneous jobs, 1 per each user.

However, today something strange happened. The server was under heavy
load and
using a lot of swap, as usual when two Matlab jobs are running. Two jobs
were
indeed running: pids A and B (A < B). However, they were both by the
same user
(which shouldn’t happen). Furthermore, Only B was a son of the daemon,
whereas
A was a son of init. The log file said that earlier, a son C died which
wasn’t
in the SQL table of running jobs (dying children are removed from there,
because
it means they finished running). And weirdest, the SQL table of running
jobs
contained job D, which wasn’t run (and belonged to a different user).

Very odd, and I can’t seem to find the bug. So here are relevant bits of
code,
if anyone has an idea I’d greatly appreciate it.

(snip)

module MatlabAS
class Daemon
def initialize
load_config
@total_procs = 0
@user_procs = {}

    mysql_connect

    setup_signals
    clear_running_jobs
    main_loop
  end

  # snip

  protected

  # Main program loop
  def main_loop
    @continue_running = true
    while @continue_running
      mysql_connect
      begin
        handle_command # For handling manual 'kill job' commands, 

not relevant
rescue
$stderr.print 'Error handling command: ’
$stderr.puts $!
end
if @total_procs < @config[‘limits’][‘total_procs’]
queue_entry = get_queue_entry
if queue_entry
user_id = queue_entry[:user][:id]
@user_procs[user_id] = 0 if @user_procs[user_id].nil?
@user_procs[user_id] += 1
@total_procs += 1

          delete_queue_entry queue_entry[:id]

          child_pid = fork do
            run_worker_process queue_entry[:user][:username],
                               queue_entry[:directory]
          end
          add_running_job_entry queue_entry[:sent_at],
                                queue_entry[:user],
                                queue_entry[:directory],
                                child_pid
        end
      end

      mysql_disconnect

      really_sleep @config['limits']['sleep_cycle']
    end
  end

  # Make sure that the duration is slept, even if signals were 

caught
def really_sleep(duration=0)
# Special case for 0-duration sleep, even though we don’t use it
if duration == 0
sleep 0
else
nap_remaining = duration
while nap_remaining > 0
nap_remaining -= sleep nap_remaining
end
end
end

  # Do cleanup-work when children die
  def handle_dying_children
    mysql_connect
    foreach_dying_child do |child_pid|
      begin
        child = get_running_job_by_pid child_pid
        MatlabAS.mail child[:user][:username], "(snipped)"
        @total_procs -= 1
        @user_procs[child[:user][:id]] -= 1
        add_historic_job_entry child[:sent_at], child[:started_at],
                               child[:user], child[:directory]
      rescue
        $stderr.print 'Error rescuing a dying child: '
        $stderr.puts $!
      end
    end
  end

  # Loop across all currently dying children, yielding each
  # one's +pid+. Very useful for handling +SIGCHLD+.
  def foreach_dying_child
    begin
      while pid = Process.waitpid(-1,Process::WNOHANG)
        yield pid
      end
    rescue Errno::ECHILD
      # Out of dying children, no problem.
    end
  end

  # Run an actual matlab job
  def run_worker_process(user, directory)
    user_info = Etc.getpwnam user
    work_pathname = Base_path + user + directory
    setup_signals false
    Process::Sys.setgid user_info.gid
    Process::Sys.setuid user_info.uid

    # snip, handles errors for missing directory / startup file

    # config variables were simplified here, obviously they're in
    # a nice hash

    system "#@bash -c \"#@matlab < #@startup_file > #@output 2> 

#@error""
end

  # Set up signal handlers. Default is to set up the signal handlers
  # for a parent process, but if +parent+ is false, signals are set
  # up for a child.
  def setup_signals(parent = true)
    if parent
      Signal.trap('CHLD') { handle_dying_children }

      Signal.trap('INT') do
        puts "User interrupt. Dying gracefully..."
        self.close
      end

      Signal.trap('TERM') do
        puts "Termination requested. Dying gracefully..."
        self.close
      end

      Signal.trap('HUP') do
        puts "Configuration reload requested, reloading config."
        load_config
      end
    else
      Signal.trap('CHLD', 'DEFAULT')
      Signal.trap('INT', 'DEFAULT')
      Signal.trap('TERM', 'DEFAULT')
      Signal.trap('HUP', 'DEFAULT')
    end
  end

  def handle_command
    # Snip, kills a job if such an instruction is in the table
  end

  # Reads one queue entry from the +queue_entries+ table, in which 

the
# user doesn’t match any of the ones who have currently hit their
# process quota. If none are available, returns +nil+.
def get_queue_entry
query_string = 'SELECT * FROM queue_entries ’

    if users_at_quota.size > 0
      query_string += 'WHERE ' +
        users_at_quota.collect do |user_id|
          "user_id != #{user_id}"
        end.join(' AND ') + ' '
    end

    query_string += 'LIMIT 1'

    result = @mysql_connection.query query_string
    queue_entry_hash = result.fetch_hash
    result.free

    if queue_entry_hash
      return { :id => queue_entry_hash['id'],
               :sent_at => Time.parse(queue_entry_hash['sent_at']),
               :user => get_user_by_id(queue_entry_hash['user_id']),
               :directory => queue_entry_hash['directory']
             }
    else
      return nil
    end
  end

  # Returns an array of all users which are at their process quota.
  def users_at_quota
    @user_procs.reject do |user_id,procs|
      procs < @config['limits']['procs_per_user']
    end.keys
  end

  # Deletes the given queued entry from the table.
  def delete_queue_entry(id)
    @mysql_connection.query "DELETE FROM queue_entries WHERE id = 

#{id}"
end

  # Adds information about the given historic job to the history 

table.
def add_historic_job_entry(sent_at, started_at, user, directory)
@mysql_connection.query <<-EOF
INSERT INTO historic_jobs (sent_at,
started_at,
user_id,
directory)
VALUES (’#{sent_at.to_mysql_time}’,
‘#{started_at.to_mysql_time}’,
‘#{user[:id]}’,
‘#{directory}’)
EOF
end

  # Adds information about the given running-job to the running-job
  # table.
  def add_running_job_entry(sent_at, user, directory, pid)
    @mysql_connection.query <<-EOF

INSERT INTO running_jobs (sent_at,
user_id,
directory,
pid)
VALUES (’#{sent_at.to_mysql_time}’,
‘#{user[:id]}’,
‘#{directory}’,
‘#{pid}’)
EOF
end

  # Gets a command from the +commands+ table and deletes it
  def get_command
    # snip
  end

  # Gets the running-job information for a job matching the supplied
  # +pid+, and removes it from the running-jobs table unless
  # :remove is set to false (default is to delete. Example:
  #
  #   get_running_job_by_pid 1053, :remove => false
  #
  def get_running_job_by_pid(pid, options = { :remove => true })
    result = @mysql_connection.query <<-EOF

SELECT * FROM running_jobs WHERE pid = #{pid} LIMIT 1
EOF
result_hash = result.fetch_hash
result.free

    # THIS IS THE ERROR WHICH WAS RAISED
    raise "No job with pid #{pid} in running jobs table." if 

result_hash.nil?

    if options[:remove]
      @mysql_connection.query "DELETE FROM running_jobs WHERE pid = 

#{pid}"
end

    return { :sent_at => Time.parse(result_hash['sent_at']),
             :started_at => Time.parse(result_hash['started_at']),
             :user => get_user_by_id(result_hash['user_id']),
             :directory => result_hash['directory']
           }
  end

  # Clears the running-jobs table, and any STOP commands
  def clear_running_jobs
    @mysql_connection.query "DELETE FROM running_jobs"
    @mysql_connection.query "DELETE FROM commands WHERE command = 

‘stop’"
end

  # Gets user information by the given user_id. User information
  # is in the form <tt>{ :id, :username }</tt>.
  def get_user_by_id(user_id)
    result = @mysql_connection.query <<-EOF

SELECT username FROM users WHERE id = #{user_id} LIMIT 1
EOF
result_hash = result.fetch_hash
result.free

    { :id => user_id, :username => result_hash['username'] }
  end

  # Load configuration from YAML file
  def load_config
    @config = File.open('/etc/matlab_as.yml') { |yf| YAML::load yf }
  end

  def mysql_connect
    mysql_disconnect
    mysql_config = @config['mysql']
    connection = Mysql.new mysql_config['host'],
    mysql_config['username'], mysql_config['password']
    connection.select_db mysql_config['database']

    @mysql_connection = connection
  end

  def mysql_disconnect
    @mysql_connection.close unless @mysql_connection.nil?
  end

  def MatlabAS.mail(address, message)
    # snip
  end
end

end

daemon = MatlabAS::Daemon.new
daemon.close


#2

On Wed, 17 May 2006, Ohad L. wrote:

However, today something strange happened. The server was under heavy load
and using a lot of swap, as usual when two Matlab jobs are running. Two jobs
were indeed running: pids A and B (A < B). However, they were both by the
same user (which shouldn’t happen). Furthermore, Only B was a son of the
daemon, whereas A was a son of init.

do your daemon apparently exited and it doesn’t prevent itself from
running
twice. it should obtain a lockfile or lock itself using posixlock
(lockfile
or posixlock gems on rubyforge) to prevent two instances from ever
running
at the same time. how is your daemon started/kept alive?

The log file said that earlier, a son C died which wasn’t in the SQL table
of running jobs (dying children are removed from there, because it means
they finished running). And weirdest, the SQL table of running jobs
contained job D, which wasn’t run (and belonged to a different user).

none of the db stuff is transactional, so it’s quite possible for
strange
things to happen. for instance

         delete_queue_entry queue_entry[:id]

job removed. process dies. job lost.

         child_pid = fork do
           run_worker_process queue_entry[:user][:username],
                              queue_entry[:directory]
         end

job dies instantly - before adding it to queue - sigchld is caught,

handler triggers, db does not yet contain child_pid.

         add_running_job_entry queue_entry[:sent_at],
                               queue_entry[:user],
                               queue_entry[:directory],
                               child_pid

etc.

it would be very hard to refactor this code the use transactions since
you are
forking and forking is not supported with an open db handle (all writes,
for
instance, get flushed on child/parent exit). in theory, though, you
need code
like this all over the place

db.transaction do
child_pid = start_child job
add_running_job_entry child_pid, job
end

and then the signal handler must be written in such a way that it cannot
start
another transaction - this way you’ll never get sigchld and find no pid
for
the child yet entered. this gets really tricky. the way i did it for
ruby
queue was to setup a drb daemon that does all the forking/waiting for
me.
this way i can operate in the normal syncrhonous fashion instead of in
async
mode using signal handlers. you can read about rq here

http://www.linuxjournal.com/article/7922
http://raa.ruby-lang.org/project/rq/

in partucular i talk about a db/forking issue about halfway down the
first
article - it’s relevant if you have time to read it.

i’m note sure what your setup is, but i suspect rq could serve as your
queue
manager quite well - if you aren’t distributing jobs to many nodes and
run it
only on one node it’s a queue manager instead of a cluster manager.

kind regards.

-a


#3

do your daemon apparently exited and it doesn’t prevent itself from running
twice. it should obtain a lockfile or lock itself using posixlock
(lockfile
or posixlock gems on rubyforge) to prevent two instances from ever
running
at the same time. how is your daemon started/kept alive?

hehe… manually, at this point, so that’s not it. Excellent point
though, and
I will make sure restarting the daemon is lock-protected once automatic
restart
is in place.

none of the db stuff is transactional, so it’s quite possible for
strange
things to happen. for instance

         delete_queue_entry queue_entry[:id]

job removed. process dies. job lost.

I see. So I’d essentially have to remove the job from the queue (it’s
most
definetely not queued anymore), check if it’s running, and only if it is

add it to the running-jobs list? (And otherwise, into some sort of
‘jobs-we-lost’ list kept in the daemon)? Where do transactions come into
play
here?

The main issue you’re talking about is what happens when a job dies
before
getting into the ‘running jobs’ list, and then a ghost comes into the
list.
This would indeed stall the queue, but rather than add two mechanisms
(transactions and DRb), I’d rather just have the daemon periodically
check
that jobs it thinks are running actually are. Some very useful info
though,
much thanks!

However, what I’m more seriously concerned about is the apparent
‘reparenting’
of the job. I’ll try and be a bit more clear about what happened: Two
jobs
are currently running on the server, and this is the output of pstree on
the pid of the daemon:

$ pstree -p 24788
ruby(24788)â??â?¬â??ruby(4854)â??â??â??bash(4855)â??â??â??MATLAB(4856)â??â??â??matlab_helper(4936)
â??â??ruby(24859)â??â??â??bash(24861)â??â??â??MATLAB(24862)â??â??â??matlab_helper(24929)

So 24788 is my daemon, 4854 and 24859 are the forks (those are the PIDs)
in the
database, and 4856 and 24862 are eating up my CPU :slight_smile:

Now, here’s an analog of how it was in the broken state:

bash(4855)â??â??â??MATLAB(4856)â??â??â??matlab_helper(4936)

ruby(24859)â??â??â??bash(24861)â??â??â??MATLAB(24862)â??â??â??matlab_helper(24929)

Meaning - parent dead (but its orphans live, now reparanted to init for
some
reason), and one of the bashes got reparanted to init. Kinda looks like
matlab
was run by a user, but it wasn’t (verified).

Unless I’m missing something, this doesn’t have to do with my lack of
transactions, or things going out of sync… any ideas?


#4

On Thu, 18 May 2006, Ohad L. wrote:

I see. So I’d essentially have to remove the job from the queue (it’s most
definetely not queued anymore), check if it’s running, and only if it is -
add it to the running-jobs list? (And otherwise, into some sort of
‘jobs-we-lost’ list kept in the daemon)? Where do transactions come into
play here?

deleting a job from the queue, forking the child, and adding the cid to
the
running_jobs table need to be transaction protected.

problems:

  • you cannot fork with an open connection

    http://lists.mysql.com/mysql/1127

    you need a way to do this:

    - start a child process
    - get it's pid
    - start a transaction
    - remove job from queue
    - add pid to running_jobs
    - start job running
    - commit transaction
    

    ruby queue does exactly this to avoid the db/fork issue by
    setting up a
    drb process manger. the process manager starts a pipe to a bash
    login
    shell (so user gets nice environment) and returns the pid.
    during the
    transaction the command to be run is sent down the pipe. in this
    way we
    can get the pid of the child and yet not start it ‘running’. i
    think
    this is the only way to avoid forking in the transaction but, if
    you do
    not use transactions you’ll always have border problems.

  • your reap model is async to, in addition to the above, we have to
    be
    prepared for any transaction to be interupted by a signal
    handler.
    this is, ihmo, impossible for all but the most gifted of programers
    and
    not worth the hassle. i know i could not make this model work
    correctly.

fwiw my drb process manger in ruby queue is only about 100 lines of code
and
completely eliminates the above issue. it’s used like this (psuedo
code)

job_manager = JobManager.new # drb server running on localhost in
another process

loop do

 while number_of_running_jobs < max_number_of_running_jobs
   child = job_manager.new_child

   db.transaction do
     remove_from_queue child.pid
     child.run command
     add_to_running_jobs child.pid
   end
 end

 status = job_manager.wait -1 # wait for anyone to finish

 db.transaction do
   job_is_finished status.pid, status.exit_status
 end

end

note that all the forking and waiting occurs in another process! this
hugely simplifies the process of making the job queue service
correct and
robust.

the database, and 4856 and 24862 are eating up my CPU :slight_smile:

Now, here’s an analog of how it was in the broken state:

bash(4855)â??â??â??MATLAB(4856)â??â??â??matlab_helper(4936)

ruby(24859)â??â??â??bash(24861)â??â??â??MATLAB(24862)â??â??â??matlab_helper(24929)

Meaning - parent dead (but its orphans live, now reparanted to init for some
reason), and one of the bashes got reparanted to init. Kinda looks like
matlab was run by a user, but it wasn’t (verified).

it looks like your daemon aborted and was restarted and, because it’s
munged
the nice child handling of ruby, children were orphaned. when children
are
orphaned they become children of init (i guess you knew that…).
anyhow,
matlab and idl tweak signals themselves so i guess so it can become very
ugly
to mange children with signals. ruby queue handles this by making sure
all
children are collected in the normal way or, if that fails, killing them
with
-9 and then collecting them on exit - in otherwords it simply refuses to
exit
until all children are collected.

to me this really just looks like the daemon was killed, didn’t collect
every
child on exit, and therfore orphans were picked up by init.

Unless I’m missing something, this doesn’t have to do with my lack of
transactions, or things going out of sync… any ideas?

yes - it’s a separate issue. regardless - making an external process
manger
willl greatly simplify the whole program wrst both transactions and nice
child
waiting behaviour.

regards.

-a