[Zope3-checkins] SVN: Zope3/trunk/src/zope/server/ Call close_when_done() rather than close() in FTP data connections.

Shane Hathaway shane at zope.com
Mon Sep 6 21:52:17 EDT 2004


Log message for revision 27460:
  Call close_when_done() rather than close() in FTP data connections.
  
  ftp/server.py: a lot of rearranging was necessary to fix code that 
  called close() with arguments.  It's important to send a report at the 
  end of FTP data connections, but the old way relied on close() with 
  arguments and we sometimes need to call close_when_done() instead of 
  close().
  
  dualmodechannel.py: added an assertion that verifies close() is always 
  called in asynchronous mode.  See the comment.
  
  
  


Changed:
  U   Zope3/trunk/src/zope/server/dualmodechannel.py
  U   Zope3/trunk/src/zope/server/ftp/server.py


-=-
Modified: Zope3/trunk/src/zope/server/dualmodechannel.py
===================================================================
--- Zope3/trunk/src/zope/server/dualmodechannel.py	2004-09-07 01:45:52 UTC (rev 27459)
+++ Zope3/trunk/src/zope/server/dualmodechannel.py	2004-09-07 01:52:16 UTC (rev 27460)
@@ -82,7 +82,7 @@
         return not self.will_close
 
     def handle_read(self):
-        if not self.async_mode:
+        if not self.async_mode or self.will_close:
             return
         try:
             data = self.recv(self.adj.recv_bytes)
@@ -189,3 +189,10 @@
             # main thread calls handle_write().
             self.async_mode = 1
             self.pull_trigger()
+
+    def close(self):
+        # Always close in asynchronous mode.  If the connection is
+        # closed in a thread, the main loop can end up with a bad file
+        # descriptor.
+        assert self.async_mode
+        asyncore.dispatcher.close(self)

Modified: Zope3/trunk/src/zope/server/ftp/server.py
===================================================================
--- Zope3/trunk/src/zope/server/ftp/server.py	2004-09-07 01:45:52 UTC (rev 27459)
+++ Zope3/trunk/src/zope/server/ftp/server.py	2004-09-07 01:52:16 UTC (rev 27460)
@@ -147,10 +147,11 @@
 
     def cmd_abor(self, args):
         'See IFTPCommandHandler'
+        assert self.async_mode
+        self.reply('TRANSFER_ABORTED')
         if self.client_dc is not None:
-            self.client_dc.close('TRANSFER_ABORTED')
-        else:
-            self.reply('TRANSFER_ABORTED')
+            self.client_dc.reported = True
+            self.client_dc.close()
 
     def cmd_appe (self, args):
         'See IFTPCommandHandler'
@@ -208,14 +209,11 @@
             except GetoptError:
                 self.reply('ERR_ARGS')
                 return
-
             if len(args) > 1:
                 self.reply('ERR_ARGS')
                 return
-
             args = args and args[0] or ''
 
-
         fs = self._getFileSystem()
         path = self._generatePath(args)
         if not fs.type(path):
@@ -237,7 +235,9 @@
             cdc.write(s)
             cdc.close_when_done()
         except OSError, err:
-            cdc.close('ERR_NO_LIST', str(err))
+            self.reply('ERR_NO_LIST', str(err))
+            cdc.reported = True
+            cdc.close_when_done()
 
     def getList(self, args, long=0, directory=0):
         # we need to scan the command line for arguments to '/bin/ls'...
@@ -271,7 +271,6 @@
         return '\r\n'.join(file_list) + '\r\n'
 
 
-
     def cmd_mdtm(self, args):
         'See IFTPCommandHandler'
         fs = self._getFileSystem()
@@ -401,9 +400,13 @@
             fs.readfile(path, outstream, start)
             cdc.close_when_done()
         except OSError, err:
-            cdc.close('ERR_OPEN_READ', str(err))
+            self.reply('ERR_OPEN_READ', str(err))
+            cdc.reported = True
+            cdc.close_when_done()
         except IOError, err:
-            cdc.close('ERR_IO', str(err))
+            self.reply('ERR_IO', str(err))
+            cdc.reported = True
+            cdc.close_when_done()
 
 
     def cmd_rest(self, args):
@@ -489,7 +492,7 @@
 
     def finishedRecv(self, buffer, (path, mode, start)):
         """Called by RecvChannel when the transfer is finished."""
-        # Always called in a task.
+        assert not self.async_mode
         try:
             infile = buffer.getfile()
             infile.seek(0)
@@ -555,18 +558,17 @@
         path = posixpath.join(self.cwd, args)
         return posixpath.normpath(path)
 
-
     def newPassiveAcceptor(self):
         # ensure that only one of these exists at a time.
+        assert self.async_mode
         if self.passive_acceptor is not None:
             self.passive_acceptor.close()
             self.passive_acceptor = None
         self.passive_acceptor = PassiveAcceptor(self)
         return self.passive_acceptor
 
-
-
     def connectDataChannel(self, cdc):
+        """Attempt to connect the data channel."""
         pa = self.passive_acceptor
         if pa:
             # PASV mode.
@@ -575,7 +577,6 @@
                 conn, addr = pa.ready
                 cdc.set_socket (conn)
                 cdc.connected = 1
-                self.passive_acceptor.close()
                 self.passive_acceptor = None
             # else we're still waiting for a connect to the PASV port.
             # FTP Explorer is known to do this.
@@ -588,16 +589,13 @@
             try:
                 cdc.connect((ip, port))
             except socket.error:
