Threading and Deadlock

I’m making the back end for a feed reader. I plan to have a daemon
which periodically checks feeds for updates. The daemon will have one
refill thread responsible for maintaining a list of feeds to be updated.
It will also have n worker threads who pop feeds from the list and
process them. When the list is empty, the refill thread will sleep for
some amount of time (no need to update the feeds constantly), refill the
list of feeds, and signal to the workers to start up again.

Here’s a crawler class as I have it so far. I’ve removed some of the
nonrelevant parts.

class Crawler
def initialize(minutes_sleep)
@minutes_sleep = minutes_sleep
@first_run = true

@feeds = Array.new

@feeds_lock = Mutex.new
@empty = ConditionVariable.new
@filled = ConditionVariable.new

end

def fill
@feeds_lock.synchronize do
@empty.wait(@feeds_lock)
if @first_run
@first_run = false
else
sleep(60 * @minutes_sleep)
end
@feeds = Feed.find(:all, :conditions=>‘active = 1’)
@filled.broadcast(@feeds_lock)
end
end

def crawl(worker_number)
@feeds_lock.synchronize do
if @feeds.size == 0
@empty.broadcast(@feeds_lock)
@filled.wait(@feeds_lock)
end

  feed = @feeds.pop
  # Do some processing
end

end
end

Then I kick off with another script that looks like this:

crawler = Crawler.new(minutes_sleep)
filler = Thread.new {crawler.fill}
workers = (1…num_workers).map do |i|
Thread.new {crawler.crawl(i)}
end

filler.join
workers.each{|thread| thread.join}

This code results in a deadlock. It seems like the refill thread is not
waking up, doing its work, and signaling to the workers. But I have no
idea - I am stumped. Any pointers?

I did try to use the Queue class, but it seemed a little magical and I
couldn’t quite figure out how to use it for my needs.

Hello

I did try to use the Queue class, but it seemed a little magical and I
couldn’t quite figure out how to use it for my needs.

I unfortunately can’t really where your deadlock comes from. But the
Queue class is really simple:

Just push elements onto the Queue in the filler class, and pop them
from your workers… That’s exactly what you need:

class Filler

def refill
ary = [pull the feeds]
for element in ar
queue << element
end
sleep…
end
end

class Worker
def work
while(a = queue.pop)
get a…
end
end
end

And that’s all. To finish, just push false into the queue once for
every worker thread, to make sure they exit.

Vince

On 9/27/06, Jordan McKible [email protected] wrote:

nonrelevant parts.
Have you thought through why this script needs to be threaded in the
first
place? Is there an external latency you need to capture?

Francis C. wrote:

On 9/27/06, Jordan McKible [email protected] wrote:

nonrelevant parts.
Have you thought through why this script needs to be threaded in the
first
place? Is there an external latency you need to capture?

Processing a feed entails an http request to retrieve it, parsing, and
updating the database, so it seems natural to use threads. This daemon
needs to be able to handle thousands of feeds.

Vincent F. wrote:

class Filler

def refill
ary = [pull the feeds]
for element in ar
queue << element
end
sleep…
end
end

I don’t think this is quite what I need. The worker threads should
never exit - they should just sleep until the queue is refilled. It
seems like this configuration could lead to the queue being filled up
before it’s completely depleated by workers.

On 9/27/06, Jordan McKible [email protected] wrote:

nonrelevant parts.
I’m seeing multiple problems in this code. #fill and #crawl don’t appear
to
have loops. #fill contains a statement that can sleep for multiple
minutes
holding a mutex lock, but fortunately appears not to be reachable. Try
running with Thread.abort_on_exception set true to see if you are
missing a
fault. Again, why does this program need to be threaded, since you’ve
coded
it such that it will sleep holding a lock?

Francis C. wrote:

On 9/27/06, Jordan McKible [email protected] wrote:

nonrelevant parts.
I’m seeing multiple problems in this code. #fill and #crawl don’t appear
to
have loops.

ah, the loop, of course! The fill and crawl should be wrapped in loop do
{}, correct?

#fill contains a statement that can sleep for multiple
minutes
holding a mutex lock, but fortunately appears not to be reachable.

This is by design - in order to not be constantly retrieving feeds (that
seems a little exessive for a feed reader) I want to have a break
between refills

On 9/27/06, Jordan McKible [email protected] wrote:

#fill contains a statement that can sleep for multiple
minutes
holding a mutex lock, but fortunately appears not to be reachable.

