Using threads to show progress

I want to do this in a Rake task, but the concept is Ruby. I would like
to use one thread to do the work, and one thread to periodically tell me
“We’re working on element x now”. I’m trying to work from this page:
http://ruby-doc.org/docs/ProgrammingRuby/html/tut_threads.html
Here is basically what I have:

dictated_exams = DictatedExam.all
enumerate = Thread.new do
  dictated_exams.each do |exam|
    Thread.current[:id] = exam.id
  end
end
print = Thread.new do
  sleep 1
  puts enumerate[:id]
end

end

… And I’m pretty much stuck here. I -think- what I want to do is
create a thread which won’t end until after the main one ends, and loop
forever ?
Or maybe I want to repeatedly create a new thread which will print the
:id, then exit, until the ‘enumerate’ thread is done ?
I realize that I could do this in one process, without threads, but I
want to update the output, say, every second, instead of with every
element.

Aldric G. wrote:

I want to do this in a Rake task, but the concept is Ruby. I would like
to use one thread to do the work, and one thread to periodically tell me
“We’re working on element x now”. I’m trying to work from this page:
Programming Ruby: The Pragmatic Programmer's Guide
Here is basically what I have:

I’ve successfully implemented it this way:

desc “Go through dictated exams and show IDs”
task(:showSomeIDs => :environment) do
dictated_exams = DictatedExam.all
enumerate = Thread.new do
dictated_exams.each do |exam|
Thread.current[:id] = exam.id
end
end
print = Thread.new do
while true
sleep 0.01
puts enumerate[:id]
end
end
enumerate.join
print.kill
end

Is there a more elegant way to do it?

Aldric G. wrote:

I’ve successfully implemented it this way:

desc “Go through dictated exams and show IDs”
task(:showSomeIDs => :environment) do
dictated_exams = DictatedExam.all
enumerate = Thread.new do
dictated_exams.each do |exam|
Thread.current[:id] = exam.id
end
end
print = Thread.new do
while true
sleep 0.01
puts enumerate[:id]
end
end
enumerate.join
print.kill
end

Is there a more elegant way to do it?

while enumerate.alive?
puts enumerate[:id]
sleep 1
end

Then you don’t need a print thread at all (just do it from the ‘main’
thread)

On Tuesday 15 December 2009 01:53:21 pm Aldric G. wrote:

Aldric G. wrote:

I want to do this in a Rake task, but the concept is Ruby. I would like
to use one thread to do the work, and one thread to periodically tell me
“We’re working on element x now”. I’m trying to work from this page:
Programming Ruby: The Pragmatic Programmer's Guide
Here is basically what I have:

I’ve successfully implemented it this way:
[snip]
Thread.current[:id] = exam.id

What’s the advantage of making this a thread-local variable?

Taking Brian C.'s advice into account, I’d probably do this:

desc “Go through dictated exams and show IDs”
task(:showSomeIDs => :environment) do
dictated_exams = DictatedExam.all
current_id = nil
enumerate = Thread.new do
dictated_exams.each do |exam|
current_id = exam.id
end
end
while enumerate.alive?
sleep 0.01
puts current_id
end
enumerate.join
end

In this case, it’s simple enough that doing the threading yourself isn’t
too
bad. But since you seem new to threading, here’s a couple things we
didn’t
cover:

  • Locking. In this case, you’re only printing the value of one variable
    which
    I assume is an integer, so updates to that should be atomic. You’ll
    probably
    be fine if you’re just printing some status, but you never know – even
    methods that don’t have ! on them can modify state.

  • Exception handling. Actually, I did that by joining the thread
    anyway, even
    though I know it’s ended when the loop is over. If I didn’t do that, any
    exceptions raised within the thread would be silently ignored.

Threads really aren’t elegant. They are the 'goto’s of concurrency, and
I
don’t know of anyone who actually claims that naked threads are the way
forward. The only way this would be elegant is if it was hidden away in
a
library which exposed a higher-level interface to the user – but I’m
not sure
something like that exists or would be useful here.

Piyush R. wrote:

In the enumerate thread keep pushing id to queue whenever there is a new
object being worked on.

dictated_exams.each do |exam|
queue.push exam.id
end

That’s a really good suggestion: the consumer will block when it tries
to pop from the queue, so you don’t need to spin.

However in that case you can’t also poll Thread#alive?, so I suggest you
wrap the producer thread to make it send a termination message, even if
it terminates abnormally with an exception.

