[Checkins] SVN: ZODB/trunk/src/ - Storage servers now emit Serving and Closed events so subscribers

Jim Fulton jim at zope.com
Mon Nov 21 12:33:31 UTC 2011


Log message for revision 123454:
  - Storage servers now emit Serving and Closed events so subscribers
    can discover addresses when dynamic port assignment (bind to port 0)
    is used. This could, for example, be used to update address
    information in a ZooKeeper database.
  
  - Client storagers have a method, new_addr, that can be used to change
    the server address(es). This can be used, for example, to update a
    dynamically determined server address from information in a
    ZooKeeper database.
  
  - Moved some responsibility from runzeo to StorageServer to make it
    easier to use storage servers without runzeo.
  

Changed:
  U   ZODB/trunk/src/CHANGES.txt
  U   ZODB/trunk/src/ZEO/ClientStorage.py
  U   ZODB/trunk/src/ZEO/StorageServer.py
  U   ZODB/trunk/src/ZEO/runzeo.py
  A   ZODB/trunk/src/ZEO/tests/dynamic_server_ports.test
  A   ZODB/trunk/src/ZEO/tests/new_addr.test
  U   ZODB/trunk/src/ZEO/tests/registerDB.test
  U   ZODB/trunk/src/ZEO/tests/servertesting.py
  U   ZODB/trunk/src/ZEO/tests/testZEO.py
  U   ZODB/trunk/src/ZEO/tests/zeoserver.py
  U   ZODB/trunk/src/ZEO/zrpc/client.py
  U   ZODB/trunk/src/ZEO/zrpc/connection.py
  U   ZODB/trunk/src/ZEO/zrpc/server.py

-=-
Modified: ZODB/trunk/src/CHANGES.txt
===================================================================
--- ZODB/trunk/src/CHANGES.txt	2011-11-21 11:49:49 UTC (rev 123453)
+++ ZODB/trunk/src/CHANGES.txt	2011-11-21 12:33:31 UTC (rev 123454)
@@ -2,8 +2,8 @@
  Change History
 ================
 
-3.11.0 (2010-??-??)
-===================
+3.11.0a1 (2011-??-??)
+=====================
 
 New Features
 ------------
@@ -19,6 +19,16 @@
   comparison inherited from object. (This doesn't apply to old-style
   class instances.)
 
+- Storage servers now emit Serving and Closed events so subscribers
+  can discover addresses when dynamic port assignment (bind to port 0)
+  is used. This could, for example, be used to update address
+  information in a ZooKeeper database.
+
+- Client storagers have a method, new_addr, that can be used to change
+  the server address(es). This can be used, for example, to update a
+  dynamically determined server address from information in a
+  ZooKeeper database.
+
 3.10.5 (2011-11-19)
 ===================
 

Modified: ZODB/trunk/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/trunk/src/ZEO/ClientStorage.py	2011-11-21 11:49:49 UTC (rev 123453)
+++ ZODB/trunk/src/ZEO/ClientStorage.py	2011-11-21 12:33:31 UTC (rev 123454)
@@ -424,8 +424,10 @@
             if not self._rpc_mgr.attempt_connect():
                 self._rpc_mgr.connect()
 
+    def new_addr(self, addr):
+        self._addr = addr
+        self._rpc_mgr.new_addrs(addr)
 
-
     def _wait(self, timeout=None):
         if timeout is not None:
             deadline = time.time() + timeout
