[Checkins] SVN: zc.zk/trunk/src/zc/zk/ Refactored to cleanup unused Children and Properties objects.
Jim Fulton
jim at zope.com
Mon Dec 5 20:00:21 UTC 2011
Log message for revision 123575:
Refactored to cleanup unused Children and Properties objects.
Added ln and get_properties methods.
Added docs/tests for readjustment of Children and Properties objects
that fell out of the refactoring.
Various other cleanups and changes along the way.
Added timouts to avoid waiting forever when disconnected.
Changed:
U zc.zk/trunk/src/zc/zk/README.txt
U zc.zk/trunk/src/zc/zk/__init__.py
U zc.zk/trunk/src/zc/zk/tests.py
-=-
Modified: zc.zk/trunk/src/zc/zk/README.txt
===================================================================
--- zc.zk/trunk/src/zc/zk/README.txt 2011-12-05 19:59:35 UTC (rev 123574)
+++ zc.zk/trunk/src/zc/zk/README.txt 2011-12-05 20:00:20 UTC (rev 123575)
@@ -4,17 +4,18 @@
The zc.zk package provides some high-level interfaces to the low-level
zookeeper extension. It's not complete, in that it doesn't try, at
this time, to be a complete high-level interface. Rather, it provides
-facilities we need to use Zookeeeper to services together:
+facilities we need to use ZooKeeper to connect services:
- ZODB database clients and servers
- HTTP-based clients and services
-- Load balencers
+- Load balancers and HTTP application servers
The current (initial) use cases are:
- Register a server providing a service.
- Get the addresses of servers providing a service.
-- Get abd set service configuration data.
+- Get and set service configuration data.
+- Model system architecture as a tree.
This package makes no effort to support Windows. (Patches to support
Windows might be accepted if they don't add much complexity.)
@@ -30,7 +31,7 @@
different ways, it isn't listed as a distribution requirement.
An easy way to get the Python zookeeper binding is by installing
-``zc-zookeeper-static``, whch is a self-contained statically building
+``zc-zookeeper-static``, which is a self-contained statically built
distribution.
Instantiating a ZooKeeper helper
@@ -42,8 +43,8 @@
>>> zk = zc.zk.ZooKeeper('zookeeper.example.com:2181')
The ZooKeeper constructor takes a ZooKeeper connection string, which is a
-comma-separated list of addresses of the form HOST:PORT. It defaults
-to '127.0.0.1:2181', which is convenient during development.
+comma-separated list of addresses of the form *HOST:PORT*. It defaults
+to ``'127.0.0.1:2181'``, which is convenient during development.
Register a server providing a service.
--------------------------------------
@@ -63,23 +64,24 @@
``register_server`` creates a read-only ephemeral ZooKeeper node as a
-child of the given service path. The name of the new node is the
-given address. This allows clients to get the list of addresses by
-just getting the list of the names of children of the service path.
+child of the given service path. The name of the new node is (a
+string representation of) the given address. This allows clients to
+get the list of addresses by just getting the list of the names of
+children of the service path.
Ephemeral nodes have the useful property that they're automatically
removed when a ZooKeeper session is closed or when the process
-containing it dies. De-deregistration is automatic.
+containing it dies. De-registration is automatic.
When registering a server, you can optionally provide server (node)
data as additional keyword arguments to register_server. By default,
-the process id is set as the ``pid`` server key. This is useful to
+the process id is set as the ``pid`` property. This is useful to
tracking down the server process.
Get the addresses of service providers.
---------------------------------------
-Getting the adresses providing a service is accomplished by getting the
+Getting the addresses providing a service is accomplished by getting the
children of a service node::
>>> addresses = zk.children('/fooservice/providers')
@@ -95,7 +97,7 @@
['192.168.0.42:8080', '192.168.0.42:8081']
You can call the iterable with a callback function that is called
-whenenever the list of children changes::
+whenever the list of children changes::
>>> @zk.children('/fooservice/providers')
... def addresses_updated(addresses):
@@ -114,8 +116,8 @@
Get service configuration data.
-------------------------------
-You get service configuration data by getting data associated with a
-ZooKeeper node. The interface for getting data is similar to the
+You get service configuration data by getting properties associated with a
+ZooKeeper node. The interface for getting properties is similar to the
interface for getting children::
>>> data = zk.properties('/fooservice')
@@ -144,10 +146,10 @@
The callback is called immediately. It'll also be called when data are
updated.
-Updating node data
-------------------
+Updating node properties
+------------------------
-You can't set data properties, but you can update data by calling it's
+You can't set properties, but you can update properties by calling the
``update`` method::
>>> thread_info = {'threads': 2}
@@ -158,14 +160,14 @@
secret: u'123'
threads: 2
-or by calling it's ``set`` method, which removes keys not listed::
+or by calling the ``set`` method, which removes keys not listed::
>>> data.set(threads= 3, secret='1234')
data updated
secret: u'1234'
threads: 3
-Both update and set can take data from a positional data argument, or
+Both ``update`` and ``set`` can take data from a positional data argument, or
from keyword parameters. Keyword parameters take precedent over the
positional data argument.
@@ -191,7 +193,7 @@
representation. You can then populate the tree by importing the
representation. Heres an example::
- /lb
+ /lb : ipvs
/pools
/cms
# The address is fixed because it's
@@ -200,16 +202,16 @@
providers -> /cms/providers
/retail
address = '1.2.3.5:80'
- providers -> /cms/retail
+ providers -> /cms/providers
- /cms
+ /cms : z4m cms
threads = 3
/providers
/databases
/main
/providers
- /retail
+ /retail : z4m retail
threads = 1
/providers
/databases
@@ -220,22 +222,29 @@
.. -> tree_text
This example defines a tree with 3 top nodes, ``lb`` and ``cms``, and
-``retail``. The ``retail`` node has two subnodes, ``providers`` and
-``databases`` and a property ``threads``. The ``/retail/databases``
-node has symbolic link, ``main`` and a ``ugc`` subnode. The symbolic
-link is implemented as a property named ``main ->``. We'll say more
-about symbolic links in a later section.
+``retail``. The ``retail`` node has two sub-nodes, ``providers`` and
+``databases`` and a property ``threads``.
+The ``/retail/databases`` node has symbolic link, ``main`` and a
+``ugc`` sub-node. The symbolic link is implemented as a property named
+`` We'll say more about symbolic links in a later section.
+
+The ``lb``, ``cms`` and ``retail`` nodes have *types*. A type is
+indicated by following a node name with a colon and a string value.
+The string value is used to populate a ``type`` property. Types are
+useful to document the kinds of services provided at a node and can be
+used by deployment tools to deploy service providers.
+
You can import a tree definition with the ``import_tree`` method:
>>> zk.import_tree(tree_text)
-This imports the tree at the top pf the ZooKeeper tree.
+This imports the tree at the top of the ZooKeeper tree.
We can also export a ZooKeeper tree:
>>> print zk.export_tree(),
- /cms
+ /cms : z4m cms
threads = 3
/databases
/main
@@ -245,15 +254,15 @@
secret = u'1234'
threads = 3
/providers
- /lb
+ /lb : ipvs
/pools
/cms
address = u'1.2.3.4:80'
providers -> /cms/providers
/retail
address = u'1.2.3.5:80'
- providers -> /cms/retail
- /retail
+ providers -> /cms/providers
+ /retail : z4m retail
threads = 1
/databases
main -> /cms/databases/main
@@ -263,10 +272,10 @@
Note that when we export a tree:
-- The special reserverd top-level zookeeper node is omitted.
-- Ephemeral nodes are ommitted.
+- The special reserved top-level zookeeper node is omitted.
+- Ephemeral nodes are omitted.
- Each node's information is sorted by type (properties, then links,
-- then subnodes) and then by name,
+- then sub-nodes) and then by name,
You can export just a portion of a tree:
@@ -293,7 +302,7 @@
We can import a tree over an existing tree and changes will be
applied. Let's update our textual description::
- /lb
+ /lb : ipvs
/pools
/cms
# The address is fixed because it's
@@ -301,7 +310,7 @@
address = '1.2.3.4:80'
providers -> /cms/providers
- /cms
+ /cms : z4m cms
threads = 4
/providers
/databases
@@ -310,7 +319,7 @@
.. -> tree_text
-and reimport::
+and re-import::
>>> zk.import_tree(tree_text)
extra path not trimmed: /lb/pools/retail
@@ -319,7 +328,7 @@
this if we export the tree:
>>> print zk.export_tree(),
- /cms
+ /cms : z4m cms
threads = 4
/databases
/main
@@ -329,15 +338,15 @@
secret = u'1234'
threads = 3
/providers
- /lb
+ /lb : ipvs
/pools
/cms
address = u'1.2.3.4:80'
providers -> /cms/providers
/retail
address = u'1.2.3.5:80'
- providers -> /cms/retail
- /retail
+ providers -> /cms/providers
+ /retail : z4m retail
threads = 1
/databases
main -> /cms/databases/main
@@ -356,7 +365,7 @@
>>> zk.import_tree(tree_text, trim=True)
>>> print zk.export_tree(),
- /cms
+ /cms : z4m cms
threads = 4
/databases
/main
@@ -366,12 +375,12 @@
secret = u'1234'
threads = 3
/providers
- /lb
+ /lb : ipvs
/pools
/cms
address = u'1.2.3.4:80'
providers -> /cms/providers
- /retail
+ /retail : z4m retail
threads = 1
/databases
main -> /cms/databases/main
@@ -384,7 +393,7 @@
automatically trimmed. So we weren't warned about the unreferenced
top-level nodes in the import.
-Recursice Deletion
+Recursive Deletion
------------------
ZooKeeper only allows deletion of nodes without children.
@@ -396,7 +405,7 @@
>>> zk.delete_recursive('/retail')
>>> print zk.export_tree(),
- /cms
+ /cms : z4m cms
threads = 4
/databases
/main
@@ -406,7 +415,7 @@
secret = u'1234'
threads = 3
/providers
- /lb
+ /lb : ipvs
/pools
/cms
address = u'1.2.3.4:80'
@@ -426,7 +435,7 @@
ZooKeeper doesn't have a concept of symbolic links, but ``zc.zk``
provides a convention for dealing with symbolic links. When trying to
-resolve a path, if a node lacks a child, but hase a property with a
+resolve a path, if a node lacks a child, but have a property with a
name ending in ``' ->'``, the child will be found by following the
path in the property value.
@@ -443,11 +452,108 @@
u'/cms/providers/1.2.3.4:5'
Note a limitation of symbolic links is that they can be hidden by
-children. For example, if we added a real node, at ``/lb/pools``
+children. For example, if we added a real node, at
+``/lb/pools/cms/provioders``, it would shadow the link.
``children``, ``properties``, and ``register_server`` will
automatically use ``resolve`` to resolve paths.
+When the ``children`` and ``properties`` are used for a node, the
+paths they use will be adjusted dynamically when paths are removed.
+To illustrate this, let's get children of ``/cms/databases/main``::
+
+ >>> main_children = zk.children('/cms/databases/main')
+ >>> main_children.path
+ '/cms/databases/main'
+ >>> main_children.real_path
+ '/cms/databases/main'
+
+.. test
+
+ >>> main_properties = zk.properties('/cms/databases/main')
+ >>> main_properties.path
+ '/cms/databases/main'
+ >>> main_properties.real_path
+ '/cms/databases/main'
+
+``Children`` and ``Properties`` objects have a ``path`` attribute that
+has the value passed to the ``children`` or ``properties``
+methods. They have a ``real_path`` attribute that contains the path
+after resolving symbolic links. Let's suppose we want to move the
+database node to '/databases/cms'. First we'll export it::
+
+ >>> export = zk.export_tree('/cms/databases/main', name='cms')
+ >>> print export,
+ /cms
+ /providers
+
+Note that we used the export ``name`` option to specify a new name for
+the exported tree.
+
+Now, we'll create a databases node::
+
+ >>> zk.create('/databases', '', zc.zk.OPEN_ACL_UNSAFE)
+ '/databases'
+
+And import the export:
+
+ >>> zk.import_tree(export, '/databases')
+ >>> print zk.export_tree('/databases'),
+ /databases
+ /cms
+ /providers
+
+Next, we'll create a symbolic link at the old location. We can use the
+``ln`` convenience method::
+
+ >>> zk.ln('/databases/cms', '/cms/databases/main')
+ >>> zk.get_properties('/cms/databases')
+ {u'main ->': u'/databases/cms'}
+
+Now, we can remove ``/cms/databases/main`` and ``main_children`` will
+be updated::
+
+ >>> zk.delete_recursive('/cms/databases/main')
+ >>> main_children.path
+ '/cms/databases/main'
+ >>> main_children.real_path
+ u'/databases/cms'
+
+.. test
+
+ >>> main_properties.path
+ '/cms/databases/main'
+ >>> main_properties.real_path
+ u'/databases/cms'
+
+If we update ``/databases/cms``, ``main_children`` will see the
+updates:
+
+ >>> sorted(main_children)
+ ['providers']
+ >>> zk.delete('/databases/cms/providers')
+ >>> sorted(main_children)
+ []
+
+.. test
+
+ >>> dict(main_properties)
+ {}
+ >>> zk.properties('/databases/cms').set(a=1)
+ >>> dict(main_properties)
+ {u'a': 1}
+
+Node deletion
+-------------
+
+If a node is deleted and ``Children`` or ``Properties`` instances have
+been created for it, and the paths they were created with can't be
+resolved using symbolic links, then the instances' data will be
+cleared. Attempts to update properties will fail. If callbacks have
+been registered, they will be called without arguments, if possible.
+It would be bad, in practice, to remove a node that processes are
+watching.
+
``zc.zk.ZooKeeper``
-------------------
@@ -457,9 +563,19 @@
``children(path)``
Return a `zc.zk.Children`_ for the path.
+ Note that there is a fair bit of machinery in `zc.zk.Children`_
+ objects to support keeping them up to date, callbacks, and cleaning
+ them up when they are no-longer used. If you only want to get the
+ list of children once, use ``get_children``.
+
``properties(path)``
Return a `zc.zk.Properties`_ for the path.
+ Note that there is a fair bit of machinery in `zc.zk.Properties`_
+ objects to support keeping them up to date, callbacks, and cleaning
+ them up when they are no-longer used. If you only want to get the
+ properties once, use ``get_properties``.
+
``handle``
The ZooKeeper session handle
@@ -481,31 +597,61 @@
acl
An access control-list to use for imported nodes. If not
- specifuied, then full access is allowed to everyone.
+ specified, then full access is allowed to everyone.
dry_run
Boolean, defaulting to false, indicating whether to do a dry
run of the import, without applying any changes.
-``export_tree(path[, include_ephemeral])``
+``export_tree(path[, ephemeral[, name]])``
Export a tree to a text representation.
path
The path to export.
- include_ephemeral
+ ephemeral
Boolean, defaulting to false, indicating whether to include
ephemeral nodes in the export. Including ephemeral nodes is
mainly useful for visualizing the tree state.
+ name
+ The name to use for the top-level node.
+
+ This is useful when using export and import to copy a tree to
+ a different location and name in the hierarchy.
+
+ Normally, when exporting the root node, ``/``, the root isn't
+ included, but it is included if a name is given.
+
``delete_recursive(path[, dry_run])``
- Delete a node and all of it's subnodes.
+ Delete a node and all of it's sub-nodes.
Ephemeral nodes or nodes containing them are not deleted.
The dry_run option causes a summary of what would be deleted to be
printed without actually deleting anything.
+``get_children(path)``
+ Get a list of the names of the children the node at the given path.
+
+ This is more efficient than ``children`` when all you need is to
+ read the list once, as it doesn't create a `zc.zk.Children`_
+ object.
+
+``get_properties(path)``
+ Get the properties for the node at the given path as a dictionary.
+
+ This is more efficient than ``properties`` when all you need is to
+ read the properties once, as it doesn't create a
+ `zc.zk.Properties`_ object.
+
+``ln(source, destination)``
+ Create a symbolic link at the destination path pointing to the
+ source path.
+
+ If the destination path ends with ``'/'``, then the source name is
+ appended to the destination.
+
``resolve(path)``
Find the real path for the given path.
@@ -529,12 +675,11 @@
ZooKeeper functions as methods: ``acreate``, ``add_auth``,
``adelete``, ``aexists``, ``aget``, ``aget_acl``, ``aget_children``,
``aset``, ``aset_acl``, ``async``, ``client_id``, ``create``,
-``delete``, ``exists``, ``get``, ``get_acl``, ``get_children``,
-``is_unrecoverable``, ``recv_timeout``, ``set``, ``set2``,
-``set_acl``, ``set_debug_level``, ``set_log_stream``, ``set_watcher``,
-and ``zerror``. When calling these as methods on ``ZooKeeper``
-instances, it isn't necessary to pass a handle, as that is provided
-automatically.
+``delete``, ``exists``, ``get``, ``get_acl``, ``is_unrecoverable``,
+``recv_timeout``, ``set``, ``set2``, ``set_acl``, ``set_debug_level``,
+``set_log_stream``, ``set_watcher``, and ``zerror``. When calling
+these as methods on ``ZooKeeper`` instances, it isn't necessary to
+pass a handle, as that is provided automatically.
zc.zk.Children
--------------
@@ -549,6 +694,8 @@
The callback is passed the children instance when a child node is
added or removed.
+ The ``Children`` instance is returned.
+
zc.zk.Properties
----------------
@@ -566,7 +713,7 @@
Update the properties for the node.
The data argument, if given, must be a dictionary or something that
- can be passed to a dictiomnary's ``update`` method. Items supplied
+ can be passed to a dictionary's ``update`` method. Items supplied
as keywords take precedence over items supplied in the data
argument.
@@ -576,30 +723,27 @@
The callback is passed the properties instance when properties are
changed.
-Node deletion
--------------
+ The ``Properties`` instance is returned.
-If a node is deleted and ``Children`` or ``Properties`` instances have
-been created for it, the instances' data will be cleared. Attempts to
-update properties will fail. If callbacks have been registered, they
-will be called without arguments, if possible. It would be bad, in
-practice, to remove a node that processes are watching.
-
Changes
-------
-0.2.0 (2011-12-)
+0.2.0 (2011-12-05)
~~~~~~~~~~~~~~~~~~
- Added tree import and export.
-- Added recursive node-deletion API.
- Added symbolic-links.
- properties set and update methods now accept positional
mapping objects (or iterables of items) as well as keyword arguments.
+- Added recursive node-deletion API.
+- Added ``get_properties`` to get properties without creating watches.
- Added convenience access to low-level ZooKeeper APIs.
- Added ``OPEN_ACL_UNSAFE`` and ``READ_ACL_UNSAFE`` (in ``zc.zk``),
- which are mentioned by the ZooKeeper docs. but not included in the
+ which are mentioned by the ZooKeeper documentation. but not included in the
``zookeeper`` module.
+- ``Children`` and ``Properties`` objects are now cleaned up when
+ no-longer used. Previously, they remained in memory for the life of
+ the session.
0.1.0 (2011-11-27)
~~~~~~~~~~~~~~~~~~
Modified: zc.zk/trunk/src/zc/zk/__init__.py
===================================================================
--- zc.zk/trunk/src/zc/zk/__init__.py 2011-12-05 19:59:35 UTC (rev 123574)
+++ zc.zk/trunk/src/zc/zk/__init__.py 2011-12-05 20:00:20 UTC (rev 123575)
@@ -18,6 +18,7 @@
import re
import sys
import threading
+import weakref
import zc.thread
import zookeeper
@@ -88,7 +89,10 @@
READ_ACL_UNSAFE = [world_permission()]
-_text_is_node = re.compile(r'/(?P<name>\S+)$').match
+_text_is_node = re.compile(
+ r'/(?P<name>\S+)'
+ '(\s*:\s*(?P<type>\S.*))?'
+ '$').match
_text_is_property = re.compile(
r'(?P<name>\S+)'
'\s*=\s*'
@@ -108,16 +112,23 @@
class LinkLoop(Exception):
pass
+class FailedConnect(Exception):
+ pass
+
class ZooKeeper:
- def __init__(self, zkaddr=2181):
+ def __init__(self, zkaddr=2181, timeout=1):
if isinstance(zkaddr, int):
zkaddr = "127.0.0.1:%s" % zkaddr
+ self.timeout = timeout
self.zkaddr = zkaddr
- self.watches = set()
+ self.watches = WatchManager()
self.connected = threading.Event()
- zookeeper.init(zkaddr, self._watch_session)
- self.connected.wait()
+ handle = zookeeper.init(zkaddr, self._watch_session)
+ self.connected.wait(timeout)
+ if not self.connected.is_set():
+ zookeeper.close(handle)
+ raise FailedConnect(zkaddr)
handle = None
def _watch_session(self, handle, event_type, state, path):
@@ -126,12 +137,8 @@
if state == zookeeper.CONNECTED_STATE:
if self.handle is None:
self.handle = handle
- if self.watches:
- # reestablish after session reestablished
- watches = self.watches
- self.watches = set()
- for watch in watches:
- self._watch(watch, False)
+ for watch in self.watches.clear():
+ self._watch(watch, False)
else:
assert handle == self.handle
self.connected.set()
@@ -150,35 +157,96 @@
kw['pid'] = os.getpid()
if not isinstance(addr, str):
addr = '%s:%s' % addr
- self.connected.wait()
+ self.connected.wait(self.timeout)
path = self.resolve(path)
zookeeper.create(self.handle, path + '/' + addr, encode(kw),
[world_permission()], zookeeper.EPHEMERAL)
def _watch(self, watch, wait=True):
+ event_type = watch.event_type
if wait:
- self.connected.wait()
- self.watches.add(watch)
+ self.connected.wait(self.timeout)
+ watch.real_path = real_path = self.resolve(watch.path)
+ key = event_type, real_path
+ if self.watches.add(key, watch):
+ try:
+ self._watchkey(key)
+ except zookeeper.ConnectionLossException:
+ # We lost a race here. We got disconnected between
+ # when we resolved the watch path and the time we set
+ # the watch. This is very unlikely.
+ watches = set(self.watches.pop(key))
+ for w in watches:
+ w._deleted()
+ if watch in watches:
+ watches.remove(watch)
+ if watches:
+ # OMG, how unlucky can we be?
+ # someone added a watch between the time we added
+ # the key and failed to add the watch in zookeeper.
+ logger.critical('lost watches %r', watches)
+ raise
+ else:
+ # We already had a watch for the key. We need to pass this one
+ # it's data.
+ zkfunc = getattr(zookeeper, self.__zkfuncs[event_type])
+ watch._notify(zkfunc(self.handle, real_path))
- def handler(h, t, state, p):
- if watch not in self.watches:
- return
- assert h == self.handle
- assert state == zookeeper.CONNECTED_STATE
- assert p == watch.path
- if t == zookeeper.DELETED_EVENT:
+ __zkfuncs = {
+ zookeeper.CHANGED_EVENT: 'get',
+ zookeeper.CHILD_EVENT: 'get_children',
+ }
+ def _watchkey(self, key):
+ event_type, real_path = key
+ zkfunc = getattr(zookeeper, self.__zkfuncs[event_type])
+
+ def handler(h, t, state, p, reraise=False):
+ try:
+ assert h == self.handle
+ assert state == zookeeper.CONNECTED_STATE
+ assert p == real_path
+ if key not in self.watches:
+ return
+
+ if t == zookeeper.DELETED_EVENT:
+ self._rewatch(key)
+ else:
+ assert t == event_type
+ try:
+ v = zkfunc(self.handle, real_path, handler)
+ except zookeeper.NoNodeException:
+ self._rewatch(key)
+ else:
+ for watch in self.watches.watches(key):
+ watch._notify(v)
+ except:
+ logger.exception("%s(%s) handler failed",
+ self.__zkfuncs[event_type], real_path)
+ if reraise:
+ raise
+
+ handler(self.handle, event_type, self.state, real_path, True)
+
+ def _rewatch(self, key):
+ event_type = key[0]
+ for watch in self.watches.pop(key):
+ try:
+ real_path = self.resolve(watch.path)
+ except (zookeeper.NoNodeException, LinkLoop):
+ logger.exception("%s path went away", watch)
watch._deleted()
- self.watches.remove(watch)
else:
- assert t == watch.event_type
- zkfunc = getattr(zookeeper, watch.zkfunc)
- watch._notify(zkfunc(self.handle, watch.path, handler))
+ self._watch(watch, False)
- handler(self.handle, watch.event_type, self.state, watch.path)
-
def children(self, path):
return Children(self, path)
+ def get_properties(self, path):
+ return decode(self.get(path)[0])
+
+ def properties(self, path):
+ return Properties(self, path)
+
def import_tree(self, text, path='/', trim=False, acl=OPEN_ACL_UNSAFE,
dry_run=False):
# Step 1, build up internal tree repesentation:
@@ -211,6 +279,8 @@
m = _text_is_node(data)
if m:
data = _Tree(m.group('name'))
+ if m.group('type'):
+ data.properties['type'] = m.group('type')
else:
if '->' in data:
raise ValueError(lineno, data, "Bad link format")
@@ -251,7 +321,7 @@
self._import_tree(path, root, acl, trim, dry_run, True)
def _import_tree(self, path, node, acl, trim, dry_run, top=False):
- self.connected.wait()
+ self.connected.wait(self.timeout)
if not top:
new_children = set(node.children)
for name in self.get_children(path):
@@ -269,6 +339,7 @@
if self.exists(cpath):
if dry_run:
new = child.properties
+ old = self.get_properties(cpath)
old = decode(self.get(cpath)[0])
for n, v in sorted(old.items()):
if n not in new:
@@ -325,24 +396,34 @@
logger.info('deleting %s', path)
self.delete(path)
- def export_tree(self, path='/', ephemeral=False):
+ def export_tree(self, path='/', ephemeral=False, name=None):
output = []
out = output.append
+ self.connected.wait(self.timeout)
- def export_tree(path, indent):
+ def export_tree(path, indent, name=None):
children = self.get_children(path)
if path == '/':
path = ''
if 'zookeeper' in children:
children.remove('zookeeper')
+ if name is not None:
+ out(indent + '/' + name)
+ indent += ' '
else:
data, meta = self.get(path)
if meta['ephemeralOwner'] and not ephemeral:
return
- out(indent+'/'+path.rsplit('/', 1)[1])
+ if name is None:
+ name = path.rsplit('/', 1)[1]
+ properties = decode(data)
+ type_ = properties.pop('type', None)
+ if type_:
+ name += ' : '+type_
+ out(indent + '/' + name)
indent += ' '
links = []
- for i in sorted(decode(data).iteritems()):
+ for i in sorted(properties.iteritems()):
if i[0].endswith(' ->'):
links.append(i)
else:
@@ -353,12 +434,9 @@
for name in children:
export_tree(path+'/'+name, indent)
- export_tree(path, '')
+ export_tree(path, '', name)
return '\n'.join(output)+'\n'
- def properties(self, path):
- return Properties(self, path)
-
def resolve(self, path, seen=()):
if self.exists(path):
return path
@@ -372,7 +450,7 @@
newpath = base + '/' + name
if self.exists(newpath):
return newpath
- props = decode(self.get(base)[0])
+ props = self.get_properties(base)
newpath = props.get(name+' ->')
if not newpath:
raise zookeeper.NoNodeException()
@@ -383,9 +461,18 @@
raise zookeeper.NoNodeException(path)
def _set(self, path, data):
- self.connected.wait()
+ self.connected.wait(self.timeout)
return zookeeper.set(self.handle, path, data)
+
+ def ln(self, target, source):
+ base, name = source.rsplit('/', 1)
+ if target[-1] == '/':
+ target += name
+ properties = self.get_properties(base)
+ properties[name+' ->'] = target
+ self._set(base, encode(properties))
+
def close(self):
zookeeper.close(self.handle)
del self.handle
@@ -413,13 +500,87 @@
del _make_method
+class WatchManager:
+ # Manage {key -> w{watches}} in a thread-safe manner.
+ # (And also provide a hard set to allow nodeinfos w callbacks
+ # to keep themselves around.)
+ def __init__(self):
+ self.data = {}
+ self.lock = threading.Lock()
+
+ def _remove(ref, selfref=weakref.ref(self)):
+ self = selfref()
+ if self is None:
+ return
+ key = ref.key
+ with self.lock:
+ refs = self.data.get(key, ())
+ if ref in refs:
+ refs.remove(ref)
+ if not refs:
+ del self.data[key]
+
+ self._remove = _remove
+
+ def __len__(self):
+ with self.lock:
+ return sum(
+ len([r for r in refs if r() is not None])
+ for refs in self.data.itervalues()
+ )
+
+ def __contains__(self, key):
+ with self.lock:
+ return key in self.data
+
+ def add(self, key, value):
+ ref = weakref.KeyedRef(value, self._remove, key)
+ newkey = False
+ with self.lock:
+ try:
+ refs = self.data[key]
+ except KeyError:
+ self.data[key] = refs = set()
+ newkey = True
+ refs.add(ref)
+ return newkey
+
+ def pop(self, key):
+ with self.lock:
+ watches = [ref() for ref in self.data.pop(key, ())]
+
+ for watch in watches:
+ if watch is not None:
+ yield watch
+
+ def watches(self, key):
+ with self.lock:
+ watches = [ref() for ref in self.data.get(key, ())]
+
+ for watch in watches:
+ if watch is not None:
+ yield watch
+
+ def clear(self):
+ # Clear data and return an iterator on the old values
+ with self.lock:
+ old = self.data
+ self.data = {}
+
+ for refs in old.itervalues():
+ for ref in refs:
+ v = ref()
+ if v is not None:
+ yield v
+
+
class NodeInfo:
def __init__(self, session, path):
self.session = session
- self.path = session.resolve(path)
- self.callbacks = set()
+ self.path = path
+ self.callbacks = []
session._watch(self)
def setData(self, data):
@@ -438,7 +599,8 @@
logger.exception('Error %r calling %r', self, callback)
def __repr__(self):
- return "%s.%s(%s, %s)" % (
+ return "%s%s.%s(%s, %s)" % (
+ self.deleted and 'DELETED: ' or '',
self.__class__.__module__, self.__class__.__name__,
self.session.handle, self.path)
@@ -456,8 +618,8 @@
def __call__(self, func):
func(self)
- self.callbacks.add(func)
- return func
+ self.callbacks.append(func)
+ return self
def __iter__(self):
return iter(self.data)
@@ -465,12 +627,10 @@
class Children(NodeInfo):
event_type = zookeeper.CHILD_EVENT
- zkfunc = 'get_children'
class Properties(NodeInfo, collections.Mapping):
event_type = zookeeper.CHANGED_EVENT
- zkfunc = 'get'
def setData(self, data):
sdata, self.meta_data = data
Modified: zc.zk/trunk/src/zc/zk/tests.py
===================================================================
--- zc.zk/trunk/src/zc/zk/tests.py 2011-12-05 19:59:35 UTC (rev 123574)
+++ zc.zk/trunk/src/zc/zk/tests.py 2011-12-05 20:00:20 UTC (rev 123575)
@@ -22,6 +22,7 @@
import pprint
import re
import StringIO
+import sys
import time
import zc.zk
import zc.thread
@@ -60,6 +61,21 @@
def side_effect(mock):
return lambda func: setattr(mock, 'side_effect', func)
+class zklogger(object):
+
+ def __init__(self):
+ logger = logging.getLogger('zc.zk')
+ h = logging.StreamHandler(sys.stdout)
+ h.setFormatter(logging.Formatter("%(levelname)s %(message)s"))
+ logger.addHandler(h)
+ self.h = h
+ logger.setLevel(logging.DEBUG)
+
+ def uninstall(self):
+ logger = logging.getLogger('zc.zk')
+ logger.removeHandler(self.h)
+ logger.setLevel(logging.NOTSET)
+
class Tests(unittest.TestCase):
@mock.patch('zookeeper.init')
@@ -118,8 +134,9 @@
path = '/test'
@side_effect(get_children)
- def _(handle, path_, handler):
- self.__handler = handler
+ def _(handle, path_, handler=None):
+ if handler is not None:
+ self.__handler = handler
self.assertEqual((handle, path_), (0, path))
return data
@@ -135,7 +152,7 @@
self.assertEqual(list(children), data)
# callbacks are called too:
- cb = children(mock.Mock())
+ cb = mock.Mock(); children(cb)
cb.assert_called_with(children)
cb.reset_mock()
self.assertEqual(len(children.callbacks), 1)
@@ -161,7 +178,7 @@
# if a callback raises zc.zk.CancelWatch, the cancel is logged
# and callback is discarded
- cb = children(mock.Mock())
+ cb = mock.Mock(); children(cb)
self.assertEqual(len(children.callbacks), 1)
cb.side_effect = zc.zk.CancelWatch
data = []
@@ -176,7 +193,7 @@
h.uninstall()
# If a session expires, it will be reestablished with watches intact.
- cb = children(mock.Mock())
+ cb = mock.Mock(); children(cb)
self.__session_watcher(
0, zookeeper.SESSION_EVENT, zookeeper.EXPIRED_SESSION_STATE, "")
close.assert_called_with(0)
@@ -197,8 +214,9 @@
path = '/test'
@side_effect(get)
- def _(handle, path_, handler):
- self.__handler = handler
+ def _(handle, path_, handler=None):
+ if handler is not None:
+ self.__handler = handler
self.assertEqual((handle, path_), (0, path))
return json.dumps(data), {}
@@ -214,11 +232,10 @@
self.assertEqual(dict(properties), data)
# callbacks are called too:
- cb = properties(mock.Mock())
+ cb = mock.Mock(); properties(cb)
cb.assert_called_with(properties)
cb.reset_mock()
self.assertEqual(len(properties.callbacks), 1)
- data = dict(a=1, b=2)
self.__handler(0, zookeeper.CHANGED_EVENT, zookeeper.CONNECTED_STATE,
path)
self.assertEqual(dict(properties), data)
@@ -240,7 +257,7 @@
# if a callback raises zc.zk.CancelWatch, the cancel is logged
# and callback is discarded
- cb = properties(mock.Mock())
+ cb = mock.Mock(); properties(cb)
self.assertEqual(len(properties.callbacks), 1)
cb.side_effect = zc.zk.CancelWatch
data = {}
@@ -255,7 +272,7 @@
h.uninstall()
# If a session expires, it will be reestablished with watches intact.
- cb = properties(mock.Mock())
+ cb = mock.Mock(); properties(cb)
self.__session_watcher(
0, zookeeper.SESSION_EVENT, zookeeper.EXPIRED_SESSION_STATE, "")
close.assert_called_with(0)
@@ -274,8 +291,9 @@
path = '/test'
@side_effect(get)
- def _(handle, path_, handler):
- self.__handler = handler
+ def _(handle, path_, handler=None):
+ if handler is not None:
+ self.__handler = handler
self.assertEqual((handle, path_), (0, path))
return json.dumps(data), {}
@@ -304,8 +322,9 @@
path = '/test'
@side_effect(get)
- def _(handle, path_, handler):
- self.__handler = handler
+ def _(handle, path_, handler=None):
+ if handler is not None:
+ self.__handler = handler
self.assertEqual((handle, path_), (0, path))
return data, {}
@@ -337,45 +356,236 @@
properties.set(string_value='xxx')
self.assertEqual(self.__set_data, 'xxx')
- @mock.patch('zookeeper.state')
- @mock.patch('zookeeper.get')
- @mock.patch('zookeeper.get_children')
- def test_deleted_node_with_watchers(self, get_children, get, state):
- state.side_effect = self.state_side_effect
- path = '/test'
- @side_effect(get)
- def _(handle, path_, handler):
- self.__get_handler = handler
- return '{"a": 1}', {}
- @side_effect(get_children)
- def _(handle, path_, handler):
- self.__child_handler = handler
- return ['x']
+def test_children():
+ """
+ >>> zk = zc.zk.ZooKeeper('zookeeper.example.com:2181')
+ >>> _ = zk.create('/test', '', zc.zk.OPEN_ACL_UNSAFE)
+ >>> children = zk.children('/test')
+ >>> sorted(children)
+ []
- children = self.__zk.children(path)
- self.assertEqual(list(children), ['x'])
- cb = children(mock.Mock())
- cb.side_effect = lambda x: None
- ccb = children(mock.Mock())
- ccb.assert_called_with(children)
+ >>> def create(path):
+ ... zk.create(path, '', zc.zk.OPEN_ACL_UNSAFE)
+ >>> create('/test/a')
+ >>> sorted(children)
+ ['a']
- properties = self.__zk.properties(path)
- self.assertEqual(dict(properties), dict(a=1))
- cb = properties(mock.Mock())
- cb.side_effect = lambda x: None
- pcb = properties(mock.Mock())
- pcb.assert_called_with(properties)
+We can register callbacks:
- self.__get_handler(
- 0, zookeeper.DELETED_EVENT, zookeeper.CONNECTED_STATE, path)
- self.assertEqual(dict(properties), {})
- pcb.assert_called_with()
+ >>> @children
+ ... def cb(c):
+ ... print 'good', sorted(c)
+ good ['a']
- self.__child_handler(
- 0, zookeeper.DELETED_EVENT, zookeeper.CONNECTED_STATE, path)
- self.assertEqual(list(children), [])
- ccb.assert_called_with()
+When we register a callback, it gets called immediately with a children object.
+ >>> create('/test/b')
+ good ['a', 'b']
+ >>> sorted(children)
+ ['a', 'b']
+
+If a callback raises an error immediately, it isn't saved:
+
+ >>> @children
+ ... def bad(c):
+ ... raise ValueError
+ Traceback (most recent call last):
+ ...
+ ValueError
+
+ >>> create('/test/c')
+ good ['a', 'b', 'c']
+
+If a callback raises an error later, it is logged and the callback is
+cancelled:
+
+ >>> logger = zklogger()
+
+ >>> badnow = False
+ >>> @children
+ ... def bad(c):
+ ... assert c is children
+ ... print 'bad later', sorted(c)
+ ... if badnow:
+ ... raise ValueError
+ bad later ['a', 'b', 'c']
+
+ >>> zk.delete('/test/c')
+ good ['a', 'b']
+ bad later ['a', 'b']
+
+ >>> badnow = True
+ >>> zk.delete('/test/b') # doctest: +ELLIPSIS
+ good ['a']
+ bad later ['a']
+ ERROR watch(zc.zk.Children(0, /test), <function bad at ...>)
+ Traceback (most recent call last):
+ ...
+ ValueError
+
+ >>> zk.delete('/test/a')
+ good []
+
+A callback can also cancel itself by raising CancelWatch:
+
+ >>> cancelnow = False
+ >>> @children
+ ... def cancel(c):
+ ... assert c is children
+ ... print 'cancel later', sorted(c)
+ ... if cancelnow:
+ ... raise zc.zk.CancelWatch
+ cancel later []
+
+ >>> create('/test/a')
+ good ['a']
+ cancel later ['a']
+
+ >>> cancelnow = True
+ >>> create('/test/b') # doctest: +ELLIPSIS
+ good ['a', 'b']
+ cancel later ['a', 'b']
+ DEBUG cancelled watch(zc.zk.Children(0, /test), <function cancel at ...>)
+
+ >>> logger.uninstall()
+ """
+
+def test_handler_cleanup():
+ """
+ >>> zk = zc.zk.ZooKeeper('zookeeper.example.com:2181')
+ >>> _ = zk.create('/test', '', zc.zk.OPEN_ACL_UNSAFE)
+
+Children:
+
+ >>> children = zk.children('/test')
+ >>> len(zk.watches)
+ 1
+ >>> del children
+ >>> len(zk.watches)
+ 0
+
+ >>> children = zk.children('/test')
+ >>> @children
+ ... def kids(c):
+ ... print c
+ zc.zk.Children(0, /test)
+ >>> len(zk.watches)
+ 1
+ >>> del children
+ >>> len(zk.watches)
+ 1
+ >>> del kids
+ >>> len(zk.watches)
+ 0
+
+ >>> @zk.children('/test')
+ ... def kids(c):
+ ... print c
+ zc.zk.Children(0, /test)
+
+ >>> len(zk.watches)
+ 1
+ >>> del kids
+ >>> len(zk.watches)
+ 0
+
+Properties:
+
+ >>> properties = zk.properties('/test')
+ >>> len(zk.watches)
+ 1
+ >>> del properties
+ >>> len(zk.watches)
+ 0
+
+ >>> properties = zk.properties('/test')
+ >>> @properties
+ ... def props(c):
+ ... print c
+ zc.zk.Properties(0, /test)
+ >>> len(zk.watches)
+ 1
+ >>> del properties
+ >>> len(zk.watches)
+ 1
+ >>> del props
+ >>> len(zk.watches)
+ 0
+
+ >>> @zk.properties('/test')
+ ... def props(c):
+ ... print c
+ zc.zk.Properties(0, /test)
+
+ >>> len(zk.watches)
+ 1
+ >>> del props
+ >>> len(zk.watches)
+ 0
+
+ """
+
+def test_deleted_node_with_watchers():
+ """
+
+Set up some handlers.
+
+ >>> zk = zc.zk.ZooKeeper('zookeeper.example.com:2181')
+ >>> _ = zk.create('/test', '{"a": 1}', zc.zk.OPEN_ACL_UNSAFE)
+
+ >>> children = zk.children('/test')
+ >>> @children
+ ... def _(arg):
+ ... print 1, list(arg)
+ 1 []
+
+ >>> @children
+ ... def _(arg=None):
+ ... print 2, arg
+ 2 zc.zk.Children(0, /test)
+
+ >>> _ = zk.create('/test/a', '', zc.zk.OPEN_ACL_UNSAFE)
+ 1 ['a']
+ 2 zc.zk.Children(0, /test)
+
+ >>> zk.delete('/test/a')
+ 1 []
+ 2 zc.zk.Children(0, /test)
+
+ >>> properties = zk.properties('/test')
+ >>> @properties
+ ... def _(arg):
+ ... print 3, dict(arg)
+ 3 {u'a': 1}
+
+ >>> @properties
+ ... def _(arg=None):
+ ... print 4, arg
+ 4 zc.zk.Properties(0, /test)
+
+ >>> zk.set('/test', '{"b": 2}')
+ 3 {u'b': 2}
+ 4 zc.zk.Properties(0, /test)
+
+Hack data into the child watcher to verify it's cleared:
+
+ >>> children.data = 'data'
+
+Now delete the node. The handlers that accept no arguments will be called:
+
+ >>> zk.delete('/test')
+ 4 None
+ 2 None
+
+Note that the handlers that accept 0 arguments were called.
+
+And the data are cleared:
+
+ >>> list(children), list(properties)
+ ([], [])
+ """
+
+
def resilient_import():
"""
We can use vatious spacing in properties and links:
@@ -537,12 +747,7 @@
>>> zk.resolve('/top/a/top/a/b/top/x')
Traceback (most recent call last):
- File "/usr/local/python/2.6/lib/python2.6/doctest.py", line 1253, in __run
- compileflags, 1) in test.globs
- File "<doctest zc.zk.tests.test_resolve[4]>", line 1, in <module>
- zk.resolve('/top/a/top/a/b/top/x')
- File "/Users/jim/p/zc/zk/trunk/src/zc/zk/__init__.py", line 382, in resolve
- raise zookeeper.NoNodeException(path)
+ ...
NoNodeException: /top/a/top/a/b/top/x
>>> zk.resolve('/top/a/b/c/d/loop')
@@ -556,6 +761,29 @@
LinkLoop: ('/top/a/loop', u'/top/a/b/loop', u'/top/a/loop')
"""
+def test_ln_target_w_trailing_slash():
+ """
+ >>> zk = zc.zk.ZooKeeper('zookeeper.example.com:2181')
+ >>> zk.ln('/databases/main', '/fooservice/')
+ >>> pprint.pprint(zk.get_properties('/fooservice'))
+ {u' ->': u'/databases/main',
+ u'database': u'/databases/foomain',
+ u'favorite_color': u'red',
+ u'threads': 1}
+ """
+
+def test_export_top_w_name():
+ """
+ >>> zk = zc.zk.ZooKeeper('zookeeper.example.com:2181')
+ >>> print zk.export_tree('/', name='top'),
+ /top
+ /fooservice
+ database = u'/databases/foomain'
+ favorite_color = u'red'
+ threads = 1
+ /providers
+ """
+
def assert_(cond, mess=''):
if not cond:
print 'assertion failed: ', mess
@@ -628,11 +856,12 @@
def delete(self, handle, path):
self.check_handle(handle)
- self.traverse(path) # seeif it's there
+ node = self.traverse(path)
base, name = path.rsplit('/', 1)
- node = self.traverse(base)
- del node.children[name]
- node.children_changed(self.handle, zookeeper.CONNECTED_STATE, base)
+ bnode = self.traverse(base)
+ del bnode.children[name]
+ node.deleted(self.handle, zookeeper.CONNECTED_STATE, path)
+ bnode.children_changed(self.handle, zookeeper.CONNECTED_STATE, base)
def exists(self, handle, path):
self.check_handle(handle)
@@ -699,6 +928,16 @@
for w in watchers:
w(handle, zookeeper.CHANGED_EVENT, state, path)
+ def deleted(self, handle, state, path):
+ watchers = self.watchers
+ self.watchers = ()
+ for w in watchers:
+ w(handle, zookeeper.DELETED_EVENT, state, path)
+ watchers = self.child_watchers
+ self.watchers = ()
+ for w in watchers:
+ w(handle, zookeeper.DELETED_EVENT, state, path)
+
def test_suite():
return unittest.TestSuite((
unittest.makeSuite(Tests),
More information about the checkins
mailing list