require 'thread'
dictated_exams = ["foo","bar"]
queue = Queue.new
enumerate = Thread.new do
  begin
    dictated_exams.each do |exam|
      queue.push exam
      raise "Oops" if exam == "baz"
    end
  ensure
    queue.push nil
  end
end
while current = queue.pop
  puts current
end
enumerate.join   # just to propagate exception

Brian C. wrote:

Piyush R. wrote:

In the enumerate thread keep pushing id to queue whenever there is a new
object being worked on.

dictated_exams.each do |exam|
queue.push exam.id
end

That’s a really good suggestion: the consumer will block when it tries
to pop from the queue, so you don’t need to spin.

The idea of the queue is very good.

I now have this code going for me :

require ‘thread’
queue = Queue.new

dictated_exams = (1…1_000_000)
backspaces = 8 # Max of dictated_exams string size
worker_thread = Thread.new do
dictated_exams.each do |exam|
queue << exam
end
end
consumer_thread = Thread.new do
while true
id = queue.pop
print “\b” * backspaces
print id
end
end
worker_thread.join
consumer_thread.kill

As someone noticed, I am new to threads (I didn’t think it was written
on my forehead, but I guess it must be next to the “Kick me” sign on my
back). I’m looking for a way to have an idea of what long rake tasks are
doing. I wrote a long and convoluted task to find and delete rows which
contain duplicate data. While I am working on it and polishing it up,
I’d like to be able to keep track of what it does. It has a couple of
‘each’ statements, so I will probably end up needing to call a
‘printing’ thread a few times, once for each step – that is why I can’t
just use the main thread to print…
And this also gave me the idea of trying my hand at writing a simple gem
or plugin to indicate progress (I know it’s been done before, but it may
be good practice for me).

David said that threads aren’t elegant. I somewhat agree, I don’t really
like working with them, they seem clunky right now, but it seems to be a
pretty good way to separate the “showing progress” code and the “getting
work done” code. Are there better ways to do this?
My original reason for wanting this, which may be misguided, was the
cost in time for doing a “print” - I remember my early days with Ruby,
working on Project Euler and trying to keep track of how fast my
brute-force solutions were going… And they went MUCH faster without a
“put” in the loop.

How about using a queue ?
http://ruby-doc.org/stdlib/libdoc/thread/rdoc/classes/Queue.html

In the enumerate thread keep pushing id to queue whenever there is a new
object being worked on.

dictated_exams.each do |exam|
queue.push exam.id
end

Consumer thread should remain the same as the example.

Aldric G. wrote:

I now have this code going for me :

Killing threads asynchronously is usually a Bad Idea[TM]. It leaves you
open to all sorts of race conditions. Terminating gracefully, by sending
a nil into the queue as I showed before, is much better.

It has a couple of
‘each’ statements, so I will probably end up needing to call a
‘printing’ thread a few times, once for each step – that is why I can’t
just use the main thread to print…

Can you not just do $stderr.puts … ?

The main downside of this is if you are processing 10,000,000 things and
you don’t want 10,000,000 screen updates. In that case, I suggest you
keep a “last output” timestamp and only update once per second.

Another option might be to take advantage of duck-typing. Replace
$stderr with an object of your own which looks like an IO, but which
only writes the data if no update has been seen for the last second, or
something like that.

Brian C. wrote:

Killing threads asynchronously is usually a Bad Idea[TM]. It leaves you
open to all sorts of race conditions. Terminating gracefully, by sending
a nil into the queue as I showed before, is much better.
I read the rdocs for the Queue class and now I understand how it works -
you are right, I will fix my ugly code.

The main downside of this is if you are processing 10,000,000 things and
you don’t want 10,000,000 screen updates. In that case, I suggest you
keep a “last output” timestamp and only update once per second.
That is a good idea!

Another option might be to take advantage of duck-typing. Replace
$stderr with an object of your own which looks like an IO, but which
only writes the data if no update has been seen for the last second, or
something like that.

That sounds like work. shudder
On the other hand, that’s a great way to start playing with IO objects.

A suggestion

class Progress
def initialize(out = STDERR, interval = 0.5)
@out = out
@interval = interval
@timestamp = nil
@category = nil
end

def bar(category, n, m)
@out << sprintf("%-20s[%-56s]", category, “*” * (56.0 * n / m))
end

Progress << [“Baking”, 53, 100]

Progress << “Finished”

def <<(data)
return if data.is_a?(Array) && data[0] == @category && @timestamp &&
Time.now < @timestamp
@timestamp = Time.now + @interval
if data.is_a?(Array) && data[0] == @category
@out << “\r”
bar(*data)
return
end
if @category
@out << “\r”
bar(@category, 100, 100)
@out << “\r\n”
end
if data.is_a?(Array)
@category = data[0]
bar(*data)
else
@category = nil
@out << data << “\r\n”
end
end

