Process Rings (#135)

The three rules of Ruby Q.:

  1. Please do not post any solutions or spoiler discussion for this quiz
    until
    48 hours have passed from the time on this message.

  2. Support Ruby Q. by submitting ideas as often as you can:

http://www.rubyquiz.com/

  1. Enjoy!

Suggestion: A [QUIZ] in the subject of emails about the problem helps
everyone
on Ruby T. follow the discussion. Please reply to the original quiz
message,
if you can.

-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=

I recently wrote about a challenge in the Programming Erlang book on my
blog:

Gray Soft / Not Found

Language comparison issues aside, just figuring out how to build a ring
of
“processes” was quite the brain bender for me. That always makes for
good Ruby
Quiz material, in my opinion.

The task is straight forward:

  1. Your program should take two command-line arguments: a number of
    processes and a number of cycles.
  2. Begin by creating the requested number of processes, in a ring.
    For example, when three processes are requested, process one
    creates and sends messages to process two, which creates and sends
    messages to process three. The third process then sends its
    messages back to process one.
  3. Pass a message around your ring of processes a number of times
    equal to the requested cycles. Print timing results for how
    long this takes.

The message you pass doesn’t much matter. A simple String is fine. You
may
also wish to pass a counter with it, to verify the correct number of
sends.

I’ll leave the definition of “processes” intentionally vague. Ruby
doesn’t have
an equivalent to Erlang processes so we will just say that each process
should
represent a node where we could run some instructions concurrently. Be
creative.

Ok… this is my first attempt at using callcc in ruby. The code
is not what I would call beautiful, but should be readable…(??).
Suggestions are most welcome!

#!/usr/bin/env ruby

vim:et:ts=4:sw=4

$n = 0 if $DEBUG

class RLWP
def initalize
@nxt = nil
end
attr_accessor :nxt
def makecont
passesleft, message = callcc { |cont|
return cont
}
if passesleft <= 0
puts $n if $DEBUG
exit 0
end
$n += 1 if $DEBUG
@nxt.call(passesleft - 1, message)
end
end

def run(n, cycles, msg)
process = Array.new(n) { RLWP.new }
cont = process.collect { |p| p.makecont }
process.each_with_index { |p,i| p.nxt = cont[(i+1) % n] }
cont[0].call(n * cycles, msg)
end

run ARGV[0].to_i, ARGV[1].to_i, “xyzzy”

tom@molly:~/src/quiz-135% time ruby -d quiz-135-callcc.rb 1000 1000
1000000
ruby -d quiz-135-callcc.rb 1000 1000 10.781 user 0.037 system 99% cpu
10.868 total
tom@molly:~/src/quiz-135%

At somewhere around 12000 processes ruby started crashing on my system,
maybe a memory issue?

tom@molly:~/src/quiz-135% time ruby quiz-135-callcc.rb 12000 3
[2] 30579 illegal hardware instruction (core dumped) ruby
quiz-135-callcc.rb 12000 3
ruby quiz-135-callcc.rb 12000 3 1.695 user 0.713 system 32% cpu 7.418
total
tom@molly:~/src/quiz-135%

regards,
-tom

On Aug 19, 2007, at 6:49 PM, Tom Danielsen wrote:

Ok… this is my first attempt at using callcc in ruby.

Neat idea.

Here’s my own version using fork():

#!/usr/bin/env ruby -wKU

unless ARGV.size == 2
abort “Usage: #{File.basename($PROGRAM_NAME)} PROCESSES CYCLES”
end
processes, cycles = ARGV.map { |n| n.to_i }

parent, child = true, false
parent_reader, parent_writer = IO.pipe
reader, writer = IO.pipe
my_reader = parent_reader

puts “Creating #{processes} processes…”
processes.times do |process|
if fork
break
else
parent_reader.close unless parent_reader.closed?
writer.close

 parent         = false
 my_reader      = reader
 reader, writer = IO.pipe

end
child = true if process == processes - 1
end
if child
puts “Done.”
my_writer = parent_writer
else
parent_writer.close
my_writer = writer
end

if parent
puts “Timer started.”
start_time = Time.now
puts “Sending a message around the ring #{cycles} times…”
cycles.times do
my_writer.puts “0 Ring message”
my_writer.flush
raise “Failure” unless my_reader.gets =~ /\A#{processes} Ring
message\Z/
end
puts “Done: success.”
puts “Time in seconds: #{(Time.now - start_time).to_i}”
else
my_reader.each do |message|
if message =~ /\A(\d+)\s+(.+)/
my_writer.puts “#{$1.to_i + 1} #{$2}”
my_writer.flush
end
end
end

END

And here’s a threaded attempt:

#!/usr/bin/env ruby -wKU

begin
require “fastthread”
puts “Using the fastthread library.”
rescue LoadError
require “thread”
puts “Using the standard Ruby thread library.”
end

module MRing
class Forward
def initialize(count, parent)
@child = count.zero? ? parent : Forward.new(count - 1, parent)
@queue = Queue.new

   run
 end

 def send_message(message)
   @queue.enq message
 end

 private

 def run
   Thread.new do
     loop do
       message = @queue.deq
       if message =~ /\A(\d+)\s+(.+)/
         @child.send_message "#{$1.to_i + 1} #{$2}"
       end
     end
   end
 end

end

class Parent < Forward
def initialize(processes, cycles)
@processes = processes
@cycles = cycles

   puts "Creating #{processes} processes..."
   super(processes, self)
 end

 private

 def run
   puts "Timer started."
   start_time = Time.now
   puts "Sending a message around the ring #{@cycles} times..."
   @cycles.times do
     @child.send_message "0 Ring message"
     raise "Failure" unless @queue.deq =~ /\A#{@processes} Ring

message\Z/
end
puts “Done: success.”
puts “Time in seconds: #{(Time.now - start_time).to_i}”
end
end
end

if FILE == $PROGRAM_NAME
unless ARGV.size == 2
abort “Usage: #{File.basename($PROGRAM_NAME)} PROCESSES CYCLES”
end
processes, cycles = ARGV.map { |n| n.to_i }

MRing::Parent.new(processes, cycles)
end

END

James Edward G. II

def initalize
    @nxt = nil
end

oops, a slight copy/paste error there… run() will call
nxt= before calling the continuations so the code still runs,
but I like to set my instance vars in the constructor anyway :slight_smile:

regards,
tom

I’ll resubmit my entry,

  • fixed typo “initialize”
  • cleaner exit from message passing code

regards,
-tom

#!/usr/bin/env ruby

vim:et:ts=4:sw=4

$n = 0 if $DEBUG

class RLWP
def initialize
@nxt = nil
end
attr_accessor :nxt
def makecont
passesleft, message = callcc { |cont|
return cont
}
if passesleft <= 0
puts $n if $DEBUG
throw :DONE
end
$n += 1 if $DEBUG
@nxt.call(passesleft - 1, message)
end
end

def run(n, cycles, msg)
catch(:DONE) {
process = Array.new(n) { RLWP.new }
cont = process.collect { |p| p.makecont }
process.each_with_index { |p,i| p.nxt = cont[(i+1) % n] }
cont[0].call(n * cycles, msg)
}
end

run ARGV[0].to_i, ARGV[1].to_i, “xyzzy”

On Aug 22, 2007, at 1:45 PM, Adam S. wrote:

I’ve been thinking about virtual machines recently, so I decided to
implement one in Ruby for this quiz.

That gets my vote for the craziest Ruby Q. solution ever. Wow!

You have no idea how much I regret that I wrote the quiz summary
earlier today. :frowning:

James Edward G. II

It is too rare that I find time to solve the Quiz, but this time I
just had to find that time to make it back, and I enjoyed it as ever
:slight_smile:
Here goes my shot:

require ‘labrador/enum/map’
require ‘labrador/exp/open-proto’
require ‘thread’

processes, cycles = ARGV.map.to_i

timer = new_proto{
def init
obj_variable :stopped, nil
obj_variable :started, Time.now
obj_variable :ellapsed, nil
end
def read
self.ellapsed ||=
stopped.tv_sec - started.tv_sec + (
stopped.tv_usec - started.tv_usec
) / 1_000_000.0
end
def reset
self.stopped = nil
self.started = Time.now
end
def stop
self.stopped = Time.now
end
}
ring_element = new_proto(Prototype::OpenProto){
define_method :init do |params|
super
obj_variable :thread, Thread.new{
cycles.times do |i|
m = lhs_queue.deq
rhs_queue.enq “thread=#{count}::count=#{i}”
end
}
end
}

startup_timer = timer.new
lqueue = Queue.new
all_processes = (2…processes).map{ |count|
ring_element.new :count => count,
:lhs_queue => lqueue,
:rhs_queue => ( lqueue = Queue.new )
}
all_processes << ring_element.new(
:count => 1,
:lhs_queue => lqueue,
:rhs_queue => all_processes.first.lhs_queue
)
startup_timer.stop
run_timer = timer.new
all_processes.last.lhs_queue.enq “Can you please start”
all_processes.map.thread.map.join
run_timer.stop
puts “Startup time for #{processes} processes: %3.6fs” %
startup_timer.read
puts “Runtime for #{processes} processes and #{cycles} cycles: %3.6fs”
% run_timer.read

On 8/22/07, James Edward G. II [email protected] wrote:

On Aug 22, 2007, at 1:45 PM, Adam S. wrote:

I’ve been thinking about virtual machines recently, so I decided to
implement one in Ruby for this quiz.

That gets my vote for the craziest Ruby Q. solution ever. Wow!

You have no idea how much I regret that I wrote the quiz summary
earlier today. :frowning:

You have no idea how much I regret that I had a bunch of real work to
do and didn’t have time to finish it up sooner. :slight_smile:

I also wish I had time and a valid reason to keep working on extending
the VM. The first thing I’d do is find a good name for the
language…

-Adam

On 8/17/07, Ruby Q. [email protected] wrote:


I’ll leave the definition of “processes” intentionally vague. Ruby doesn’t have
an equivalent to Erlang processes so we will just say that each process should
represent a node where we could run some instructions concurrently. Be
creative.

I’ve been thinking about virtual machines recently, so I decided to
implement one in Ruby for this quiz.

My solution has 4 parts:

  • a pair of programs in an as yet unnamed language. Each program
    sends a message to the next process. One program implements a counter
    to stop the loop at the end.

  • A Compiler for this language. To keep the compiler simple, the
    parser has no look-ahead. One result of this is that all operators
    have left associativity, so
    ‘n=n+1’, ‘n=1+n’, and ‘n=(n+1)’ all set n to different values, and
    only the last one is unsuprising. The compiler returns an array
    containing ‘assembly’, which is essentially bytecode, except the
    codes are ruby symbols, not bytes.

  • The InstructionSet, which contains a method for each VM instruction

  • The virtual CPU. Schedules and runs a set of Proceses. A Process
    executes assembly by sending each symbol to the instruction set.

This solution may be the slowest one submitted, but it was interesting
to write.
-Adam

—BEGIN SOLUTION—
#processring.rb

for Ruby Q. #135

Adam S.

#Implements a virtual machine and a compiler for a simple language

language definition:

types: ints, strings

variables don’t need to be declared in advance (works like ruby)

only 3 keywords: ‘exit’, ‘if’ and ‘while’,

the latter two take the form ‘keyword (condition) { body }’.

the parens and brackets are required

only 2 operators: ‘+’ and ‘-’.

4 builtin functions: ‘_peek’ returns true if any messages waiting

for this proceses

‘_get’ returns first pending message.

‘_send(id, message)’ sends message to

process with given id

‘_puts(message)’ writes message to stdout

‘%’ before a name indicates process variable.

process variables include: %id = current process id

%last = value of last

expression

strictly left associative, use parentheses to group.

be careful with assignments: ‘n = 1+1’ == ‘(n=1)+1’

you usually want to do ‘n = (1+1)’

Here are the two programs we will execute.

this one just forwards any message to the next process

prog1 = <<PROG
while (0) { _get }
while (1) {
if (_peek) {
msg =_get
_send ((%id+1),msg)
}
}
PROG

This one generates a message and sends it to process 0, n times.

It will be the last process so we can close the ring.

prog2 = <<PROG
n = _get
_send (0,“chunky bacon”)
while (n ) {
if (_peek) {
msg = _get
_send (0,msg)
n = ( n - 1)
_puts ( n )
}
}
_puts ( “done” )
exit
PROG

The Compiler turns program text into “assembly”

class Compiler
@@symbols = {}
#register keywords
%w{while if end exit}.each{|kw| @@symbols[kw]=kw.to_sym}
#register builtins
%w{_peek _get _send _puts}.each{|bi| @@symbols[bi] = :builtin}

def self.compile code
asm = []
text = code
text = text.dup #don’t destroy original code
token,name = parse text
while (token)
p token if $DEBUG
case token
when :while,:if,:exit,:end,:add,:subtract,:assign,:comma
asm << token
when :localvar,:procvar,:builtin,:num,:string
asm << token
asm << name
when :startgroup,:startblock
asm << token
asm << 0 #placeholder for size of
group/block
when :endgroup
startgroup = asm.rindex(:startgroup)
asm[startgroup] = :group
asm[startgroup+1] = asm.size-startgroup-2 #store groupsize
when :endblock
startblock = asm.rindex(:startblock)
asm[startblock] = :block
asm[startblock+1] = asm.size-startblock #store blocksize
asm << :endblock
asm << asm.size+1 #placeholder for looptarget (default is next
inst.)
end
token,name = parse text
end
return asm
end