-                cdc.close('NO_DATA_CONN')
+                self.reply('NO_DATA_CONN')
+                cdc.reported = True
+                cdc.close_when_done()
 
-                
-    def notifyClientDCClosing(self, *reply_args):
-        if self.client_dc is not None:
-            self.client_dc = None
-            if reply_args:
-                self.reply(*reply_args)
+    def closedData(self):
+        self.client_dc = None
 
-
     def close(self):
         LineServerChannel.close(self)
         # Make sure the client DC gets closed too.
@@ -703,11 +701,9 @@
         self.addr = self.getsockname()
         self.listen(1)
 
-
     def log (self, *ignore):
         pass
 
-
     def handle_accept (self):
         conn, addr = self.accept()
         conn.setblocking(0)
@@ -722,18 +718,48 @@
         self.close()
 
 
-class RecvChannel(DualModeChannel):
-    """ """
+class FTPDataChannel(DualModeChannel):
+    """Base class for FTP data connections"""
+    
+    def __init__ (self, control_channel):
+        self.control_channel = control_channel
+        self.reported = False
+        DualModeChannel.__init__(self, None, None, control_channel.adj)
 
+    def report(self, *reply_args):
+        """Reports the result of the data transfer."""
+        self.reported = True
+        if self.control_channel is not None:
+            self.control_channel.reply(*reply_args)
+
+    def reportDefault(self):
+        """Provide a default report on close."""
+        pass
+
+    def close(self):
+        """Notifies the control channel when the data connection closes."""
+        c = self.control_channel
+        try:
+            if c is not None and not self.reported:
+                self.reportDefault()
+        finally:
+            self.control_channel = None
+            DualModeChannel.close(self)
+            if c is not None:
+                c.closedData()
+    
+
+class RecvChannel(FTPDataChannel):
+    """FTP data receive channel"""
+
     complete_transfer = 0
     _fileno = None  # provide a default for asyncore.dispatcher._fileno
 
     def __init__ (self, control_channel, finish_args):
-        self.control_channel = control_channel
         self.finish_args = finish_args
         self.inbuf = OverflowableBuffer(control_channel.adj.inbuf_overflow)
-        DualModeChannel.__init__(self, None, None, control_channel.adj)
-        # Note that this channel starts out in async mode.
+        FTPDataChannel.__init__(self, control_channel)
+        # Note that this channel starts in async mode.
 
     def writable (self):
         return 0
@@ -753,22 +779,13 @@
         self.close()
         c.queue_task(task)
 
-    def close(self, *reply_args):
-        try:
-            c = self.control_channel
-            if c is not None:
-                self.control_channel = None
-                if not self.complete_transfer and not reply_args:
-                    # Not all data transferred
-                    reply_args = ('TRANSFER_ABORTED',)
-                c.notifyClientDCClosing(*reply_args)
-        finally:
-            if self.socket is not None:
-                # XXX asyncore.dispatcher.close() doesn't like socket == None
-                DualModeChannel.close(self)
+    def reportDefault(self):
+        if not self.complete_transfer:
+            self.report('TRANSFER_ABORTED')
+        # else the transfer completed and FinishedRecvTask will
+        # provide a complete reply through finishedRecv().
 
 
-
 class FinishedRecvTask(object):
 
     implements(ITask)
@@ -803,16 +820,16 @@
         pass
 
 
-class XmitChannel(DualModeChannel):
+class XmitChannel(FTPDataChannel):
+    """FTP data send channel"""
 
     opened = 0
     _fileno = None  # provide a default for asyncore.dispatcher._fileno
 
     def __init__ (self, control_channel, ok_reply_args):
-        self.control_channel = control_channel
         self.ok_reply_args = ok_reply_args
         self.set_sync()
-        DualModeChannel.__init__(self, None, None, control_channel.adj)
+        FTPDataChannel.__init__(self, control_channel)
 
     def _open(self):
         """Signal the client to open the connection."""
@@ -825,7 +842,7 @@
             raise IOError, 'Client FTP connection closed'
         if not self.opened:
             self._open()
-        DualModeChannel.write(self, data)
+        FTPDataChannel.write(self, data)
 
     def readable(self):
         return not self.connected
@@ -836,34 +853,26 @@
             self.recv(1)
         except:
             # The connection failed.
-            self.close('NO_DATA_CONN')
+            self.report('NO_DATA_CONN')
+            self.close()
 
     def handle_connect(self):
         pass
 
     def handle_comm_error(self):
-        self.close('TRANSFER_ABORTED')
+        self.report('TRANSFER_ABORTED')
+        self.close()
 
-    def close(self, *reply_args):
-        try:
-            c = self.control_channel
-            if c is not None:
-                self.control_channel = None
-                if not reply_args:
-                    if not len(self.outbuf):
-                        # All data transferred
-                        if not self.opened:
-                            # Zero-length file
-                            self._open()
-                        reply_args = ('TRANS_SUCCESS',)
-                    else:
-                        # Not all data transferred
-                        reply_args = ('TRANSFER_ABORTED',)
-                c.notifyClientDCClosing(*reply_args)
-        finally:
-            if self.socket is not None:
-                # XXX asyncore.dispatcher.close() doesn't like socket == None
-                DualModeChannel.close(self)
+    def reportDefault(self):
+        if not len(self.outbuf):
+            # All data transferred
+            if not self.opened:
+                # Zero-length file
+                self._open()
+            self.report('TRANS_SUCCESS')
+        else:
+            # Not all data transferred
+            self.report('TRANSFER_ABORTED')
 
 
 class ApplicationXmitStream(object):



More information about the Zope3-Checkins mailing list