def self.<<(data)
@progress ||= new
@progress << data
end
end

if FILE == $0
Progress << “Feeding the cat”
33.times do |x|
Progress << [“munch”, x, 33]
sleep(0.1)
end
Progress << “Put the cat out”
Progress << “Go to sleep”
20.times do |x|
Progress << [“zzzzzzz”, x, 20]
sleep(0.2)
end
Progress << “Wakey wakey!”
end

Brian C. wrote:

A suggestion

class Progress
def initialize(out = STDERR, interval = 0.5)
@out = out
@interval = interval
@timestamp = nil
@category = nil
end

[…]

def self.<<(data)
@progress ||= new
@progress << data
end
end

That’s pretty interesting. Are you… Defining an instance of the class
within the class itself ?
Why are you using STDERR?
Thanks for the suggestion; I’ll pore over that.

Brian C. wrote:

Aldric G. wrote:

That’s pretty interesting. Are you… Defining an instance of the class
within the class itself ?

Yes. It’s just a shortcut to allow you to use a default instance, whilst
still allowing you to have multiple instances if you want.

Why are you using STDERR?

Just convention really.

If the rake task generates useful output, the user may have redirected
STDOUT to a file (*). And STDERR is unbuffered by default, so you see
the messages immediately without having to flush.

(*) Of course, it is possible to redirect stderr too: 2>/dev/null will
silence your progress messages.

I see! Very nice. I have one last question.

if FILE == $0
Progress << “Feeding the cat”
33.times do |x|
Progress << [“munch”, x, 33]
sleep(0.1)
end
Progress << “Put the cat out”
Progress << “Go to sleep”
20.times do |x|
Progress << [“zzzzzzz”, x, 20]
sleep(0.2)
end
Progress << “Wakey wakey!”
end

From inside a loop, can I dynamically know over how many elements the
loop will iterate? I noticed the repetition of ‘33’ and ‘20’ … :slight_smile:

Aldric G. wrote:

That’s pretty interesting. Are you… Defining an instance of the class
within the class itself ?

Yes. It’s just a shortcut to allow you to use a default instance, whilst
still allowing you to have multiple instances if you want.

Why are you using STDERR?

Just convention really.

If the rake task generates useful output, the user may have redirected
STDOUT to a file (*). And STDERR is unbuffered by default, so you see
the messages immediately without having to flush.

(*) Of course, it is possible to redirect stderr too: 2>/dev/null will
silence your progress messages.

Aldric G. wrote:

if FILE == $0
Progress << “Feeding the cat”
33.times do |x|
Progress << [“munch”, x, 33]
sleep(0.1)
end
Progress << “Put the cat out”
Progress << “Go to sleep”
20.times do |x|
Progress << [“zzzzzzz”, x, 20]
sleep(0.2)
end
Progress << “Wakey wakey!”
end

From inside a loop, can I dynamically know over how many elements the
loop will iterate? I noticed the repetition of ‘33’ and ‘20’ … :slight_smile:

It depends on what you’re iterating over. If it’s an Array, use
Array#size:

myarray.each_with_index do |elem, i|
Progress << [“Processing”, i, myarray.size]

end

Progress does here assume that you know the number of iterations in
advance (so it can show a progress bar). You could easily modify the
code so that

Progress << ["munch", x]

it would just display the number x, rather than a bar.

def bar(category, n, m=nil)
if m
@out << sprintf("%-20s[%-56s]", category, “*” * (56.0 * n / m))
else
@out << sprintf("%-20s %-56s", category, n.to_s)
end
end

On Tuesday 15 December 2009 07:46:26 pm Piyush R. wrote:

Consumer thread should remain the same as the example.

Except that this removes any advantage of threads, as I see it – and it
still
doesn’t remove the essential problem of shared memory, once you start
using it
for anything more complex than IDs.

And there is an advantage to doing this status update in a thread, I
just
don’t think it’s worth the potential headaches.

The main advantage of the Queue is the same advantage of a Unix pipe.
For
example, I now have some large files compressed with lzop initially,
which I
now want to recompress with lzma. It’s not going to make a huge amount
of
difference to do it this way (since lzma is so much slower than lzop),
but
consider:

lzop -d < foo.lzo | lzma -v9 > foo.lzma

