Forum: Ruby Resolving pipe deadlock

386571a6fd12fc72ff39593da1af15c3?d=identicon&s=25 Martin Hansen (maasha)
on 2014-01-17 09:19
This is a cross post from SO:
http://stackoverflow.com/questions/21173136/how-to...

I am trying to emulate UNIX command line pipes in a Ruby-only solution
that uses multiple cores. Eventually, the records piped from command to
command will be Ruby objects marshaled using msgpack. Unfortunately, the
code hangs after the first dump command. I am really trying to figure
out what causes this deadlock and how to resolve it.

Any hints appreciated.


Cheers,


Martin
0e6ac58dab6125c1cd2e7ac645076b6f?d=identicon&s=25 Joel VanderWerf (Guest)
on 2014-01-17 20:23
(Received via mailing list)
On 01/17/2014 12:19 AM, Martin Hansen wrote:
> This is a cross post from SO:
>
http://stackoverflow.com/questions/21173136/how-to...
>
> I am trying to emulate UNIX command line pipes in a Ruby-only solution
> that uses multiple cores. Eventually, the records piped from command to
> command will be Ruby objects marshaled using msgpack. Unfortunately, the
> code hangs after the first dump command. I am really trying to figure
> out what causes this deadlock and how to resolve it.

This seems to work, without the parallel gem (sorry, not familiar with
it):

#!/usr/bin/env ruby

require 'msgpack'
require 'pp'

class Pipe
   def initialize
     @commands = []
   end

   def add(command, options = {})
     @commands << Command.new(command, options)

     self
   end

   def run
     writers = {}
     @commands.each_cons(2) do |c_in, c_out|
       reader, writer = IO.pipe

       c_out.input = MessagePack::Unpacker.new(reader)
       c_in.output = MessagePack::Packer.new(writer)
       writers[c_in] = writer
     end

     @commands.map do |command|
       fork do
         command.run
       end
       writers[command].close if writers[command]
     end

     Process.waitall
   end

   class Command
     attr_accessor :input, :output

     def initialize(command, options)
       @command = command
       @options = options
       @input   = nil
       @output  = nil
     end

     def run
       send @command
     end

     def cat
       @input.each { |record| @output.write(record).flush } if @input

       File.open(@options[:input]) do |ios|
         ios.each { |record| @output.write(record).flush } if @output
       end
     end

     def dump
       @input.each do |record|
         puts record
         @output.write(record).flush if @output
       end
     end
   end
end

p = Pipe.new
p.add(:cat, input: "foo.tab").add(:dump).add(:cat, input:
"table.txt").add(:dump)
p.run
386571a6fd12fc72ff39593da1af15c3?d=identicon&s=25 Martin Hansen (maasha)
on 2014-01-18 10:34
If you feed it more than a couple of hundred lines of data the deadlock
happens.


Martin
E0d864d9677f3c1482a20152b7cac0e2?d=identicon&s=25 Robert Klemme (robert_k78)
on 2014-01-18 17:57
(Received via mailing list)
On Sat, Jan 18, 2014 at 10:34 AM, Martin Hansen <lists@ruby-forum.com>
wrote:
> If you feed it more than a couple of hundred lines of data the deadlock
> happens.

No knowledge of gem "parallels" here either but this phenomenon
happens also if one does not properly parallelize IO. It may be an
explanation.  If it is what I am suggesting then everything works as
long as data size is less as some buffer size because then an IO
operation can finish even if there is no live reader.  If you exceed
the limit then one IO operation blocks waiting for another to read
data from the buffer so it can write more data.  If the other write
operation is somehow blocked by the first one (i.e. via some
synchronization mechanism or plainly because it is invoked from the
same thread afterwards) then the deadlock happens: the writer cannot
finish and the reader never gets a chance to read.

Kind regards

robert
386571a6fd12fc72ff39593da1af15c3?d=identicon&s=25 Martin Hansen (maasha)
on 2014-01-19 08:28
Thanks Robert,


Your explanation fits more or less exactly what I was fearing.

I think it would be really nice for a understanding/debugging point of
view if I could get this working in a single thread (parallel can be
disable by setting in_processes: 0). However, there are several issues.

First: with a simple pipe: p.add(:cat, input: "foo.tab").add(:dump)
where "foo.tab" only contains a few lines the script is hanging after
:dump and I suspect it is because of lack of a "termination signal" like
EOF.

Second, if "foo.tab" contains more than a couple of thousand lines then
it blocks in the :cat step - and I suspect that the reader buffer is
full and requires unloading to resolve the block.

Third, I was hoping that with multiple processes the next process would
unload the buffer from the preceding step in a timely fashion. However,
it is clear that this isn't the case - some synchronization is require
between the processes. If only Ruby (MRI) threads supported multiple
processors it could be handled with a mutex (darn GVL!). One could dig
into EventMachine to see if that would work, but I am scared by the
complexity of if.

Thus, I fear my design is flawed. One thing I can think of is to change
the design slightly so a single record at a time is passed from command
to command.


Cheers,


Martin
18813f71506ebad74179bf8c5a136696?d=identicon&s=25 Eric Wong (Guest)
on 2014-01-19 10:18
(Received via mailing list)
Martin Hansen <lists@ruby-forum.com> wrote:
> I am trying to emulate UNIX command line pipes in a Ruby-only solution
> that uses multiple cores. Eventually, the records piped from command to
> command will be Ruby objects marshaled using msgpack. Unfortunately, the
> code hangs after the first dump command. I am really trying to figure
> out what causes this deadlock and how to resolve it.

Hey, I'm not familiar with either parallel or msgpack, but can you try
using strace (or similar syscall tracer for your OS) on the spawned
processes?  You'll probably find one (or more) processes is stuck in the
read/write/select syscalls and which FDs they're stuck on.

If you're on Linux, perhaps check out dtas-graph, a Perl script I wrote
for visualizing the pipelines and dataflows within dtas-player[1].  I
combined it with strace for debugging, because it was sometimes
confusing to associate the FDs shown in strace output with the actual
pipes/connected process on the other end(s).

dtas-graph should work on any Linux processes using pipes, not just
dtas-player.

  git clone git://80x24.org/dtas dtas

  # You'll need the Graph::Easy perl module,
  # "apt-get install libgraph-easy-perl" if you're on Debian,
  # I would've written it in Ruby but I couldn't find a graphing
  # library in Ruby with ASCII/UTF-8 text output.
  perl dtas/perl/dtas-graph $PID_OF_MAIN_PROCESS

Sample output: http://dtas.80x24.org/dtas-graph-sample.txt

Arrows denote direction of data flow, on the lines are the file
descriptor numbers associated with the pipe for each process.  Boxes
either represent processes (identified via PIDs) or pipe objects
(identified via PIPEID:PIPE_INO mapping).


[1] - duct tape audio suite - http://dtas.80x24.org/
386571a6fd12fc72ff39593da1af15c3?d=identicon&s=25 Martin Hansen (maasha)
on 2014-01-19 10:43
I get this with strace, but I don't really know what it means:

maasha@mao:~/scratch$ strace -cp 18264
Process 18264 attached - interrupt to quit
Process 18264 detached
% time     seconds  usecs/call     calls    errors syscall
------ ----------- ----------- --------- --------- ----------------
  -nan    0.000000           0         6           read
  -nan    0.000000           0        23         1 write
  -nan    0.000000           0         1           open
  -nan    0.000000           0         4           close
  -nan    0.000000           0         1           fstat
  -nan    0.000000           0         1           mmap
  -nan    0.000000           0         2           munmap
  -nan    0.000000           0         3           rt_sigaction
  -nan    0.000000           0         2         1 rt_sigreturn
  -nan    0.000000           0         1           getrlimit
  -nan    0.000000           0         1           sched_getaffinity
  -nan    0.000000           0         1           tgkill
------ ----------- ----------- --------- --------- ----------------
100.00    0.000000                    46         2 total
18813f71506ebad74179bf8c5a136696?d=identicon&s=25 Eric Wong (Guest)
on 2014-01-19 10:58
(Received via mailing list)
Martin Hansen <lists@ruby-forum.com> wrote:
> I get this with strace, but I don't really know what it means:
>
> maasha@mao:~/scratch$ strace -cp 18264

