Forum: Ruby Weird forking/rejoining issue

Announcement (2017-05-07): www.ruby-forum.com is now read-only since I unfortunately do not have the time to support and maintain the forum any more. Please see rubyonrails.org/community and ruby-lang.org/en/community for other Rails- und Ruby-related community platforms.
426d8be2204a36c4e609ace097d7c554?d=identicon&s=25 Ohad Lutzky (ohad)
on 2006-05-17 16:11
I'm a Unix admin at a research lab in the technion, and use (and
loooove) Ruby
for all kinds of useful applications I wrote for use in the lab. One of
them is
the Matlab Application Server, a queue/batch manager for Matlab jobs.
Old version
was perl, text files, and one file per job - new version is Rails for
the UI
(which is on another machine), vanilla Ruby for the daemon, and MySQL to
communicate between them.

Usually it works spectacularily, and people don't have to lock down the
Windows
workstations overnight (or over many nights) so they finish their jobs.
The queue
is also rather fair - 2 simultaneous jobs, 1 per each user.

However, today something strange happened. The server was under heavy
load and
using a lot of swap, as usual when two Matlab jobs are running. Two jobs
were
indeed running: pids A and B (A < B). However, they were both by the
same user
(which shouldn't happen). Furthermore, Only B was a son of the daemon,
whereas
A was a son of init. The log file said that earlier, a son C died which
wasn't
in the SQL table of running jobs (dying children are removed from there,
because
it means they finished running). And weirdest, the SQL table of running
jobs
contained job D, which wasn't run (and belonged to a different user).

Very odd, and I can't seem to find the bug. So here are relevant bits of
code,
if anyone has an idea I'd greatly appreciate it.

  (snip)

  module MatlabAS
    class Daemon
      def initialize
        load_config
        @total_procs = 0
        @user_procs = {}

        mysql_connect

        setup_signals
        clear_running_jobs
        main_loop
      end

      # snip

      protected

      # Main program loop
      def main_loop
        @continue_running = true
        while @continue_running
          mysql_connect
          begin
            handle_command # For handling manual 'kill job' commands,
not relevant
          rescue
            $stderr.print 'Error handling command: '
            $stderr.puts $!
          end
          if @total_procs < @config['limits']['total_procs']
            queue_entry = get_queue_entry
            if queue_entry
              user_id = queue_entry[:user][:id]
              @user_procs[user_id] = 0 if @user_procs[user_id].nil?
              @user_procs[user_id] += 1
              @total_procs += 1

              delete_queue_entry queue_entry[:id]

              child_pid = fork do
                run_worker_process queue_entry[:user][:username],
                                   queue_entry[:directory]
              end
              add_running_job_entry queue_entry[:sent_at],
                                    queue_entry[:user],
                                    queue_entry[:directory],
                                    child_pid
            end
          end

          mysql_disconnect

          really_sleep @config['limits']['sleep_cycle']
        end
      end

      # Make sure that the duration is slept, even if signals were
caught
      def really_sleep(duration=0)
        # Special case for 0-duration sleep, even though we don't use it
        if duration == 0
          sleep 0
        else
          nap_remaining = duration
          while nap_remaining > 0
            nap_remaining -= sleep nap_remaining
          end
        end
      end

      # Do cleanup-work when children die
      def handle_dying_children
        mysql_connect
        foreach_dying_child do |child_pid|
          begin
            child = get_running_job_by_pid child_pid
            MatlabAS.mail child[:user][:username], "(snipped)"
            @total_procs -= 1
            @user_procs[child[:user][:id]] -= 1
            add_historic_job_entry child[:sent_at], child[:started_at],
                                   child[:user], child[:directory]
          rescue
            $stderr.print 'Error rescuing a dying child: '
            $stderr.puts $!
          end
        end
      end

      # Loop across all currently dying children, yielding each
      # one's +pid+. Very useful for handling +SIGCHLD+.
      def foreach_dying_child
        begin
          while pid = Process.waitpid(-1,Process::WNOHANG)
            yield pid
          end
        rescue Errno::ECHILD
          # Out of dying children, no problem.
        end
      end

      # Run an actual matlab job
      def run_worker_process(user, directory)
        user_info = Etc.getpwnam user
        work_pathname = Base_path + user + directory
        setup_signals false
        Process::Sys.setgid user_info.gid
        Process::Sys.setuid user_info.uid

        # snip, handles errors for missing directory / startup file

        # config variables were simplified here, obviously they're in
        # a nice hash

        system "#@bash -c \"#@matlab < #@startup_file > #@output 2>
#@error\""
      end

      # Set up signal handlers. Default is to set up the signal handlers
      # for a parent process, but if +parent+ is false, signals are set
      # up for a child.
      def setup_signals(parent = true)
        if parent
          Signal.trap('CHLD') { handle_dying_children }

          Signal.trap('INT') do
            puts "User interrupt. Dying gracefully..."
            self.close
          end

          Signal.trap('TERM') do
            puts "Termination requested. Dying gracefully..."
            self.close
          end

          Signal.trap('HUP') do
            puts "Configuration reload requested, reloading config."
            load_config
          end
        else
          Signal.trap('CHLD', 'DEFAULT')
          Signal.trap('INT', 'DEFAULT')
          Signal.trap('TERM', 'DEFAULT')
          Signal.trap('HUP', 'DEFAULT')
        end
      end

      def handle_command
        # Snip, kills a job if such an instruction is in the table
      end

      # Reads one queue entry from the +queue_entries+ table, in which
the
      # user doesn't match any of the ones who have currently hit their
      # process quota. If none are available, returns +nil+.
      def get_queue_entry
        query_string = 'SELECT * FROM queue_entries '

        if users_at_quota.size > 0
          query_string += 'WHERE ' +
            users_at_quota.collect do |user_id|
              "user_id != #{user_id}"
            end.join(' AND ') + ' '
        end

        query_string += 'LIMIT 1'

        result = @mysql_connection.query query_string
        queue_entry_hash = result.fetch_hash
        result.free

        if queue_entry_hash
          return { :id => queue_entry_hash['id'],
                   :sent_at => Time.parse(queue_entry_hash['sent_at']),
                   :user => get_user_by_id(queue_entry_hash['user_id']),
                   :directory => queue_entry_hash['directory']
                 }
        else
          return nil
        end
      end

      # Returns an array of all users which are at their process quota.
      def users_at_quota
        @user_procs.reject do |user_id,procs|
          procs < @config['limits']['procs_per_user']
        end.keys
      end

      # Deletes the given queued entry from the table.
      def delete_queue_entry(id)
        @mysql_connection.query "DELETE FROM queue_entries WHERE id =
#{id}"
      end

      # Adds information about the given historic job to the history
table.
      def add_historic_job_entry(sent_at, started_at, user, directory)
        @mysql_connection.query <<-EOF
  INSERT INTO historic_jobs (sent_at,
                             started_at,
                             user_id,
                             directory)
         VALUES ('#{sent_at.to_mysql_time}',
                 '#{started_at.to_mysql_time}',
                 '#{user[:id]}',
                 '#{directory}')
        EOF
      end

      # Adds information about the given running-job to the running-job
      # table.
      def add_running_job_entry(sent_at, user, directory, pid)
        @mysql_connection.query <<-EOF
  INSERT INTO running_jobs (sent_at,
                            user_id,
                            directory,
                            pid)
         VALUES ('#{sent_at.to_mysql_time}',
                 '#{user[:id]}',
                 '#{directory}',
                 '#{pid}')
        EOF
      end

      # Gets a command from the +commands+ table and deletes it
      def get_command
        # snip
      end

      # Gets the running-job information for a job matching the supplied
      # +pid+, and removes it from the running-jobs table unless
      # :remove is set to false (default is to delete. Example:
      #
      #   get_running_job_by_pid 1053, :remove => false
      #
      def get_running_job_by_pid(pid, options = { :remove => true })
        result = @mysql_connection.query <<-EOF
  SELECT * FROM running_jobs WHERE pid = #{pid} LIMIT 1
        EOF
        result_hash = result.fetch_hash
        result.free

        # THIS IS THE ERROR WHICH WAS RAISED
        raise "No job with pid #{pid} in running jobs table." if
result_hash.nil?

        if options[:remove]
          @mysql_connection.query "DELETE FROM running_jobs WHERE pid =
#{pid}"
        end

        return { :sent_at => Time.parse(result_hash['sent_at']),
                 :started_at => Time.parse(result_hash['started_at']),
                 :user => get_user_by_id(result_hash['user_id']),
                 :directory => result_hash['directory']
               }
      end

      # Clears the running-jobs table, and any STOP commands
      def clear_running_jobs
        @mysql_connection.query "DELETE FROM running_jobs"
        @mysql_connection.query "DELETE FROM commands WHERE command =
'stop'"
      end

      # Gets user information by the given user_id. User information
      # is in the form <tt>{ :id, :username }</tt>.
      def get_user_by_id(user_id)
        result = @mysql_connection.query <<-EOF
  SELECT username FROM users WHERE id = #{user_id} LIMIT 1
        EOF
        result_hash = result.fetch_hash
        result.free

        { :id => user_id, :username => result_hash['username'] }
      end

      # Load configuration from YAML file
      def load_config
        @config = File.open('/etc/matlab_as.yml') { |yf| YAML::load yf }
      end

      def mysql_connect
        mysql_disconnect
        mysql_config = @config['mysql']
        connection = Mysql.new mysql_config['host'],
        mysql_config['username'], mysql_config['password']
        connection.select_db mysql_config['database']

        @mysql_connection = connection
      end

      def mysql_disconnect
        @mysql_connection.close unless @mysql_connection.nil?
      end

      def MatlabAS.mail(address, message)
        # snip
      end
    end
  end

  daemon = MatlabAS::Daemon.new
  daemon.close
Cb48ca5059faf7409a5ab3745a964696?d=identicon&s=25 unknown (Guest)
on 2006-05-17 17:00
(Received via mailing list)
On Wed, 17 May 2006, Ohad Lutzky wrote:

>
> However, today something strange happened. The server was under heavy load
> and using a lot of swap, as usual when two Matlab jobs are running. Two jobs
> were indeed running: pids A and B (A < B). However, they were both by the
> same user (which shouldn't happen). Furthermore, Only B was a son of the
> daemon, whereas A was a son of init.

do your daemon apparently exited and it doesn't prevent itself from
running
twice.  it should obtain a lockfile or lock itself using posixlock
(lockfile
or posixlock gems on rubyforge) to prevent two instances from ever
running
at the same time.  how is your daemon started/kept alive?

> The log file said that earlier, a son C died which wasn't in the SQL table
> of running jobs (dying children are removed from there, because it means
> they finished running).  And weirdest, the SQL table of running jobs
> contained job D, which wasn't run (and belonged to a different user).

none of the db stuff is transactional, so it's quite possible for
strange
things to happen.  for instance

1)
>              delete_queue_entry queue_entry[:id]
   #
   # job removed.  process dies.  job lost.
   #


2)
>
>              child_pid = fork do
>                run_worker_process queue_entry[:user][:username],
>                                   queue_entry[:directory]
>              end
   #
   # job dies instantly - before adding it to queue - sigchld is caught,
   # handler triggers, db does not yet contain child_pid.
   #
