Resolving pipe deadlock

This Parallel-less version is more of less following the example from
the IO.pipe docs, however, even if I can connect the commands in the
pipeline I fail to see how I can then run the commands:

def run
@commands.each_cons(2) do |parent, child|
reader, writer = IO.pipe

  if fork
    writer.close
    child.input = MessagePack::Unpacker.new(reader)
    reader.close
    Process.wait
  else
    reader.close
    parent.output = MessagePack::Packer.new(writer)
    writer.close
  end
end

self

end

I better sleep on it :o/

Martin

On Mon, Jan 20, 2014 at 1:54 PM, Robert K.
[email protected] wrote:

On Sun, Jan 19, 2014 at 6:47 PM, Martin H. [email protected] wrote:

and it hangs after dumping big.tab. Again, I suspect bad pipe hangling
and I uploaded the traces from that one as well as trace_3step.tar. In
fact I uploaded code, test files and traces.

One more potential source of issues: you open all the pipes from the
main process. However, child processes will inherit those pipe
objects and if they are not closed there they will keep the pipeline
alive. That might be the issue you face.

Here’s a demo of what I mean:

#!/usr/bin/ruby

r, w = IO.pipe

p2 = fork {
w.close
r.each do |line|
line.chomp!
puts “Got: ‘#{line}’”
end
}

p1 = fork {
r.close
10.times {|i| w.puts i}
w.close
}

r.close

w.close # uncomment to make hang go away

puts “waiting for #{p2}…”
Process.waitpid p2
puts “finished”

The situation becomes worse with multiple processes and pipes being
generated because they are all inherited by all the processes.

I created a solution that seems to work:

Main difference to your API: I use lambdas as abstraction of work to
do because this is more flexible than your approach with methods in
the Pipe class. Lambads receive an IO to read from and one to write
to.

Kind regards

robert

Robert, I appreciate your effort. I still have a hard time wrapping my
head around fork and closing dangling file handles - just as you pointed
out. Also, lambdas, smart as they may be, are less readable to me, and I
would think also my target audience (biologists). I have decided to
stick to my proposed syntax for setting up and executing pipes, which is
most flexible and easy to grasp (and will work excellently in irb):

require ‘pipe’

p1 = Pipe.new.add(:cat, input: “test_in”).add(:grep, pattern: “foo”)
p2 = Pipe.new.add(:save, output: “test_out” )

(p1 + p2).add(:dump).run

Also, I am aiming for some +100 commands here, where some will be quite
advanced - and I am afraid of lambdas for this. In fact, I am
experimenting to come up with the next generation of Biopieces
(www.biopieces.org).

Here is a version with named pipes that works (though still with
dangling file handles):

Named pipes don’t have the parent/child binding of IO.pipe, so they work
with the parallel gem. However, the stream terminator “\0” I use is
quite hackery. I could possible keep track of the number of records
passed around instead. Or get those dangling file handles sorted?

Finally, I wonder about performance of IO.pipe versus named pipes - I
want to do a benchmark. Actually, I am concerned about overall
performance; this is of cause not C or even well written Ruby code
optimized to a specific task, but rather a general purpose way of
setting up pipes and executing them as simply as possible. I figure that
30 minutes writing a script that runs for 1 minute is more often less
appealing than a 1 minute script that runs for 30 minutes.

Cheers,

Martin

On Tue, Jan 21, 2014 at 9:08 PM, Martin H. [email protected]
wrote:

Robert, I appreciate your effort. I still have a hard time wrapping my
head around fork and closing dangling file handles - just as you pointed
out.

Did you look at the code I provided (the gist)? Basically you need to
close all file handles that you do not need in a process which
includes all previously opened pipes and the write end of the current
pipe. I also did the optimization that the head of the pipeline is
executed in the current process.

Also, lambdas, smart as they may be, are less readable to me, and I
would think also my target audience (biologists). I have decided to
stick to my proposed syntax for setting up and executing pipes, which is
most flexible and easy to grasp (and will work excellently in irb):

