[Checkins] SVN: relstorage/trunk/ - Added a --dry-run option to zodbpack. Set it to force a dry run
Shane Hathaway
shane at hathawaymix.org
Wed Feb 2 08:51:12 EST 2011
Log message for revision 120049:
- Added a --dry-run option to zodbpack. Set it to force a dry run
of packing.
- Improved the efficiency of packing databases that are both broad
and deep.
Changed:
U relstorage/trunk/CHANGES.txt
U relstorage/trunk/relstorage/adapters/interfaces.py
U relstorage/trunk/relstorage/adapters/mysql.py
U relstorage/trunk/relstorage/adapters/oracle.py
U relstorage/trunk/relstorage/adapters/packundo.py
U relstorage/trunk/relstorage/adapters/postgresql.py
U relstorage/trunk/relstorage/adapters/schema.py
U relstorage/trunk/relstorage/options.py
U relstorage/trunk/relstorage/storage.py
U relstorage/trunk/relstorage/tests/hptestbase.py
U relstorage/trunk/relstorage/tests/reltestbase.py
U relstorage/trunk/relstorage/zodbpack.py
-=-
Modified: relstorage/trunk/CHANGES.txt
===================================================================
--- relstorage/trunk/CHANGES.txt 2011-02-02 11:23:05 UTC (rev 120048)
+++ relstorage/trunk/CHANGES.txt 2011-02-02 13:51:11 UTC (rev 120049)
@@ -7,6 +7,12 @@
- Fixed a missing import in the blob cache cleanup code.
+- Added a --dry-run option to zodbpack. Set it to force a dry run
+ of packing.
+
+- Improved the efficiency of packing databases that are both broad
+ and deep.
+
1.5.0a1 (2010-10-21)
--------------------
Modified: relstorage/trunk/relstorage/adapters/interfaces.py
===================================================================
--- relstorage/trunk/relstorage/adapters/interfaces.py 2011-02-02 11:23:05 UTC (rev 120048)
+++ relstorage/trunk/relstorage/adapters/interfaces.py 2011-02-02 13:51:11 UTC (rev 120049)
@@ -317,20 +317,16 @@
Returns None if there is nothing to pack.
"""
- def pre_pack(pack_tid, get_references, options):
+ def pre_pack(pack_tid, get_references):
"""Decide what to pack.
pack_tid specifies the most recent transaction to pack.
get_references is a function that accepts a stored object state
and returns a set of OIDs that state refers to.
-
- options is an instance of relstorage.Options.
- In particular, the options.pack_gc flag indicates whether
- to run garbage collection.
"""
- def pack(pack_tid, options, sleep=None, packed_func=None):
+ def pack(pack_tid, sleep=None, packed_func=None):
"""Pack. Requires the information provided by pre_pack.
packed_func, if provided, will be called for every object state
Modified: relstorage/trunk/relstorage/adapters/mysql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/mysql.py 2011-02-02 11:23:05 UTC (rev 120048)
+++ relstorage/trunk/relstorage/adapters/mysql.py 2011-02-02 13:51:11 UTC (rev 120049)
@@ -136,6 +136,7 @@
connmanager=self.connmanager,
runner=self.runner,
locker=self.locker,
+ options=options,
)
self.dbiter = HistoryPreservingDatabaseIterator(
runner=self.runner,
@@ -145,6 +146,7 @@
connmanager=self.connmanager,
runner=self.runner,
locker=self.locker,
+ options=options,
)
self.dbiter = HistoryFreeDatabaseIterator(
runner=self.runner,
Modified: relstorage/trunk/relstorage/adapters/oracle.py
===================================================================
--- relstorage/trunk/relstorage/adapters/oracle.py 2011-02-02 11:23:05 UTC (rev 120048)
+++ relstorage/trunk/relstorage/adapters/oracle.py 2011-02-02 13:51:11 UTC (rev 120049)
@@ -131,6 +131,7 @@
connmanager=self.connmanager,
runner=self.runner,
locker=self.locker,
+ options=options,
)
self.dbiter = HistoryPreservingDatabaseIterator(
runner=self.runner,
@@ -140,6 +141,7 @@
connmanager=self.connmanager,
runner=self.runner,
locker=self.locker,
+ options=options,
)
self.dbiter = HistoryFreeDatabaseIterator(
runner=self.runner,
Modified: relstorage/trunk/relstorage/adapters/packundo.py
===================================================================
--- relstorage/trunk/relstorage/adapters/packundo.py 2011-02-02 11:23:05 UTC (rev 120048)
+++ relstorage/trunk/relstorage/adapters/packundo.py 2011-02-02 13:51:11 UTC (rev 120049)
@@ -14,12 +14,21 @@
"""Pack/Undo implementations.
"""
+import BTrees
from relstorage.adapters.interfaces import IPackUndo
from ZODB.POSException import UndoError
from zope.interface import implements
import logging
import time
+try:
+ IISet = BTrees.family64.II.Set
+ difference = BTrees.family64.II.difference
+except AttributeError:
+ # Fall back to old BTrees with no special support for 64 bit integers
+ from BTrees.OOBTree import OOSet as IISet
+ from BTrees.OOBTree import difference
+
log = logging.getLogger(__name__)
@@ -28,10 +37,11 @@
verify_sane_database = False
- def __init__(self, connmanager, runner, locker):
+ def __init__(self, connmanager, runner, locker, options):
self.connmanager = connmanager
self.runner = runner
self.locker = locker
+ self.options = options
def choose_pack_transaction(self, pack_point):
"""Return the transaction before or at the specified pack time.
@@ -50,9 +60,95 @@
finally:
self.connmanager.close(conn, cursor)
- def _visit_all_references(self, cursor):
- """Visit all references in pack_object and set the keep flags.
+ def _traverse_graph(self, cursor):
+ """Visit the entire object graph and set the pack_object.keep flags.
+
+ Dispatches to the implementation specified in
+ self.options.pack_gc_traversal.
"""
+ m = getattr(self,
+ '_traverse_graph_%s' % self.options.pack_gc_traversal)
+ m(cursor)
+
+ def _traverse_graph_python(self, cursor):
+ """Visit the entire object graph and set the pack_object.keep flags.
+
+ Python implementation.
+ """
+ log.info("pre_pack: downloading pack_object and object_ref.")
+
+ # Download the list of root objects to keep from pack_object.
+ keep_set = IISet([0]) # set([oid])
+ stmt = """
+ SELECT zoid
+ FROM pack_object
+ WHERE keep = %(TRUE)s
+ """
+ self.runner.run_script_stmt(cursor, stmt)
+ for from_oid, in cursor:
+ keep_set.insert(from_oid)
+
+ # Download the list of object references into all_refs.
+ # Use IISets to minimize RAM consumption.
+ all_refs = {} # {from_oid: set([to_oid])}
+ stmt = """
+ SELECT object_ref.zoid, object_ref.to_zoid
+ FROM object_ref
+ JOIN pack_object ON (object_ref.zoid = pack_object.zoid)
+ WHERE object_ref.tid >= pack_object.keep_tid
+ ORDER BY object_ref.zoid
+ """
+ self.runner.run_script_stmt(cursor, stmt)
+ current_oid = None
+ current_refs = IISet() # set([to_oid])
+ for from_oid, to_oid in cursor:
+ if current_oid is None:
+ current_oid = from_oid
+ elif current_oid != from_oid:
+ all_refs[current_oid] = current_refs
+ current_refs = IISet()
+ current_refs.insert(to_oid)
+ if current_oid is not None:
+ all_refs[current_oid] = current_refs
+
+ # Traverse the object graph. Add to keep_set all of the
+ # reachable OIDs.
+ added_oids = IISet()
+ added_oids.update(keep_set)
+ pass_num = 0
+ while added_oids:
+ pass_num += 1
+ to_visit = added_oids
+ added_oids = IISet()
+ for from_oid in to_visit:
+ to_oids = all_refs.get(from_oid)
+ if to_oids:
+ to_add = difference(to_oids, keep_set)
+ if to_add:
+ keep_set.update(to_add)
+ added_oids.update(to_add)
+ log.info("pre_pack: found %d more referenced object(s) in "
+ "pass %d", len(added_oids), pass_num)
+
+ # Set pack_object.keep for all OIDs in keep_set.
+ del all_refs # Free some RAM
+ log.info("pre_pack: uploading the list of reachable objects.")
+ keep_list = list(keep_set)
+ while keep_list:
+ batch = keep_list[:100]
+ keep_list = keep_list[100:]
+ oids_str = ','.join(str(oid) for oid in batch)
+ stmt = """
+ UPDATE pack_object SET keep = %%(TRUE)s, visited = %%(TRUE)s
+ WHERE zoid IN (%s)
+ """ % oids_str
+ self.runner.run_script_stmt(cursor, stmt)
+
+ def _traverse_graph_sql(self, cursor):
+ """Visit the entire object graph and set the pack_object.keep flags.
+
+ SQL implementation.
+ """
# Each of the objects to be kept might refer to other objects.
# Mark the referenced objects to be kept as well. Do this
# repeatedly until all references have been satisfied.
@@ -109,7 +205,7 @@
else:
pass_num += 1
- def _pause_pack(self, sleep, options, start):
+ def _pause_pack(self, sleep, start):
"""Pause packing to allow concurrent commits."""
if sleep is None:
sleep = time.sleep
@@ -118,9 +214,9 @@
# Compensate for low timer resolution by
# assuming that at least 10 ms elapsed.
elapsed = 0.01
- duty_cycle = options.pack_duty_cycle
+ duty_cycle = self.options.pack_duty_cycle
if duty_cycle > 0.0 and duty_cycle < 1.0:
- delay = min(options.pack_max_delay,
+ delay = min(self.options.pack_max_delay,
elapsed * (1.0 / duty_cycle - 1.0))
if delay > 0:
log.debug('pack: sleeping %.4g second(s)', delay)
@@ -145,9 +241,10 @@
_script_create_temp_pack_visit = """
CREATE TEMPORARY TABLE temp_pack_visit (
zoid BIGINT NOT NULL,
- keep_tid BIGINT
+ keep_tid BIGINT NOT NULL
);
- CREATE UNIQUE INDEX temp_pack_visit_zoid ON temp_pack_visit (zoid)
+ CREATE UNIQUE INDEX temp_pack_visit_zoid ON temp_pack_visit (zoid);
+ CREATE INDEX temp_pack_keep_tid ON temp_pack_visit (keep_tid)
"""
_script_pre_pack_follow_child_refs = """
@@ -424,7 +521,7 @@
"from %d object(s)", tid, to_count, from_count)
return to_count
- def pre_pack(self, pack_tid, get_references, options):
+ def pre_pack(self, pack_tid, get_references):
"""Decide what to pack.
pack_tid specifies the most recent transaction to pack.
@@ -432,8 +529,8 @@
get_references is a function that accepts a pickled state and
returns a set of OIDs that state refers to.
- options is an instance of relstorage.Options.
- The options.pack_gc flag indicates whether to run garbage collection.
+ The self.options.pack_gc flag indicates whether
+ to run garbage collection.
If pack_gc is false, at least one revision of every object is kept,
even if nothing refers to it. Packing with pack_gc disabled can be
much faster.
@@ -441,7 +538,7 @@
conn, cursor = self.connmanager.open_for_pre_pack()
try:
try:
- if options.pack_gc:
+ if self.options.pack_gc:
log.info("pre_pack: start with gc enabled")
self._pre_pack_with_gc(
conn, cursor, pack_tid, get_references)
@@ -456,7 +553,7 @@
self.runner.run_script_stmt(cursor, stmt)
to_remove = 0
- if options.pack_gc:
+ if self.options.pack_gc:
# Pack objects with the keep flag set to false.
stmt = """
INSERT INTO pack_state (tid, zoid)
@@ -559,8 +656,8 @@
-- Keep objects that have been revised since pack_tid.
-- Use temp_pack_visit for temporary state; otherwise MySQL 5 chokes.
- INSERT INTO temp_pack_visit (zoid)
- SELECT zoid
+ INSERT INTO temp_pack_visit (zoid, keep_tid)
+ SELECT zoid, 0
FROM current_object
WHERE tid > %(pack_tid)s;
@@ -575,8 +672,8 @@
-- Keep objects that are still referenced by object states in
-- transactions that will not be packed.
-- Use temp_pack_visit for temporary state; otherwise MySQL 5 chokes.
- INSERT INTO temp_pack_visit (zoid)
- SELECT DISTINCT to_zoid
+ INSERT INTO temp_pack_visit (zoid, keep_tid)
+ SELECT DISTINCT to_zoid, 0
FROM object_ref
WHERE tid > %(pack_tid)s;
@@ -590,11 +687,11 @@
"""
self.runner.run_script(cursor, stmt, {'pack_tid': pack_tid})
- # Set the 'keep' flags in pack_object
- self._visit_all_references(cursor)
+ # Traverse the graph, setting the 'keep' flags in pack_object
+ self._traverse_graph(cursor)
- def pack(self, pack_tid, options, sleep=None, packed_func=None):
+ def pack(self, pack_tid, sleep=None, packed_func=None):
"""Pack. Requires the information provided by pre_pack."""
# Read committed mode is sufficient.
@@ -633,14 +730,14 @@
self._pack_transaction(
cursor, pack_tid, tid, packed, has_removable,
packed_list)
- if time.time() >= start + options.pack_batch_timeout:
+ if time.time() >= start + self.options.pack_batch_timeout:
conn.commit()
if packed_func is not None:
for oid, tid in packed_list:
packed_func(oid, tid)
del packed_list[:]
self.locker.release_commit_lock(cursor)
- self._pause_pack(sleep, options, start)
+ self._pause_pack(sleep, start)
self.locker.hold_commit_lock(cursor)
start = time.time()
if packed_func is not None:
@@ -756,9 +853,10 @@
_script_create_temp_pack_visit = """
CREATE TEMPORARY TABLE temp_pack_visit (
zoid BIGINT UNSIGNED NOT NULL,
- keep_tid BIGINT UNSIGNED
+ keep_tid BIGINT UNSIGNED NOT NULL
);
CREATE UNIQUE INDEX temp_pack_visit_zoid ON temp_pack_visit (zoid);
+ CREATE INDEX temp_pack_keep_tid ON temp_pack_visit (keep_tid);
CREATE TEMPORARY TABLE temp_pack_child (
zoid BIGINT UNSIGNED NOT NULL
);
@@ -856,9 +954,10 @@
_script_create_temp_pack_visit = """
CREATE TEMPORARY TABLE temp_pack_visit (
zoid BIGINT NOT NULL,
- keep_tid BIGINT
+ keep_tid BIGINT NOT NULL
);
- CREATE UNIQUE INDEX temp_pack_visit_zoid ON temp_pack_visit (zoid)
+ CREATE UNIQUE INDEX temp_pack_visit_zoid ON temp_pack_visit (zoid);
+ CREATE INDEX temp_pack_keep_tid ON temp_pack_visit (keep_tid)
"""
_script_pre_pack_follow_child_refs = """
@@ -995,7 +1094,7 @@
return len(add_refs)
- def pre_pack(self, pack_tid, get_references, options):
+ def pre_pack(self, pack_tid, get_references):
"""Decide what the garbage collector should delete.
Objects created or modified after pack_tid will not be
@@ -1004,11 +1103,10 @@
get_references is a function that accepts a pickled state and
returns a set of OIDs that state refers to.
- options is an instance of relstorage.Options.
- The options.pack_gc flag indicates whether to run garbage collection.
- If pack_gc is false, this method does nothing.
+ The self.options.pack_gc flag indicates whether to run garbage
+ collection. If pack_gc is false, this method does nothing.
"""
- if not options.pack_gc:
+ if not self.options.pack_gc:
log.warning("pre_pack: garbage collection is disabled on a "
"history-free storage, so doing nothing")
return
@@ -1056,11 +1154,11 @@
"""
self.runner.run_script(cursor, stmt, {'pack_tid': pack_tid})
- # Set the 'keep' flags in pack_object
- self._visit_all_references(cursor)
+ # Traverse the graph, setting the 'keep' flags in pack_object
+ self._traverse_graph(cursor)
- def pack(self, pack_tid, options, sleep=None, packed_func=None):
+ def pack(self, pack_tid, sleep=None, packed_func=None):
"""Run garbage collection.
Requires the information provided by pre_pack.
@@ -1096,14 +1194,14 @@
self.runner.run_many(cursor, stmt, items)
packed_list.extend(items)
- if time.time() >= start + options.pack_batch_timeout:
+ if time.time() >= start + self.options.pack_batch_timeout:
conn.commit()
if packed_func is not None:
for oid, tid in packed_list:
packed_func(oid, tid)
del packed_list[:]
self.locker.release_commit_lock(cursor)
- self._pause_pack(sleep, options, start)
+ self._pause_pack(sleep, start)
self.locker.hold_commit_lock(cursor)
start = time.time()
@@ -1159,7 +1257,7 @@
_script_create_temp_pack_visit = """
CREATE TEMPORARY TABLE temp_pack_visit (
zoid BIGINT UNSIGNED NOT NULL,
- keep_tid BIGINT UNSIGNED
+ keep_tid BIGINT UNSIGNED NOT NULL
);
CREATE UNIQUE INDEX temp_pack_visit_zoid ON temp_pack_visit (zoid);
CREATE TEMPORARY TABLE temp_pack_child (
Modified: relstorage/trunk/relstorage/adapters/postgresql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/postgresql.py 2011-02-02 11:23:05 UTC (rev 120048)
+++ relstorage/trunk/relstorage/adapters/postgresql.py 2011-02-02 13:51:11 UTC (rev 120049)
@@ -101,6 +101,7 @@
connmanager=self.connmanager,
runner=self.runner,
locker=self.locker,
+ options=options,
)
self.dbiter = HistoryPreservingDatabaseIterator(
runner=self.runner,
@@ -110,6 +111,7 @@
connmanager=self.connmanager,
runner=self.runner,
locker=self.locker,
+ options=options,
)
self.dbiter = HistoryFreeDatabaseIterator(
runner=self.runner,
Modified: relstorage/trunk/relstorage/adapters/schema.py
===================================================================
--- relstorage/trunk/relstorage/adapters/schema.py 2011-02-02 11:23:05 UTC (rev 120048)
+++ relstorage/trunk/relstorage/adapters/schema.py 2011-02-02 13:51:11 UTC (rev 120049)
@@ -346,8 +346,10 @@
# whose references need to be examined.
CREATE GLOBAL TEMPORARY TABLE temp_pack_visit (
zoid NUMBER(20) NOT NULL PRIMARY KEY,
- keep_tid NUMBER(20)
+ keep_tid NUMBER(20) NOT NULL
);
+ CREATE UNIQUE INDEX temp_pack_visit_zoid ON temp_pack_visit (zoid);
+ CREATE INDEX temp_pack_keep_tid ON temp_pack_visit (keep_tid)
# Temporary state during undo: a list of objects
# to be undone and the tid of the undone state.
@@ -355,6 +357,7 @@
zoid NUMBER(20) NOT NULL PRIMARY KEY,
prev_tid NUMBER(20) NOT NULL
);
+ CREATE UNIQUE INDEX temp_undo_zoid ON temp_undo (zoid)
"""
history_preserving_init = """
@@ -647,8 +650,10 @@
# whose references need to be examined.
CREATE GLOBAL TEMPORARY TABLE temp_pack_visit (
zoid NUMBER(20) NOT NULL PRIMARY KEY,
- keep_tid NUMBER(20)
+ keep_tid NUMBER(20) NOT NULL
);
+ CREATE UNIQUE INDEX temp_pack_visit_zoid ON temp_pack_visit (zoid);
+ CREATE INDEX temp_pack_keep_tid ON temp_pack_visit (keep_tid)
"""
history_free_init = """
Modified: relstorage/trunk/relstorage/options.py
===================================================================
--- relstorage/trunk/relstorage/options.py 2011-02-02 11:23:05 UTC (rev 120048)
+++ relstorage/trunk/relstorage/options.py 2011-02-02 13:51:11 UTC (rev 120049)
@@ -37,6 +37,7 @@
self.replica_timeout = 600.0
self.poll_interval = 0
self.pack_gc = True
+ self.pack_gc_traversal = 'sql'
self.pack_dry_run = False
self.pack_batch_timeout = 5.0
self.pack_duty_cycle = 0.5
Modified: relstorage/trunk/relstorage/storage.py
===================================================================
--- relstorage/trunk/relstorage/storage.py 2011-02-02 11:23:05 UTC (rev 120048)
+++ relstorage/trunk/relstorage/storage.py 2011-02-02 13:51:11 UTC (rev 120049)
@@ -1060,10 +1060,12 @@
finally:
self._lock_release()
- def pack(self, t, referencesf, sleep=None):
+ def pack(self, t, referencesf, dry_run=False, sleep=None):
if self._is_read_only:
raise POSException.ReadOnlyError()
+ dry_run = dry_run or self._options.pack_dry_run
+
pack_point = repr(TimeStamp(*time.gmtime(t)[:5] + (t % 60,)))
pack_point_int = u64(pack_point)
@@ -1092,7 +1094,7 @@
"been packed", time.ctime(t))
return
- if self._options.pack_dry_run:
+ if dry_run:
log.info("pack: beginning dry run")
s = time.ctime(TimeStamp(p64(tid_int)).timeTime())
@@ -1102,10 +1104,9 @@
# In pre_pack, the adapter fills tables with
# information about what to pack. The adapter
# must not actually pack anything yet.
- adapter.packundo.pre_pack(
- tid_int, get_references, self._options)
+ adapter.packundo.pre_pack(tid_int, get_references)
- if self._options.pack_dry_run:
+ if dry_run:
log.info("pack: dry run complete")
else:
# Now pack.
@@ -1113,7 +1114,7 @@
packed_func = self.blobhelper.after_pack
else:
packed_func = None
- adapter.packundo.pack(tid_int, self._options, sleep=sleep,
+ adapter.packundo.pack(tid_int, sleep=sleep,
packed_func=packed_func)
finally:
adapter.locker.release_pack_lock(lock_cursor)
Modified: relstorage/trunk/relstorage/tests/hptestbase.py
===================================================================
--- relstorage/trunk/relstorage/tests/hptestbase.py 2011-02-02 11:23:05 UTC (rev 120048)
+++ relstorage/trunk/relstorage/tests/hptestbase.py 2011-02-02 13:51:11 UTC (rev 120049)
@@ -192,7 +192,7 @@
db.close()
def checkPackGCDisabled(self):
- self._storage._options.pack_gc = False
+ self._storage._adapter.packundo.options.pack_gc = False
self.checkPackGC(expect_object_deleted=False)
def checkPackGCDryRun(self):
Modified: relstorage/trunk/relstorage/tests/reltestbase.py
===================================================================
--- relstorage/trunk/relstorage/tests/reltestbase.py 2011-02-02 11:23:05 UTC (rev 120048)
+++ relstorage/trunk/relstorage/tests/reltestbase.py 2011-02-02 13:51:11 UTC (rev 120049)
@@ -481,7 +481,8 @@
def checkPackDutyCycle(self):
# Exercise the code in the pack algorithm that releases the
# commit lock for a time to allow concurrent transactions to commit.
- self._storage._options.pack_batch_timeout = 0 # pause after every txn
+ # pause after every txn
+ self._storage._adapter.packundo.options.pack_batch_timeout = 0
slept = []
def sim_sleep(seconds):
Modified: relstorage/trunk/relstorage/zodbpack.py
===================================================================
--- relstorage/trunk/relstorage/zodbpack.py 2011-02-02 11:23:05 UTC (rev 120048)
+++ relstorage/trunk/relstorage/zodbpack.py 2011-02-02 13:51:11 UTC (rev 120049)
@@ -40,7 +40,13 @@
parser.add_option(
"-d", "--days", dest="days", default="0",
help="Days of history to keep (default 0)",
- )
+ )
+ parser.add_option(
+ "--dry-run", dest="dry_run", default=False,
+ action="store_true",
+ help="Perform a dry run of the pack. "
+ "(Only works with some storage types)",
+ )
options, args = parser.parse_args(argv[1:])
if len(args) != 1:
@@ -59,7 +65,10 @@
log.info("Opening %s...", name)
storage = s.open()
log.info("Packing %s.", name)
- storage.pack(t, ZODB.serialize.referencesf)
+ if options.dry_run:
+ storage.pack(t, ZODB.serialize.referencesf, dry_run=True)
+ else:
+ storage.pack(t, ZODB.serialize.referencesf)
storage.close()
log.info("Packed %s.", name)
More information about the checkins
mailing list