>              add_running_job_entry queue_entry[:sent_at],
>                                    queue_entry[:user],
>                                    queue_entry[:directory],
>                                    child_pid


etc.

it would be very hard to refactor this code the use transactions since
you are
forking and forking is not supported with an open db handle (all writes,
for
instance, get flushed on child/parent exit).  in theory, though, you
need code
like this all over the place

   db.transaction do
     child_pid = start_child job
     add_running_job_entry child_pid, job
   end

and then the signal handler must be written in such a way that it cannot
start
another transaction - this way you'll never get sigchld and find no pid
for
the child yet entered.  this gets really tricky.  the way i did it for
ruby
queue was to setup a drb daemon that does all the forking/waiting for
me.
this way i can operate in the normal syncrhonous fashion instead of in
async
mode using signal handlers.  you can read about rq here

   http://www.linuxjournal.com/article/7922
   http://raa.ruby-lang.org/project/rq/

in partucular i talk about a db/forking issue about halfway down the
first
article - it's relevant if you have time to read it.

i'm note sure what your setup is, but i suspect rq could serve as your
queue
manager quite well - if you aren't distributing jobs to many nodes and
run it
only on one node it's a queue manager instead of a cluster manager.

kind regards.

-a
426d8be2204a36c4e609ace097d7c554?d=identicon&s=25 Ohad Lutzky (ohad)
on 2006-05-17 17:59
> do your daemon apparently exited and it doesn't prevent itself from running
> twice.  it should obtain a lockfile or lock itself using posixlock
> (lockfile
> or posixlock gems on rubyforge) to prevent two instances from ever
> running
> at the same time.  how is your daemon started/kept alive?

