[Checkins] SVN: zope.sendmail/trunk/src/zope/sendmail/delivery.py add code to allow several QueueProcessorThreads send messages from the same

Benji York benji at zope.com
Thu Oct 18 15:16:59 EDT 2007


Log message for revision 80928:
  add code to allow several QueueProcessorThreads send messages from the same
  maildir simultaneously; this is helpful if you run several instances of the
  same app on a box
  

Changed:
  U   zope.sendmail/trunk/src/zope/sendmail/delivery.py

-=-
Modified: zope.sendmail/trunk/src/zope/sendmail/delivery.py
===================================================================
--- zope.sendmail/trunk/src/zope/sendmail/delivery.py	2007-10-18 19:15:26 UTC (rev 80927)
+++ zope.sendmail/trunk/src/zope/sendmail/delivery.py	2007-10-18 19:16:58 UTC (rev 80928)
@@ -19,12 +19,14 @@
 """
 __docformat__ = 'restructuredtext'
 
+import atexit
+import logging
+import os
+import os.path
 import rfc822
+import stat
 import threading
-import logging
-import atexit
 import time
-from os import unlink, getpid
 from cStringIO import StringIO
 from random import randrange
 from time import strftime
@@ -37,6 +39,12 @@
 import transaction
 
 
+# The longest time sending a file is expected to take.  Longer than this and
+# the send attempt will be assumed to have failed.  This means that sending
+# very large files or using very slow mail servers could result in duplicate
+# messages sent.
+MAX_SEND_TIME = 60*60*3
+
 class MailDataManager(object):
     implements(IDataManager)
 
@@ -86,7 +94,7 @@
         """Generates a new message ID according to RFC 2822 rules"""
         randmax = 0x7fffffff
         left_part = '%s.%d.%d' % (strftime('%Y%m%d%H%M%S'),
-                                  getpid(),
+                                  os.getpid(),
                                   randrange(0, randmax))
         return "%s@%s" % (left_part, gethostname())
 
@@ -138,6 +146,59 @@
         return MailDataManager(msg.commit, onAbort=msg.abort)
 
 
+# The below diagram depicts the operations performed while sending a message in
+# the ``run`` method of ``QueueProcessorThread``.  This sequence of operations
+# will be performed for each file in the maildir each time the thread "wakes
+# up" to send messages.
+#
+# Any error conditions not depected on the diagram will provoke the catch-all
+# exception logging of the ``run`` method.
+#
+# In the diagram the "message file" is the file in the maildir's "cur" directory
+# that contains the message and "tmp file" is a hard link to the message file
+# created in the maildir's "tmp" directory.
+#
+#           ( start trying to deliver a message )
+#                            |
+#                            |
+#                            V
+#            +-----( get tmp file mtime )
+#            |               |
+#            |               | file exists
+#            |               V
+#            |         ( check age )-----------------------------+
+#   tmp file |               |                       file is new |
+#   does not |               | file is old                       |
+#   exist    |               |                                   |
+#            |      ( unlink tmp file )-----------------------+  |
+#            |               |                      file does |  |
+#            |               | file unlinked        not exist |  |
+#            |               V                                |  |
+#            +---->( touch message file )------------------+  |  |
+#                            |                   file does |  |  |
+#                            |                   not exist |  |  |
+#                            V                             |  |  |
+#            ( link message file to tmp file )----------+  |  |  |
+#                            |                 tmp file |  |  |  |
+#                            |           already exists |  |  |  |
+#                            |                          |  |  |  |
+#                            V                          V  V  V  V
+#                     ( send message )             ( skip this message )
+#                            |
+#                            V
+#                 ( unlink message file )---------+
+#                            |                    |
+#                            | file unlinked      | file disappeared
+#                            |                    |
+#                            |  +-----------------+
+#                            |  |
+#                            |  V
+#                  ( unlink tmp file )-------------+
+#                            |                     |
+#                            | file unlinked       | file disappeared
+#                            V                     |
+#                  ( message delivered )<----------+
+
 class QueueProcessorThread(threading.Thread):
     """This thread is started at configuration time from the
     `mail:queuedDelivery` directive handler.