I’ll try to convince you to reconsider. First, with the approach you
have taken, you need to define a method inside the Pipe class for
every functionality that you want to have. By that you make the
Pipe class multi purpose (executing pipelines in multiple processes
AND a lot individual functionality). As a consequence the provider of
the Pipe class (you) ought to provide all the algorithms in form of
methods. Of course users of the class can open it again and add
methods (e.g. their own version of “cat” or whatnot). But that opens
the door for name conflicts, issues with instance variables etc.

In software engineering it is usually considered good practice to only
condense one purpose in a class. For class Pipe (which should
probably rather be called “Pipeline”) it is executing multiple
operations in various processes (or even threads as my class offers).
By using lambda as abstraction for an anonymous method you have
maximum flexibility to provide any functionality. The only contract
is that the lambda will be invoked with two IOs for reading and
writing.

Even if you dislike lambdas, it is not too hard to come up with a
syntax that resembles yours! And that is probably the best news for
you. Here are some examples (for “cat” only) and how they look like
when creating the pipeline:

CAT = lambda do |*args|
lambda do |io_in, io_out|
args.each do |file|
File.foreach(file) {|line| io_out.puts line}
end
end
end

pipe.add(CAT[“test_in”]).add(GREP[/foo.*bar/])
pipe << CAT[“test_in”] << GREP[/foo.*bar/]
pipe | CAT[“test_in”] | GREP[/foo.*bar/]

(Note, I added operator “|”.)

Other forms to create those lambdas - basically always following the
same approach to use a closure to capture arguments for later
execution:

def cat(*args)
lambda do |io_in, io_out|
args.each do |file|
File.foreach(file) {|line| io_out.puts line}
end
end
end

pipe.add(cat(“test_in”)).add(grep(/foo.*bar/))
pipe << cat(“test_in”) << grep(/foo.*bar/)
pipe | cat(“test_in”) | grep(/foo.*bar/)

def Cat(*args)
lambda do |io_in, io_out|
args.each do |file|
File.foreach(file) {|line| io_out.puts line}
end
end
end

pipe.add(Cat(“test_in”)).add(Grep(/foo.*bar/))
pipe << Cat(“test_in”) << Grep(/foo.*bar/)
pipe | Cat(“test_in”) | Grep(/foo.*bar/)

require ‘pipe’

p1 = Pipe.new.add(:cat, input: “test_in”).add(:grep, pattern: “foo”)
p2 = Pipe.new.add(:save, output: “test_out” )

(p1 + p2).add(:dump).run

Easily done.

Also, I am aiming for some +100 commands here, where some will be quite
advanced - and I am afraid of lambdas for this.

Why? Your lambda can even be a simple adapter if you want to make use
of multiple other classes.

In fact, I am
experimenting to come up with the next generation of Biopieces
(www.biopieces.org).

Here is a version with named pipes that works (though still with
dangling file handles):

named_pipes2.rb · GitHub

I have updated my version but there is the opposite problem: one of
the IOs is closed too early. :slight_smile: You can see current state of affairs
in the gist. I have to go to bed now.

Named pipes don’t have the parent/child binding of IO.pipe, so they work
with the parallel gem. However, the stream terminator “\0” I use is
quite hackery. I could possible keep track of the number of records
passed around instead. Or get those dangling file handles sorted?

Better that.

Finally, I wonder about performance of IO.pipe versus named pipes - I
want to do a benchmark. Actually, I am concerned about overall
performance; this is of cause not C or even well written Ruby code
optimized to a specific task, but rather a general purpose way of
setting up pipes and executing them as simply as possible. I figure that
30 minutes writing a script that runs for 1 minute is more often less
appealing than a 1 minute script that runs for 30 minutes.

:slight_smile:

There is another disadvantages of names pipes: you need to make sure
that names do not collide. Also, there might be security
implications. The approach with nameless pipes is certainly robuster.

Kind regards

