[Checkins] SVN: relstorage/trunk/relstorage/tests/ Retire speedtest in favor of zodbshootout

Shane Hathaway shane at hathawaymix.org
Tue Nov 17 15:38:26 EST 2009


Log message for revision 105772:
  Retire speedtest in favor of zodbshootout

Changed:
  D   relstorage/trunk/relstorage/tests/comparison.ods
  D   relstorage/trunk/relstorage/tests/speedtest.py

-=-
Deleted: relstorage/trunk/relstorage/tests/comparison.ods
===================================================================
(Binary files differ)

Deleted: relstorage/trunk/relstorage/tests/speedtest.py
===================================================================
--- relstorage/trunk/relstorage/tests/speedtest.py	2009-11-17 20:31:38 UTC (rev 105771)
+++ relstorage/trunk/relstorage/tests/speedtest.py	2009-11-17 20:38:25 UTC (rev 105772)
@@ -1,408 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2008 Zope Foundation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""Compare the speed of RelStorage with FileStorage + ZEO.
-
-Splits into many processes to avoid contention over the global
-interpreter lock.
-"""
-
-import cPickle
-import logging
-import os
-import shutil
-import signal
-import sys
-import tempfile
-import time
-import traceback
-
-import transaction
-from BTrees.IOBTree import IOBTree
-from ZODB.DB import DB
-from ZODB.Connection import Connection
-
-from relstorage.options import Options
-from relstorage.storage import RelStorage
-
-debug = False
-txn_count = 10
-object_counts = [10000]  # [1, 100, 10000]
-concurrency_levels = range(1, 16, 2)
-contenders = [
-    ('ZEO + FileStorage', 'zeofs_test'),
-    ('PostgreSQLAdapter', 'postgres_test'),
-    ('MySQLAdapter', 'mysql_test'),
-    ('OracleAdapter', 'oracle_test'),
-    ]
-repetitions = 3
-max_attempts = 20
-keep_history = True
-
-
-class ChildProcessError(Exception):
-    """A child process failed"""
-
-
-def run_in_child(wait, func, *args, **kw):
-    pid = os.fork()
-    if pid == 0:
-        # child
-        try:
-            try:
-                logging.basicConfig()
-                if debug:
-                    logging.getLogger().setLevel(logging.DEBUG)
-
-                func(*args, **kw)
-            except:
-                traceback.print_exc()
-                os._exit(1)
-        finally:
-            os._exit(0)
-    elif wait:
-        pid_again, code = os.waitpid(pid, 0)
-        if code:
-            raise ChildProcessError(
-                "process running %r failed with exit code %d" % (func, code))
-    return pid
-
-
-class ZEOServerRunner(object):
-
-    def __init__(self):
-        self.dir = tempfile.mkdtemp()
-        self.store_fn = os.path.join(self.dir, 'storage')
-        self.sock_fn = os.path.join(self.dir, 'sock')
-        self.pid = None
-
-    def run(self):
-        from ZODB.FileStorage import FileStorage
-        from ZEO.StorageServer import StorageServer
-
-        fs = FileStorage(self.store_fn, create=True)
-        ss = StorageServer(self.sock_fn, {'1': fs})
-
-        import ThreadedAsync.LoopCallback
-        ThreadedAsync.LoopCallback.loop()
-
-    def start(self):
-        self.pid = run_in_child(False, self.run)
-        # parent
-        sys.stderr.write('Waiting for ZEO server to start...')
-        while not os.path.exists(self.sock_fn):
-            sys.stderr.write('.')
-            sys.stderr.flush()
-            time.sleep(0.1)
-        sys.stderr.write(' started.\n')
-        sys.stderr.flush()
-
-    def stop(self):
-        os.kill(self.pid, signal.SIGTERM)
-        shutil.rmtree(self.dir)
-
-
-class SpeedTest:
-
-    def __init__(self, concurrency, objects_per_txn, zeo_runner):
-        self.concurrency = concurrency
-        self.data_to_store = dict((n, 1) for n in range(objects_per_txn))
-        self.zeo_runner = zeo_runner
-
-    def populate(self, make_storage):
-        # initialize the database
-        storage = make_storage()
-        db = DB(storage)
-        conn = db.open()
-        root = conn.root()
-
-        # clear the database
-        root['speedtest'] = None
-        transaction.commit()
-        db.pack()
-
-        # put a tree in the database
-        root['speedtest'] = t = IOBTree()
-        for i in range(self.concurrency):
-            t[i] = IOBTree()
-        transaction.commit()
-        conn.close()
-        db.close()
-        if debug:
-            print >> sys.stderr, 'Populated storage.'
-
-    def write_test(self, storage, n):
-        db = DB(storage)
-        start = time.time()
-        for i in range(txn_count):
-            conn = db.open()
-            root = conn.root()
-            myobj = root['speedtest'][n]
-            myobj[i] = IOBTree(self.data_to_store)
-            transaction.commit()
-            conn.close()
-        end = time.time()
-        db.close()
-        return end - start
-
-    def read_test(self, storage, n):
-        db = DB(storage)
-        start = time.time()
-        for i in range(txn_count):
-            conn = db.open()
-            root = conn.root()
-            got = len(list(root['speedtest'][n][i]))
-            expect = len(self.data_to_store)
-            if got != expect:
-                raise AssertionError
-            conn.close()
-        end = time.time()
-        db.close()
-        return end - start
-
-    def run_tests(self, make_storage):
-        """Run a write and read test.
-
-        Returns the mean time per write transaction and
-        the mean time per read transaction.
-        """
-        run_in_child(True, self.populate, make_storage)
-        r = range(self.concurrency)
-
-        def write(n):
-            return self.write_test(make_storage(), n)
-        def read(n):
-            return self.read_test(make_storage(), n)
-
-        write_times = distribute(write, r)
-        read_times = distribute(read, r)
-        count = float(self.concurrency * txn_count)
-        return (sum(write_times) / count, sum(read_times) / count)
-
-    def zeofs_test(self):
-        def make_storage():
-            from ZEO.ClientStorage import ClientStorage
-            return ClientStorage(self.zeo_runner.sock_fn)
-        return self.run_tests(make_storage)
-
-    def postgres_test(self):
-        from relstorage.adapters.postgresql import PostgreSQLAdapter
-        if keep_history:
-            db = 'relstoragetest'
-        else:
-            db = 'relstoragetest_hf'
-        dsn = 'dbname=%s user=relstoragetest password=relstoragetest' % db
-        options = Options(keep_history=keep_history)
-        adapter = PostgreSQLAdapter(dsn=dsn, options=options)
-        adapter.schema.prepare()
-        adapter.schema.zap_all()
-        def make_storage():
-            return RelStorage(adapter)
-        return self.run_tests(make_storage)
-
-    def oracle_test(self):
-        from relstorage.adapters.oracle import OracleAdapter
-        dsn = os.environ.get('ORACLE_TEST_DSN', 'XE')
-        if keep_history:
-            db = 'relstoragetest'
-        else:
-            db = 'relstoragetest_hf'
-        options = Options(keep_history=keep_history)
-        adapter = OracleAdapter(
-            user=db,
-            password='relstoragetest',
-            dsn=dsn,
-            options=options,
-            )
-        adapter.schema.prepare()
-        adapter.schema.zap_all()
-        def make_storage():
-            return RelStorage(adapter)
-        return self.run_tests(make_storage)
-
-    def mysql_test(self):
-        from relstorage.adapters.mysql import MySQLAdapter
-        if keep_history:
-            db = 'relstoragetest'
-        else:
-            db = 'relstoragetest_hf'
-        options = Options(keep_history=keep_history)
-        adapter = MySQLAdapter(
-            options=options,
-            db=db,
-            user='relstoragetest',
-            passwd='relstoragetest',
-            )
-        adapter.schema.prepare()
-        adapter.schema.zap_all()
-        def make_storage():
-            return RelStorage(adapter)
-        return self.run_tests(make_storage)
-
-
-def distribute(func, param_iter):
-    """Call a function in separate processes concurrently.
-
-    param_iter is an iterator that provides the parameter for each
-    function call.  The parameter is passed as the single argument.
-    The results of calling the function are appended to a list, which
-    is returned once all functions have returned.  If any function
-    raises an error, the error is re-raised in the caller.
-    """
-    dir = tempfile.mkdtemp()
-    try:
-        waiting = set()  # set of child process IDs
-        for param in param_iter:
-            pid = os.fork()
-            if pid == 0:
-                # child
-                try:
-                    logging.basicConfig()
-                    if debug:
-                        logging.getLogger().setLevel(logging.DEBUG)
-
-                    fn = os.path.join(dir, str(os.getpid()))
-                    try:
-                        res = 1, func(param)
-                    except:
-                        traceback.print_exc()
-                        res = 0, sys.exc_info()[:2]
-                    f = open(fn, 'wb')
-                    try:
-                        cPickle.dump(res, f)
-                    finally:
-                        f.close()
-                finally:
-                    os._exit(0)
-            else:
-                # parent
-                waiting.add(pid)
-        results = []
-        try:
-            while waiting:
-                for pid in list(waiting):
-                    pid_again, code = os.waitpid(pid, os.WNOHANG)
-                    if not pid_again:
-                        continue
-                    waiting.remove(pid)
-                    if code:
-                        raise ChildProcessError(
-                            "A process failed with exit code %d" % code)
-                    else:
-                        fn = os.path.join(dir, str(pid))
-                        f = open(fn, 'rb')
-                        try:
-                            ok, value = cPickle.load(f)
-                            if ok:
-                                results.append(value)
-                            else:
-                                raise ChildProcessError(
-                                    "a child process raised an error: "
-                                    "%s: %s" % tuple(value))
-                        finally:
-                            f.close()
-                time.sleep(0.1)
-            return results
-        finally:
-            # kill the remaining processes
-            for pid in waiting:
-                try:
-                    os.kill(pid, signal.SIGTERM)
-                except OSError:
-                    pass
-    finally:
-        shutil.rmtree(dir)
-
-
-def main():
-    zeo_runner = ZEOServerRunner()
-    zeo_runner.start()
-
-    # results: {(objects_per_txn, concurrency, contender, direction): [time]}}
-    results = {}
-    for objects_per_txn in object_counts:
-        for concurrency in concurrency_levels:
-            for contender_name, method_name in contenders:
-                for direction in (0, 1):
-                    key = (objects_per_txn, concurrency,
-                            contender_name, direction)
-                    results[key] = []
-
-    try:
-        for objects_per_txn in object_counts:
-            for concurrency in concurrency_levels:
-                test = SpeedTest(concurrency, objects_per_txn, zeo_runner)
-                for contender_name, method_name in contenders:
-                    print >> sys.stderr, (
-                        'Testing %s with objects_per_txn=%d and concurrency=%d'
-                        % (contender_name, objects_per_txn, concurrency))
-                    method = getattr(test, method_name)
-                    key = (objects_per_txn, concurrency, contender_name)
-
-                    for rep in range(repetitions):
-                        for attempt in range(max_attempts):
-                            msg = '  Running %d/%d...' % (rep + 1, repetitions)
-                            if attempt > 0:
-                                msg += ' (attempt %d)' % (attempt + 1)
-                            print >> sys.stderr, msg,
-                            try:
-                                w, r = method()
-                            except ChildProcessError:
-                                if attempt >= max_attempts - 1:
-                                    raise
-                            else:
-                                break
-                        msg = 'write %5.3fs, read %5.3fs' % (w, r)
-                        print >> sys.stderr, msg
-                        results[key + (0,)].append(w)
-                        results[key + (1,)].append(r)
-
-    # The finally clause causes test results to print even if the tests
-    # stop early.
-    finally:
-        zeo_runner.stop()
-
-        # show the results in CSV format
-        print >> sys.stderr, (
-            'Average time per transaction in seconds.  Best of 3.')
-
-        for objects_per_txn in object_counts:
-            print '** Results with objects_per_txn=%d **' % objects_per_txn
-
-            line = ['"Concurrency"']
-            for contender_name, func in contenders:
-                for direction in (0, 1):
-                    dir_text = ['write', 'read'][direction]
-                    line.append('%s - %s' % (contender_name, dir_text))
-            print ', '.join(line)
-
-            for concurrency in concurrency_levels:
-                line = [str(concurrency)]
-
-                for contender_name, method_name in contenders:
-                    for direction in (0, 1):
-                        key = (objects_per_txn, concurrency,
-                            contender_name, direction)
-                        lst = results[key]
-                        if lst:
-                            t = min(lst)
-                            line.append('%5.3f' % t)
-                        else:
-                            line.append('?')
-
-                print ', '.join(line)
-            print
-
-
-if __name__ == '__main__':
-    main()



More information about the checkins mailing list