Trouble with mkfifo and forks

In a nutshell:

  1. Create a pipe
  2. Fork twice
  3. Each for sends 10 strings over the pipe
  4. See what comes out the other side

So, why isn’t what I send coming out?

class WaitListTest < Test::Unit::TestCase
NUMBER_OF_ORDERS = 2
PIPE = “test/wl_pipe”

def setup
rm #{PIPE}
mkfifo #{PIPE}
end

def test_registering_last_race
term_count = 0
responses = []
pids = []

NUMBER_OF_ORDERS.times do |i|
  pids << Process.fork do
    sleep 1 + (0.1 * i)
    10.times do |a|
      File.open("#{RAILS_ROOT}/#{PIPE}", "w+") do |pipe|
        sleep 0.1
        pipe.puts "Fork #{i} - #{a}"
      end
    end
    File.open("#{RAILS_ROOT}/#{PIPE}", "a") do |pipe|
      sleep 0.1
      pipe.puts "DONE"
    end
  end
end

while term_count < NUMBER_OF_ORDERS
  File.open("#{RAILS_ROOT}/#{PIPE}", "r+") do |pipe|
    val = pipe.gets
    if val.match(/DONE/)
      term_count += 1
    else
      responses << val
    end
  end
end
puts responses.sort
puts responses.count

end
end

output

Started
Fork 0 - 0
Fork 0 - 1
Fork 0 - 2
Fork 0 - 3
Fork 0 - 4
Fork 0 - 5
Fork 0 - 7
Fork 0 - 9
Fork 1 - 0
Fork 1 - 1
Fork 1 - 2
Fork 1 - 3
Fork 1 - 4
Fork 1 - 5
Fork 1 - 7
Fork 1 - 8
Fork 1 - 9
17

Why isn’t this 20?

On 09/06/2010 01:10 PM, Josh wrote:

In a nutshell:

  1. Create a pipe
  2. Fork twice
  3. Each for sends 10 strings over the pipe
  4. See what comes out the other side

I’m probably naive about something, but fifos almost never work right
for what I want to do, and I usually end up using (unix domain) sockets
(which also makes it easier to move to tcp sockets if you need to
distribute across hosts).

Anyway, in your code, it seems that the repeated closing of the read
end of the pipe causes some data to be lost. If you can modify your read
loop to keep using the same IO object, it seems to work (see below).
Also, I found I needed to watch for #gets returning nil (though that
doesn’t happen reliably).

Perhaps someone who knows more about pipes will “pipe up” on this
thread.

class WaitListTest
NUMBER_OF_ORDERS = 2
PIPE = File.expand_path("~/tmp/wl_pipe")

def setup
rm -f #{PIPE}
mkfifo #{PIPE}
end

def test_registering_last_race
term_count = 0
responses = []
pids = []

 NUMBER_OF_ORDERS.times do |i|
   pids << Process.fork do
     sleep 0.1 + (0.1 * i)
     10.times do |a|
       File.open(PIPE, "w") do |pipe|
         sleep 0.1
         pipe.puts "Fork #{i} - #{a}"
       end
     end
     File.open(PIPE, "w") do |pipe|
       sleep 0.1
       pipe.puts "DONE"
     end
   end
 end

 pipe = File.open(PIPE, "r")
 while term_count < NUMBER_OF_ORDERS

File.open(PIPE, “r”) do |pipe|

     val = pipe.gets
     case val
     when /DONE/
       term_count += 1
     when nil
       puts "pipe closed" # need this too
     else
       responses << val
     end

end

 end

 pids.each do |pid|
   Process.waitpid(pid)
 end

 puts responses.sort
 puts responses.count

end
end

wlt = WaitListTest.new
wlt.setup
wlt.test_registering_last_race

In the receiver, you are constantly opening the pipe for read and then
closing it. I expect that if someone writes to the pipe while there’s no
reader, the data is lost.

The program works for me if I swap the open and while around:

File.open("#{RAILS_ROOT}/#{PIPE}", "r") do |pipe|
  while term_count < NUMBER_OF_ORDERS
    val = pipe.gets
    if val.match(/DONE/)
      .. etc

But it is much better form to create an anonymous pipe in the parent
(rd, wr = IO.pipe) then fork. Close the rd end in the child and the wr
end in the parent.

Use Socket.pair if you want a bidirectional channel.