Forum: Ruby Recursive parallel readdir() with drb and slave libraries

Announcement (2017-05-07): www.ruby-forum.com is now read-only since I unfortunately do not have the time to support and maintain the forum any more. Please see rubyonrails.org/community and ruby-lang.org/en/community for other Rails- und Ruby-related community platforms.
John S. (Guest)
on 2009-02-06 19:19
Hi all,

I'm trying to write a ruby script to do a recursive descent, counting up
the size of each directory and it's contents.  Obviously, you just
recurse down to the bottom, and work your way back up.

But I need to break this into parallel processes because of the sheer
size of my directories, on the order of 1-5Tbs in size, with millions of
files.  I managed to write a version using the Ruby thread library, but
since Ruby 1.8.6 is not truly threaded, it didn't buy me anything.

So I've tried to use the 'drb' and 'slave' libraries to make my life
simpler.  Basically, I want to have N slaves going at any one time.
When processing a directory, and I find a sub-directory to descend, I
make a call back using DRb to see if I can fork off another slave to do
the work, or if I just recursively do the work myself.

Once I'm done in a directory, I wait on any slaves still working below
me, before I return my results to the next higher level.

I'm obviously doing something completely wrong here, but I'm missing out
what.  Can anyone help me look at this code and tell me what I'm doing
wrong here?  Or what I'm missing when you try to run it and it just
barfs (sorta) all over the place?

#!/usr/bin/ruby

require 'getoptlong'
#require 'thread'
require 'slave'
require 'drb'

$VERSION = "v1.0";
$max_slaves = 1
$count = 50

URI = "drbunix:///tmp/readdir+" + Process.pid.to_s

slave_cnt_mutex = Mutex.new

opts = GetoptLong.new(
                      [ "--help", "-h", GetoptLong::NO_ARGUMENT],
                      [ "--kids", "-k", GetoptLong::REQUIRED_ARGUMENT]
                      )

#---------------------------------------------------------------------
class Master
  def initialize(max_slaves=1)
    @slaves = []
    @max_slaves = max_slaves
    @slave_count = 1
    @slave_mutex = Mutex.new
    # Probably not needed...
    @slave_cv = ConditionVariable.new
  end

  def count
    @slave_count
  end

  # Increment the count of slaves, returning 1 if incremented, 0 if not.
  def addslave
        ok = nil
    @slave_mutex.synchronize do
      if (@slave_count < @max_slaves) then
        @slave_count += 1
        ok = 1
      end
    end
    ok
  end

  # Decrement the count of slaves
  def remslave
    @slave_mutex.synchronize do
      if (@slave_count > 1) then
        @slave_count -= 1
      end
    end
    @slave_count
  end

end

#---------------------------------------------------------------------
# Read a directory and add to the database; this function is recursive
# for sub-directories

def readdir(master,dir)

  #puts "readdir(#{dir})"

  size_file = {}
  size_dir = {}
  size_total = 0

  # Slave pool for this level of readdir() recursion.
  pool = []

  # Traverse the directory and collect the size of all files and
  # directories

  begin
    Dir.foreach(dir) do |f|
      #print " #{f},"
      if(f != "." && f != "..") then
        f_full = addpath(dir, f)
        stat = File.lstat(f_full)

        if(!stat.symlink?) then

          if(stat.file?) then
            #puts "  File: #{f}"
            size = File.size(f_full)
            size_file[f] = size
            size_total += size
          end

          if(stat.directory?) then
            #puts "  DIR: #{f}, thread_count = #{@thread_count},
max_slaves = #{@max_slaves}."
            if ($max_slaves <= 1) then
              #puts " no threads."
              size = readdir(master,f_full)
              if (size > 0) then
                size_dir[f] = size
                size_total += size
              end
            else
              ok = master.addslave
              if (ok)
                puts " Threaded!"
                pool << Slave.object(:async => true) {
                  size = readdir(master,f_full)
                  puts "size = #{size}"
                }
              else
                puts " no free threads, do anyway"
                size = readdir(master,f_full)
                if(size > 0) then
                  size_dir[f] = size
                  size_total += size
                end
              end
            end
          end
        end
      end
    end
  rescue SystemCallError => errmsg
    puts errmsg
  end

    pool.each { |slave|
    size_total += slave.value
    #master.remslave
    slave.shutdown
  }

  #Puts "Dir: #{dir} = #{size_total}"
  return size_total
end

#---------------------------------------------------------------------
def usage
  puts
  puts "usage: readdir-drb [--kids NUM] <dir>"
  puts "  defaults to #{$max_kids} children"
  puts
  puts "  version: #{$version}"
  puts
end

#---------------------------------------------------------------------
def addpath(a, b)
  return a + b if(a =~ /\/$/)
  return a + "/" + b
end



#---------------------------------------------------------------------
# Main
#---------------------------------------------------------------------
$DEBUG = true

opts.each do |opt,arg|
  case opt
  when "--kids"
    $max_slaves = arg.to_i
  else
    usage
    exit
  end
end

if ARGV.length != 1
  puts "Missing dir argument (try --help)"
  exit 0
end

dir = ARGV.shift

# Start the server which does the counting and coordination between
slaves.
DRb.start_service URI, Master.new($max_slaves)

# Fire up the first slave process which will do the work of readdir()
master = DRbObject.new_with_uri URI

# Now let's try to do a recursive readdir() algorith with threads.

size = readdir(master,dir)

sizekb = size / 1024;
sizemb = sizekb / 1024;
sizegb = sizemb / 1024;

puts ""
puts "Total size: #{size} B"
puts "Total size: #{sizekb} KB"
puts "Total size: #{sizemb} MB"
puts "Total size: #{sizegb} GB"
This topic is locked and can not be replied to.