Recursive parallel readdir() with drb and slave libraries

Hi all,

I’m trying to write a ruby script to do a recursive descent, counting up
the size of each directory and it’s contents. Obviously, you just
recurse down to the bottom, and work your way back up.

But I need to break this into parallel processes because of the sheer
size of my directories, on the order of 1-5Tbs in size, with millions of
files. I managed to write a version using the Ruby thread library, but
since Ruby 1.8.6 is not truly threaded, it didn’t buy me anything.

So I’ve tried to use the ‘drb’ and ‘slave’ libraries to make my life
simpler. Basically, I want to have N slaves going at any one time.
When processing a directory, and I find a sub-directory to descend, I
make a call back using DRb to see if I can fork off another slave to do
the work, or if I just recursively do the work myself.

Once I’m done in a directory, I wait on any slaves still working below
me, before I return my results to the next higher level.

I’m obviously doing something completely wrong here, but I’m missing out
what. Can anyone help me look at this code and tell me what I’m doing
wrong here? Or what I’m missing when you try to run it and it just
barfs (sorta) all over the place?

#!/usr/bin/ruby

require ‘getoptlong’
#require ‘thread’
require ‘slave’
require ‘drb’

$VERSION = “v1.0”;
$max_slaves = 1
$count = 50

URI = “drbunix:///tmp/readdir+” + Process.pid.to_s

slave_cnt_mutex = Mutex.new

opts = GetoptLong.new(
[ “–help”, “-h”, GetoptLong::NO_ARGUMENT],
[ “–kids”, “-k”, GetoptLong::REQUIRED_ARGUMENT]
)

#---------------------------------------------------------------------
class Master
def initialize(max_slaves=1)
@slaves = []
@max_slaves = max_slaves
@slave_count = 1
@slave_mutex = Mutex.new
# Probably not needed…
@slave_cv = ConditionVariable.new
end

def count
@slave_count
end

Increment the count of slaves, returning 1 if incremented, 0 if not.

def addslave
ok = nil
@slave_mutex.synchronize do
if (@slave_count < @max_slaves) then
@slave_count += 1
ok = 1
end
end
ok
end

Decrement the count of slaves

def remslave
@slave_mutex.synchronize do
if (@slave_count > 1) then
@slave_count -= 1
end
end
@slave_count
end

end

#---------------------------------------------------------------------

Read a directory and add to the database; this function is recursive

for sub-directories

def readdir(master,dir)

#puts “readdir(#{dir})”

size_file = {}
size_dir = {}
size_total = 0

Slave pool for this level of readdir() recursion.

pool = []

Traverse the directory and collect the size of all files and

directories

begin
Dir.foreach(dir) do |f|
#print " #{f},"
if(f != “.” && f != “…”) then
f_full = addpath(dir, f)
stat = File.lstat(f_full)

    if(!stat.symlink?) then

      if(stat.file?) then
        #puts "  File: #{f}"
        size = File.size(f_full)
        size_file[f] = size
        size_total += size
      end

      if(stat.directory?) then
        #puts "  DIR: #{f}, thread_count = #{@thread_count},

max_slaves = #{@max_slaves}."
if ($max_slaves <= 1) then
#puts " no threads."
size = readdir(master,f_full)
if (size > 0) then
size_dir[f] = size
size_total += size
end
else
ok = master.addslave
if (ok)
puts " Threaded!"
pool << Slave.object(:async => true) {
size = readdir(master,f_full)
puts “size = #{size}”
}
else
puts " no free threads, do anyway"
size = readdir(master,f_full)
if(size > 0) then
size_dir[f] = size
size_total += size
end
end
end
end
end
end
end
rescue SystemCallError => errmsg
puts errmsg
end

pool.each { |slave|
size_total += slave.value
#master.remslave
slave.shutdown

}

#Puts “Dir: #{dir} = #{size_total}”
return size_total
end

#---------------------------------------------------------------------
def usage
puts
puts “usage: readdir-drb [–kids NUM] "
puts " defaults to #{$max_kids} children”
puts
puts " version: #{$version}"
puts
end

#---------------------------------------------------------------------
def addpath(a, b)
return a + b if(a =~ //$/)
return a + “/” + b
end

#---------------------------------------------------------------------

Main

#---------------------------------------------------------------------
$DEBUG = true

opts.each do |opt,arg|
case opt
when “–kids”
$max_slaves = arg.to_i
else
usage
exit
end
end

if ARGV.length != 1
puts “Missing dir argument (try --help)”
exit 0
end

dir = ARGV.shift

Start the server which does the counting and coordination between

slaves.
DRb.start_service URI, Master.new($max_slaves)

Fire up the first slave process which will do the work of readdir()

master = DRbObject.new_with_uri URI

Now let’s try to do a recursive readdir() algorith with threads.

size = readdir(master,dir)

sizekb = size / 1024;
sizemb = sizekb / 1024;
sizegb = sizemb / 1024;

puts “”
puts “Total size: #{size} B”
puts “Total size: #{sizekb} KB”
puts “Total size: #{sizemb} MB”
puts “Total size: #{sizegb} GB”