Shell pipeline in Ruby?


#1

Hello,

how do you write an equivalent of

$ cmdA | cmdB | cmdC

in Ruby?

Specifically, I would like to see the PID, return value and stderr of
each of these commands but I would like cmdB to read the stdout of
cmdA directly, on its stdin, and similarily for cmdC and cmdB.

Here I am not interested in feeding cmdA some particular input (eg. it
can read /dev/null for all I care) but in general gluing arbitrary fd
to its stdin might be desirable in other cases.

Thanks

Michal


#2

Have you checked out Open3.pipeline?

Jos


Peace cannot be achieved through violence, it can only be attained
through
understanding.


#3

Michal S. wrote in post #998275:

Hello,

how do you write an equivalent of

$ cmdA | cmdB | cmdC

How about:

puts cmdA | cmdB | cmdC

or

puts %x{cmdA | cmdB | cmdC}


#4

On 12 May 2011 19:22, Jos B. removed_email_address@domain.invalid wrote:

Have you checked out Open3.pipeline?

I have checked all the stuff I could find once but did not find a
solution.

That’s why I am asking, perhaps I overlooked something.

While the pipeline gives the status of all the involved processes and
connects them properly which is more than I ever got from Ruby it is
not obvious how to capture the output - the stdout of the last
process can be captured with pipeline_r but the stderrs get sinked
somewhere in the pipeline_run.

Also the in and out options used in the example are not documented
afaict.

Thanks

Michal


#5

There is also popen4 which I believe also gives you the pid with std -
in/out/err


#6

On 12 May 2011 21:54, Stu removed_email_address@domain.invalid wrote:

There is also popen4 which I believe also gives you the pid with std -
in/out/err

It provides stderr but not the pipeline which cannot be constructed in
pure ruby (unless a popen taking fd:s for stdio is implemented).

Thanks

Michal


#7

In the spirit of the recent call for better ruby documentation, I wrote
a
writeup:

http://en.wikibooks.org/w/index.php?title=Ruby_Programming/Running_Multiple_Processes&stable=0#Chaining_processes

Enjoy.
-roger-


#8

On Thu, May 12, 2011 at 6:05 PM, Michal S. removed_email_address@domain.invalid
wrote:

Here I am not interested in feeding cmdA some particular input (eg. it
can read /dev/null for all I care) but in general gluing arbitrary fd
to its stdin might be desirable in other cases.

$ ri Open3.pipeline_r Open3.pipeline_start

Cheers

robert


#9

On 13 May 2011 05:13, Roger P. removed_email_address@domain.invalid wrote:

In the spirit of the recent call for better ruby documentation, I wrote

Unfortunately, this does not work.

First, these pipelines don’t always work as one would expect so
testing is required before posting a “solution”.

Second, afaict the reopen method only changes the IOs at Ruby level,
not the fd:s at C level so it’s useless in this context.

Thanks

Michal


#10

On 13 May 2011 10:44, Robert K. removed_email_address@domain.invalid wrote:

Here I am not interested in feeding cmdA some particular input (eg. it
can read /dev/null for all I care) but in general gluing arbitrary fd
to its stdin might be desirable in other cases.

$ ri Open3.pipeline_r Open3.pipeline_start

Again, like in Open3.pipeline the :err argument used in the example is
not documented.

From the description of the example it seems it captures the stderr of
all the commands together.

Are there more arguments like that?

What do they mean?

Also this is available in ruby 1.9 only.

Thanks

Michal


#11

Unfortunately, this does not work.

Try it now, fixed the bug.

Second, afaict the reopen method only changes the IOs at Ruby level,
not the fd:s at C level so it’s useless in this context.

Try it out. This is, AFAIK, the “standard” way to pipeline processes,
and works in 1.8 even.

GL!
-r


#12

+1 for Robert’s answer. From the Open3.pipeline_start doc, an example is
given:

convert pdf to ps and send it to a printer.

collect error message of pdftops and lpr.