This is by design - in order to not be constantly retrieving feeds (that
seems a little exessive for a feed reader) I want to have a break
between refills

As coded, your design waits for multiple minutes, then (presumably)
fires
off a few thousand HTTP requests, then goes right back to sleep,
holding
the mutex that your worker threads need to run.
Don’t do that.

Now of course when you have a loop in crawl, I assume you’ll have it
grab
the lock, then run until the queue is consumed, which will block the
fill
thread at the Mutex#synchronize call (assuming you’re lucky enough for
one
of the workers to get scheduled before the fill thread tries to go back
to
sleep. If that’s what you want, you really don’t need threads in this
app.

If you really think you need to do this with threads, then you need to
re-analyze your synchronization sets, and possibly define a few more
mutexes. Also, you mentioned a capturable external latency in the fill
operation, but not in the crawl. Is there one?

On Thu, 28 Sep 2006, Jordan McKible wrote:

I’m making the back end for a feed reader. I plan to have a daemon which
periodically checks feeds for updates. The daemon will have one refill
thread responsible for maintaining a list of feeds to be updated. It will
also have n worker threads who pop feeds from the list and process them.
When the list is empty, the refill thread will sleep for some amount of time
(no need to update the feeds constantly), refill the list of feeds, and
signal to the workers to start up again.

Here’s a crawler class as I have it so far. I’ve removed some of the
nonrelevant parts.

harp:~ > cat a.rb
require ‘thread’

class Crawler
DEFAULT = {
‘n_workers’ => 4
}

def initialize opts = {}
getopt = getopts opts
@n_consumers = getopt[‘n_consumers’]
@q = Queue.new
@producer = new_producer
@consumers = Array.new(@n_consumers){ new_consumer }
end

def new_producer
new_thread do
loop do
feeds = get_feeds
feeds.each{|feed| @q.push feed}
end
end
end

def new_consumer
new_thread do
loop do
feed = @q.pop
process_feed feed
end
end
end

def get_feeds # placeholder
%w( a b c )
end

def process_feed feed # placeholder
trace “processing <#{ feed }>”
sleep 1
end

def trace msg
tid = Thread.current.object_id
STDERR.puts “#{ msg } (#{ tid })”
end

def getopts opts
lambda{|o| opts[o.to_s] || opts[o.to_s.intern] || DEFAULT[o.to_s]}
end

def new_thread &b
Thread.new do
Thread.current.abort_on_exception = true
b.call
end
end
end

Crawler.new :n_consumers => 4

STDIN.gets

harp:~ > ruby a.rb
processing (-609386786)
processing (-609491016)
processing (-609529486)
processing
(-609324242)
processing (-609386786)
processing (-609491016)
processing
(-609324242)
processing (-609529486)
processing (-609386786)
processing
(-609491016)
processing (-609324242)
processing (-609529486)

hth.

-a

On Thu, 28 Sep 2006, Jordan McKible wrote:

#fill contains a statement that can sleep for multiple
minutes
holding a mutex lock, but fortunately appears not to be reachable.

This is by design - in order to not be constantly retrieving feeds (that
seems a little exessive for a feed reader) I want to have a break
between refills

even if true - consider that you are effectively re-writing Queue to
accomplish your task. using the built-in Queue class will greatly
simply your
goal.

fyi.

-a

Jordan McKible wrote:

end

I don’t think this is quite what I need. The worker threads should
never exit - they should just sleep until the queue is refilled.

This is exactly what this scheme is doing. I was just mentionning the
exit so you give all the worker threads a chance to exit if the program
ever stops. I bet it will stop, one day, won’t it ? The queue.pop is
blocking, which means it will wait until the queue has some elements:

-------------------------------------------------------------- Queue#pop
pop(non_block=false)

 Retrieves data from the queue. If the queue is empty, the calling
 thread is suspended until data is pushed onto the queue. If
 non_block is true, the thread isn't suspended, and an exception is
 raised.

It

seems like this configuration could lead to the queue being filled up
before it’s completely depleated by workers.

You can’t fill up the queue (unless you mean run out of memory ?). And
you can use SizedQueue if you’re worried about it: you set a limit on
the number of elements in the queue. Then, when that number is reached,
the

queue << elements

blocks. It wakes up again when workers have depleted it enough.

I know this scheme works, as I use it for a massive parallel download
program: I build a queue of elements to download, and I use several
threads to pull them. Works perfectly. This really is exactly what you
need: no need to bother about synchronisation whatsoever…

Cheers !

Vince