Threading an email queueing system

#1

I needed an email queueing system for a project I am writing. The
script
below runs every minute via a cronjob on one of my servers. I am using
a
table in mysql to keep track of the number of email bot threads to spawn
and
their current state. They need to be threaded(or at least run
concurrently)
since some email lists will be longer than others, and some emails have
a
higher priority. The thought is that this way I can quickly add more
email
threads as is needed, as well as have a simple web ui to check their
state
and turn them on or off as is necessary.

It works perfectly, most of the time. But sometime, arbitrarily, one of
the
threads does not finish and an email bot state is left as processing.
In
the log file, it actually says that it completes, but the entry in the
database still says processing. Any ideas?

======== Email Bots Table ===========
±—±-------±---------------------------------------------±---------------±--------------------+
| id | status | query_constraints | state
| updated_at |
±—±-------±---------------------------------------------±---------------±--------------------+
| 1 | on | select * from email_queue where priority = 0 |
idle | 2010-03-25 17:33:14 |
| 2 | on | select * from email_queue WHERE priority > 0 |
idle | 2010-03-25 17:33:14 |
±—±-------±---------------------------------------------±---------------±--------------------+

======== Email bot Script that runs via cronjob ===========

require ‘rubygems’
require ‘net/smtp’
require ‘timeout’
require ‘mysql’
require ‘activesupport’
require ‘activerecord’

begin
require ‘openssl’
rescue LoadError
end

ActiveRecord::Base.establish_connection(
:adapter => ‘mysql’,
:host => ‘localhost’,
:database => ‘adevelopment’,
:username => ‘username’,
:password => ‘password’
)

def send_email(from, to, subject, message)
msg = <<END_OF_MESSAGE
From: #{from}
To: #{to}
MIME-Version: 1.0
Content-type: text/html
Subject: #{subject}

#{message}
END_OF_MESSAGE

Net::SMTP.start('server', 25, '<sending domain>','username',

‘password’,
:login) do |smtp|
smtp.send_message msg, from.gsub(/[^<]+</,’’).gsub(/>/,’’),
to.gsub(/[^<]+</,’’).gsub(/>/,’’)
end
end

gather email bots

mysql = ActiveRecord::Base.connection.execute(“SELECT * FROM
email_bots”)
emailbots = {}
mysql.each_hash do |p|
emailbots[p[‘id’]] = p
end
mysql.free

threads = []
emailbots.each do |emailbot|
if emailbot[1][‘status’] == ‘on’ && emailbot[1][‘state’] == ‘idle’
threads << Thread.new{
ebot = emailbot[1]
ActiveRecord::Base.connection.execute(“UPDATE email_bots SET
state=‘processing’, updated_at=UTC_TIMESTAMP() WHERE id=#{ebot[‘id’]}”)
sent = “”
mysql =
ActiveRecord::Base.connection.execute(ebot[‘query_constraints’])
i = 1
mysql.each_hash do |email|
puts email[‘id’]
begin
send_email(email[‘sender’], email[‘recipient’],
email[‘subject’], email[‘message’])
sent += email[‘id’] + “,”
rescue Exception => e

ActiveRecord::Base.connection.execute(ActiveRecord::Base.send(“sanitize_sql_array”,[“UPDATE
email_queue SET error=’%s’ WHERE id=#{email[‘id’]}”, “#{e.message}
#{e.backtrace.inspect}”]))
end
i = i + 1
if i % 100 == 0
if sent != “”
ActiveRecord::Base.connection.execute(“INSERT INTO
email_sent
(SELECT priority, user_id, #{ebot[‘id’]}, sender, recipient, subject,
message, error, UTC_TIMESTAMP FROM email_queue WHERE id in
(#{sent.chop!}))”)
ActiveRecord::Base.connection.execute(“DELETE FROM
email_queue
WHERE id in (#{sent})”)
sent = “”
end
end
end
mysql.free
if sent != “”
ActiveRecord::Base.connection.execute(“INSERT INTO email_sent
(SELECT priority, user_id, #{ebot[‘id’]}, sender, recipient, subject,
message, error, UTC_TIMESTAMP FROM email_queue WHERE id in
(#{sent.chop!}))”)
ActiveRecord::Base.connection.execute(“DELETE FROM email_queue
WHERE id in (#{sent})”)
end
ActiveRecord::Base.connection.execute(“UPDATE email_bots SET
state=‘idle’ WHERE id=#{ebot[‘id’]}”)
}
else

This is a potential work around which i am not too excited about.

mysql =

ActiveRecord::Base.connection.execute(ebot[‘query_constraints’])

if mysql.num_rows == 0

ActiveRecord::Base.connection.execute("UPDATE email_bots SET

current_action=‘idle’ WHERE id=#{ebot[‘id’]}")
end
end
end
threads.each { |t|
t.join
}

#2

Actually looking it is terminating when i call mysql.free after
finishing walking through all of the sends