Control the concurrent thread numbers

Hi,

I’d like to run total 10 threads concurrently, but from the printed
result I can see there is only one thread is running, not 10. What is
the issue?

thread_limit = 10
@array = (1…100).to_a

while @array.size > 0
if Thread.list.size < thread_limit
Thread.new {
@array.shift
}.join
end
puts Thread.list.size
end

Thanks.

Because you’re joining each threat before moving to the next one. You
need to gather all the threads and then join them later.

On Sat, Mar 16, 2013 at 12:13 PM, Ken P. [email protected] wrote:

if Thread.list.size < thread_limit
Thread.new {
@array.shift
}.join
end
puts Thread.list.size
end

The thread that calls join suspends its execution until the joined
thread finishes. So when join returns in your code, the thread that
did @array.shift doesn’t exist anymore.

If you want several threads to shift from the array “simultaneously”,
you have to take care about synchronizing access to the shared array,
though.

What are you really trying to accomplish?

Jesus.

On 16 March 2013 14:24, Ken P. [email protected] wrote:

You are right, I’m trying to consume the array simultaneously. But after
I move the join out of loop, it still has problem. How to fix it?

Every thread is executing code wrapped in a mutex, so only one thread
will be able to #shift the array.

As Joel said, what’s the problem you’re really trying to solve?

“Jesús Gabriel y Galán” wrote in post #1101925:

The thread that calls join suspends its execution until the joined
thread finishes. So when join returns in your code, the thread that
did @array.shift doesn’t exist anymore.

If you want several threads to shift from the array “simultaneously”,
you have to take care about synchronizing access to the shared array,
though.

What are you really trying to accomplish?

You are right, I’m trying to consume the array simultaneously. But after
I move the join out of loop, it still has problem. How to fix it?

threads = []
mut = Mutex.new
while @all.size > 0
while Thread.list.size < thread_number
threads << Thread.new {
mut.sychronize do
@all.shift
end
}
end
end
threads.each {|t| t.join}

split the array before using threads.

Adam P. wrote in post #1101931:

As Joel said, what’s the problem you’re really trying to solve?

The real problem is that there are plenty of files on disks and a long
list of words stored in an array. I’d count the total appearing times
for every words among these files.

I need to run this task with a multiple threads program. Each thread
would take one word from the array by shifting a word to ensure other
thread will not do the same task at the same time, then do the counting.
However, the total numbers of thread should be controlled at a number
less than 10.

On Sat, Mar 16, 2013 at 4:01 PM, Ken P. [email protected] wrote:

thread will not do the same task at the same time, then do the counting.
However, the total numbers of thread should be controlled at a number
less than 10.

That’s a very inefficient way to partition the work because you will
be reading each file over and over again - and file IO is slow.
Rather you would read each file only once and count all words and
finally combine counts.

Cheers

robert

Ken P. wrote in post #1101934:

The real problem is that there are plenty of files on disks and a long
list of words stored in an array. I’d count the total appearing times
for every words among these files.

I need to run this task with a multiple threads program. Each thread
would take one word from the array by shifting a word to ensure other
thread will not do the same task at the same time, then do the counting.
However, the total numbers of thread should be controlled at a number
less than 10.

As Joel said, partition the array outside the threads. I made a tiny
proof-of-concept: https://gist.github.com/phluid61/5180668 However it’s
probably simpler to just do:

threads = []
matches = []
mtx = Mutex.new
big_word_list.each_slice(10) do |smaller_word_list|
threads << Thread.new(smaller_word_list) do |ary|
data = []
ary.each do |word|
data << scan_files_for(word)
end
mtx.synchronize{ matches += data }
end
end
threads.each{|t| t.join }

Or whatever. It might even make sense to decimate the files, and make
each thread handle the full word list.

threads = []
matches = []
mtx = Mutex.new
all_files.each_slice(10) do |some_files|
threads << Thread.new(some_files) do |ary|
data = []
ary.each do |file|
data << scan_file(file, big_word_list)
end
mtx.synchronize{ matches += data }
end
end
threads.each{|t| t.join }

Depends how the scan_* function works, and how easy it is to get the
list of all_files.