Just use strace -p, no need for -c unless you're doing performance
stuff.  Try -f to follow forks, too.
386571a6fd12fc72ff39593da1af15c3?d=identicon&s=25 Martin Hansen (maasha)
on 2014-01-19 11:06
Right (cryptic output IMHO):


maasha@mao:~/scratch$ strace -p 18272
Process 18272 attached - interrupt to quit
write(8, "\2764571\t4605\t5_GQbZ3ywXsN1/1\t7\t-\n", 31) = ? ERESTARTSYS
(To be restarted)
--- SIGINT (Interrupt) @ 0 (0) ---
write(4, "!", 1)                        = 1
--- SIGVTALRM (Virtual timer expired) @ 0 (0) ---
rt_sigreturn(0x1a)                      = 1
rt_sigreturn(0x2)                       = -1 EINTR (Interrupted system
call)
close(9)                                = 0
open("/proc/self/maps", O_RDONLY)       = 9
getrlimit(RLIMIT_STACK, {rlim_cur=8192*1024, rlim_max=RLIM_INFINITY}) =
0
fstat(9, {st_mode=S_IFREG|0444, st_size=0, ...}) = 0
mmap(NULL, 4096, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0)
= 0x7fd49ea35000
read(9, "7fd49d0d0000-7fd49d0df000 r-xp 0"..., 1024) = 1024
read(9, "           /usr/local/lib/ruby/2"..., 1024) = 1024
read(9, "0000 08:01 260629               "..., 1024) = 1024
read(9, "400000 rw-p 00003000 08:01 26062"..., 1024) = 1024
read(9, ":00 0 \n7fd49e824000-7fd49e844000"..., 1024) = 1024
read(9, "fff64280000 rw-p 00000000 00:00 "..., 1024) = 231
close(9)                                = 0
munmap(0x7fd49ea35000, 4096)            = 0
sched_getaffinity(18272, 32, {ffff, 0, 0, 0}) = 32
write(2, "./pipes.rb:50:in `write'", 24) = 24
write(2, ": ", 2)                       = 2
write(2, "Interrupt", 9)                = 9
write(2, "\n", 1)                       = 1
write(2, "\tfrom ./pipes.rb:50:in `flush'\n", 31) = 31
write(2, "\tfrom ./pipes.rb:50:in `block (2"..., 49) = 49
write(2, "\tfrom ./pipes.rb:50:in `each'\n", 30) = 30
write(2, "\tfrom ./pipes.rb:50:in `block in"..., 38) = 38
write(2, "\tfrom ./pipes.rb:49:in `open'\n", 30) = 30
write(2, "\tfrom ./pipes.rb:49:in `cat'\n", 29) = 29
write(2, "\tfrom ./pipes.rb:43:in `run'\n", 29) = 29
write(2, "\tfrom ./pipes.rb:27:in `block in"..., 38) = 38
write(2, "\tfrom /usr/local/lib/ruby/gems/2"..., 103) = 103
write(2, "\tfrom /usr/local/lib/ruby/gems/2"..., 87) = 87
write(2, "\tfrom /usr/local/lib/ruby/gems/2"..., 98) = 98
write(2, "\tfrom /usr/local/lib/ruby/gems/2"..., 94) = 94
write(2, "\tfrom /usr/local/lib/ruby/gems/2"..., 86) = 86
write(2, "\tfrom /usr/local/lib/ruby/gems/2"..., 86) = 86
write(2, "\tfrom ./pipes.rb:27:in `run'\n", 29) = 29
write(2, "\tfrom ./pipes.rb:67:in `<main>'\n", 32) = 32
rt_sigaction(SIGINT, {SIG_IGN, [], SA_RESTORER|SA_SIGINFO,
0x7fd49e617030}, {0x7fd49eb2d6f0, [], SA_RESTORER|SA_SIGINFO,
0x7fd49e617030}, 8) = 0
rt_sigaction(SIGINT, {SIG_DFL, [], SA_RESTORER|SA_SIGINFO,
0x7fd49e617030}, {SIG_IGN, [], SA_RESTORER|SA_SIGINFO, 0x7fd49e617030},
8) = 0
close(7)                                = 0
close(8)                                = 0
write(4, "!", 1)                        = 1
munmap(0x7fd49e926000, 1052672)         = 0
rt_sigaction(SIGINT, {SIG_DFL, [INT], SA_RESTORER|SA_RESTART,
0x7fd49d9eb4f0}, {SIG_DFL, [], SA_RESTORER|SA_SIGINFO, 0x7fd49e617030},
8) = 0
tgkill(18272, 18272, SIGINT)            = 0
--- SIGINT (Interrupt) @ 0 (0) ---
Process 18272 detached







maasha@mao:~/scratch$ strace -fp 18277
Process 18277 attached with 2 threads - interrupt to quit
[pid 18278] restart_syscall(<... resuming interrupted call ...>
<unfinished ...>
[pid 18277] write(8, "\2764571\t4605\t5_GQbZ3ywXsN1/1\t7\t-\n", 31) = ?
ERESTARTSYS (To be restarted)
[pid 18277] --- SIGINT (Interrupt) @ 0 (0) ---
[pid 18277] write(4, "!", 1 <unfinished ...>
[pid 18278] <... restart_syscall resumed> ) = 1
[pid 18277] <... write resumed> )       = 1
[pid 18278] read(3,  <unfinished ...>
[pid 18277] rt_sigreturn(0x2 <unfinished ...>
[pid 18278] <... read resumed> "!", 1024) = 1
[pid 18277] <... rt_sigreturn resumed> ) = -1 EINTR (Interrupted system
call)
[pid 18278] read(3,  <unfinished ...>
[pid 18277] write(8, "\2764571\t4605\t5_GQbZ3ywXsN1/1\t7\t-\n", 31
<unfinished ...>
[pid 18278] <... read resumed> 0x7f2fc6d28020, 1024) = -1 EAGAIN
(Resource temporarily unavailable)
[pid 18278] read(5, 0x7f2fc6d28020, 1024) = -1 EAGAIN (Resource
temporarily unavailable)
[pid 18278] tgkill(18277, 18277, SIGVTALRM) = 0
[pid 18278] poll([{fd=3, events=POLLIN}], 1, 100 <unfinished ...>
[pid 18277] <... write resumed> )       = ? ERESTARTSYS (To be
restarted)
[pid 18277] --- SIGVTALRM (Virtual timer expired) @ 0 (0) ---
[pid 18277] rt_sigreturn(0x1a)          = -1 EINTR (Interrupted system
call)
[pid 18277] close(9)                    = 0
[pid 18277] open("/proc/self/maps", O_RDONLY) = 9
[pid 18277] getrlimit(RLIMIT_STACK, {rlim_cur=8192*1024,
rlim_max=RLIM_INFINITY}) = 0
[pid 18277] fstat(9, {st_mode=S_IFREG|0444, st_size=0, ...}) = 0
[pid 18277] mmap(NULL, 4096, PROT_READ|PROT_WRITE,
MAP_PRIVATE|MAP_ANONYMOUS, -1, 0) = 0x7f2fc687f000
[pid 18277] read(9, "7f2fc4f1a000-7f2fc4f29000 r-xp 0"..., 1024) = 1024
[pid 18277] read(9, "           /usr/local/lib/ruby/2"..., 1024) = 1024
[pid 18277] read(9, "0000 08:01 260629               "..., 1024) = 1024
[pid 18277] read(9, "24a000 rw-p 00003000 08:01 26062"..., 1024) = 1024
[pid 18277] read(9, ":00 0 \n7f2fc666e000-7f2fc668e000"..., 1024) = 1024
[pid 18277] read(9, "fff1d5fb000 rw-p 00000000 00:00 "..., 1024) = 231
[pid 18277] close(9)                    = 0
[pid 18277] munmap(0x7f2fc687f000, 4096) = 0
[pid 18277] sched_getaffinity(18277, 32, {ffff, 0, 0, 0}) = 32
[pid 18277] write(2, "./pipes.rb:50:in `write'", 24) = 24
[pid 18277] write(2, ": ", 2)           = 2
[pid 18277] write(2, "Interrupt", 9)    = 9
[pid 18277] write(2, "\n", 1)           = 1
[pid 18277] write(2, "\tfrom ./pipes.rb:50:in `flush'\n", 31) = 31
[pid 18277] write(2, "\tfrom ./pipes.rb:50:in `block (2"..., 49) = 49
[pid 18277] write(2, "\tfrom ./pipes.rb:50:in `each'\n", 30) = 30
[pid 18277] write(2, "\tfrom ./pipes.rb:50:in `block in"..., 38) = 38
[pid 18277] write(2, "\tfrom ./pipes.rb:49:in `open'\n", 30) = 30
[pid 18277] write(2, "\tfrom ./pipes.rb:49:in `cat'\n", 29) = 29
[pid 18277] write(2, "\tfrom ./pipes.rb:43:in `run'\n", 29) = 29
[pid 18277] write(2, "\tfrom ./pipes.rb:27:in `block in"..., 38) = 38
[pid 18277] write(2, "\tfrom /usr/local/lib/ruby/gems/2"..., 103) = 103
[pid 18277] write(2, "\tfrom /usr/local/lib/ruby/gems/2"..., 87) = 87
[pid 18277] write(2, "\tfrom /usr/local/lib/ruby/gems/2"..., 98) = 98
[pid 18277] write(2, "\tfrom /usr/local/lib/ruby/gems/2"..., 94) = 94
[pid 18277] write(2, "\tfrom /usr/local/lib/ruby/gems/2"..., 86) = 86
[pid 18277] write(2, "\tfrom /usr/local/lib/ruby/gems/2"..., 86) = 86
[pid 18277] write(2, "\tfrom ./pipes.rb:27:in `run'\n", 29) = 29
[pid 18277] write(2, "\tfrom ./pipes.rb:67:in `<main>'\n", 32) = 32
[pid 18277] rt_sigaction(SIGINT, {SIG_IGN, [], SA_RESTORER|SA_SIGINFO,
0x7f2fc6461030}, {0x7f2fc69776f0, [], SA_RESTORER|SA_SIGINFO,
0x7f2fc6461030}, 8) = 0
[pid 18277] rt_sigaction(SIGINT, {SIG_DFL, [], SA_RESTORER|SA_SIGINFO,
0x7f2fc6461030}, {SIG_IGN, [], SA_RESTORER|SA_SIGINFO, 0x7f2fc6461030},
8) = 0
[pid 18277] close(7)                    = 0
[pid 18277] close(8)                    = 0
[pid 18277] write(4, "!", 1 <unfinished ...>
[pid 18278] <... poll resumed> )        = 1 ([{fd=3, revents=POLLIN}])
[pid 18277] <... write resumed> )       = 1
[pid 18278] read(3,  <unfinished ...>
[pid 18277] futex(0x7f2fc688a9d0, FUTEX_WAIT, 18278, NULL <unfinished
...>
[pid 18278] <... read resumed> "!", 1024) = 1
[pid 18278] read(3, 0x7f2fc6d28020, 1024) = -1 EAGAIN (Resource
temporarily unavailable)
[pid 18278] read(5, 0x7f2fc6d28020, 1024) = -1 EAGAIN (Resource
temporarily unavailable)
[pid 18278] _exit(0)                    = ?
Process 18278 detached
<... futex resumed> )                   = 0
munmap(0x7f2fc6770000, 1052672)         = 0
rt_sigaction(SIGINT, {SIG_DFL, [INT], SA_RESTORER|SA_RESTART,
0x7f2fc58354f0}, {SIG_DFL, [], SA_RESTORER|SA_SIGINFO, 0x7f2fc6461030},
8) = 0
tgkill(18277, 18277, SIGINT)            = 0
--- SIGINT (Interrupt) @ 0 (0) ---
Process 18277 detached
18813f71506ebad74179bf8c5a136696?d=identicon&s=25 Eric Wong (Guest)
on 2014-01-19 11:47
(Received via mailing list)
Martin Hansen <lists@ruby-forum.com> wrote:
> Right (cryptic output IMHO):

