[ZODB-Dev] Problems with ReadConflictError

Andrew McLean andrew-zodb at andros.org.uk
Mon Aug 21 14:44:17 EDT 2006


I'm having a problem with my first multi-threaded application using ZODB 
(v3.6).

The program basically uses an OOBTree to record whether tasks have been 
completed. The key is the task identifier. There are multiple Downloader 
threads that need to read the OOBTree (to check if a task has been 
completed). There is a single ProcessResults thread that writes to the 
OOBTree (when a task is completed). Each thread uses the same DB 
instance, but uses it's own connection instance. I also have a Packer 
thread that periodically packs the database.

The problem is that the Downloader thread occasionally throws a 
ReadConfictError exception.

I had thought that I could just wait for a bit and then retry the read, 
but that didn't help. I have a workaround, which is to assume the task 
is not done when the exception is thrown. But this isn't very elegant.

I suspect that I may not understand how the transaction manager works. 
Currently I just use "transaction.get().commit()" after each write to 
the database.

Any advice gratefully received.

I have attached a skeleton of the code below, if that helps.

- Andrew McLean


# Import various modules to support the ZODB
from ZODB import FileStorage, DB
from ZODB.POSException import ReadConflictError
from BTrees.OOBTree import OOBTree
import transaction
import BTrees.check


class Packer(threading.Thread):
   """Packer class to pack the ZODB at periodic intervals"""
   def __init__(self, db, interval):
       self.__db = db
       self.__interval = interval
       threading.Thread.__init__(self)
   def run(self):
       while 1:
           print "Packing database..."
           self.__db.pack()
           time.sleep(self.__interval)

class Downloader(threading.Thread):
   """Downloader class to download urls"""
   def __init__(self, id, taskQueue, resultQueue, db):
       self.id = id
       self.__taskQueue = taskQueue
       self.__resultQueue = resultQueue
       self.__results = db.open().root()['done']
       threading.Thread.__init__(self)
   def run(self):
       while 1:
           # Get task from the queue
           task = self.__taskQueue.get()
           if (task[1] == 1) or (not self.done(task)):
               # ....
               # Add to results queue
               self.__resultQueue.put((task, result))
   def done(self, task):
       ## backoffDelay = 1
       while 1:
           try:
               return self.__results.has_key(task)
           except ReadConflictError:
               ## print "*** ReadConfictError raised: retrying in %d 
seconds ***" % backoffDelay
               ## time.sleep(backoffDelay)
               ## backoffDelay = backoffDelay * 2
               print "*** ReadConflictError raised: Assume task not 
completed for safety ***"
               return False

class ProcessResults(threading.Thread):
   """ProcessPage class to process downloaded URLs"""
   def __init__(self, taskQueue, resultQueue, resultWriter, db, problems):
       self.__taskQueue = taskQueue
       self.__resultQueue = resultQueue
       self.__writer = resultWriter
       self.__results = db.open().root()['done']
       self.__problems = problems
       threading.Thread.__init__(self)
   def run(self):
       while 1:
           fullResult = self.__resultQueue.get()

           # ...

           # Write results to database
           self.__results[task] = True
           transaction.get().commit()


def main():

   # Setting up the ZODB database
   # ...
   storage = FileStorage.FileStorage(databasePath)
   db = DB(storage)
     # Create the "done" object (if required)
   root = db.open().root()
   if not root.has_key('done'):
       root['done'] = OOBTree()
       transaction.get().commit()

   # ...

   # Set up the workers to process the task queue
   print "Starting threads..."
   ProcessResults(taskQueue, resultQueue, resultWriter, db, 
problems).start()
   Packer(db, 15*60).start()
   for id in range(nDownloaders):
       Downloader(id, taskQueue, resultQueue, db).start()

main()



More information about the ZODB-Dev mailing list