In this case, the lzop decompression and the lzma compression are
actually
asynchronous – the lzop will decompress as fast as it can until it
fills up a
buffer, which lzma will then read from. On a modern dual-core Linux
system,
lzma will likely fill one core, while lzop uses some small-ish amount of
the
second core.

Technically, this would be a fixed-length Queue – the standard Queue
class
can actually grow indefinitely. But the idea is the same, and it is
concurrent
for the same reason.

There are generally two reasons you’d want to do something concurrently.
One
is because it’s more efficient – I have a dual-core machine. But this
only
makes sense if you’re using JRuby – MRI (1.8 and 1.9) is crippled by a
GIL,
just like Python.

The other reason is because you’ve actually got some concurrent things
happening, and sometimes pre-emptive multitasking is just easier to wrap
your
head around. For example, in this case, it’d be useful to have a
separate
thread if you wanted to be sure it was updating at a certain rate (once
a
second, or ten times a second), regardless of how fast (or how slowly)
the
actual data was changing.

One reason you might want to do this here is to provide an ETA – if the
original process stalls on a single ID for several minutes, you could
watch
the ETA get longer and longer.

But if you’re blocking waiting for ids to come off the Queue, this is no
advantage at all – it’s more or less the same as if you called some
“update_status” method with a given id.

It also introduces all the messiness of threading as soon as you start
passing
any more complex object than an id. For example, it might be nice if you
could
determine how far done the object was based on some internal state.
Ignoring
the fact that you’d only wake up to do this when you get a new id coming
down
the Queue, you now have the issue of the same object being accessed by
the
worker thread (doing whatever crunching it’s doing) and the view thread
(reading that object to figure out how done it is).

How about using a mutex synchronize block around queue.push ? Would that
solve the potential race conditions we are looking at here ?

Piyush R. wrote:

How about using a mutex synchronize block around queue.push ? Would that
solve the potential race conditions we are looking at here ?

Queue is a thread-safe class, and you do not need any additional
synchronisation.

On Thursday 17 December 2009 02:52:26 am Brian C. wrote:

Piyush R. wrote:

How about using a mutex synchronize block around queue.push ? Would that
solve the potential race conditions we are looking at here ?

Queue is a thread-safe class, and you do not need any additional
synchronisation.

Not around the queue itself, no, so you’re right – a mutex synchronize
around
queue.push would do nothing.

No, the problem isn’t the queue itself, it’s if you were to pass
something
non-thread-safe through the queue.

David M. wrote:

No, the problem isn’t the queue itself, it’s if you were to pass
something
non-thread-safe through the queue.

Indeed. It’s unwise to push the object through the queue, saying “I’m
currently working on this object”, knowing that the other side could be
examining its state at the same time.

You could pass the object you’ve just finished working on. Or else a
fresh object which has copies of the information required by the
consumer.

On 16.12.2009 16:09, Aldric G. wrote:

Another option might be to take advantage of duck-typing. Replace
$stderr with an object of your own which looks like an IO, but which
only writes the data if no update has been seen for the last second, or
something like that.

That sounds like work. shudder
On the other hand, that’s a great way to start playing with IO objects.

IMHO it is not necessary though. What you basically only need is to
execute an action only if a particular condition becomes true. That is
not specific to IO objects - you don’t even need to implement the full
protocol of IO which indeed sounds like work. A simpler solution looks
like this:

  1. high notification and change frequency => time based

class TimeThrottle

def initialize(interval, &act)
@interval = interval
@last = Time.at 0
@act = act
end

def update(*a, &act)
n = Time.now

 if n - @last > @interval
   @last = n
   (act || @act)[*a]
 end

end

end

  1. high notification and low change frequency => change based

class ChangeThrottle

def initialize(&act)
@last = nil
@act = act
end

def update(*a, &act)
if a != @last
(act || @act)[*a]
@last = a
end
end

end

Now we can combine that in an example with three different update
models:

def dump(label, msg)
printf “%-30s %s\n”, label[0, 30], msg
end

tt = TimeThrottle.new 1 do |x|
dump “time”, x
end

ct = ChangeThrottle.new do |x|
dump “change”, x * 100_000
end

ct2 = ChangeThrottle.new do |x|
dump “combined”, x * 100
end

co = TimeThrottle.new 1 do |x|
ct2.update x
end

tt.update { dump “time”, “started” }
ct.update { dump “change”, “started” }

1_000_000.times do |i|
tt.update i
ct.update i / 100_000 # reduce change frequency
co.update i / 100
end

tt.update { dump “time”, “stopped” }
ct.update { dump “change”, “stopped” }

Kind regards

robert