hehe... manually, at this point, so that's not it. Excellent point
though, and
I will make sure restarting the daemon is lock-protected once automatic
restart
is in place.

> none of the db stuff is transactional, so it's quite possible for
> strange
> things to happen.  for instance
>
> 1)
>>              delete_queue_entry queue_entry[:id]
>    #
>    # job removed.  process dies.  job lost.
>    #

I see. So I'd essentially have to remove the job from the queue (it's
most
definetely not queued anymore), check if it's running, and only if it is
-
add it to the running-jobs list? (And otherwise, into some sort of
'jobs-we-lost' list kept in the daemon)? Where do transactions come into
play
here?

The main issue you're talking about is what happens when a job dies
before
getting into the 'running jobs' list, and then a ghost comes into the
list.
This would indeed stall the queue, but rather than add two mechanisms
(transactions and DRb), I'd rather just have the daemon periodically
check
that jobs it thinks are running actually are. Some very useful info
though,
much thanks!

However, what I'm more seriously concerned about is the apparent
'reparenting'
of the job. I'll try and be a bit more clear about what happened: Two
jobs
are currently running on the server, and this is the output of pstree on
the pid of the daemon:

$ pstree -p 24788
ruby(24788)â??â?¬â??ruby(4854)â??â??â??bash(4855)â??â??â??MATLAB(4856)â??â??â??matlab_helper(4936)
            â??â??ruby(24859)â??â??â??bash(24861)â??â??â??MATLAB(24862)â??â??â??matlab_helper(24929)

