Process Rings (#135)

I can’t speak for the other solvers of course, but this kind of system
programming is always complex for me. I had to fiddle with my own
solutions for
quite some time to get everything working as expected. In my fork()ed
ring, I
particularly struggled with the pipe creation dance. This kind of task
is also
hard for me to think of good unit testing strategies for, so I struggled
through
it without my usual safety net.

To break down a solution, let’s go with James K.'s code. It’s a
Threaded
version and a little easier to follow than my own attempt. Here’s the
setup
code:

$processes = []

def kill_processes
$processes.each do |thr|
thr.exit!
end
end

James begins by carving out some space to hold his “processes,” really
Threads
in this case, in a global variable. A helper is also defined here to
kill all
of those Threads off at the end of the program execution.

The next bit of code defines what each process does. It defines a Proc
object
that calls itself recursively both to build the ring and pass messages.
Let’s
take on just the ring building half first:

thread_proc = proc do
Thread.stop
if Thread.current[:count] == 0
Thread.current[:next] = $processes.first
else
Thread.current[:next] = Thread.new(&thread_proc)
Thread.current[:next][:count] = Thread.current[:count] - 1
$processes.push(Thread.current[:next])
true until Thread.current[:next].stop?
Thread.current[:next].run
end

# ...

Don’t let all those calls to Thread.current() scare you, this code is
very
simple. This Proc is meant to be passed as the body of a Thread, so the
very
first thing is does is stop() the newly created Thread. This allows the
creator
to do some setup, then trigger the Thread to run().

The if/else branch does the process creation, just by trimming down a
counter.
For each point, the code points the :next Thread local variable at a new
Thread
that recursively calls this Proc. The new Thread is given a lower count
and the
cycle repeats. When we reach zero, the final Thread points itself at
the first
one created and we have ourselves a ring.

If you haven’t seen it before, this code makes heavy use of Ruby’s
Thread local
variable accessors. The short story is that you can treat a Thread as a
Hash to
access some Thread local storage. So everywhere you see Thread.current
above,
James is accessing a Thread local variable for the currently running
Thread.
Similarly, Thread.current[:next] gives us the next Thread in the ring.

Here’s the second half of the Proc, the message passing code:

# ...

while true
  Thread.stop
  msg = Thread.current[:message]
  cnt = Thread.current[:message_count]

  Thread.current[:message] = nil
  Thread.current[:message_count] = nil

  if cnt == 0
    kill_processes
  else
    Thread.current[:next][:message_count] = cnt - 1
    Thread.current[:next][:message] = msg
    # On small rings, the message can circle around before the
    # first thread has stopped
    true until Thread.current[:next].stop?
    Thread.current[:next].run
  end
end

end

This works almost exactly like the code we just examined. It stop()s
the
Thread, sets variables for the message and count, and taps the next
Thread to
run. When the count is exhausted, all Threads simply exit.

The application code that kicks this process off is all we have left:

processes, cycles = ARGV.map{|n| n.chomp.to_i}

$processes.push(Thread.new(&thread_proc))
true until $processes.first.stop?
$processes.first[:count] = processes - 1
$processes.first.run

puts “Creating #{processes} processes…”
sleep(0.1) until $processes.length == processes

puts “Timer started.”
start_time = Time.new

puts “Sending a message around the ring #{cycles} times…”
$processes.first[:message_count] = processes * cycles
$processes.first[:message] = “Good day!”
$processes.first.run

sleep(0.1) while $processes.first.alive?

puts “Done.”
puts "Time in seconds: " + (Time.new.to_i - start_time.to_i).to_s

Again, this code is much like what we have already examined. The code
here is
simplified though, since it only needs to interact with the first Thread
created.

I want to add a few observations about this code.

First, aside from the main Thread used to monitor the process, this code
does
not do any concurrent processing. Each thread is stopped between
message passes
and restarted when it’s time to do more work. The code can be adapted
to work
concurrently, say if some processing was needed at each step, but we
need to
introduce some form of synchronization when we add that. You can have a
look at
my own Threaded solution for one way to go about that.

The other point that surprised me about this code is that it’s quite a
bit
slower than my own Threaded code. I expected it to be much faster
without the
synchronization. I’m guessing that all of that Thread stop()ping and
run()ning
slows things down a bit, but that’s just a guess.

My thanks to the other solvers who sent in mighty clever code. Be sure
and have
a look at those solutions as well. Definitely don’t miss the virtual
machine in
pure Ruby from Adam S…

Tomorrow we will try some binary parsing…