robert

On Tue, Jan 21, 2014 at 11:50 PM, Robert K.
[email protected] wrote:

I have updated my version but there is the opposite problem: one of
the IOs is closed too early. :slight_smile: You can see current state of affairs
in the gist. I have to go to bed now.

I fixed it:

My FD closing scheme was broken because the parent process had the FDs
open far too long. I now changed it that FDs are closed early as
possible. I also move the execution code into separate classes for
thread and process execution to better separate the code and reduce
number of arguments of methods.

Kind regards

robert

Benchmark of IO.pipe vs named pipe:

maasha@mao:~$ cat benchmark.rb
#!/usr/bin/env ruby

require ‘benchmark’

system(“mkfifo my_pipe”) unless File.exists? “my_pipe”

rd, wt = IO.pipe
nrd = File.open(“my_pipe”, File::RDONLY | File::NONBLOCK)
nwt = File.open(“my_pipe”, File::WRONLY | File::NONBLOCK)

n = 1_000_000

Benchmark.bm() do |x|
x.report(“IO.pipe”) { n.times { wt.puts “foo”; wt.flush; rd.gets }
}
x.report(“named pipe”) { n.times { nwt.puts “foo”; nwt.flush; nrd.gets
} }
end

maasha@mao:~$ ./benchmark.rb
user system total real
IO.pipe 1.360000 0.750000 2.110000 ( 2.118993)
named pipe 1.280000 0.610000 1.890000 ( 1.894353)

Robert, this is indeed marvelous. I guess my problem with lambda, proc
and closures is lack of experience - there is a bit of a hurdle to get
over since abstraction is, well, abstract… Anyways, you make a strong
case, so I will stick to your suggestions!