private
def self.parse text, vartype = :localvar
pt = 0;
p “parse: #{text}” if $DEBUG
while (true)
case (c = text[pt,1])
when ‘’ #EOF
return nil

  when /\s/         #skip whitespace
    pt+=1
    next

  when /\d/         #integers
    v = text[pt..-1].to_i
    text.slice!(0..pt+v.to_s.length-1) #remove number
    return :num,v

  when /\w/        #identifiers
    name = /\w*/.match(text[pt..-1])[0]
    text.slice!(0..pt+name.length-1) #remove name
    sym = @@symbols[name]
    sym = register_var(name,vartype) if !sym #unknown identifier is 

variable
return sym,name

  when '"'       #strings
    name = /".*?[^\\]"/m.match(text[pt..-1])[0]
    text.slice!(0..pt+name.length-1) #remove name
    return :string, name

  when '%'       #processes variables
    text.slice!(0..pt)
    token,name = parse text,:procvar
    raise "invalid process variable" if token!= :procvar
    return token,name

  when '=':    #punctuation
    text.slice!(0..pt)
    return :assign, c
  when ',':
    text.slice!(0..pt)
    return :comma,c
  when '+'
      text.slice!(0..pt)
      return :add,'+'
  when '-'
      text.slice!(0..pt)
      return :subtract,'-'
  when '('
      text.slice!(0..pt)
      return :startgroup, c
  when ')'
      text.slice!(0..pt)
      return :endgroup, c
  when '{'
      text.slice!(0..pt)
      return :startblock, c
  when '}'
      text.slice!(0..pt)
      return :endblock, c
  end #case