@@ -503,10 +505,15 @@
         """
         self._db = db
 
-    def is_connected(self):
+    def is_connected(self, test=False):
         """Return whether the storage is currently connected to a server."""
         # This function is used by clients, so we only report that a
         # connection exists when the connection is ready to use.
+        if test:
+            try:
+                self._server.lastTransaction()
+            except Exception:
+                pass
         return self._ready.isSet()
 
     def sync(self):

Modified: ZODB/trunk/src/ZEO/StorageServer.py
===================================================================
--- ZODB/trunk/src/ZEO/StorageServer.py	2011-11-21 11:49:49 UTC (rev 123453)
+++ ZODB/trunk/src/ZEO/StorageServer.py	2011-11-21 12:33:31 UTC (rev 123454)
@@ -46,6 +46,7 @@
 import warnings
 import ZEO.zrpc.error
 import ZODB.blob
+import ZODB.event
 import ZODB.serialize
 import ZODB.TimeStamp
 import zope.interface
@@ -782,18 +783,20 @@
 
     # Classes we instantiate.  A subclass might override.
 
-    DispatcherClass = Dispatcher
+    DispatcherClass = ZEO.zrpc.server.Dispatcher
     ZEOStorageClass = ZEOStorage
     ManagedServerConnectionClass = ManagedServerConnection
 
-    def __init__(self, addr, storages, read_only=0,
+    def __init__(self, addr, storages,
+                 read_only=0,
                  invalidation_queue_size=100,
                  invalidation_age=None,
                  transaction_timeout=None,
                  monitor_address=None,
                  auth_protocol=None,
                  auth_database=None,
-                 auth_realm=None):
+                 auth_realm=None,
+                 ):
         """StorageServer constructor.
 
         This is typically invoked from the start.py script.
@@ -891,8 +894,13 @@
             storage.registerDB(StorageServerDB(self, name))
         self.invalidation_age = invalidation_age
         self.connections = {}
-        self.dispatcher = self.DispatcherClass(addr,
-                                               factory=self.new_connection)
+        self.socket_map = {}
+        self.dispatcher = self.DispatcherClass(
+            addr, factory=self.new_connection, map=self.socket_map)
+        if len(self.addr) == 2 and self.addr[1] == 0 and self.addr[0]:
+            self.addr = self.dispatcher.socket.getsockname()
+        ZODB.event.notify(
+            Serving(self, address=self.dispatcher.socket.getsockname()))
         self.stats = {}
         self.timeouts = {}
         for name in self.storages.keys():
@@ -1137,26 +1145,53 @@
 
         return latest_tid, list(oids)
 
-    def close_server(self):
+    def loop(self):
+        try:
+            asyncore.loop(map=self.socket_map)
+        except Exception:
+            if not self.__closed:
+                raise # Unexpected exc
+
+    __thread = None
+    def start_thread(self, daemon=True):
+        self.__thread = thread = threading.Thread(target=self.loop)
+        thread.setDaemon(daemon)
+        thread.start()
+
+    __closed = False
+    def close(self, join_timeout=1):
         """Close the dispatcher so that there are no new connections.
 
         This is only called from the test suite, AFAICT.
         """
+        if self.__closed:
+            return
+        self.__closed = True
+
+        # Stop accepting connections
         self.dispatcher.close()
         if self.monitor is not None:
             self.monitor.close()
-        # Force the asyncore mainloop to exit by hackery, i.e. close
-        # every socket in the map.  loop() will return when the map is
-        # empty.
-        for s in asyncore.socket_map.values():
-            try:
-                s.close()
-            except:
-                pass
-        asyncore.socket_map.clear()
-        for storage in self.storages.values():
+
+        ZODB.event.notify(Closed(self))
+
+        # Close open client connections
+        for sid, connections in self.connections.items():
+            for conn in connections[:]:
+                try:
+                    conn.connection.close()
+                except:
+                    pass
+
+        for name, storage in self.storages.iteritems():
+            logger.info("closing storage %r", name)
             storage.close()
 