Now, I totally believe in “separation of concerns” and this is exactly
what I am trying to achieve with these pipelines. The idea is to pass
data records (hashes marshaled with msgpack) through all commands in
the pipeline. Certain commands will respond to records with appropriate
data and act upon this and sometimes emit incoming records as well as
any newly produced to the next command in the pipeline. Therefore I will
need a command like the below CAT that emits all incoming records and
any new records from the specified file. So I tried the below with your
code, but it hangs :o( ?

CAT = lambda do |*args|
lambda do |io_in, io_out|
io_in.each_line {|line| line.chomp!; io_out.puts line}

args.each do |file|
  File.foreach(file) {|line| io_out.puts line}
end

end
end

p = Pipe.new
p << CAT[“table.txt”]
p << CAT[“foo.tab”]
p << lambda {|r,w| r.each {|x| puts x}}

p.execute_processes

Serializing and deserializing is a bottleneck and msgpack is fast:

maasha@mel:~$ cat benchmark.rb
#!/usr/bin/env ruby

require ‘benchmark’
require ‘msgpack’

n = 100_000

h = {
zero: 0,
one: 1,
two: 2,
three: 3,
four: 4,
five: 5,
six: 6
}

Benchmark.bm() do |x|
x.report(“Marshal”) { n.times { m = Marshal.dump h; u = Marshal.load m
} }
x.report(“MsgPack”) { n.times { m = h.to_msgpack; u =
MessagePack.unpack m } }
end

maasha@mel:~$ ./benchmark.rb
user system total real
Marshal 1.770000 0.000000 1.770000 ( 1.774334)
MsgPack 0.900000 0.020000 0.920000 ( 0.921232)

Now concerning CAT: This is a placeholder for commands that will read in
data in particular formats from specified files. I have found time and
time again that it is extremely useful (using Biopieces 1.0) to be able
to process data like this:

p = Pipe.new
p << CAT[“file1”]
p << GREP["/foobar/"]
p << CAT["file2]
p.execute_processes

I know that this is not the way UNIX cat works, and that BASH can do
stuff like this neatly. But this is not UNIX and UNIX cat. I really
would like to let ALL commands read any incoming records and emit them
again (pending given options) along with any newly created records.

Cheers,

Martin

On Wed, Jan 22, 2014 at 12:43 PM, Martin H. [email protected]
wrote:

Robert, this is indeed marvelous. I guess my problem with lambda, proc
and closures is lack of experience - there is a bit of a hurdle to get
over since abstraction is, well, abstract… Anyways, you make a strong
case, so I will stick to your suggestions!

:slight_smile:

Now, I totally believe in “separation of concerns” and this is exactly
what I am trying to achieve with these pipelines. The idea is to pass
data records (hashes marshaled with msgpack)

Btw. why do you use msgpack? Is there any advantage over Marshal? If
you want to use individual messages for exchange instead of the raw
binary stream you could maybe add another layer which will handle
serialization and deserialization so individual lambdas receive only
one argument (an object) and the returned value will be sent off to
the next stage.

through all commands in
the pipeline. Certain commands will respond to records with appropriate
data and act upon this and sometimes emit incoming records as well as
any newly produced to the next command in the pipeline. Therefore I will
need a command like the below CAT that emits all incoming records and
any new records from the specified file. So I tried the below with your
code, but it hangs :o( ?

That’s because your first CAT tries to read from $stdin. If you want
to append multiple files you can remove the first line from the
definition and use

p << CAT[“table.txt”, “foo.tab”]

CAT = lambda do |*args|
lambda do |io_in, io_out|

blocks in this line:

p << CAT[“foo.tab”]
p << lambda {|r,w| r.each {|x| puts x}}

Kind regards

robert

On Wed, Jan 22, 2014 at 2:27 PM, Martin H. [email protected]
wrote:

h = {
x.report(“Marshal”) { n.times { m = Marshal.dump h; u = Marshal.load m
} }
x.report(“MsgPack”) { n.times { m = h.to_msgpack; u =
MessagePack.unpack m } }
end

maasha@mel:~$ ./benchmark.rb
user system total real
Marshal 1.770000 0.000000 1.770000 ( 1.774334)
MsgPack 0.900000 0.020000 0.920000 ( 0.921232)

Impressive. Although it does not look as if Marshal was really slow.

I know that this is not the way UNIX cat works, and that BASH can do
stuff like this neatly. But this is not UNIX and UNIX cat. I really
would like to let ALL commands read any incoming records and emit them
again (pending given options) along with any newly created records.

Well, but then you need a way to decide whether you want to read from
the pipe or not because the first CAT will always block on $stdio. I
suggest to do it like the UNIX cat and use “-” as identifier for
stdin:

CAT = lambda do |*args|
args << ‘-’ if args.empty?

lambda do |io_in, io_out|
args.each do |file|
case file
when “-”
io_in.each_line {|line| line.chomp!; io_out.puts line}
else
File.foreach(file) {|line| io_out.puts line}
end
end
end
end

Now you example above looks like this

p = Pipe.new
p << CAT[“file1”]
p << GREP[“/foobar/”]
p << CAT[“-”, "file2]
p.execute_processes

:slight_smile:

For using messages to pass through pipes I suggest to write a wrapper
class around Pipe which uses lambdas with a single argument and pass
the return value through the chain.

Kind regards

robert

OK, the use of “-” inside the CAT lambda I could perhaps tolerate. But
in the invocation I think it looks silly. Now, I tried something crufty
instead that happens to work!?!:

CAT = lambda do |*args|
lambda do |io_in, io_out|
io_in.each_line {|line| line.chomp!; io_out.puts line} unless
io_in.inspect =~ /STDIN/

args.each do |file|
  File.foreach(file) {|line| io_out.puts line}
end

end
end

p = Pipe.new
p << CAT[“table.txt”]
p << CAT[“foo.tab”]
p << lambda {|r,w| r.each {|x| puts x}}
p.execute_processes

One could perhaps patch class IO and add a stdin? method?

Cheers,

Martin

On Wed, Jan 22, 2014 at 8:15 PM, Martin H. [email protected]
wrote:

OK, the use of “-” inside the CAT lambda I could perhaps tolerate. But
in the invocation I think it looks silly.

Why that? You could also replace this with something else, e.g. symbol
:input.

Now, I tried something crufty
instead that happens to work!?!:

CAT = lambda do |*args|
lambda do |io_in, io_out|
io_in.each_line {|line| line.chomp!; io_out.puts line} unless
io_in.inspect =~ /STDIN/

IMHO it does not make sense to hard code this decision inside the
lambda. This is too inflexible because the head of the pipeline could
be some other IO object OR you really want to read from stdin. Plus,
if you make the decision in the argument list of the call you can
determine the position where you phase the input in.

p.execute_processes
I don’t understand why you insist on wasting another process if you
read input files sequentially anyway.

One could perhaps patch class IO and add a stdin? method?

That is not necessary - see above.

Btw. in terms of efficiency it may be reasonable to implement CAT with
fixed buffer instead of line based:

CAT = lambda do |*args|
args << :input if args.empty?

lambda do |io_in, io_out|
buffer = ‘’

args.each do |file|
  case file
  when :input
    while (buffer = io_in.read(4_096, buffer))
      io_out.write(buffer)
    end
  else
    File.open(file) do |io|
      while (buffer = io.read(4_096, buffer))
        io_out.write(buffer)
      end
    end
  end
end

end
end

Kind regards

robert

Why that? You could also replace this with something else, e.g. symbol
:input.

I favor a minimalistic invocation and the “-” or a symbol appears
superfluous.

I don’t understand why you insist on wasting another process if you
read input files sequentially anyway.

It is a common situation in bioinformatics that you need pieces of
information from several files in different format. One could be a table
and one could be something ala XML. I would create a READ_TABLE and
READ_XML command that read in AND parses these formats:

p = Pipe.new
p << READ_TABLE
p << READ_XML
p << DO_SCIENTIFIC_CALCULATION
p << WRITE_TABLE
p.execute_processes

I hope this explains the need for records to be passed along.

Btw. in terms of efficiency it may be reasonable to implement CAT with
fixed buffer instead of line based:

Eventually, there will not be a CAT command :o) - I have just used that
as an example. I will require commands for parsing nasty bioinformatic
formats like GenBank, GFF, SAM etc - and optimizing that is a whole
different story!

And Robert, I really truly value your help. It is most inspiring!

Cheers,

Martin

On Jan 22, 2014, at 12:21, Martin H. [email protected] wrote:

p = Pipe.new
p << READ_TABLE
p << READ_XML
p << DO_SCIENTIFIC_CALCULATION
p << WRITE_TABLE
p.execute_processes

I’m just gonna throw two ideas out there:

class Pipe
def << work
# whatever you need to do w/ work
self
end
end

such that:

p = Pipe.new
p << READ_TABLE
<< READ_XML
<< DO_SCIENTIFIC_CALCULATION
<< WRITE_TABLE
p.execute_processes

or:

class Pipe
def | work
# whatever you need to do w/ work
self
end
end

such that:

p = Pipe.new | READ_TABLE | READ_XML | DO_SCIENTIFIC_CALCULATION |
WRITE_TABLE
p.execute_processes

On 01/22/2014 04:43 AM, Robert K. wrote:

On Wed, Jan 22, 2014 at 12:43 PM, Martin H. [email protected] wrote:

Now, I totally believe in “separation of concerns” and this is exactly
what I am trying to achieve with these pipelines. The idea is to pass
data records (hashes marshaled with msgpack)

Btw. why do you use msgpack? Is there any advantage over Marshal? If
you want to use individual messages for exchange instead of the raw
binary stream you could maybe add another layer which will handle
serialization and deserialization so individual lambdas receive only
one argument (an object) and the returned value will be sent off to
the next stage.

Some advantages to be aware of (not necessarily relevant to Martin’s
system):

  1. Msgpack can use non-blocking io (IO#readpartial). Using Marshal, if a
    writer pauses while writing output, the reader must wait as well; the
    reader’s state cannot be saved while the thread does other work.

But msgpack’s Unpacker#feed and #each methods can be used to buffer
incoming data and yield the fully parsed objects as they are available.
If the writer pauses in the middle of an object, the reader can make
progress on some other task, and then continue with the #feed and #each
when more data is available (checking with IO#select). (This is probably
more useful in a message broker design than in Martin’s pipelines.) You
can also do this with yajl / json, BTW.[1]

  1. You can write processing code in languages other than ruby.

And of course there is the speed and compactness, as Martin mentioned.


[1] I wrote a gem called object-stream that manages streams of objects
with different serializers (msgpack, marshal, yaml, yajl) and also with
different transports (File, Pipe, Socket, StringIO). Here’s an example
that shows the non-blocking advantage of msgpack (or yajl), compared to
marshal and yaml:

On Wed, Jan 22, 2014 at 9:07 PM, Joel VanderWerf
[email protected] wrote:

Some advantages to be aware of (not necessarily relevant to Martin’s
system):

  1. Msgpack can use non-blocking io (IO#readpartial). Using Marshal, if a
    writer pauses while writing output, the reader must wait as well; the
    reader’s state cannot be saved while the thread does other work.
  1. You can write processing code in languages other than ruby.

That’s good to know! Thank you, Joel.

Kind regards

robert

On Thu, Jan 23, 2014 at 4:13 AM, Ryan D. [email protected]
wrote:

I’m just gonna throw two ideas out there:

Thank you for the consideration!

p = Pipe.new
p << READ_TABLE
<< READ_XML
<< DO_SCIENTIFIC_CALCULATION
<< WRITE_TABLE
p.execute_processes

That was there all from the beginning: Martin hat the chaining idea in
his original posting:

And I added << in my first version.

p = Pipe.new | READ_TABLE | READ_XML | DO_SCIENTIFIC_CALCULATION | WRITE_TABLE
p.execute_processes

Yes, I had that in the version from yesterday. :-[]

Sorry. :slight_smile:

Btw. I am not sure whether the idea with the pipe symbol was actually
a good one because items are not identical (as with integer bit
arithmetic for example) and it’s also not executed immediately (as the
shell does with a pipeline).

Cheers

robert

Got it. Gist updated.

Martin

On Thu, Jan 30, 2014 at 9:31 PM, Martin H. [email protected]
wrote:

Got it. Gist updated.

I don’t think like 143 will work as IO#write usually returns the
number of bytes written. Also, throughput wise it’s probably not a
good idea to flush every message.

Also, as I’ve said before I’d rather stack the msg handling pipeline
class on top of Pipe. That way you keep concerns nicely separated and
can use class Pipe for other purposes as well. The way you did it you
created a class that can only handle MessagePack messages.

Kind regards

robert

Also, as I’ve said before I’d rather stack the msg handling pipeline
class on top of Pipe. That way you keep concerns nicely separated and
can use class Pipe for other purposes as well. The way you did it you
created a class that can only handle MessagePack messages.

Again, I get it working one way and then there is some other superior
way :o) - I’ll give it a shot.

Now, I want two additional features.

First, I want a to_s method in the Pipe class that return a string with
the commands and options run like: “Pipe.new.add(:cat. input:
“foo”).add(:save, output: “bar”).add(:dump).run”. Strings like this I
would like to log in a history file for documentation of commands run
and easy rerunning (in irb). I think there might be a problem with this,
with the lambdas giving the separation of the Pipe and commands (from
the Pipe class there is no way to see what is in the lambdas)?

Second, I would like to keep track of some basic statistics from each
command: number of records in and out, runtime, and keys seen (I will be
using records consisting of simple hashes). Each command could write a
temporary file and then the stats could be compiled after execution.
Alternative, the stats could be passed along the IO stream as the last
record, or each command should also have separate pipes inter process
communication of stats.

Cheers,

Martin