end #while

end

def self.register_var name,type
@@symbols[name] = type
end
end

#The cpu instruction set.
#each instruction is the equivalent of a VM bytecode.
class InstructionSet
def initialize cpu
@cpu = cpu
end

def exit proc #halt the cpu
@cpu.halt
end
def end proc #end the current process
@cpu.end_process proc.id
end

def while proc
loopp = proc.pc-1
test = proc.exec
blocksize = proc.exec
if test && test != 0
#if we are going to loop, store the loop start address at the
end of the block
proc.pm[proc.pc+blocksize-1] = loopp
else
proc.pc += blocksize
end
end
def if proc
test = proc.exec
blocksize = proc.exec
if !test || test == 0
proc.pc += blocksize
end
end

def block proc
blocksize = proc.pop
end
def endblock proc
jumptarg = proc.pop #after block, maybe jump somewhere
proc.pc = jumptarg
end

def group proc
groupsize = proc.pop
endgroup = proc.pc+groupsize
while (proc.pc < endgroup)
val = proc.exec
end
return val
end

def num proc
proc.pop
end
def string proc
proc.pop
end
def builtin proc
inst = proc.pop
@cpu.send(inst,proc)
end
def localvar proc
varname = proc.pop
proc.getvar varname
end
def procvar proc
varname = proc.pop
proc.send varname
end
def assign proc
proc.setvar(proc.exec)
end
def comma proc
return :comma
end
def add proc
return proc.last + proc.exec
end
def subtract proc
return proc.last - proc.exec
end