+        if self.__thread is not None:
+            self.__thread.join(join_timeout)
+
+    close_server = close
+
     def close_conn(self, conn):
         """Internal: remove the given connection from self.connections.
 
@@ -1570,3 +1605,16 @@
         if self.file:
             self.file.close()
             self.file = None
+
+class ServerEvent:
+
+    def __init__(self, server, **kw):
+        self.__dict__.update(kw)
+        self.server = server
+
+class Serving(ServerEvent):
+    pass
+
+class Closed(ServerEvent):
+    pass
+

Modified: ZODB/trunk/src/ZEO/runzeo.py
===================================================================
--- ZODB/trunk/src/ZEO/runzeo.py	2011-11-21 11:49:49 UTC (rev 123453)
+++ ZODB/trunk/src/ZEO/runzeo.py	2011-11-21 12:33:31 UTC (rev 123454)
@@ -160,7 +160,7 @@
             self.create_server()
             self.loop_forever()
         finally:
-            self.close_storages()
+            self.server.close_server()
             self.clear_socket()
             self.remove_pidfile()
 
@@ -178,6 +178,10 @@
         root.addHandler(handler)
 
     def check_socket(self):
+        if (isinstance(self.options.address, tuple) and
+            self.options.address[1] is None):
+            self.options.address = self.options.address[0], 0
+            return
         if self.can_connect(self.options.family, self.options.address):
             self.options.usage("address %s already in use" %
                                repr(self.options.address))
@@ -254,7 +258,7 @@
         if self.options.testing_exit_immediately:
             print "testing exit immediately"
         else:
-            asyncore.loop()
+            self.server.loop()
 
     def handle_sigterm(self):
         log("terminated by SIGTERM")
@@ -289,20 +293,6 @@
                         handler.rotate()
             log("Log files rotation complete", level=logging.INFO)
 
-
-
-
-
-
-    def close_storages(self):
-        for name, storage in self.storages.items():
-            log("closing storage %r" % name)
-            try:
-                storage.close()
-            except: # Keep going
-                log("failed to close storage %r" % name,
-                    level=logging.ERROR, exc_info=True)
-
     def _get_pidfile(self):
         pidfile = self.options.pid_file
         # 'pidfile' is marked as not required.

Added: ZODB/trunk/src/ZEO/tests/dynamic_server_ports.test
===================================================================
--- ZODB/trunk/src/ZEO/tests/dynamic_server_ports.test	                        (rev 0)
+++ ZODB/trunk/src/ZEO/tests/dynamic_server_ports.test	2011-11-21 12:33:31 UTC (rev 123454)
@@ -0,0 +1,106 @@
+The storage server can be told to bind to port 0, allowing the OS to
+pick a port dynamically.  For this to be useful, there needs to be a
+way to tell someone. For this reason, the server posts events to
+ZODB.notify.
+
+    >>> import ZODB.event
+    >>> old_notify = ZODB.event.notify
+
+    >>> last_event = None
+    >>> def notify(event):
+    ...     global last_event
+    ...     last_event = event
+    >>> ZODB.event.notify = notify
+
+Now, let's start a server and verify that we get a serving event:
+
+    >>> import ZEO.StorageServer, ZODB.MappingStorage
+    >>> server = ZEO.StorageServer.StorageServer(
+    ...     ('127.0.0.1', 0), {'1': ZODB.MappingStorage.MappingStorage()})
+
+    >>> isinstance(last_event, ZEO.StorageServer.Serving)
+    True
+    >>> last_event.server is server
+    True
+
+    >>> last_event.address[0], last_event.address[1] > 0
+    ('127.0.0.1', True)
+
+If the host part pf the address passed to the constructor is not an
+empty string. then the server addr attribute is the same as the
+address attribute of the event:
+
+    >>> server.addr == last_event.address
+    True
+
+Let's run the server in a thread, to make sure we can connect.
+
+    >>> server.start_thread()
+
+    >>> client = ZEO.client(last_event.address)
+    >>> client.is_connected()
+    True
+
+If we close the server, we'll get a closed event:
+
+    >>> server.close()
+    >>> isinstance(last_event, ZEO.StorageServer.Closed)
+    True
+    >>> last_event.server is server
+    True
+
+    >>> wait_until(lambda : not client.is_connected(test=True))
+    >>> client.close()
+
+If we pass an empty string as the host part of the server address, we
+can't really assign a single address, so the server addr attribute is
+left alone:
+
+    >>> server = ZEO.StorageServer.StorageServer(
+    ...     ('', 0), {'1': ZODB.MappingStorage.MappingStorage()})
+
+    >>> isinstance(last_event, ZEO.StorageServer.Serving)
+    True
+    >>> last_event.server is server
+    True
+
+    >>> last_event.address[1] > 0
+    True
+
+If the host part pf the address passed to the constructor is not an
+empty string. then the server addr attribute is the same as the
+address attribute of the event:
+
+    >>> server.addr
+    ('', 0)
+
+    >>> server.close()
+
+The runzeo module provides some process support, including getting the
+server configuration via a ZConfig configuration file.  To spell a
+dynamic port using ZConfig, you'd use a hostname by itself. In this
+case, ZConfig passes None as the port.
+
+    >>> import ZEO.runzeo
+    >>> open('conf', 'w').write("""
+    ... <zeo>
+    ...     address 127.0.0.1
+    ... </zeo>
+    ... <mappingstorage>
+    ... </mappingstorage>
+    ... """)
+    >>> options = ZEO.runzeo.ZEOOptions()
+    >>> options.realize('-C conf'.split())
+    >>> options.address
+    ('127.0.0.1', None)
+
+    >>> rs = ZEO.runzeo.ZEOServer(options)
+    >>> rs.check_socket()
+    >>> options.address
+    ('127.0.0.1', 0)
+
+
+.. cleanup
+
+    >>> ZODB.event.notify = old_notify
+


Property changes on: ZODB/trunk/src/ZEO/tests/dynamic_server_ports.test
___________________________________________________________________
Added: svn:eol-style
   + native

Added: ZODB/trunk/src/ZEO/tests/new_addr.test
===================================================================
--- ZODB/trunk/src/ZEO/tests/new_addr.test	                        (rev 0)
+++ ZODB/trunk/src/ZEO/tests/new_addr.test	2011-11-21 12:33:31 UTC (rev 123454)
@@ -0,0 +1,52 @@
+You can change the address(es) of a client storaage.
+
+We'll start by setting up a server and connecting to it:
+
+    >>> import ZEO, ZEO.StorageServer, ZODB.FileStorage, transaction
+    >>> server = ZEO.StorageServer.StorageServer(
+    ...     ('127.0.0.1', 0), {'1': ZODB.FileStorage.FileStorage('t.fs')})
+    >>> server.start_thread()
+
+    >>> conn = ZEO.connection(server.addr)
+    >>> client = conn.db().storage
+    >>> client.is_connected()
+    True
+    >>> conn.root()
+    {}
+    >>> conn.root.x = 1
+    >>> transaction.commit()
+
+Now we'll close the server:
+
+    >>> server.close()
+
+And wait for the connectin to notice it's disconnected:
+
+    >>> wait_until(lambda : not client.is_connected())
+
+Now, we'll restart the server and update the connection:
+
+    >>> server = ZEO.StorageServer.StorageServer(
+    ...     ('127.0.0.1', 0), {'1': ZODB.FileStorage.FileStorage('t.fs')})
+    >>> server.start_thread()
+    >>> client.new_addr(server.addr)
+
+
+Update with another client:
+
+    >>> conn2 = ZEO.connection(server.addr)
+    >>> conn2.root.x += 1
+    >>> transaction.commit()
+
+Wait for connect:
+
+    >>> wait_until(lambda : client.is_connected())
+    >>> _ = transaction.begin()
+    >>> conn.root()
+    {'x': 2}
+
+.. cleanup
+
+    >>> conn.close()
+    >>> conn2.close()
+    >>> server.close()


Property changes on: ZODB/trunk/src/ZEO/tests/new_addr.test
___________________________________________________________________
Added: svn:eol-style
   + native

Modified: ZODB/trunk/src/ZEO/tests/registerDB.test
===================================================================
--- ZODB/trunk/src/ZEO/tests/registerDB.test	2011-11-21 11:49:49 UTC (rev 123453)
+++ ZODB/trunk/src/ZEO/tests/registerDB.test	2011-11-21 12:33:31 UTC (rev 123454)
@@ -9,7 +9,7 @@
 We'll create a Faux storage that has a registerDB method.
 
     >>> class FauxStorage:
-    ...     invalidations = [('trans0', ['ob0']), 
+    ...     invalidations = [('trans0', ['ob0']),
     ...                      ('trans1', ['ob0', 'ob1']),
     ...                      ]
     ...     def registerDB(self, db):
@@ -28,7 +28,10 @@
 
     >>> import ZEO.StorageServer
     >>> class StorageServer(ZEO.StorageServer.StorageServer):
-    ...     DispatcherClass = lambda *a, **k: None
+    ...     class DispatcherClass:
+    ...         __init__ = lambda *a, **kw: None
+    ...         class socket:
+    ...             getsockname = staticmethod(lambda : 'socket')
 
 We'll create a storage instance and a storage server using it:
 
@@ -80,7 +83,7 @@
 
     >>> _ = server.register_connection('t', ZEOStorage(server, 1))
     >>> _ = server.register_connection('t', ZEOStorage(server, 2))
-    
+
 Now, if we call invalidate, we'll see it propigate to the client:
 
     >>> storage.db.invalidate('trans2', ['ob1', 'ob2'])
@@ -112,7 +115,7 @@
     closed 2
 
 The connections will then reopen and revalidate their caches.
-    
+
 The servers's invalidation queue will get reset
 
     >>> for tid, invalidated in server.invq['t']:

Modified: ZODB/trunk/src/ZEO/tests/servertesting.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/servertesting.py	2011-11-21 11:49:49 UTC (rev 123453)
+++ ZODB/trunk/src/ZEO/tests/servertesting.py	2011-11-21 12:33:31 UTC (rev 123454)
@@ -40,9 +40,12 @@
             storages = {'1': ZODB.MappingStorage.MappingStorage()}
         ZEO.StorageServer.StorageServer.__init__(self, addr, storages, **kw)
 
-    def DispatcherClass(*args, **kw):
-        pass
 
+    class DispatcherClass:
+        __init__ = lambda *a, **kw: None
+        class socket:
+            getsockname = staticmethod(lambda : 'socket')
+
 class Connection:
 
     peer_protocol_version = ZEO.zrpc.connection.Connection.current_protocol

Modified: ZODB/trunk/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/testZEO.py	2011-11-21 11:49:49 UTC (rev 123453)
+++ ZODB/trunk/src/ZEO/tests/testZEO.py	2011-11-21 12:33:31 UTC (rev 123454)
@@ -898,7 +898,7 @@
     >>> sorted([int(u64(oid)) for oid in oids])
     [10, 11, 12, 13, 14]
 
-    >>> server.close_server()
+    >>> server.close()
     """
 
 def getInvalidationsAfterServerRestart():