@@ -149,7 +210,7 @@
 
     def __init__(self, interval=3.0):
         threading.Thread.__init__(self)
-        self.interval = interval  
+        self.interval = interval
 
     def setMaildir(self, maildir):
         """Set the maildir.
@@ -196,13 +257,100 @@
             for filename in self.maildir:
                 fromaddr = ''
                 toaddrs = ()
+                head, tail = os.path.split(filename)
+                tmp_filename = os.path.join(head, 'sending-' + tail)
                 try:
+                    # perform a series of operations in an attempt to ensure
+                    # that no two threads/processes send this message
+                    # simultaneously as well as attempting to not generate
+                    # spurious failure messages in the log; a diagram that
+                    # represents these operations is included in
+                    # send-mail-states.txt
+                    try:
+                        # find the age of the tmp file (if it exists)
+                        age = None
+                        mtime = os.stat(tmp_filename)[stat.ST_MTIME]
+                        age = time.time() - mtime
+                    except OSError, e:
+                        if e.errno == 2: # file does not exist
+                            # the tmp file could not be stated because it
+                            # doesn't exist, that's fine, keep going
+                            pass
+                        else:
+                            # the tmp file could not be stated for some reason
+                            # other than not existing; we'll report the error
+                            raise
+
+                    # if the tmp file exists, check it's age
+                    if age is not None:
+                        try:
+                            if age > MAX_SEND_TIME:
+                                # the tmp file is "too old" remove it
+                                os.unlink(tmp_filename)
+                            else:
+                                # the tmp file is "new", so someone else may
+                                # be sending this message, try again later
+                                continue
+                            # if we get here, the file existed, but was too
+                            # old, so it was unlinked
+                        except OSError, e:
+                            if e.errno == 2: # file does not exist
+                                # it looks like someone else removed the tmp file,
+                                # that's fine, we'll try to deliver the message
+                                # again later
+                                continue
+
+                    # now we know that the tmp file doesn't exist, we need to
+                    # "touch" the message before we create the tmp file so the
+                    # mtime will reflect the fact that the file is being
+                    # processed (there is a race here, but it's OK for two or
+                    # more processes to touch the file "simultaneously")
+                    try:
+                        os.utime(filename, None)
+                    except OSError, e:
+                        if e.errno == 2: # file does not exist
+                            # someone removed the message before we could
+                            # touch it, no need to complain, we'll just keep
+                            # going
+                            continue
+
+                    # creating this hard link will fail if another process is
+                    # also sending this message
+                    try:
+                        os.link(filename, tmp_filename)
+                    except OSError, e:
+                        if e.errno == 17: # file exists
+                            # it looks like someone else is sending this
+                            # message too; we'll try again later
+                            continue
+
+                    # read message file and send contents
                     file = open(filename)
                     message = file.read()
                     file.close()
                     fromaddr, toaddrs, message = self._parseMessage(message)
                     self.mailer.send(fromaddr, toaddrs, message)
-                    unlink(filename)
+
+                    try:
+                        os.unlink(filename)
+                    except OSError, e:
+                        if e.errno == 2: # file does not exist
+                            # someone else unlinked the file; oh well
+                            pass
+                        else:
+                            # something bad happend, log it
+                            raise
+
+                    try:
+                        os.unlink(tmp_filename)
+                    except OSError, e:
+                        if e.errno == 2: # file does not exist
+                            # someone else unlinked the file; oh well
+                            pass
+                        else:
+                            # something bad happend, log it
+                            raise
+
                     # TODO: maybe log the Message-Id of the message sent
                     self.log.info("Mail from %s to %s sent.",
                                   fromaddr, ", ".join(toaddrs))



More information about the Checkins mailing list