#returns elements of group as array
#used to evaluate arguments for function call
def ungroup proc
args = []
proc.pop #ignore :group
groupsize = proc.pop
endgroup = proc.pc+groupsize
while (proc.pc < endgroup)
arg = proc.exec
args << arg unless arg == :comma
end
return args
end
end

#the CPU

acts as process scheduler

processes run for TIMESLICE instructions, or until they send or get a

message.

in the latter case, control switches to the process with a message

pending for the longest time
class CPU
TIMESLICE = 10

CProcess is a process on our virtual machine

don’t create directly, use CPU#add_process

class CProcess
attr_accessor :pm,:pc,:id,:last
def initialize id, code, vm
@id = id
@pm = code #program memory
@pc = 0 #program counter
@vars = {}
@curvar = nil
@vm = vm
end

#executes a VM instruction, advances program counter
def exec
  inst = @pm[@pc]
  p to_s if $DEBUG
  @pc+=1
  @last = @vm.send(inst,self)
end
def pop
  @pc+=1
  @pm[@pc-1]
end

def getvar name
  @curvar = name
  @vars[name]||=0
end
def setvar value
  @vars[@curvar] = value
end

def to_s
  "#{@id}@#{@pc}: #{@pm[@pc]} (#{@pm[@pc+1]})"