@@ -962,7 +962,7 @@
 dont' need to be invalidated, however, that's better than verifying
 caches.)
 
-    >>> sv.close_server()
+    >>> sv.close()
     >>> fs.close()
 
 If a storage doesn't implement lastInvalidations, a client can still
@@ -1249,7 +1249,7 @@
     ------
     --T INFO ZEO.zrpc () listening on ...
     ------
-    --T INFO ZEO.runzeo () closing storage '1'
+    --T INFO ZEO.StorageServer closing storage '1'
     testing exit immediately
     """
 
@@ -1761,6 +1761,7 @@
             'zeo-fan-out.test', 'zdoptions.test',
             'drop_cache_rather_than_verify.txt', 'client-config.test',
             'protocols.test', 'zeo_blob_cache.test', 'invalidation-age.txt',
+            'dynamic_server_ports.test', 'new_addr.test',
             setUp=forker.setUp, tearDown=zope.testing.setupstack.tearDown,
             ),
         )

Modified: ZODB/trunk/src/ZEO/tests/zeoserver.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/zeoserver.py	2011-11-21 11:49:49 UTC (rev 123453)
+++ ZODB/trunk/src/ZEO/tests/zeoserver.py	2011-11-21 12:33:31 UTC (rev 123454)
@@ -94,7 +94,7 @@
         # the ack character until the storage is finished closing.
         if self._count <= 0:
             self.log('closing the storage')
-            self._server.close_server()
+            self._server.close()
             if not self._keep:
                 for storage in self._server.storages.values():
                     cleanup(storage)
@@ -206,6 +206,7 @@
         d.start()
     # Loop for socket events
     log(label, 'entering asyncore loop')
+    server.start_thread()
     asyncore.loop()
 
 

Modified: ZODB/trunk/src/ZEO/zrpc/client.py
===================================================================
--- ZODB/trunk/src/ZEO/zrpc/client.py	2011-11-21 11:49:49 UTC (rev 123453)
+++ ZODB/trunk/src/ZEO/zrpc/client.py	2011-11-21 12:33:31 UTC (rev 123454)
@@ -146,6 +146,9 @@
         # attempting to connect.
         self.thread = None # Protected by self.cond
 
+    def new_addrs(self, addrs):
+        self.addrlist = self._parse_addrs(addrs)
+
     def _start_asyncore_loop(self):
         self.map = {}
         self.trigger = ZEO.zrpc.trigger.trigger(self.map)
@@ -269,9 +272,7 @@
             t = self.thread
             if t is None:
                 log("CM.connect(): starting ConnectThread")
-                self.thread = t = ConnectThread(self, self.client,
-                                                self.addrlist,
-                                                self.tmin, self.tmax)
+                self.thread = t = ConnectThread(self, self.client)
                 t.setDaemon(1)
                 t.start()
             if sync:
@@ -362,13 +363,10 @@
     # We don't expect clients to call any methods of this Thread other
     # than close() and those defined by the Thread API.
 
-    def __init__(self, mgr, client, addrlist, tmin, tmax):
-        self.__super_init(name="Connect(%s)" % addrlist)
+    def __init__(self, mgr, client):
+        self.__super_init(name="Connect(%s)" % mgr.addrlist)
         self.mgr = mgr
         self.client = client
-        self.addrlist = addrlist
-        self.tmin = tmin
-        self.tmax = tmax
         self.stopped = 0
         self.one_attempt = threading.Event()
         # A ConnectThread keeps track of whether it has finished a
@@ -380,7 +378,7 @@
         self.stopped = 1
 
     def run(self):
-        delay = self.tmin
+        delay = self.mgr.tmin
         success = 0
         # Don't wait too long the first time.
         # TODO: make timeout configurable?
@@ -396,11 +394,11 @@
             if self.mgr.is_connected():
                 log("CT: still trying to replace fallback connection",
                     level=logging.INFO)
-            delay = min(delay*2, self.tmax)
+            delay = min(delay*2, self.mgr.tmax)
         log("CT: exiting thread: %s" % self.getName())
 
     def try_connecting(self, timeout):
-        """Try connecting to all self.addrlist addresses.
+        """Try connecting to all self.mgr.addrlist addresses.
 
         Return 1 if a preferred connection was found; 0 if no
         connection was found; and -1 if a fallback connection was
@@ -408,7 +406,7 @@
 
         If no connection is found within timeout seconds, return 0.
         """
