Multithreading in Ruby

Hi, I’m relatively new to ruby and threading in general. I’m trying to
get the following code to work. Essentially, the program scrapes data
from a site which has a list of urls on the first page (and more list
pages can be accessed by hitting next) and each url in the list needs to
be followed as well.

So what I would like to do is create 5 concurrent threads: Thread 1
would download the list page as a Mechanize page object and queue it,
hit next and download the next page as a Mechanize page object and queue
it, etc. Thread 2 would take the queue from Thread 1 and start to
extract the required data, i.e. the urls and queue them into a new
queue. Thread 3 would take the queue from Thread 2 and download the page
each url points to and save it as a Mechanize page object and queue it
into another queue. Thread 4 would take the queue from Thread 3 and
extract the necessary data, format it and queue it into yet another
queue. And finally, Thread 5 will take the queue from Thread 4 and write
the data to a file.

At least, that is in theory… So I wrote the following program, however
only the first Thread seems to be queueing and the rest don’t work.
Please let me know if I’m missing something.

rank_pages_queue = Queue.new
items_queue = Queue.new
item_pages_queue = Queue.new
finished_items_queue = Queue.new

mech_page = get_page(url)
rank_page_download = Thread.new do
while mech_page
rank_pages_queue << mech_page
mech_page = hit_next(mech_page)
end
end

rank_page_extract = Thread.new do
while rank_pages_download.alive?
rank_page = rank_pages_queue.pop
items_queue << get_rank_name_url(rank_page)
end
end

item_page_download = Thread.new do
while rank_page_extract.alive?
items = items_queue.pop
items.each do |item_arr|
item_pages_queue << [item_arr, get_page(item_arr[2])]
end
end

Thanks in advance. And I’m running Ruby 1.8.7 on Snow Leopard.

On Mon, Aug 22, 2011 at 6:46 PM, Nabs K. [email protected] wrote:

extract the required data, i.e. the urls and queue them into a new

end
while rank_page_extract.alive?

I’m not really sure what your problem is, but I see an issue and
something that I’d do differently. If you are experiencing that the
program ends without anything done, it’s because you need to wait for
the threads to complete. You can do that with the join method, or the
value method, if you need a return value from the thread. Also, I
think there’s something wrong with your logic of calling alive?. It
can happen that the thread is still alive but it will not enqueue
anything else because it’s finished already. If this happens, that
thread will block in the call to queue.pop forever. A better
alternative is to queue a special object which will signal the
consumer that it has to finish. Something like:

require ‘thread’

def process
sleep(rand)
end

queue = Queue.new

puts “creating first”
first = Thread.new do
[“work”, “more work”, “yet some more”, :finish].each do |work|
puts “first thread is processing…”
process
puts “queuing #{work}”
queue << work
end
end
puts “creating second”
second = Thread.new do
while (work = queue.pop) != :finish
puts “got #{work} to do. processing it…”
process
end
end

first.join
second.join
puts “finished”

$ ruby queues.rb
creating first
first thread is processing…
creating second
queuing work
got work to do. processing it…
first thread is processing…
queuing more work
got more work to do. processing it…
first thread is processing…
queuing yet some more
first thread is processing…
queuing finish
got yet some more to do. processing it…
finished

In this little example, I used the symbol :finish to signal that the
other thread should finish and not wait for anything else in the
queue. You can use any other thing you want. For example, if the
consumer can receive any arbitrary object, I’ve seen people enqueue
the actual queue object to signal the end. In my example a symbol was
enough and a simple solution. Also note how I joined the two threads
at the end, to avoid the main thread exiting the program before the
threads are finished. I also added a little process method that sleeps
a random amount of time, to simulate the actual work.

Hope this helps,

Jesus.

Thanks Bartosz and Jesús, I implemented both of your suggestions and got
it working. For anyone else, this is what the end result looks like:

Thread.abort_on_exception = true
rank_pages_queue = Queue.new
items_queue = Queue.new
item_pages_queue = Queue.new
finished_items_queue = Queue.new

mech_page = get_page(url)
rank_page_download = Thread.new do
while mech_page != :finish
rank_pages_queue << mech_page
mech_page = hit_next(mech_page)
end
rank_pages_queue << :finish
end

rank_page_extract = Thread.new do
while (rank_page = rank_pages_queue.pop) != :finish
items_queue << get_rank_data(rank_page)
end
items_queue << :finish
end

item_page_download = Thread.new do
while (items = items_queue.pop) != :finish
items.each do |item_arr|
item_pages_queue << [item_arr, get_page(item_arr[3])]
end
end
item_pages_queue << :finish
end

item_pages_extract = Thread.new do
while true
item_arr, item_page = item_pages_queue.pop
break if item_arr == :finish
finished_items_queue << item_arr.concat(get_item_data(item_page))
end
finished_items_queue << :finish
end

write_to_file = Thread.new do
while (item_arr = finished_items_queue.pop) != :finish
writeToTxt(item_arr, item_arr[0] + ’ - ’ + filename)
end
end

rank_page_download.join
rank_page_extract.join
item_page_download.join
item_pages_extract.join
write_to_file.join

unsubscribe

2011/8/22 Jess Gabriel y Galn [email protected]

2011/8/22 Nabs K. [email protected]:

rank_page_extract = Thread.new do
while rank_pages_download.alive?
rank_page = rank_pages_queue.pop
items_queue << get_rank_name_url(rank_page)
end
end

All threads will be started at the same time, so when this runs for
the first time, rank_pages_queue will still be ampty and the next line
will probably raise an exception - which will silently disappear, as
exceptions always do in threads. When debugging a threading program,
always use Thread.abort_on_exception = true - this will generate the
stack trace and barf every time a thread raises an exception.