end

end #class Process

def initialize
@processes = []
@messages = []
@i = InstructionSet.new self
@queue=[[],[]] #scheduling queues
end

def add_process code
asm = code.dup
asm << :end
id = @processes.size
@processes << CProcess.new(id, asm,@i)
@messages[id] = []
@queue[0] << id
@cur_proc_id = id
end

#stop processes by swapping it out if it is running, and removing it
from queues.
def end_process id
taskswap 0 if @cur_proc_id == id
@processes -= [id]
@queue[0] -= [id]
@queue[1] -= [id]
end

def start
@running = true
run
end
def halt
@running = false
end

#inject a message into the system
def send_msg proc_id,msg
@messages[proc_id]<< msg
@queue[1]<<proc_id
end

private
#run the scheduler
def run
@timeslice = 0
while (@running)
@processes[@cur_proc_id].exec
@timeslice+=1
if (@timeslice > TIMESLICE)
taskswap 0
end
end
end

#switch to the next process waiting at this priority level
def taskswap priority
@cur_proc_id = @queue[priority].shift||@cur_proc_id
(@queue[priority] << @cur_proc_id) if priority == 0
@timeslice = 0
end

built-in messaging functions

def _peek proc
@messages[proc.id][0]
end
def _get proc
retval = @messages[proc.id].shift
taskswap 1
return retval
end
def _send proc
#send puts the target process on the high priority queue
args = @i.ungroup proc
@messages[args[0]] << args[1]
@queue[1]<<args[0]
taskswap 1
args[1]
end

def _puts proc
args = @i.ungroup proc
puts args
end
end

if FILE == $0
puts “usage: #{$0} processes cycles” or exit if ARGV.size < 2
processes, cycles = ARGV.map { |n| n.to_i }

puts “Timer started.”
start_time = Time.now
puts “Creating #{processes} processes”

code1 = Compiler.compile prog1
code2 = Compiler.compile prog2
cpu = CPU.new
(processes-1).times { cpu.add_process code1 }
last_proc = cpu.add_process code2

puts “Sending a message around the ring #{cycles} times…”
cpu.send_msg last_proc,cycles
cpu.start
puts “Time in seconds: #{(Time.now - start_time)}”
end

Hey guys… here’s my attempt. I’m a bit of n00b, so feedback
welcome!

I attacked this with DRb (distributed ruby), since it hadn’t been
touched yet. It meant taking a serious hit in performance, but it was
at least interesting. It’s threaded, but could be forked (haven’t
tried that just yet).

Thanks!

—BEGIN SOLUTION—

Num. 135

process_rings.rb

require ‘drb’

BasePort = 7654

class RingParent
def initialize(processes = 3, cycles = 5)
@processes = processes
@cycles = cycles
@message = “Message from parent\n”
end

def start
spawn_processes
connect_ring
send_messages
end

def spawn_processes
t = []
for i in 0…@processes-1
t << Thread.new do
RingMember.new(BasePort+i, BasePort+i+1, self)
end
end
t << Thread.new do
RingMember.new(BasePort+@processes-1, BasePort)
end
end

def connect_ring
DRb.start_service
@ring = DRbObject.new(nil, “druby://127.0.0.1:#{BasePort}”)
end

def send_messages
@start = Time.now
@cycles.times do
@ring.parent_receive(“Hi ring!”)
end
end

def return_message(message)
puts “Parent: Got message back- circulation time: #{Time.now -
@start}”
end
end

class RingMember
def initialize(port, next_port, parent = nil)
@port = port
@parent = parent
@current_message = “”
@next_member = connect_next(next_port)
DRBService.new(self, @port)
end

def connect_next(port)
DRb.start_service
DRbObject.new(nil, “druby://127.0.0.1:#{port}”)
end

def parent_receive(message)
@current_message = message
forward_message(@current_message)
end

def receive_message(message)
begin
message == @current_message ?
(@parent.return_message(message);(@current_message = “”)) :
forward_message(message)
rescue
puts “#{@port}: Received duplicate message, couldn’t talk to
parent: #{$!}”
end
end

def forward_message(message)
@next_member.receive_message(message)
end

def test(message)
return “#{@port}: Got message #{message}”
end
end

class DRBService
def initialize(process, port)
DRb.start_service(“druby://:#{port}”, process)
DRb.thread.join
end
end

processes = ARGV[0].to_i
cycles = ARGV[1].to_i
parent = RingParent.new(processes, cycles)