[Checkins] SVN: Sandbox/J1m/resumelb/src/zc/resumelb/lb. Added the lb set_worker_addrs method to change worker addresses. This

Jim Fulton jim at zope.com
Sun Jan 22 14:20:50 UTC 2012


Log message for revision 124128:
  Added the lb set_worker_addrs method to change worker addresses.  This
  will be handy when we get notified by zookeeper of worker changes.
  

Changed:
  U   Sandbox/J1m/resumelb/src/zc/resumelb/lb.py
  U   Sandbox/J1m/resumelb/src/zc/resumelb/lb.test

-=-
Modified: Sandbox/J1m/resumelb/src/zc/resumelb/lb.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/lb.py	2012-01-22 13:35:24 UTC (rev 124127)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/lb.py	2012-01-22 14:20:49 UTC (rev 124128)
@@ -30,14 +30,23 @@
         self.classifier = classifier
         self.disconnect_message = disconnect_message
         self.pool = Pool(settings)
+        self.workletts = {}
+        self.set_worker_addrs(worker_addrs)
 
-        self.workletts = dict(
-            (addr, gevent.spawn(self.connect, addr))
-            for addr in worker_addrs
-            )
+    def set_worker_addrs(self, addrs):
+        addrs = set(addrs)
+        workletts = self.workletts
+        old = list(workletts)
+        for addr in addrs:
+            if addr not in workletts:
+                workletts[addr] = gevent.spawn(self.connect, addr, workletts)
 
-    def connect(self, addr):
-        while 1:
+        for addr in old:
+            if addr not in addrs:
+                workletts.pop(addr)
+
+    def connect(self, addr, workletts):
+        while addr in workletts:
             socket = gevent.socket.create_connection(addr)
             Worker(self.pool, socket, addr)
 

Modified: Sandbox/J1m/resumelb/src/zc/resumelb/lb.test
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/lb.test	2012-01-22 13:35:24 UTC (rev 124127)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/lb.test	2012-01-22 14:20:49 UTC (rev 124128)
@@ -292,7 +292,57 @@
 
 But if we send a resume, it will be:
 
-    >>> write_message(workers[0].socket, 0, {})
+    >>> write_message(workers[0].socket, 0, {'h3.com': 10.0})
     >>> gevent.sleep(.01)
     >>> len(lb.pool.workers)
     2
+
+Adding and removing workers
+===========================
+
+We can add and remove workers by passing new address iterables to the
+lb set_worker_addrs method.
+
+Let's add a worker and wait for it to be connected:
+
+    >>> workers.append(Worker())
+    >>> lb.set_worker_addrs([w.addr for w in workers])
+    >>> wait_until(lambda : hasattr(workers[-1], 'socket'))
+    >>> write_message(workers[-1].socket, 0, {'h4.com': 10})
+    >>> gevent.sleep(.01)
+    >>> len(lb.pool.workers)
+    3
+
+If we submit an h4.com request, it will go to the new worker:
+
+    >>> g = gevent.spawn(app1.get, '/hi.html', {}, [('Host', 'h4.com')])
+    >>> rno, env = read_message(workers[-1].socket)
+    >>> read_message(workers[-1].socket)
+    (1, '')
+
+Now, let's remove a worker:
+
+    >>> out = workers.pop(0)
+    >>> lb.set_worker_addrs([w.addr for w in workers])
+
+Removing a worker doesn't disconnect it, but it will prevent it from
+being reconnected if it disconnects in it's own.
+
+    >>> gevent.sleep(.01)
+    >>> len(lb.pool.workers)
+    3
+
+    >>> outsocket = out.socket
+    >>> outsocket.close()
+    >>> gevent.sleep(.01)
+    >>> len(lb.pool.workers)
+    2
+
+Because the lb didn't reconnect, out's socket is till the one from
+before that we closed:
+
+    >>> out.socket is outsocket
+    True
+
+Typically, by the time we remove an address, the worker will already
+have gone away.



More information about the checkins mailing list