Is there a standard pattern for threaded access to a file?

I’m pretty new to ruby and this is one of those areas where I can’t
quite seem to turn my head inside out as the language requires :slight_smile:

I have a log file that I want to process in parts, with multiple threads
working from the same file, each one getting a line at a time and doing
something with it.

I’d like something like

  1. Open the file
  2. Create 5 threads

Each thread should read a line of the file and process it, but no 2
threads should get the same line.

What I have looks like

open (ARGV.flags.log) do |logfile|
logfile.each do |line|

         blah blah blah...

     end

end

but that’s inside out! How do I rubify this code?

Thanks,

Jon

On Oct 12, 2007, at 5:51 PM, Jon H. wrote:

  1. Open the file
  2. Create 5 threads

Each thread should read a line of the file and process it, but no 2
threads should get the same line.

cfp:~ > cat a.rb
require ‘thread’

q = Queue.new

threads = [] and 5.times{ threads << Thread.new{ puts q.pop } }

open(FILE){|fd| fd.each{|line| q.push line} }

threads.map{|t| t.join}

cfp:~ > ruby a.rb
require ‘thread’

q = Queue.new

threads = [] and 5.times{ threads << Thread.new{ puts q.pop } }

a @ http://codeforpeople.com/

ara.t.howard wrote:

threads.map{|t| t.join}

Skip that inappropriate suggestion and use the following instead:

threads.each {|t| t.join}

Actually, the example provided won’t even work in your case. You have
to do some extra things.

I’m pretty new to ruby

A Queue is a first in first out container, which means the items you
push() into one end of the Queue are the first items that pop() out the
other end. A Queue is also thread safe, which means that only one
thread can access it at the same time.

Therefore, you can push() the lines from your file into one end of the
Queue, and you can have each thread pop() a line off the other end of
the Queue.

If there is nothing in the Queue, then a thread that tries to pop() a
line from the Queue will block until more data becomes available. As a
result, even after all the lines have been read from the Queue, each
thread will come back to the Queue and try to pop() another line, but
since there won’t be any more lines left, the threads will block and
wait for more data. That means the threads will never end. To make
your threads stop trying to read more lines from the Queue once it’s
exhausted, you will need to send each thread a string that acts as a
termination message.

You could first push() all the lines from your file into the Queue, and
then start the threads, but you might as well get the threads working on
the first lines while you are pushing the rest of the lines into the
Queue. So, start the threads and let them block, then start pushing
the lines from the file into the Queue.

require ‘thread’