pdf_file = “paper.pdf”
printer = “printer-name”
err_r, err_w = IO.pipe
Open3.pipeline_start([“pdftops”, pdf_file, “-”],
[“lpr”, “-P#{printer}”],
:err=>err_w) {|ts|
err_w.close
p err_r.read # error messages of pdftops and lpr.
}

Cheers
Lionel


#13

Roger P. wrote in post #998434:

In the spirit of the recent call for better ruby documentation, I wrote
a
writeup:

http://en.wikibooks.org/w/index.php?title=Ruby_Programming/Running_Multiple_Processes&stable=0#Chaining_processes

Unfortunately it appears the wiki formatting is mangled in firefox, but
works well in chrome :slight_smile:

Here’s the complete code snippet:

pipe_me_in, pipe_peer_out = IO.pipe
pipe_peer_in, pipe_me_out = IO.pipe

fork do
STDIN.reopen(pipe_peer_in)
STDOUT.reopen(pipe_peer_out)
Kernel.exec(“echo 33”)

this line is never executed because exec moves the process

end
pipe_peer_out.close # file handles have to all be closed in order for
the “read” method, below, to be able to know

that it’s done reading data, so it can return. See also

http://devver.wordpress.com/2009/10/22/beware-of-pipe-duplication-in-subprocesses/
pipe_me_in.read

There is a link in there to a blog describing ruby’s shell class,
perhaps you did not see it?

-r


#14

On 12 May 2011 18:05, Michal S. removed_email_address@domain.invalid wrote:

cmdA directly, on its stdin, and similarily for cmdC and cmdB.

Here I am not interested in feeding cmdA some particular input (eg. it
can read /dev/null for all I care) but in general gluing arbitrary fd
to its stdin might be desirable in other cases.

Thanks

Michal

OK, thanks for all the replies.

It seems this is not doable with ruby 1.8 but spawn and similar
methods in 1.9 grew the named arguments which include the file
descriptors.

Here is a working (although not polished) solution for ruby 1.9.

First a short testing script called cmd:

#!/usr/bin/ruby

prefix = ARGV[0] + “:”
append = ARGV[1]

while l = STDIN.gets do
STDOUT << l
STDERR << prefix << l
end

STDOUT.puts append

And the long script with pipes that calls the above:

#!/usr/bin/ruby1.9.1

Debian calls ruby 1.9 ruby1.9.1

pipes = []
(1…4).each {pipes << IO.pipe}
errs = []
(1…3).each {errs << IO.pipe}

writer = pipes[0][1]
reader = pipes[3][0]
i = 1

Careful about pipe direction, it only works one way.

pids = [
spawn(“cmd”, “a”, “aa”, :in => pipes[0][0], :out => pipes[1][1], :err
=> errs[0][1] ),
spawn(“cmd”, “b”, “bb”, :in => pipes[1][0], :out => pipes[2][1], :err
=> errs[1][1] ),
spawn(“cmd”, “c”, “cc”, :in => pipes[2][0], :out => pipes[3][1], :err
=> errs[2][1] ),
]

We don’t really want to see these pipes, perhaps some

Open3.pipeline_* method would create them out of sight
pipes.flatten.select{|p| p != writer && p != reader}.each{|p|p.close}\

maxlen = 128 # some arbitrary size for chunk of data read at once

outputs = errs.collect{|r,w|r}
outputs << reader
wsel = [writer]

while true do
ios = IO.select(outputs, wsel, nil, 1)
STDERR << ‘.’

string = "%4i\n" % i
if writer then
  begin
    result = writer.write_nonblock(string)
    i = i + 1
    if ( i > 1000 ) then
      writer.close
      writer = nil
      wsel = nil
    end
  rescue IO::WaitWritable, Errno::EINTR
  end
end

outputs.each{|io|
begin
result = io.read_nonblock(maxlen)
STDOUT << result
rescue IO::WaitReadable, Errno::EINTR, EOFError
if $!.is_a? EOFError then
STDERR.puts “EOF”
outputs.reject!{|o| o == io }
end
end
}

(0…3).each{|i| p = pids[i]
if p and Process.waitpid(p, Process::WNOHANG) then
STDOUT.puts $?.inspect
pids[i] = nil
end
}
break if !ios && pids.select{|pid| pid} == []
end


#15

On 13 May 2011 16:23, Robert K. removed_email_address@domain.invalid wrote:

each of these commands but I would like cmdB to read the stdout of

[env, cmdname, arg1, …, opts] command name and one or
more arguments (no shell)
[env, [cmdname, argv0], arg1, …, opts] command name and
arguments including argv[0] (no shell)

Note that env and opts are optional, as Process.spawn.

Then

$ ri Process.spawn

It’s not quite clear that Process.spawn description also applies to
options outside the commands and how it applies but maybe it makes
sense, they would be applied to the whole pipeline.

Thanks

Michal


#16

On Fri, May 13, 2011 at 3:27 PM, Michal S. removed_email_address@domain.invalid
wrote:

cmdA directly, on its stdin, and similarily for cmdC and cmdB.

Here I am not interested in feeding cmdA some particular input (eg. it
can read /dev/null for all I care) but in general gluing arbitrary fd
to its stdin might be desirable in other cases.

$ ri Open3.pipeline_r Open3.pipeline_start

Again, like in Open3.pipeline the :err argument used in the example is
not documented.

Not true. From ri Open3.pipeline_r in 1.9.2p180:

Each cmd is a string or an array. If it is an array, the elements are
passed
to Process.spawn.

cmd:
commandline command line string which
is passed to a shell
[env, commandline, opts] command line string which
is passed to a shell
[env, cmdname, arg1, …, opts] command name and one or
more arguments (no shell)
[env, [cmdname, argv0], arg1, …, opts] command name and
arguments including argv[0] (no shell)

Note that env and opts are optional, as Process.spawn.

Then

$ ri Process.spawn

From the description of the example it seems it captures the stderr of
all the commands together.

Are there more arguments like that?

What do they mean?

See above.

Also this is available in ruby 1.9 only.

Well?

Cheers

robert


#17

On 13 May 2011 15:48, Roger P. removed_email_address@domain.invalid wrote:

works well in chrome :slight_smile:
Kernel.exec(“echo 33”)

this line is never executed because exec moves the process

end
pipe_peer_out.close # file handles have to all be closed in order for
the “read” method, below, to be able to know

that it’s done reading data, so it can return. See also

http://devver.wordpress.com/2009/10/22/beware-of-pipe-duplication-in-subprocesses/

pipe_me_in.read

Changing the last line to

puts ‘"’ + pipe_me_in.read.chomp + ‘"’

Really writes 33 quoted so IO.reopen works for setting up the file
descriptors.

It is not clear from the docs if it would work or not.

It does work, though:

out = File.open(“testfile”,“w”)

STDOUT.reopen(out)

system(“echo 33”)

STDERR.puts ‘"’ + File.read(“testfile”).chomp + ‘"’

=> “33”

I vaguely recall some issues with using STDIN and STDOUT for
redirection earlier but it might be due to using them incorrectly.

There is a link in there to a blog describing ruby’s shell class,
perhaps you did not see it?

Yes, I missed the link labeled just [1]

Thanks

Michal


#18

Michal S. wrote in post #998548:

And the long script with pipes that calls the above:

Since you are doing this on a real operating system, there’s no need to
use ruby to copy the output of one process to the input of the next
process (unless you want to capture the communication between the
processes as well).

OTOH, making sure all the right FDs are closed in each child can be
tricky in 1.8, because there’s no IO.close_on_exec= (although 1.9 has
this).

Here is an example which works in 1.8.7. Obviously it’s possible to
refactor this to N children, although I expect you’ll end up with
something like the ‘pipeline’ method mentioned before if you do.


#Thread.abort_on_exception = true # for debugging

ruby_to_a_rd, ruby_to_a_wr = IO.pipe
a_to_b_rd, a_to_b_wr = IO.pipe
a_err_rd, a_err_wr = IO.pipe

open: r2a_r, r2a_w, a2b_r, a2b_w, ae_r, ae_w

pid1 = fork do
ruby_to_a_wr.close
a_to_b_rd.close
a_err_rd.close
STDIN.reopen(ruby_to_a_rd)
STDOUT.reopen(a_to_b_wr)
STDERR.reopen(a_err_wr)
exec(“cat; echo done cat 1>&2”)
STDERR.puts “Whoops! #{$!}”
end
ruby_to_a_rd.close
a_to_b_wr.close
a_err_wr.close

open: r2a_w, a2b_r, ae_r

b_to_c_rd, b_to_c_wr = IO.pipe
b_err_rd, b_err_wr = IO.pipe

open: r2a_w, a2b_r, ae_r, b2c_r, b2c_w, be_r, be_w

pid2 = fork do
ruby_to_a_wr.close
b_to_c_rd.close
a_err_rd.close
b_err_rd.close
STDIN.reopen(a_to_b_rd)
STDOUT.reopen(b_to_c_wr)
STDERR.reopen(b_err_wr)
exec(“tr ‘a-z’ ‘A-Z’; echo done tr 1>&2”)
STDERR.puts “Whoops! #{$!}”
end
a_to_b_rd.close
b_to_c_wr.close
b_err_wr.close

open: r2a_w, ae_r, b2c_r, be_r

c_to_ruby_rd, c_to_ruby_wr = IO.pipe
c_err_rd, c_err_wr = IO.pipe

open: r2a_w, ae_r, b2c_r, be_r, c2r_r, c2r_w, ce_r, ce_w

pid3 = fork do
ruby_to_a_wr.close
c_to_ruby_rd.close
a_err_rd.close
b_err_rd.close
c_err_rd.close
STDIN.reopen(b_to_c_rd)
STDOUT.reopen(c_to_ruby_wr)
STDERR.reopen(c_err_wr)
exec(“sed ‘s/O/0/g’; echo done sed 1>&2”)
STDERR.puts “Whoops! #{$!}”
end
b_to_c_rd.close
c_to_ruby_wr.close
c_err_wr.close

open: r2a_w, ae_r, be_r, c2r_r, ce_r

Thread.new do
ruby_to_a_wr.puts “Here is some data”
ruby_to_a_wr.puts “And some more”
ruby_to_a_wr.close
end

Thread.new do
while line = a_err_rd.gets
puts “A err: #{line}”
end
a_err_rd.close
end

Thread.new do
while line = b_err_rd.gets
puts “B err: #{line}”
end
b_err_rd.close
end

Thread.new do
while line = c_err_rd.gets
puts “C err: #{line}”
end
c_err_rd.close
end

while line = c_to_ruby_rd.gets
puts line
end
c_to_ruby_rd.close

This can be simplified a bit if you don’t want three separate error
streams; they can all share the same one. i.e. each child would have

err_rd.close
STDERR.reopen(err_wr)

Regards,

Brian.


#19

Here is a ruby 1.9.2 version with close_on_exec=. I leave it as an
exercise to refactor this to an arbitrary pipeline of N commands - it
would return one input pipe, one output pipe, N pids and N error pipes.

Regards,

Brian.

#Thread.abort_on_exception = true # for debugging

ruby_to_a_rd, ruby_to_a_wr = IO.pipe
ruby_to_a_wr.close_on_exec = true # no children should have this

a_to_b_rd, a_to_b_wr = IO.pipe

a_err_rd, a_err_wr = IO.pipe
a_err_rd.close_on_exec = true # no children should have this

pid1 = fork do
a_to_b_rd.close
STDIN.reopen(ruby_to_a_rd)
STDOUT.reopen(a_to_b_wr)
STDERR.reopen(a_err_wr)
exec(“cat; echo done cat 1>&2”)
STDERR.puts “Whoops! #{$!}”
end

we don’t want these fds, nor any of the further children

ruby_to_a_rd.close
a_to_b_wr.close
a_err_wr.close

b_to_c_rd, b_to_c_wr = IO.pipe
b_err_rd, b_err_wr = IO.pipe
b_err_rd.close_on_exec = true # no children should have this

pid2 = fork do
b_to_c_rd.close
STDIN.reopen(a_to_b_rd)
STDOUT.reopen(b_to_c_wr)
STDERR.reopen(b_err_wr)
exec(“tr ‘a-z’ ‘A-Z’; echo done tr 1>&2”)
STDERR.puts “Whoops! #{$!}”
end

we don’t want these fds, nor any of the further children

a_to_b_rd.close
b_to_c_wr.close
b_err_wr.close

c_to_ruby_rd, c_to_ruby_wr = IO.pipe
c_err_rd, c_err_wr = IO.pipe
c_err_rd.close_on_exec = true # no children should have this
pid3 = fork do
c_to_ruby_rd.close
STDIN.reopen(b_to_c_rd)
STDOUT.reopen(c_to_ruby_wr)
STDERR.reopen(c_err_wr)
exec(“sed ‘s/O/0/g’; echo done sed 1>&2”)
STDERR.puts “Whoops! #{$!}”
end

we don’t want these fds, nor any of the further children

b_to_c_rd.close
c_to_ruby_wr.close
c_err_wr.close

Thread.new do
ruby_to_a_wr.puts “Here is some data”
ruby_to_a_wr.puts “And some more”
ruby_to_a_wr.close
end

Thread.new do
while line = a_err_rd.gets
puts “A err: #{line}”
end
a_err_rd.close
end

Thread.new do
while line = b_err_rd.gets
puts “B err: #{line}”
end
b_err_rd.close
end

Thread.new do
while line = c_err_rd.gets
puts “C err: #{line}”
end
c_err_rd.close
end

while line = c_to_ruby_rd.gets
puts line
end
c_to_ruby_rd.close


#20

On 14 May 2011 17:05, Brian C. removed_email_address@domain.invalid wrote:

Michal S. wrote in post #998548:

And the long script with pipes that calls the above:

Since you are doing this on a real operating system, there’s no need to
use ruby to copy the output of one process to the input of the next

I don’t use Ruby for that.

And I used a select loop because I had issues with threads in the past.

However, your example works perfectly for me, even when updated to put
more data through each of the pipes to make sure they don’t end up all
in a buffer and the threads have to be actually switched for the
pipeline to work.

I think there might still be issues if the produced data was without
line endings because none of the gets would finish but that can be
solved by using read_nonblock or somesuch instead.

Thanks

Michal


#Thread.abort_on_exception = true # for debugging

ruby_to_a_rd, ruby_to_a_wr = IO.pipe
a_to_b_rd, a_to_b_wr = IO.pipe
a_err_rd, a_err_wr = IO.pipe

open: r2a_r, r2a_w, a2b_r, a2b_w, ae_r, ae_w

pid1 = fork do
ruby_to_a_wr.close
a_to_b_rd.close
a_err_rd.close
STDIN.reopen(ruby_to_a_rd)
STDOUT.reopen(a_to_b_wr)
STDERR.reopen(a_err_wr)
exec(‘while read x ; do echo “$x” ; echo “$x” >&2 ; done’)
STDERR.puts “Whoops! #{$!}”
end
ruby_to_a_rd.close
a_to_b_wr.close
a_err_wr.close

open: r2a_w, a2b_r, ae_r

b_to_c_rd, b_to_c_wr = IO.pipe
b_err_rd, b_err_wr = IO.pipe

open: r2a_w, a2b_r, ae_r, b2c_r, b2c_w, be_r, be_w

pid2 = fork do
ruby_to_a_wr.close
b_to_c_rd.close
a_err_rd.close
b_err_rd.close
STDIN.reopen(a_to_b_rd)
STDOUT.reopen(b_to_c_wr)
STDERR.reopen(b_err_wr)
exec(‘while read x ; do x=echo "$x" | tr a-z A-Z ; echo “$x” ;
echo “$x” >&2 ; done’)
STDERR.puts “Whoops! #{$!}”
end
a_to_b_rd.close
b_to_c_wr.close
b_err_wr.close

open: r2a_w, ae_r, b2c_r, be_r

c_to_ruby_rd, c_to_ruby_wr = IO.pipe
c_err_rd, c_err_wr = IO.pipe

open: r2a_w, ae_r, b2c_r, be_r, c2r_r, c2r_w, ce_r, ce_w

pid3 = fork do
ruby_to_a_wr.close
c_to_ruby_rd.close
a_err_rd.close
b_err_rd.close
c_err_rd.close
STDIN.reopen(b_to_c_rd)
STDOUT.reopen(c_to_ruby_wr)
STDERR.reopen(c_err_wr)
exec(‘while read x ; do x=echo "$x" | sed -e s/O/0/g ; echo “$x”
; echo “$x” >&2 ; done’)
STDERR.puts “Whoops! #{$!}”
end
b_to_c_rd.close
c_to_ruby_wr.close
c_err_wr.close

open: r2a_w, ae_r, be_r, c2r_r, ce_r

Thread.new do
ruby_to_a_wr.puts “Here is some data”
(1…1000).each{|i|
ruby_to_a_wr.puts “#{i} And some more”
}
ruby_to_a_wr.close
end

Thread.new do
while line = a_err_rd.gets
puts “A err: #{line}”
end
a_err_rd.close
end

Thread.new do
while line = b_err_rd.gets
puts “B err: #{line}”
end
b_err_rd.close
end

Thread.new do
while line = c_err_rd.gets
puts “C err: #{line}”
end
c_err_rd.close
end

while line = c_to_ruby_rd.gets
puts line
end
c_to_ruby_rd.close