You'll have to figure that all out and what it means.

I'm just pointing you towards to some of the tools available.  Leave the
processes running while you strace; run strace on each process in
multiple terminals, look at "ps axf" output, etc...
E0d864d9677f3c1482a20152b7cac0e2?d=identicon&s=25 Robert Klemme (robert_k78)
on 2014-01-19 12:40
(Received via mailing list)
On Sun, Jan 19, 2014 at 11:06 AM, Martin Hansen <lists@ruby-forum.com>
wrote:
> Right (cryptic output IMHO):
>
>
> maasha@mao:~/scratch$ strace -p 18272
> Process 18272 attached - interrupt to quit
> write(8, "\2764571\t4605\t5_GQbZ3ywXsN1/1\t7\t-\n", 31) = ? ERESTARTSYS
> (To be restarted)

A lot is missing before that.  To find out what object a file
descriptor points to it's usually better to start the process with
"strace -f" and not attach later.  Because if you attach later then
the open() calls are usually not contained.  I would try something
like

$ strace -o trace -ff -ttt your/ruby/process with args

This will write one trace file per process called "trace.<PID>" and
follow forks (-o trace and -ff) and print timestamps with microsecond
resolution (-ttt).  I suggest the latter over -r because then you can
align events from different processes if need be.

It seems the trace mostly contains reaction to you killing the process
with SIGINT (similar for the other trace):

> fstat(9, {st_mode=S_IFREG|0444, st_size=0, ...}) = 0
> sched_getaffinity(18272, 32, {ffff, 0, 0, 0}) = 32
> write(2, "\tfrom ./pipes.rb:43:in `run'\n", 29) = 29
> 0x7fd49e617030}, {0x7fd49eb2d6f0, [], SA_RESTORER|SA_SIGINFO,
> 8) = 0
> tgkill(18272, 18272, SIGINT)            = 0
> --- SIGINT (Interrupt) @ 0 (0) ---
> Process 18272 detached

> [pid 18278] read(5, 0x7f2fc6d28020, 1024) = -1 EAGAIN (Resource
> temporarily unavailable)

Maybe lines above give an indication that the reader is actually
blocked because there is no data.  (Ruby MRI internally uses
nonblocking IO to multiplex threads.)


Kind regards

robert
386571a6fd12fc72ff39593da1af15c3?d=identicon&s=25 Martin Hansen (maasha)
on 2014-01-19 13:55
OK, the output of strace -o trace -ff -ttt ./pipes.rb produces two
files:

ftp://ftp_20140119_17561:eLNy4r+fAN27@ftp.dna.ku.dk

I have yet to figure out what they mean.


Cheers,


Martin
E0d864d9677f3c1482a20152b7cac0e2?d=identicon&s=25 Robert Klemme (robert_k78)
on 2014-01-19 14:57
(Received via mailing list)
On Sun, Jan 19, 2014 at 1:55 PM, Martin Hansen <lists@ruby-forum.com>
wrote:
> OK, the output of strace -o trace -ff -ttt ./pipes.rb produces two
> files:
>
> ftp://ftp_20140119_17561:eLNy4r+fAN27@ftp.dna.ku.dk

Thank you!

> I have yet to figure out what they mean.

I can share a few observations:

The parent creates three pipes:
$ fgrep pipe2 trace.1*
trace.18386:1390135219.046131 pipe2([3, 4], O_CLOEXEC) = 0
trace.18386:1390135219.046295 pipe2([5, 6], O_CLOEXEC) = 0
trace.18386:1390135219.112926 pipe2([7, 8], O_CLOEXEC) = 0

The client reads only from FD 3 and 5:

$ fgrep read trace.18387
1390135375.259220 read(3, "!", 1024)    = 1
1390135375.259410 read(3, 0x7fb63c823020, 1024) = -1 EAGAIN (Resource
temporarily unavailable)
1390135375.259663 read(5, 0x7fb63c823020, 1024) = -1 EAGAIN (Resource
temporarily unavailable)
1390135375.261880 read(3, "!", 1024)    = 1
1390135375.261910 read(3, 0x7fb63c823020, 1024) = -1 EAGAIN (Resource
temporarily unavailable)
1390135375.261932 read(5, 0x7fb63c823020, 1024) = -1 EAGAIN (Resource
temporarily unavailable)