-        log("CT: attempting to connect on %d sockets" % len(self.addrlist))
+        log("CT: attempting to connect on %d sockets" % len(self.mgr.addrlist))
         deadline = time.time() + timeout
         wrappers = self._create_wrappers()
         for wrap in wrappers.keys():
@@ -434,7 +432,7 @@
         return 0
 
     def _expand_addrlist(self):
-        for domain, addr in self.addrlist:
+        for domain, addr in self.mgr.addrlist:
             # AF_INET really means either IPv4 or IPv6, possibly
             # indirected by DNS. By design, DNS lookup is deferred
             # until connections get established, so that DNS

Modified: ZODB/trunk/src/ZEO/zrpc/connection.py
===================================================================
--- ZODB/trunk/src/ZEO/zrpc/connection.py	2011-11-21 11:49:49 UTC (rev 123453)
+++ ZODB/trunk/src/ZEO/zrpc/connection.py	2011-11-21 12:33:31 UTC (rev 123454)
@@ -12,6 +12,7 @@
 #
 ##############################################################################
 import asyncore
+import errno
 import sys
 import threading
 import logging
@@ -658,7 +659,11 @@
 
 def server_loop(map):
     while len(map) > 1:
-        asyncore.poll(30.0, map)
+        try:
+            asyncore.poll(30.0, map)
+        except Exception, v:
+            if v.args[0] != errno.EBADF:
+                raise
 
     for o in map.values():
         o.close()

Modified: ZODB/trunk/src/ZEO/zrpc/server.py
===================================================================
--- ZODB/trunk/src/ZEO/zrpc/server.py	2011-11-21 11:49:49 UTC (rev 123453)
+++ ZODB/trunk/src/ZEO/zrpc/server.py	2011-11-21 12:33:31 UTC (rev 123454)
@@ -43,8 +43,8 @@
     """A server that accepts incoming RPC connections"""
     __super_init = asyncore.dispatcher.__init__
 
-    def __init__(self, addr, factory=Connection):
-        self.__super_init()
+    def __init__(self, addr, factory=Connection, map=None):
+        self.__super_init(map=map)
         self.addr = addr
         self.factory = factory
         self._open_socket()



More information about the checkins mailing list