So 24788 is my daemon, 4854 and 24859 are the forks (those are the PIDs)
in the
database, and 4856 and 24862 are eating up my CPU :)

Now, here's an analog of how it was in the broken state:

bash(4855)â??â??â??MATLAB(4856)â??â??â??matlab_helper(4936)

ruby(24859)â??â??â??bash(24861)â??â??â??MATLAB(24862)â??â??â??matlab_helper(24929)

Meaning - parent dead (but its orphans live, now reparanted to init for
some
reason), and one of the bashes got reparanted to init. Kinda looks like
matlab
was run by a user, but it wasn't (verified).

Unless I'm missing something, this doesn't have to do with my lack of
transactions, or things going out of sync... any ideas?
Cb48ca5059faf7409a5ab3745a964696?d=identicon&s=25 unknown (Guest)
on 2006-05-17 19:18
(Received via mailing list)
On Thu, 18 May 2006, Ohad Lutzky wrote:

> I see. So I'd essentially have to remove the job from the queue (it's most
> definetely not queued anymore), check if it's running, and only if it is -
> add it to the running-jobs list? (And otherwise, into some sort of
> 'jobs-we-lost' list kept in the daemon)? Where do transactions come into
> play here?

deleting a job from the queue, forking the child, and adding the cid to
the
running_jobs table need to be transaction protected.

problems:

   - you cannot fork with an open connection

       http://lists.mysql.com/mysql/1127

       you need a way to do this:

         - start a child process
         - get it's pid
         - start a transaction
         - remove job from queue
         - add pid to running_jobs
         - start job running
         - commit transaction

       ruby queue does exactly this to avoid the db/fork issue by
setting up a
       drb process manger.  the process manager starts a pipe to a bash
login
       shell (so user gets nice environment) and returns the pid.
during the
       transaction the command to be run is sent down the pipe.  in this
way we
       can get the pid of the child and yet not start it 'running'.  i
think
       this is the only way to avoid forking in the transaction but, if
you do
       not use transactions you'll always have border problems.

   - your reap model is async to, in addition to the above, we have to
be
     prepared for __any__ transaction to be interupted by a signal
handler.
     this is, ihmo, impossible for all but the most gifted of programers
and
     not worth the hassle.  i know i could not make this model work
correctly.

fwiw my drb process manger in ruby queue is only about 100 lines of code
and
completely eliminates the above issue.  it's used like this (psuedo
code)

   job_manager = JobManager.new  # drb server running on localhost in
another process

   loop do

     while number_of_running_jobs < max_number_of_running_jobs
       child = job_manager.new_child

       db.transaction do
         remove_from_queue child.pid
         child.run command
         add_to_running_jobs child.pid
       end
     end

     status = job_manager.wait -1 # wait for anyone to finish

     db.transaction do
       job_is_finished status.pid, status.exit_status
     end

   end

note that all the forking and waiting occurs in another process!  this
__hugely__ simplifies the process of making the job queue service
correct and
robust.


> the database, and 4856 and 24862 are eating up my CPU :)
>
> Now, here's an analog of how it was in the broken state:
>
> bash(4855)â??â??â??MATLAB(4856)â??â??â??matlab_helper(4936)
>
> ruby(24859)â??â??â??bash(24861)â??â??â??MATLAB(24862)â??â??â??matlab_helper(24929)
>
> Meaning - parent dead (but its orphans live, now reparanted to init for some
> reason), and one of the bashes got reparanted to init. Kinda looks like
> matlab was run by a user, but it wasn't (verified).

it looks like your daemon aborted and was restarted and, because it's
munged
the nice child handling of ruby, children were orphaned.  when children
are
orphaned they become children of init (i guess you knew that...).
anyhow,
matlab and idl tweak signals themselves so i guess so it can become very
ugly
to mange children with signals.  ruby queue handles this by making sure
all
children are collected in the normal way or, if that fails, killing them
with
-9 and then collecting them on exit - in otherwords it simply refuses to
exit
until all children are collected.

to me this really just looks like the daemon was killed, didn't collect
every
child on exit, and therfore orphans were picked up by init.

> Unless I'm missing something, this doesn't have to do with my lack of
> transactions, or things going out of sync... any ideas?

yes - it's a separate issue.  regardless - making an external process
manger
willl greatly simplify the whole program wrst both transactions and nice
child
waiting behaviour.

regards.

-a
This topic is locked and can not be replied to.