The parent writes to 2 (stderr) and 4 and 8
$ egrep -o 'write\([0-9]+' trace.18386 | sort -u
write(2
write(4
write(8

But to 4 only at the end
$ fgrep 'write(4' trace.18386
1390135375.258855 write(4, "!", 1)      = 1
1390135375.261804 write(4, "!", 1)      = 1

Main output from file "big.tab" goes to 8:

$ fgrep -c 'write(8' trace.18386
1924

Now, as you can see 8 is connected to 7 but since the child never
reads from 7 no data is transferred to it.

If you look at the ordering of pipe2 and clone you see that the client
is created before the last pipe is opened:

$ egrep '^[0-9]+\.[0-9]+ (pipe|clone)' trace.18386
1390135219.046131 pipe2([3, 4], O_CLOEXEC) = 0
1390135219.046295 pipe2([5, 6], O_CLOEXEC) = 0
1390135219.046513 clone(child_stack=0x7fb63c384ff0,
flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID,
parent_tidptr=0x7fb63c3859d0, tls=0x7fb63c385700,
child_tidptr=0x7fb63c3859d0) = 18387
1390135219.112926 pipe2([7, 8], O_CLOEXEC) = 0

So maybe it cannot read from FD 7 which it would need to do to read
what the parent writes to FD 8. But see at the end.

The strange thing is that almost all write operations are successful
because as many bytes are written as indicated:

$ egrep -c 'write\(8,.* ([0-9]+)\) = \1' trace.18386
1922
$ egrep -c 'write\(8,.* ([0-9]+)\) =' trace.18386
1924

$ diff -U5 <(egrep 'write\(8,.* ([0-9]+)\) =' trace.18386) <(egrep
'write\(8,.* ([0-9]+)\) = \1' trace.18386)
--- /dev/fd/63 2014-01-19 14:33:25.426820557 +0100
+++ /dev/fd/62 2014-01-19 14:33:25.426820557 +0100
@@ -1918,7 +1918,5 @@
 1390135219.160905 write(8, "\332\0
4559\t4602\t3_cpVRyxwXsN1/2\t592"..., 35) = 35
 1390135219.160929 write(8, "\332\0
4563\t4597\t5_X4035ywXsN1/1\t833"..., 35) = 35
 1390135219.160954 write(8, "\332\0
4565\t4599\t1_bWkzlxwXsN1/1\t442"..., 35) = 35
 1390135219.160980 write(8, "\332\0
4568\t4611\t1_TQZWsxwXsN1/2\t540"..., 35) = 35
 1390135219.161004 write(8, "\332\0
4570\t4613\t3_gQoPtxwXsN1/2\t196"..., 35) = 35
-1390135219.161029 write(8, "\2764571\t4605\t5_GQbZ3ywXsN1/1\t7\t-\n",
31) = ? ERESTARTSYS (To be restarted)
-1390135375.259425 write(8, "\2764571\t4605\t5_GQbZ3ywXsN1/1\t7\t-\n",
31) = ? ERESTARTSYS (To be restarted)

The last two ones happen because of the signal I believe:

1390135219.161029 write(8, "\2764571\t4605\t5_GQbZ3ywXsN1/1\t7\t-\n",
31) = ? ERESTARTSYS (To be restarted)
1390135375.258745 --- SIGINT (Interrupt) @ 0 (0) ---
1390135375.258855 write(4, "!", 1)      = 1
1390135375.259234 rt_sigreturn(0x2)     = -1 EINTR (Interrupted system
call)
1390135375.259425 write(8, "\2764571\t4605\t5_GQbZ3ywXsN1/1\t7\t-\n",
31) = ? ERESTARTSYS (To be restarted)
1390135375.259784 --- SIGVTALRM (Virtual timer expired) @ 0 (0) ---
1390135375.259836 rt_sigreturn(0x1a)    = -1 EINTR (Interrupted system
call)

All in all 65266 bytes are written:

$ awk '/write\(8,.* = [0-9]+/ {sum+=$NF} END{print sum}' trace.18386
65266

This is 270 bytes short of 64k. I have no idea whether you happened to
kill the parent before it got into the stuck state. You could check
your pipe buffer size with "ulimit -p". On linux this gives buffer
size in 512 byte blocks. On my machine it returns 8 which is 4k which
also happens to be the memory page size but of course it may be
different on your system.

Maybe you repeat the test and wait a bit longer to see whether the
parent gets into some kind of blocking state.

I looked a bit closer at flags passed to clone() and it seems the
child is a thread only:

1390135219.046513 clone(child_stack=0x7fb63c384ff0,
flags=CLONE_VM (same memory space)
|CLONE_FS (same filesystem information)
|CLONE_FILES (share the same file descriptor table.)
|CLONE_SIGHAND (share the same table of signal handlers)
|CLONE_THREAD (same thread group as the calling process)
|CLONE_SYSVSEM (share a single list of System V semaphore undo values
|CLONE_SETTLS (The newtls argument is the new TLS (Thread Local
Storage) descriptor.)
|CLONE_PARENT_SETTID (Store child thread ID at location ptid in parent
and child memory)
|CLONE_CHILD_CLEARTID (Erase  child  thread  ID  at location ctid in
child memory when the child exits,)

So, since both share the same FD table the client _could_ read from FD
7 but apparently nobody does.  So apparently you have a pipe which is
written to but nobody reads from it.  At some point in time the writer
then must block since buffers are limited - always.

Kind regards

robert
386571a6fd12fc72ff39593da1af15c3?d=identicon&s=25 Martin Hansen (maasha)
on 2014-01-19 18:47
Thanks Robert,

As usual your assistance and insight is awesome.

I did let the strace run for a while before, and tested a couple of new
ones that was run for a minute or so, but since the traces contain the
same number of lines, I don't think run time was an issue.

I don't understand why 3 pipes (pipe pairs?) are reported, when IO.pipe
is called only once and fork is disabled (also disabling msgpack doesn't
change the number of pipes).

"ulimit -p" on my linux system yields 8 and that indicates a 64K buffer.
On my Mac it yields 1?

Anyways, I did some more tests/straces with Parallel enabled:

https://gist.github.com/maasha/5c6a914df0a79657714a

Since this version uses forks it means that a separate process is
reading from the process that is writing (that's my idea at least). This
works to the degree that all of the data is being dumped, but then the
script hangs - I suspect that the pipe is not closed and freed
correctly. I uploaded the traces from this test to trace_2step.tar to
ftp://ftp_20140119_17561:eLNy4r+fAN27@ftp.dna.ku.dk

Moreover, adding a further step in line 67 in the above gist:

p.add(:cat, input: "big.tab").add(:cat, input: "table.txt").add(:dump)

and it hangs after dumping big.tab. Again, I suspect bad pipe hangling
and I uploaded the traces from that one as well as trace_3step.tar. In
fact I uploaded code, test files and traces.


Cheers,


Martin
E0d864d9677f3c1482a20152b7cac0e2?d=identicon&s=25 Robert Klemme (robert_k78)
on 2014-01-20 08:49
(Received via mailing list)
On Sun, Jan 19, 2014 at 6:47 PM, Martin Hansen <lists@ruby-forum.com>
wrote:
> Thanks Robert,
>
> As usual your assistance and insight is awesome.

You're welcome!

> I did let the strace run for a while before, and tested a couple of new
> ones that was run for a minute or so, but since the traces contain the
> same number of lines, I don't think run time was an issue.
>
> I don't understand why 3 pipes (pipe pairs?) are reported, when IO.pipe
> is called only once and fork is disabled (also disabling msgpack doesn't
> change the number of pipes).
>
> "ulimit -p" on my linux system yields 8 and that indicates a 64K buffer.
> On my Mac it yields 1?

Does 8 really mean 64k?  I would have assumed that all flavors of
Linux use 512 byte blocks which would mean 4k - same as on my box.

>
> Moreover, adding a further step in line 67 in the above gist:
>
> p.add(:cat, input: "big.tab").add(:cat, input: "table.txt").add(:dump)
>
> and it hangs after dumping big.tab. Again, I suspect bad pipe hangling
> and I uploaded the traces from that one as well as trace_3step.tar. In
> fact I uploaded code, test files and traces.

I don't have time right now to dive into yet another trace but I did
look at documentation of parallel. As far as I can tell from brief
inspection this is implementing a farmer worker scenario, i.e. you
have a queue of work items and processors (thread or process) pick
them up one after the other and execute. But if I understand what you
are doing properly this is not what you want.  Basically you want all
tasks to run in parallel and the work items are taken from somewhere
else (i.e. the task at the head of the FIFO pipeline reads them from a
file) and piped through. In other words: it seems you are not using
the proper framework for your problem at hand. :-)

Kind regards

robert
5a837592409354297424994e8d62f722?d=identicon&s=25 Ryan Davis (Guest)
on 2014-01-20 10:14
(Received via mailing list)
On Jan 19, 2014, at 1:17, Eric Wong <normalperson@yhbt.net> wrote:

>   # I would've written it in Ruby but I couldn't find a graphing
>   # library in Ruby with ASCII/UTF-8 text output.

If I'm allowed to use aalib, I can have one for you in about 10 more
lines of code. :P
18813f71506ebad74179bf8c5a136696?d=identicon&s=25 Eric Wong (Guest)
on 2014-01-20 11:25
(Received via mailing list)
Ryan Davis <ryand-ruby@zenspider.com> wrote:
> On Jan 19, 2014, at 1:17, Eric Wong <normalperson@yhbt.net> wrote:
> >   # I would've written it in Ruby but I couldn't find a graphing
> >   # library in Ruby with ASCII/UTF-8 text output.
>
> If I'm allowed to use aalib, I can have one for you in about 10 more
> lines of code. :P

Sure, I'd try it!
E0d864d9677f3c1482a20152b7cac0e2?d=identicon&s=25 Robert Klemme (robert_k78)
on 2014-01-20 13:55
(Received via mailing list)
On Sun, Jan 19, 2014 at 6:47 PM, Martin Hansen <lists@ruby-forum.com>
wrote:

> and it hangs after dumping big.tab. Again, I suspect bad pipe hangling
> and I uploaded the traces from that one as well as trace_3step.tar. In
> fact I uploaded code, test files and traces.

One more potential source of issues: you open all the pipes from the
main process.  However, child processes will inherit those pipe
objects and if they are not closed there they will keep the pipeline
alive. That might be the issue you face.

Cheers

robert
386571a6fd12fc72ff39593da1af15c3?d=identicon&s=25 Martin Hansen (maasha)
on 2014-01-20 20:37
@Robert, that is of cause something I need to look into. I did notice
that IO.pipe have a child/parent relationship from the docs, but some
clever guy on IRC #ruby said it would probably be OK to use. I was
actually using named pipes in an earlier version - and those have no
child/parent bindings - but the script would still hang for reasons
unknown.

For now I will do some forking manually to avoid Parallel.


Cheers,


Martin
386571a6fd12fc72ff39593da1af15c3?d=identicon&s=25 Martin Hansen (maasha)
on 2014-01-20 22:24
This Parallel-less version is more of less following the example from
the IO.pipe docs, however, even if I can connect the commands in the
pipeline I fail to see how I can then run the commands:

  def run
    @commands.each_cons(2) do |parent, child|
      reader, writer = IO.pipe

      if fork
        writer.close
        child.input = MessagePack::Unpacker.new(reader)
        reader.close
        Process.wait
      else
        reader.close
        parent.output = MessagePack::Packer.new(writer)
        writer.close
      end
    end

    self
  end

I better sleep on it :o/


Martin
E0d864d9677f3c1482a20152b7cac0e2?d=identicon&s=25 Robert Klemme (robert_k78)
on 2014-01-21 09:27
(Received via mailing list)
On Mon, Jan 20, 2014 at 10:24 PM, Martin Hansen <lists@ruby-forum.com>
wrote:
> This Parallel-less version is more of less following the example from
> the IO.pipe docs, however, even if I can connect the commands in the
> pipeline I fail to see how I can then run the commands:

I have a solution which connects commands but it still suffers from
your original issue (i.e. not terminating properly).

> I better sleep on it :o/

Good plan!

Kind regards

robert
E0d864d9677f3c1482a20152b7cac0e2?d=identicon&s=25 Robert Klemme (robert_k78)
on 2014-01-21 11:25
(Received via mailing list)
On Mon, Jan 20, 2014 at 1:54 PM, Robert Klemme
<shortcutter@googlemail.com> wrote:
> On Sun, Jan 19, 2014 at 6:47 PM, Martin Hansen <lists@ruby-forum.com> wrote:
>
>> and it hangs after dumping big.tab. Again, I suspect bad pipe hangling
>> and I uploaded the traces from that one as well as trace_3step.tar. In
>> fact I uploaded code, test files and traces.
>
> One more potential source of issues: you open all the pipes from the
> main process.  However, child processes will inherit those pipe
> objects and if they are not closed there they will keep the pipeline
> alive. That might be the issue you face.

Here's a demo of what I mean:

#!/usr/bin/ruby

r, w = IO.pipe

p2 = fork {
  w.close
  r.each do |line|
    line.chomp!
    puts "Got: '#{line}'"
  end
}

p1 = fork {
  r.close
  10.times {|i| w.puts i}
  w.close
}

r.close
# w.close # uncomment to make hang go away

puts "waiting for #{p2}..."
Process.waitpid p2
puts "finished"


The situation becomes worse with multiple processes and pipes being
generated because they are all inherited by all the processes.

I created a solution that seems to work:
https://gist.github.com/rklemme/8537604

Main difference to your API: I use lambdas as abstraction of work to
do because this is more flexible than your approach with methods in
the Pipe class.  Lambads receive an IO to read from and one to write
to.

Kind regards

robert
386571a6fd12fc72ff39593da1af15c3?d=identicon&s=25 Martin Hansen (maasha)
on 2014-01-21 21:08
Robert, I appreciate your effort. I still have a hard time wrapping my
head around fork and closing dangling file handles - just as you pointed
out. Also, lambdas, smart as they may be, are less readable to me, and I
would think also my target audience (biologists). I have decided to
stick to my proposed syntax for setting up and executing pipes, which is
most flexible and easy to grasp (and will work excellently in irb):

require 'pipe'

p1 = Pipe.new.add(:cat, input: "test_in").add(:grep, pattern: "foo")
p2 = Pipe.new.add(:save, output: "test_out" )

(p1 + p2).add(:dump).run

Also, I am aiming for some +100 commands here, where some will be quite
advanced - and I am afraid of lambdas for this. In fact, I am
experimenting to come up with the next generation of Biopieces
(www.biopieces.org).

Here is a version with named pipes that works (though still with
dangling file handles):

https://gist.github.com/maasha/3d0d9299e5826900af63

Named pipes don't have the parent/child binding of IO.pipe, so they work
with the parallel gem. However, the stream terminator "\0" I use is
quite hackery. I could possible keep track of the number of records
passed around instead. Or get those dangling file handles sorted?

Finally, I wonder about performance of IO.pipe versus named pipes - I
want to do a benchmark. Actually, I am concerned about overall
performance; this is of cause not C or even well written Ruby code
optimized to a specific task, but rather a general purpose way of
setting up pipes and executing them as simply as possible. I figure that
30 minutes writing a script that runs for 1 minute is more often less
appealing than a 1 minute script that runs for 30 minutes.


Cheers,


Martin
386571a6fd12fc72ff39593da1af15c3?d=identicon&s=25 Martin Hansen (maasha)
on 2014-01-21 21:58
Benchmark of IO.pipe vs named pipe:

maasha@mao:~$ cat benchmark.rb
#!/usr/bin/env ruby

require 'benchmark'

system("mkfifo my_pipe") unless File.exists? "my_pipe"

rd, wt   = IO.pipe
nrd      = File.open("my_pipe", File::RDONLY | File::NONBLOCK)
nwt      = File.open("my_pipe", File::WRONLY | File::NONBLOCK)

n = 1_000_000

Benchmark.bm() do |x|
  x.report("IO.pipe")    { n.times { wt.puts "foo"; wt.flush; rd.gets }
}
  x.report("named pipe") { n.times { nwt.puts "foo"; nwt.flush; nrd.gets
} }
end

maasha@mao:~$ ./benchmark.rb
       user     system      total        real
IO.pipe  1.360000   0.750000   2.110000 (  2.118993)
named pipe  1.280000   0.610000   1.890000 (  1.894353)
E0d864d9677f3c1482a20152b7cac0e2?d=identicon&s=25 Robert Klemme (robert_k78)
on 2014-01-21 23:50
(Received via mailing list)
On Tue, Jan 21, 2014 at 9:08 PM, Martin Hansen <lists@ruby-forum.com>
wrote:
> Robert, I appreciate your effort. I still have a hard time wrapping my
> head around fork and closing dangling file handles - just as you pointed
> out.

Did you look at the code I provided (the gist)? Basically you need to
close all file handles that you do not need in a process which
includes all previously opened pipes and the write end of the current
pipe.  I also did the optimization that the head of the pipeline is
executed in the current process.

> Also, lambdas, smart as they may be, are less readable to me, and I
> would think also my target audience (biologists). I have decided to
> stick to my proposed syntax for setting up and executing pipes, which is
> most flexible and easy to grasp (and will work excellently in irb):

I'll try to convince you to reconsider. First, with the approach you
have taken, you need to define a method inside the Pipe class for
*every* functionality that you want to have.  By that you make the
Pipe class multi purpose (executing pipelines in multiple processes
AND a lot individual functionality).  As a consequence the provider of
the Pipe class (you) ought to provide all the algorithms in form of
methods. Of course users of the class can open it again and add
methods (e.g. their own version of "cat" or whatnot).  But that opens
the door for name conflicts, issues with instance variables etc.

In software engineering it is usually considered good practice to only
condense one purpose in a class.  For class Pipe (which should
probably rather be called "Pipeline") it is executing multiple
operations in various processes (or even threads as my class offers).
By using lambda as abstraction for an anonymous method you have
maximum flexibility to provide any functionality.  The only contract
is that the lambda will be invoked with two IOs for reading and
writing.

Even if you dislike lambdas, it is not too hard to come up with a
syntax that resembles yours!  And that is probably the best news for
you.  Here are some examples (for "cat" only) and how they look like
when creating the pipeline:

CAT = lambda do |*args|
  lambda do |io_in, io_out|
    args.each do |file|
      File.foreach(file) {|line| io_out.puts line}
    end
  end
end

pipe.add(CAT["test_in"]).add(GREP[/foo.*bar/])
pipe << CAT["test_in"] << GREP[/foo.*bar/]
pipe | CAT["test_in"] | GREP[/foo.*bar/]

(Note, I added operator "|".)

Other forms to create those lambdas - basically always following the
same approach to use a closure to capture arguments for later
execution:

def cat(*args)
  lambda do |io_in, io_out|
    args.each do |file|
      File.foreach(file) {|line| io_out.puts line}
    end
  end
end

pipe.add(cat("test_in")).add(grep(/foo.*bar/))
pipe << cat("test_in") << grep(/foo.*bar/)
pipe | cat("test_in") | grep(/foo.*bar/)

def Cat(*args)
  lambda do |io_in, io_out|
    args.each do |file|
      File.foreach(file) {|line| io_out.puts line}
    end
  end
end

pipe.add(Cat("test_in")).add(Grep(/foo.*bar/))
pipe << Cat("test_in") << Grep(/foo.*bar/)
pipe | Cat("test_in") | Grep(/foo.*bar/)


> require 'pipe'
>
> p1 = Pipe.new.add(:cat, input: "test_in").add(:grep, pattern: "foo")
> p2 = Pipe.new.add(:save, output: "test_out" )
>
> (p1 + p2).add(:dump).run

Easily done.

> Also, I am aiming for some +100 commands here, where some will be quite
> advanced - and I am afraid of lambdas for this.

Why?  Your lambda can even be a simple adapter if you want to make use
of multiple other classes.

> In fact, I am
> experimenting to come up with the next generation of Biopieces
> (www.biopieces.org).
>
> Here is a version with named pipes that works (though still with
> dangling file handles):
>
> https://gist.github.com/maasha/3d0d9299e5826900af63

I have updated my version but there is the opposite problem: one of
the IOs is closed too early. :-)  You can see current state of affairs
in the gist.  I have to go to bed now.

> Named pipes don't have the parent/child binding of IO.pipe, so they work
> with the parallel gem. However, the stream terminator "\0" I use is
> quite hackery. I could possible keep track of the number of records
> passed around instead. Or get those dangling file handles sorted?

Better that.

> Finally, I wonder about performance of IO.pipe versus named pipes - I
> want to do a benchmark. Actually, I am concerned about overall
> performance; this is of cause not C or even well written Ruby code
> optimized to a specific task, but rather a general purpose way of
> setting up pipes and executing them as simply as possible. I figure that
> 30 minutes writing a script that runs for 1 minute is more often less
> appealing than a 1 minute script that runs for 30 minutes.

:-)

There is another disadvantages of names pipes: you need to make sure
that names do not collide.  Also, there might be security
implications.  The approach with nameless pipes is certainly robuster.

Kind regards

robert
E0d864d9677f3c1482a20152b7cac0e2?d=identicon&s=25 Robert Klemme (robert_k78)
on 2014-01-22 11:35
(Received via mailing list)
On Tue, Jan 21, 2014 at 11:50 PM, Robert Klemme
<shortcutter@googlemail.com> wrote:

> I have updated my version but there is the opposite problem: one of
> the IOs is closed too early. :-)  You can see current state of affairs
> in the gist.  I have to go to bed now.

I fixed it:
https://gist.github.com/rklemme/8537604

My FD closing scheme was broken because the parent process had the FDs
open far too long. I now changed it that FDs are closed early as
possible.  I also move the execution code into separate classes for
thread and process execution to better separate the code and reduce
number of arguments of methods.

Kind regards

robert
386571a6fd12fc72ff39593da1af15c3?d=identicon&s=25 Martin Hansen (maasha)
on 2014-01-22 12:43
Robert, this is indeed marvelous. I guess my problem with lambda, proc
and closures is lack of experience - there is a bit of a hurdle to get
over since abstraction is, well, abstract... Anyways, you make a strong
case, so I will stick to your suggestions!

Now, I totally believe in "separation of concerns" and this is exactly
what I am trying to achieve with these pipelines. The idea is to pass
data records (hashes marshaled with msgpack)  through all commands in
the pipeline. Certain commands will respond to records with appropriate
data and act upon this and sometimes emit incoming records as well as
any newly produced to the next command in the pipeline. Therefore I will
need a command like the below CAT that emits all incoming records and
any new records from the specified file. So I tried the below with your
code, but it hangs :o( ?



CAT = lambda do |*args|
  lambda do |io_in, io_out|
    io_in.each_line {|line| line.chomp!; io_out.puts line}

    args.each do |file|
      File.foreach(file) {|line| io_out.puts line}
    end
  end
end

p = Pipe.new
p << CAT["table.txt"]
p << CAT["foo.tab"]
p << lambda {|r,w| r.each {|x| puts x}}

p.execute_processes
E0d864d9677f3c1482a20152b7cac0e2?d=identicon&s=25 Robert Klemme (robert_k78)
on 2014-01-22 13:44
(Received via mailing list)
On Wed, Jan 22, 2014 at 12:43 PM, Martin Hansen <lists@ruby-forum.com>
wrote:
> Robert, this is indeed marvelous. I guess my problem with lambda, proc
> and closures is lack of experience - there is a bit of a hurdle to get
> over since abstraction is, well, abstract... Anyways, you make a strong
> case, so I will stick to your suggestions!

:-)

> Now, I totally believe in "separation of concerns" and this is exactly
> what I am trying to achieve with these pipelines. The idea is to pass
> data records (hashes marshaled with msgpack)

Btw. why do you use msgpack?  Is there any advantage over Marshal?  If
you want to use individual messages for exchange instead of the raw
binary stream you could maybe add another layer which will handle
serialization and deserialization so individual lambdas receive only
one argument (an object) and the returned value will be sent off to
the next stage.

>  through all commands in
> the pipeline. Certain commands will respond to records with appropriate
> data and act upon this and sometimes emit incoming records as well as
> any newly produced to the next command in the pipeline. Therefore I will
> need a command like the below CAT that emits all incoming records and
> any new records from the specified file. So I tried the below with your
> code, but it hangs :o( ?

That's because your first CAT tries to read from $stdin.  If you want
to append multiple files you can remove the first line from the
definition and use

p << CAT["table.txt", "foo.tab"]

> CAT = lambda do |*args|
>   lambda do |io_in, io_out|

blocks in this line:
> p << CAT["foo.tab"]
> p << lambda {|r,w| r.each {|x| puts x}}

Kind regards

robert
386571a6fd12fc72ff39593da1af15c3?d=identicon&s=25 Martin Hansen (maasha)
on 2014-01-22 14:27
Serializing and deserializing is a bottleneck and msgpack is fast:

maasha@mel:~$ cat benchmark.rb
#!/usr/bin/env ruby

require 'benchmark'
require 'msgpack'

n = 100_000

h = {
  zero:  0,
  one:   1,
  two:   2,
  three: 3,
  four:  4,
  five:  5,
  six:   6
}

Benchmark.bm() do |x|
  x.report("Marshal") { n.times { m = Marshal.dump h; u = Marshal.load m
} }
  x.report("MsgPack") { n.times { m = h.to_msgpack;   u =
MessagePack.unpack m } }
end

maasha@mel:~$ ./benchmark.rb
       user     system      total        real
Marshal  1.770000   0.000000   1.770000 (  1.774334)
MsgPack  0.900000   0.020000   0.920000 (  0.921232)


Now concerning CAT: This is a placeholder for commands that will read in
data in particular formats from specified files. I have found time and
time again that it is extremely useful (using Biopieces 1.0) to be able
to process data like this:

p = Pipe.new
p << CAT["file1"]
p << GREP["/foobar/"]
p << CAT["file2]
p.execute_processes

I know that this is not the way UNIX cat works, and that BASH can do
stuff like this neatly. But this is not UNIX and UNIX cat. I really
would like to let ALL commands read any incoming records and emit them
again (pending given options) along with any newly created records.


Cheers,


Martin
E0d864d9677f3c1482a20152b7cac0e2?d=identicon&s=25 Robert Klemme (robert_k78)
on 2014-01-22 14:44
(Received via mailing list)
On Wed, Jan 22, 2014 at 2:27 PM, Martin Hansen <lists@ruby-forum.com>
wrote:
> h = {
>   x.report("Marshal") { n.times { m = Marshal.dump h; u = Marshal.load m
> } }
>   x.report("MsgPack") { n.times { m = h.to_msgpack;   u =
> MessagePack.unpack m } }
> end
>
> maasha@mel:~$ ./benchmark.rb
>        user     system      total        real
> Marshal  1.770000   0.000000   1.770000 (  1.774334)
> MsgPack  0.900000   0.020000   0.920000 (  0.921232)

Impressive. Although it does not look as if Marshal was really slow.

>
> I know that this is not the way UNIX cat works, and that BASH can do
> stuff like this neatly. But this is not UNIX and UNIX cat. I really
> would like to let ALL commands read any incoming records and emit them
> again (pending given options) along with any newly created records.

Well, but then you need a way to decide whether you want to read from
the pipe or not because the first CAT will always block on $stdio.  I
suggest to do it like the UNIX cat and use "-" as identifier for
stdin:

CAT = lambda do |*args|
  args << '-' if args.empty?

  lambda do |io_in, io_out|
    args.each do |file|
      case file
      when "-"
        io_in.each_line {|line| line.chomp!; io_out.puts line}
      else
        File.foreach(file) {|line| io_out.puts line}
      end
    end
  end
end

Now you example above looks like this

p = Pipe.new
p << CAT["file1"]
p << GREP["/foobar/"]
p << CAT["-", "file2]
p.execute_processes

:-)

For using messages to pass through pipes I suggest to write a wrapper
class around Pipe which uses lambdas with a single argument and pass
the return value through the chain.

Kind regards

robert
386571a6fd12fc72ff39593da1af15c3?d=identicon&s=25 Martin Hansen (maasha)
on 2014-01-22 20:15
OK, the use of "-" inside the CAT lambda I could perhaps tolerate. But
in the invocation I think it looks silly. Now, I tried something crufty
instead that happens to work!?!:

CAT = lambda do |*args|
  lambda do |io_in, io_out|
    io_in.each_line {|line| line.chomp!; io_out.puts line} unless
io_in.inspect =~ /STDIN/

    args.each do |file|
      File.foreach(file) {|line| io_out.puts line}
    end
  end
end

p = Pipe.new
p << CAT["table.txt"]
p << CAT["foo.tab"]
p << lambda {|r,w| r.each {|x| puts x}}
p.execute_processes

One could perhaps patch class IO and add a stdin? method?


Cheers,


Martin
E0d864d9677f3c1482a20152b7cac0e2?d=identicon&s=25 Robert Klemme (robert_k78)
on 2014-01-22 20:43
(Received via mailing list)
On Wed, Jan 22, 2014 at 8:15 PM, Martin Hansen <lists@ruby-forum.com>
wrote:
> OK, the use of "-" inside the CAT lambda I could perhaps tolerate. But
> in the invocation I think it looks silly.

Why that?  You could also replace this with something else, e.g. symbol
:input.

> Now, I tried something crufty
> instead that happens to work!?!:
>
> CAT = lambda do |*args|
>   lambda do |io_in, io_out|
>     io_in.each_line {|line| line.chomp!; io_out.puts line} unless
> io_in.inspect =~ /STDIN/

IMHO it does not make sense to hard code this decision inside the
lambda.  This is too inflexible because the head of the pipeline could
be some other IO object OR you really want to read from stdin.  Plus,
if you make the decision in the argument list of the call you can
determine the position where you phase the input in.

> p.execute_processes
I don't understand why you insist on wasting another process if you
read input files sequentially anyway.

> One could perhaps patch class IO and add a stdin? method?

That is not necessary - see above.

Btw. in terms of efficiency it may be reasonable to implement CAT with
fixed buffer instead of line based:

CAT = lambda do |*args|
  args << :input if args.empty?

  lambda do |io_in, io_out|
    buffer = ''

    args.each do |file|
      case file
      when :input
        while (buffer = io_in.read(4_096, buffer))
          io_out.write(buffer)
        end
      else
        File.open(file) do |io|
          while (buffer = io.read(4_096, buffer))
            io_out.write(buffer)
          end
        end
      end
    end
  end
end

Kind regards

robert
0e6ac58dab6125c1cd2e7ac645076b6f?d=identicon&s=25 Joel VanderWerf (Guest)
on 2014-01-22 21:08
(Received via mailing list)
On 01/22/2014 04:43 AM, Robert Klemme wrote:
> On Wed, Jan 22, 2014 at 12:43 PM, Martin Hansen <lists@ruby-forum.com> wrote:
...
>> Now, I totally believe in "separation of concerns" and this is exactly
>> what I am trying to achieve with these pipelines. The idea is to pass
>> data records (hashes marshaled with msgpack)
>
> Btw. why do you use msgpack?  Is there any advantage over Marshal?  If
> you want to use individual messages for exchange instead of the raw
> binary stream you could maybe add another layer which will handle
> serialization and deserialization so individual lambdas receive only
> one argument (an object) and the returned value will be sent off to
> the next stage.

Some advantages to be aware of (not necessarily relevant to Martin's
system):

1. Msgpack can use non-blocking io (IO#readpartial). Using Marshal, if a
writer pauses while writing output, the reader must wait as well; the
reader's state cannot be saved while the thread does other work.

But msgpack's Unpacker#feed and #each methods can be used to buffer
incoming data and yield the fully parsed objects as they are available.
If the writer pauses in the middle of an object, the reader can make
progress on some other task, and then continue with the #feed and #each
when more data is available (checking with IO#select). (This is probably
more useful in a message broker design than in Martin's pipelines.) You
can also do this with yajl / json, BTW.[1]

2. You can write processing code in languages other than ruby.

And of course there is the speed and compactness, as Martin mentioned.

---

[1] I wrote a gem called object-stream that manages streams of objects
with different serializers (msgpack, marshal, yaml, yajl) and also with
different transports (File, Pipe, Socket, StringIO). Here's an example
that shows the non-blocking advantage of msgpack (or yajl), compared to
marshal and yaml:

https://github.com/vjoel/object-stream/blob/master...
386571a6fd12fc72ff39593da1af15c3?d=identicon&s=25 Martin Hansen (maasha)
on 2014-01-22 21:21
> Why that?  You could also replace this with something else, e.g. symbol
> :input.

I favor a minimalistic invocation and the "-" or a symbol appears
superfluous.

> I don't understand why you insist on wasting another process if you
> read input files sequentially anyway.

It is a common situation in bioinformatics that you need pieces of
information from several files in different format. One could be a table
and one could be something ala XML. I would create a READ_TABLE and
READ_XML command that read in AND parses these formats:

p = Pipe.new
p << READ_TABLE
p << READ_XML
p << DO_SCIENTIFIC_CALCULATION
p << WRITE_TABLE
p.execute_processes

I hope this explains the need for records to be passed along.

> Btw. in terms of efficiency it may be reasonable to implement CAT with
> fixed buffer instead of line based:

Eventually, there will not be a CAT command :o) - I have just used that
as an example. I will require commands for parsing nasty bioinformatic
formats like GenBank, GFF, SAM etc - and optimizing that is a whole
different story!

And Robert, I really truly value your help. It is most inspiring!


Cheers,


Martin
5a837592409354297424994e8d62f722?d=identicon&s=25 Ryan Davis (Guest)
on 2014-01-23 04:14
(Received via mailing list)
On Jan 22, 2014, at 12:21, Martin Hansen <lists@ruby-forum.com> wrote:

> p = Pipe.new
> p << READ_TABLE
> p << READ_XML
> p << DO_SCIENTIFIC_CALCULATION
> p << WRITE_TABLE
> p.execute_processes

I'm just gonna throw two ideas out there:

class Pipe
  def << work
    # whatever you need to do w/ work
    self
  end
end

such that:

p = Pipe.new
p << READ_TABLE
  << READ_XML
  << DO_SCIENTIFIC_CALCULATION
  << WRITE_TABLE
p.execute_processes

or:

class Pipe
  def | work
    # whatever you need to do w/ work
    self
  end
end

such that:

p = Pipe.new | READ_TABLE | READ_XML | DO_SCIENTIFIC_CALCULATION |
WRITE_TABLE
p.execute_processes
E0d864d9677f3c1482a20152b7cac0e2?d=identicon&s=25 Robert Klemme (robert_k78)
on 2014-01-23 10:24
(Received via mailing list)
On Wed, Jan 22, 2014 at 9:07 PM, Joel VanderWerf
<joelvanderwerf@gmail.com> wrote:

> Some advantages to be aware of (not necessarily relevant to Martin's
> system):
>
> 1. Msgpack can use non-blocking io (IO#readpartial). Using Marshal, if a
> writer pauses while writing output, the reader must wait as well; the
> reader's state cannot be saved while the thread does other work.

> 2. You can write processing code in languages other than ruby.

That's good to know! Thank you, Joel.

Kind regards

robert
E0d864d9677f3c1482a20152b7cac0e2?d=identicon&s=25 Robert Klemme (robert_k78)
on 2014-01-23 10:33
(Received via mailing list)
On Thu, Jan 23, 2014 at 4:13 AM, Ryan Davis <ryand-ruby@zenspider.com>
wrote:

> I'm just gonna throw two ideas out there:

Thank you for the consideration!

> p = Pipe.new
> p << READ_TABLE
>   << READ_XML
>   << DO_SCIENTIFIC_CALCULATION
>   << WRITE_TABLE
> p.execute_processes

That was there all from the beginning: Martin hat the chaining idea in
his original posting:
http://stackoverflow.com/questions/21173136/how-to...
And I added << in my first version.
https://gist.github.com/rklemme/8537604/revisions

> p = Pipe.new | READ_TABLE | READ_XML | DO_SCIENTIFIC_CALCULATION | WRITE_TABLE
> p.execute_processes

Yes, I had that in the version from yesterday. :-[]
https://gist.github.com/rklemme/8537604/revisions

Sorry. :-)

Btw. I am not sure whether the idea with the pipe symbol was actually
a good one because items are not identical (as with integer bit
arithmetic for example) and it's also not executed immediately (as the
shell does with a pipeline).

Cheers

robert
386571a6fd12fc72ff39593da1af15c3?d=identicon&s=25 Martin Hansen (maasha)
on 2014-01-30 16:47
So after a busy week, I have at last some time to look more into this.
So far it looks very promising, but then I wanted to use the
(de-)serializer method of msgpack and it hangs once again?

https://gist.github.com/maasha/c1db99b7c0982ea0e7b1

line 49-50

The File.pipe? test of cause needs to be changed, but there is more to
it ...


Cheers,


Martin
386571a6fd12fc72ff39593da1af15c3?d=identicon&s=25 Martin Hansen (maasha)
on 2014-01-30 21:31
Got it. Gist updated.

Martin
E0d864d9677f3c1482a20152b7cac0e2?d=identicon&s=25 Robert Klemme (robert_k78)
on 2014-01-31 12:58
(Received via mailing list)
On Thu, Jan 30, 2014 at 9:31 PM, Martin Hansen <lists@ruby-forum.com>
wrote:
> Got it. Gist updated.

I don't think like 143 will work as IO#write usually returns the
number of bytes written.  Also, throughput wise it's probably not a
good idea to flush every message.

Also, as I've said before I'd rather stack the msg handling pipeline
class on top of Pipe.  That way you keep concerns nicely separated and
can use class Pipe for other purposes as well.  The way you did it you
created a  class that can only handle MessagePack messages.

Kind regards

robert
386571a6fd12fc72ff39593da1af15c3?d=identicon&s=25 Martin Hansen (maasha)
on 2014-02-01 15:18
> Also, as I've said before I'd rather stack the msg handling pipeline
> class on top of Pipe.  That way you keep concerns nicely separated and
> can use class Pipe for other purposes as well.  The way you did it you
> created a  class that can only handle MessagePack messages.

Again, I get it working one way and then there is some other superior
way :o) - I'll give it a shot.

Now, I want two additional features.

First, I want a to_s method in the Pipe class that return a string with
the commands and options run like: "Pipe.new.add(:cat. input:
"foo").add(:save, output: "bar").add(:dump).run". Strings like this I
would like to log in a history file for documentation of commands run
and easy rerunning (in irb). I think there might be a problem with this,
with the lambdas giving the separation of the Pipe and commands (from
the Pipe class there is no way to see what is in the lambdas)?

Second, I would like to keep track of some basic statistics from each
command: number of records in and out, runtime, and keys seen (I will be
using records consisting of simple hashes). Each command could write a
temporary file and then the stats could be compiled after execution.
Alternative, the stats could be passed along the IO stream as the last
record, or each command should also have separate pipes inter process
communication of stats.


Cheers,


Martin
E0d864d9677f3c1482a20152b7cac0e2?d=identicon&s=25 Robert Klemme (robert_k78)
on 2014-02-01 17:14
(Received via mailing list)
On Sat, Feb 1, 2014 at 3:18 PM, Martin Hansen <lists@ruby-forum.com>
wrote:
>> Also, as I've said before I'd rather stack the msg handling pipeline
>> class on top of Pipe.  That way you keep concerns nicely separated and
>> can use class Pipe for other purposes as well.  The way you did it you
>> created a  class that can only handle MessagePack messages.
>
> Again, I get it working one way and then there is some other superior
> way :o) - I'll give it a shot.

Actually I mentioned that already - maybe it was not explicit enough.
:-)

> Now, I want two additional features.
>
> First, I want a to_s method in the Pipe class that return a string with
> the commands and options run like: "Pipe.new.add(:cat. input:
> "foo").add(:save, output: "bar").add(:dump).run". Strings like this I
> would like to log in a history file for documentation of commands run
> and easy rerunning (in irb). I think there might be a problem with this,
> with the lambdas giving the separation of the Pipe and commands (from
> the Pipe class there is no way to see what is in the lambdas)?

There is generally no way to get at the contents of the lambdas.
However, if you generate the lambdas yourself (like in those CAT
methods we have discussed earlier) you can do something like:

def CAT(*args)
  l = lambda do ... end

  def l.to_s
    "CAT(#{args.inspect})" # or whatever
  end

  l
end

You could devise other schemes that would allow to execute the code
again.  You could even write a file and create the lambda from there.
Lots of options.

> Second, I would like to keep track of some basic statistics from each
> command: number of records in and out, runtime, and keys seen (I will be
> using records consisting of simple hashes). Each command could write a
> temporary file and then the stats could be compiled after execution.
> Alternative, the stats could be passed along the IO stream as the last
> record, or each command should also have separate pipes inter process
> communication of stats.

Without giving too much thought I would prefer the approach with the
statistics sent down the pipe at the end because it avoids fiddling
with temporary files.

Kind regards

robert
Please log in before posting. Registration is free and takes only a minute.
Existing account

NEW: Do you have a Google/GoogleMail, Yahoo or Facebook account? No registration required!
Log in with Google account | Log in with Yahoo account | Log in with Facebook account
No account? Register here.