#Create some data:
File.open(“data.txt”, “w”) do |file|
(1…100).each do |num|
file.puts(“line #{num}”)
end
end

#Read data with 5 threads:
q = Queue.new

my_threads = (1…5).collect do |i|
Thread.new do #returns a thread
loop do
line = q.pop

  if line == "END_OF_DATA"
    break
  end

  #process line:
  puts line.capitalize
end

end
end

#Threads are blocking while they
#await data. Give them some data:
IO.foreach(“data.txt”) do |line|
q.push(line)
end

#Send each thread a signal that
#terminates the thread:
5.times {q.push(“END_OF_DATA”)}

#Wait for all the threads to finish
#executing:
my_threads.each {|t| t.join}

While this is all true and well I have some additional remarks.

On 13.10.2007 05:59, 7stud – wrote:

Therefore, you can push() the lines from your file into one end of the
exhausted, you will need to send each thread a string that acts as a
termination message.

There is a better option: rather send something down the queue that is
not a String - otherwise processing would suddenly stop if the file
contained the terminating line.

You could first push() all the lines from your file into the Queue, and
then start the threads,

That’s a rather bad idea given that a file can be huge and you do not
need all lines in memory for line wise processing.

That’s the same reason why it’s a good idea to use a bounded queue: if
processing is slower than reading, an unbounded queue will eventually
fill up with the complete file contents. If processing is faster than
reading then threads will have to wait either way.

(1…100).each do |num|
loop do
end

#Wait for all the threads to finish
#executing:
my_threads.each {|t| t.join}

Here’s my version with all the remarks incorporated.

require ‘thread’

MAX_IN_QUEUE = 1024
NUM_THREADS = 5

queue = SizedQueue.new MAX_IN_QUEUE

threads = (1…NUM_THREADS).map do

we use the mechanism to pass the queue through

the constructor to avoid nasty effects of

variable “queue” changing

Thread.new queue do |q|
# we use the queue itself as terminator
until q == (item = q.deq)
begin
# whatever processing
rescue Exception => e
# whatever error handling
end
end
end
end

read from files on the command line

ARGF.each do |line|
queue.enq line
end

threads.each do |th|

send the terminator and wait

queue.enq queue
th.join
end

Have fun!

robert

Francis C. wrote:

On 10/12/07, Jon H. [email protected] wrote:

  1. Open the file
  2. Create 5 threads

Each thread should read a line of the file and process it, but no 2
threads should get the same line.

Why are you doing this in the first place? Do you have a computer with
five
processors and five memory buses?

According to pickaxe2, p. 135, your question is irrelevant:

“Finally, if your machine has more than one processor, Ruby threads
won’t take advantage of that fact–because they run in a single process,
and in a single native thread, they are constrained to run on one
processor at a time.”

Perhaps a better question for the op is: does your processing result in
any pauses in the code? For instance, do you use the information in the
log file to send requests to websites where you are waiting for a
response?

Threads do not actually allow any code to run at the same time. What
really happens is that execution rapidly shifts from one thread to
another, which gives the appearance that the threads are executing at
the same time.

If you have five methods that each take 2 seconds to execute, and you
run those five methods one after another, your program with take 10
seconds to execute. On the other hand, if you use five threads to
execute those methods, your program will still take 10 seconds to
execute. For example, suppose each thread gets 1 second to execute
before execution shifts to another thread, something like this will
occur:

thread1: 1 sec
|
V
thread2: 1 sec
|
V
thread3: 1 sec
|
V
thread4: 1 sec
|
V
thread5: 1 sec
|
V
thread1: 1 sec
|
V
thread2: 1 sec
|
V
thread3: 1 sec
|
V
thread4: 1 sec
|
V
thread5: 1 sec

If you total up the time, it still takes 10 seconds for your program to
execute when using five threads. The only way threads help speed up
execution is if there are pauses in your code where nothing is
happening. During those pauses, threads allow execution to shift to
other code that is ready to execute.

On 10/12/07, Jon H. [email protected] wrote:

  1. Open the file
  2. Create 5 threads

Each thread should read a line of the file and process it, but no 2
threads should get the same line.

Why are you doing this in the first place? Do you have a computer with
five
processors and five memory buses?

On Oct 13, 6:33 am, Robert K. [email protected] wrote:

There is a better option: rather send something down the queue that is
not a String - otherwise processing would suddenly stop if the file
contained the terminating line.

That’s a very nice solution. It demonstrates a lot of accumulatd
wisdom. I think I’d use a symbol in the queue, such as :end_of_data,
rather than the queue itself to mark the end of the data, if only to
avoid a “huh?” moment from those who read the code down the line.

Eric


On-site, hands-on Ruby training is available from http://LearnRuby.com
!

ara.t.howard wrote:

[…]
threads = [] and 5.times{ threads << Thread.new{ puts q.pop } }
[…]

just a question of style:

while the above is quite clever, is there any (hidden) reason to use it
over
such primitive constructs like

threads = Array.new(5){ Thread.new{ puts q.pop } }

or

threads = (1…5).map{ Thread.new{ puts q.pop } }

?

cheers

Simon

On Oct 13, 2007, at 9:50 AM, Simon Kröger wrote:

ara.t.howard wrote:

[…]
threads = [] and 5.times{ threads << Thread.new{ puts q.pop } }
[…]

just a question of style:

while the above is quite clever, is there any (hidden) reason to
use it over
such primitive constructs like

not really. mostly i just like the ‘5’ to be more prominent in the code

threads = Array.new(5){ Thread.new{ puts q.pop } }

this works

or

threads = (1…5).map{ Thread.new{ puts q.pop } }

seems kinda heavyweight, but yeah

cheers.

a @ http://codeforpeople.com/

Eric I. schrieb:

That’s a very nice solution. It demonstrates a lot of accumulatd
wisdom. I think I’d use a symbol in the queue, such as :end_of_data,
rather than the queue itself to mark the end of the data, if only to
avoid a “huh?” moment from those who read the code down the line.

I think there will be a big (and probably long) “huh?” moment when
running the
code:

threads.each do |th|

send the terminator and wait

queue.enq queue
th.join
end

is more likely than not to never terminate. If another thread than th is
eating
the terminator th.join will wait for a long time.

Eric

cheers

Simon

never ever think wisdom will protect you against threads :slight_smile: except if
wisdom
tells you not to use them.

On Oct 13, 2007, at 07:29 , Francis C. wrote:

  1. Open the file
  2. Create 5 threads

Each thread should read a line of the file and process it, but no 2
threads should get the same line.

Why are you doing this in the first place? Do you have a computer
with five processors and five memory buses?

It depends upon the operation being parallelized. If your file is
filled with IP addresses to resolve you will use very little CPU.
I’ve used resolv.rb and hundreds of threads to rapidly resolve IP
addresses streamed from an HTTP access log.

On Oct 12, 2007, at 8:35 PM, 7stud – wrote:

Skip that inappropriate suggestion and use the following instead:

threads.each {|t| t.join}

unless you use

Thread.current.abort_on_exception = true

you should always use ‘map’ and check the return value. otherwise
you have no idea if your threads have completed successfully and you
simply exit whether threads did their job or not.

regards.

a @ http://codeforpeople.com/

ara.t.howard wrote:

On Oct 12, 2007, at 8:35 PM, 7stud – wrote:

Skip that inappropriate suggestion and use the following instead:

threads.each {|t| t.join}

unless you use

Thread.current.abort_on_exception = true

you should always use ‘map’ and check the return value.

  1. Where are you checking a return value:

threads.map{|t| t.join}

In fact, you discard map’s return value.

  1. How is map’s return value ever going to be different than your
    threads array?

On 13.10.2007 17:52, Simon Kröger wrote:

send the terminator and wait

queue.enq queue
th.join
end

is more likely than not to never terminate. If another thread than th is eating
the terminator th.join will wait for a long time.

Of course you are right! Normally I use two loops here - dunno why I
suddenly though this was a great idea. Thanks for catching that stupid
error!

I also agree to Eric, that using something else is probably better
because the code will be more readable. I just wanted to demonstrate
the point to not use a “regular” queue element.

Kind regards

robert

On Oct 13, 11:52 am, Simon Kröger [email protected] wrote:

the terminator th.join will wait for a long time.
Of course! Good catch!

never ever think wisdom will protect you against threads :slight_smile: except if wisdom
tells you not to use them.

I don’t think I was claiming that it would protect you, but it sure
helps.

Eric


Are you interested in on-site Ruby training that uses well-designed,
real-world, hands-on exercises? http://LearnRuby.com

On Oct 13, 1:32 pm, Eric H. [email protected] wrote:

On Oct 13, 2007, at 07:29 , Francis C. wrote:

Eric, are you reading/posting on comp.lang.ruby ? I don’t see Francis’
post, but both you and 7stud quoted him, so I’m wondering if it was
aggregated from somewhere else.

On Oct 13, 2007, at 11:43 AM, 7stud – wrote:

  1. Where are you checking a return value:

threads.map{|t| t.join}

i’m not, but in a real piece of code longer than 5 lines it would be

In fact, you discard map’s return value.

  1. How is map’s return value ever going to be different than your
    threads array?

ah - ‘join’ should indeed be ‘value’ there. sorry.

basically one should use Thread.current.abort_on_exception, check the
return values, or be prepared that threads may fail and you might no
know about it (which is obviously ok sometimes)

a @ http://codeforpeople.com/

On Oct 13, 2007, at 2:15 PM, Brian A. wrote:

Eric, are you reading/posting on comp.lang.ruby ? I don’t see Francis’
post, but both you and 7stud quoted him, so I’m wondering if it was
aggregated from somewhere else.

i’m using the ml and do see francis’ post.

a @ http://codeforpeople.com/

On Oct 13, 2007, at 13:15 , Brian A. wrote:

On Oct 13, 1:32 pm, Eric H. [email protected] wrote:

On Oct 13, 2007, at 07:29 , Francis C. wrote:

Eric, are you reading/posting on comp.lang.ruby ? I don’t see Francis’
post, but both you and 7stud quoted him, so I’m wondering if it was
aggregated from somewhere else.

I use the one, true ruby-talk, the [email protected] mailing list.