[Zope3-checkins] CVS: Zope3/src/zope/index/text/tests - __init__.py:1.1 hs-tool.py:1.1 indexhtml.py:1.1 mailtest.py:1.1 mhindex.py:1.1 queryhtml.py:1.1 test_index.py:1.1 test_lexicon.py:1.1 test_nbest.py:1.1 test_pipelinefactory.py:1.1 test_queryengine.py:1.1 test_queryparser.py:1.1 test_setops.py:1.1 test_textindexwrapper.py:1.1 wordstats.py:1.1

Anthony Baxter anthony@interlink.com.au
Sun, 13 Jul 2003 23:53:59 -0400


Update of /cvs-repository/Zope3/src/zope/index/text/tests
In directory cvs.zope.org:/tmp/cvs-serv12584/src/zope/index/text/tests

Added Files:
	__init__.py hs-tool.py indexhtml.py mailtest.py mhindex.py 
	queryhtml.py test_index.py test_lexicon.py test_nbest.py 
	test_pipelinefactory.py test_queryengine.py 
	test_queryparser.py test_setops.py test_textindexwrapper.py 
	wordstats.py 
Log Message:
index-geddon, part the second. 

Moved zope.textindex and zope.fieldindex into zope.index.text and
zope.index.field, respectively. Shouldn't need any module aliases 
for the old ones, as people shouldn't be instantiating the classes
in zope.textindex or zope.fieldindex directly - instead, they should
be going via zope.app.index.field.index and zope.app.index.text.index.


=== Added File Zope3/src/zope/index/text/tests/__init__.py ===
#
# This file is necessary to make this directory a package.


=== Added File Zope3/src/zope/index/text/tests/hs-tool.py ===
#! /usr/bin/env python

##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################

import cPickle
import os.path
import sys

from hotshot.log import LogReader

def load_line_info(log):
    byline = {}
    prevloc = None
    for what, place, tdelta in log:
        if tdelta > 0:
            t, nhits = byline.get(prevloc, (0, 0))
            byline[prevloc] = (tdelta + t), (nhits + 1)
            prevloc = place
    return byline

def basename(path, cache={}):
    try:
        return cache[path]
    except KeyError:
        fn = os.path.split(path)[1]
        cache[path] = fn
        return fn

def print_results(results):
    for info, place in results:
        if place is None:
            # This is the startup time for the profiler, and only
            # occurs at the very beginning.  Just ignore it, since it
            # corresponds to frame setup of the outermost call, not
            # anything that's actually interesting.
            continue
        filename, line, funcname = place
        print '%8d %8d' % info, basename(filename), line

def annotate_results(results):
    files = {}
    for stats, place in results:
        if not place:
            continue
        time, hits = stats
        file, line, func = place
        l = files.get(file)
        if l is None:
            l = files[file] = []
        l.append((line, hits, time))
    order = files.keys()
    order.sort()
    for k in order:
        if os.path.exists(k):
            v = files[k]
            v.sort()
            annotate(k, v)

def annotate(file, lines):
    print "-" * 60
    print file
    print "-" * 60
    f = open(file)
    i = 1
    match = lines[0][0]
    for line in f:
        if match == i:
            print "%6d %8d " % lines[0][1:], line,
            del lines[0]
            if lines:
                match = lines[0][0]
            else:
                match = None
        else:
            print " " * 16, line,
        i += 1
    print

def get_cache_name(filename):
    d, fn = os.path.split(filename)
    cache_dir = os.path.join(d, '.hs-tool')
    cache_file = os.path.join(cache_dir, fn)
    return cache_dir, cache_file

def cache_results(filename, results):
    cache_dir, cache_file = get_cache_name(filename)
    if not os.path.exists(cache_dir):
        os.mkdir(cache_dir)
    fp = open(cache_file, 'wb')
    try:
        cPickle.dump(results, fp, 1)
    finally:
        fp.close()

def main(filename, annotate):
    cache_dir, cache_file = get_cache_name(filename)

    if (  os.path.isfile(cache_file)
          and os.path.getmtime(cache_file) > os.path.getmtime(filename)):
        # cached data is up-to-date:
        fp = open(cache_file, 'rb')
        results = cPickle.load(fp)
        fp.close()
    else:
        log = LogReader(filename)
        byline = load_line_info(log)
        # Sort
        results = [(v, k) for k, v in byline.items()]
        results.sort()
        cache_results(filename, results)

    if annotate:
        annotate_results(results)
    else:
        print_results(results)


if __name__ == "__main__":
    import getopt

    annotate_p = 0
    opts, args = getopt.getopt(sys.argv[1:], 'A')
    for o, v in opts:
        if o == '-A':
            annotate_p = 1
    if args:
        filename, = args
    else:
        filename = "profile.dat"

    main(filename, annotate_p)


=== Added File Zope3/src/zope/index/text/tests/indexhtml.py ===
#! /usr/bin/env python
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Index a collection of HTML files on the filesystem.

usage: indexhtml.py [options] dir

Will create an index of all files in dir or its subdirectories.

options:
-f data.fs  -- the path to the filestorage datafile
"""
from __future__ import nested_scopes

import os
from time import clock

from zodb.storage.file import FileStorage
from zodb.btrees.IOBTree import IOBTree

from zope.index.text.textindexwrapper import TextIndexWrapper
from zope.index.text.htmlsplitter import HTMLWordSplitter
from zope.index.text.lexicon import Lexicon, StopWordRemover

def make_zc_index():
    # there's an elaborate dance necessary to construct an index
    class Struct:
        pass
    extra = Struct()
    extra.doc_attr = "read"
    extra.lexicon_id = "lexicon"
    caller = Struct()
    caller.lexicon = Lexicon(HTMLWordSplitter(), StopWordRemover())
    return ZCTextIndex("read", extra, caller)

# XXX make a splitter more like the HTMLSplitter for TextIndex
# signature is
# Splitter(string, stop_words, encoding,
#          singlechar, indexnumbers, casefolding)

class MySplitter:
    def __init__(self):
        self._v_splitter = HTMLWordSplitter()
    def __call__(self, text, stopdict, *args, **kwargs):
        words = self._v_splitter._split(text)
        def lookup(w):
            return stopdict.get(w, w)
        return filter(None, map(lookup, words))

def make_old_index():
    from Products.PluginIndexes.TextIndex.TextIndex import TextIndex
    from Products.PluginIndexes.TextIndex.Lexicon  import Lexicon
    from zope.index.text.stopdict import get_stopdict

    l = Lexicon(get_stopdict())
    l.SplitterFunc = MySplitter()
    return TextIndex("read", lexicon=l)

def main(db, root, dir):
    rt["index"] = index = INDEX()
    rt["files"] = paths = IOBTree()
    get_transaction().commit()

    zodb_time = 0.0
    pack_time = 0.0

    files = [os.path.join(dir, file) for file in os.listdir(dir)]
    docid = 0
    t0 = clock()
    for file in files:
        if os.path.isdir(file):
            files += [os.path.join(file, sub) for sub in os.listdir(file)]
        else:
            if not file.endswith(".html"):
                continue
            docid += 1
            if LIMIT is not None and docid > LIMIT:
                break
            if VERBOSE:
                print "%5d" % docid, file
            f = open(file, "rb")
            paths[docid] = file
            index.index_object(docid, f)
            f.close()
            if docid % TXN_INTERVAL == 0:
                z0 = clock()
                get_transaction().commit()
                z1 = clock()
                zodb_time += z1 - z0
                if VERBOSE:
                    print "commit took", z1 - z0, zodb_time
            if docid % PACK_INTERVAL == 0:
                p0 = clock()
                db.pack()
                p1 = clock()
                zodb_time += p1 - p0
                pack_time += p1 - p0
                if VERBOSE:
                    print "pack took", p1 - p0, pack_time
    z0 = clock()
    get_transaction().commit()
    z1 = t1 = clock()
    total_time = t1 - t0
    zodb_time += z1 - z0
    if VERBOSE:
        print "Total index time", total_time
        print "Non-pack time", total_time - pack_time
        print "Non-ZODB time", total_time - zodb_time

if __name__ == "__main__":
    import sys
    import getopt

    VERBOSE = 0
    FSPATH = "Data.fs"
    TXN_INTERVAL = 100
    PACK_INTERVAL = 500
    LIMIT = None
    INDEX = make_zc_index
    try:
        opts, args = getopt.getopt(sys.argv[1:], 'vf:t:p:n:T')
    except getopt.error, msg:
        print msg
        print __doc__
        sys.exit(2)

    for o, v in opts:
        if o == '-v':
            VERBOSE += 1
        if o == '-f':
            FSPATH = v
        if o == '-t':
            TXN_INTERVAL = int(v)
        if o == '-p':
            PACK_INTERVAL = int(v)
        if o == '-n':
            LIMIT = int(v)
        if o == '-T':
            INDEX = make_old_index

    if len(args) != 1:
        print "Expected on argument"
        print __doc__
        sys.exit(2)
    dir = args[0]

    fs = FileStorage(FSPATH)
    db = ZODB.DB(fs)
    cn = db.open()
    rt = cn.root()
    dir = os.path.join(os.getcwd(), dir)
    print dir
    main(db, rt, dir)
    cn.close()
    fs.close()


=== Added File Zope3/src/zope/index/text/tests/mailtest.py ===
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Test an index with a Unix mailbox file.

usage: python mailtest.py [options] <data.fs>

options:
    -v     -- verbose

    Index Generation
    -i mailbox
    -n NNN -- max number of messages to read from mailbox
    -t NNN -- commit a transaction every NNN messages (default: 1)
    -p NNN -- pack <data.fs> every NNN messages (default: 500), and at end
    -p 0   -- don't pack at all
    -x     -- exclude the message text from the data.fs

    Queries
    -q query
    -b NNN -- return the NNN best matches (default: 10)
    -c NNN -- context; if -v, show the first NNN lines of results (default: 5)

The script either indexes or queries depending on whether -q or -i is
passed as an option.

For -i mailbox, the script reads mail messages from the mailbox and
indexes them.  It indexes one message at a time, then commits the
transaction.

For -q query, it performs a query on an existing index.

If both are specified, the index is performed first.

You can also interact with the index after it is completed. Load the
index from the database:

    import zodb
    from zodb.storage.file import FileStorage
    fs = FileStorage(<data.fs>
    db = ZODB.DB(fs)
    index = cn.open().root()["index"]
    index.search("python AND unicode")
"""

from zope.index.text.lexicon import \
     Lexicon, CaseNormalizer, Splitter, StopWordRemover

# XXX This import is bad, and was so before the renaming
from zope.index.text.zctextindex import ZCTextIndex

from BTrees.IOBTree import IOBTree
from zope.index.text.queryparser import QueryParser

import sys
import mailbox
import time

def usage(msg):
    print msg
    print __doc__
    sys.exit(2)

class Message:

    total_bytes = 0

    def __init__(self, msg):
        subject = msg.getheader('subject', '')
        author = msg.getheader('from', '')
        if author:
            summary = "%s (%s)\n" % (subject, author)
        else:
            summary = "%s\n" % subject
        self.text = summary + msg.fp.read()
        Message.total_bytes += len(self.text)

class Extra:
    pass

def index(rt, mboxfile, db, profiler):
    global NUM
    idx_time = 0
    pack_time = 0
    start_time = time.time()

    lexicon = Lexicon(Splitter(), CaseNormalizer(), StopWordRemover())
    extra = Extra()
    extra.lexicon_id = 'lexicon'
    extra.doc_attr = 'text'
    extra.index_type = 'Okapi BM25 Rank'
    caller = Extra()
    caller.lexicon = lexicon
    rt["index"] = idx = ZCTextIndex("index", extra, caller)
    if not EXCLUDE_TEXT:
        rt["documents"] = docs = IOBTree()
    else:
        docs = None
    get_transaction().commit()

    mbox = mailbox.UnixMailbox(open(mboxfile, 'rb'))
    if VERBOSE:
        print "opened", mboxfile
    if not NUM:
        NUM = sys.maxint

    if profiler:
        itime, ptime, i = profiler.runcall(indexmbox, mbox, idx, docs, db)
    else:
        itime, ptime, i = indexmbox(mbox, idx, docs, db)
    idx_time += itime
    pack_time += ptime

    get_transaction().commit()

    if PACK_INTERVAL and i % PACK_INTERVAL != 0:
        if VERBOSE >= 2:
            print "packing one last time..."
        p0 = time.clock()
        db.pack(time.time())
        p1 = time.clock()
        if VERBOSE:
            print "pack took %s sec" % (p1 - p0)
        pack_time += p1 - p0

    if VERBOSE:
        finish_time = time.time()
        print
        print "Index time", round(idx_time / 60, 3), "minutes"
        print "Pack time", round(pack_time / 60, 3), "minutes"
        print "Index bytes", Message.total_bytes
        rate = (Message.total_bytes / idx_time) / 1024
        print "Index rate %.2f KB/sec" % rate
        print "Indexing began", time.ctime(start_time)
        print "Indexing ended", time.ctime(finish_time)
        print "Wall clock minutes", round((finish_time - start_time)/60, 3)

def indexmbox(mbox, idx, docs, db):
    idx_time = 0
    pack_time = 0
    i = 0
    while i < NUM:
        _msg = mbox.next()
        if _msg is None:
            break
        i += 1
        msg = Message(_msg)
        if VERBOSE >= 2:
            print "indexing msg", i
        i0 = time.clock()
        idx.index_object(i, msg)
        if not EXCLUDE_TEXT:
            docs[i] = msg
        if i % TXN_SIZE == 0:
            get_transaction().commit()
        i1 = time.clock()
        idx_time += i1 - i0
        if VERBOSE and i % 50 == 0:
            print i, "messages indexed"
            print "cache size", db.cacheSize()
        if PACK_INTERVAL and i % PACK_INTERVAL == 0:
            if VERBOSE >= 2:
                print "packing..."
            p0 = time.clock()
            db.pack(time.time())
            p1 = time.clock()
            if VERBOSE:
                print "pack took %s sec" % (p1 - p0)
            pack_time += p1 - p0
    return idx_time, pack_time, i


def query(rt, query_str, profiler):
    idx = rt["index"]
    docs = rt["documents"]

    start = time.clock()
    if profiler is None:
        results, num_results = idx.query(query_str, BEST)
    else:
        if WARM_CACHE:
            print "Warming the cache..."
            idx.query(query_str, BEST)
        start = time.clock()
        results, num_results = profiler.runcall(idx.query, query_str, BEST)
    elapsed = time.clock() - start

    print "query:", query_str
    print "# results:", len(results), "of", num_results, \
          "in %.2f ms" % (elapsed * 1000)

    tree = QueryParser(idx.lexicon).parseQuery(query_str)
    qw = idx.index.query_weight(tree.terms())

    for docid, score in results:
        scaled = 100.0 * score / qw
        print "docid %7d score %6d scaled %5.2f%%" % (docid, score, scaled)
        if VERBOSE:
            msg = docs[docid]
            ctx = msg.text.split("\n", CONTEXT)
            del ctx[-1]
            print "-" * 60
            print "message:"
            for l in ctx:
                print l
            print "-" * 60


def main(fs_path, mbox_path, query_str, profiler):
    f = ZODB.FileStorage.FileStorage(fs_path)
    db = ZODB.DB(f, cache_size=CACHE_SIZE)
    cn = db.open()
    rt = cn.root()

    if mbox_path is not None:
        index(rt, mbox_path, db, profiler)
    if query_str is not None:
        query(rt, query_str, profiler)

    cn.close()
    db.close()
    f.close()

if __name__ == "__main__":
    import getopt

    NUM = 0
    VERBOSE = 0
    PACK_INTERVAL = 500
    EXCLUDE_TEXT = 0
    CACHE_SIZE = 10000
    TXN_SIZE = 1
    BEST = 10
    CONTEXT = 5
    WARM_CACHE = 0
    query_str = None
    mbox_path = None
    profile = None
    old_profile = None
    try:
        opts, args = getopt.getopt(sys.argv[1:], 'vn:p:i:q:b:c:xt:w',
                                   ['profile=', 'old-profile='])
    except getopt.error, msg:
        usage(msg)
    if len(args) != 1:
        usage("exactly 1 filename argument required")
    for o, v in opts:
        if o == '-n':
            NUM = int(v)
        elif o == '-v':
            VERBOSE += 1
        elif o == '-p':
            PACK_INTERVAL = int(v)
        elif o == '-q':
            query_str = v
        elif o == '-i':
            mbox_path = v
        elif o == '-b':
            BEST = int(v)
        elif o == '-x':
            EXCLUDE_TEXT = 1
        elif o == '-t':
            TXN_SIZE = int(v)
        elif o == '-c':
            CONTEXT = int(v)
        elif o == '-w':
            WARM_CACHE = 1
        elif o == '--profile':
            profile = v
        elif o == '--old-profile':
            old_profile = v
    fs_path, = args

    if profile:
        import hotshot
        profiler = hotshot.Profile(profile, lineevents=1, linetimings=1)
    elif old_profile:
        import profile
        profiler = profile.Profile()
    else:
        profiler = None

    main(fs_path, mbox_path, query_str, profiler)

    if profile:
        profiler.close()
    elif old_profile:
        import pstats
        profiler.dump_stats(old_profile)
        stats = pstats.Stats(old_profile)
        stats.strip_dirs().sort_stats('time').print_stats(20)


=== Added File Zope3/src/zope/index/text/tests/mhindex.py ===
#! /usr/bin/env python2.2

##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""MH mail indexer.

To index messages from a single folder (messages defaults to 'all'):
  mhindex.py [options] -u +folder [messages ...]

To bulk index all messages from several folders:
  mhindex.py [options] -b folder ...; the folder name ALL means all folders.

To execute a single query:
  mhindex.py [options] query

To enter interactive query mode:
  mhindex.py [options]

Common options:
  -d FILE -- specify the Data.fs to use (default ~/.Data.fs)
  -w -- dump the word list in alphabetical order and exit
  -W -- dump the word list ordered by word id and exit

Indexing options:
  -O -- do a prescan on the data to compute optimal word id assignments;
        this is only useful the first time the Data.fs is used
  -t N -- commit a transaction after every N messages (default 20000)
  -p N -- pack after every N commits (by default no packing is done)

Querying options:
  -m N -- show at most N matching lines from the message (default 3)
  -n N -- show the N best matching messages (default 3)
"""

import os
import re
import sys
import time
import mhlib
import getopt
import traceback
from StringIO import StringIO
from stat import ST_MTIME

DATAFS = "~/.mhindex.fs"
ZOPECODE = "~/projects/Zope3/lib/python"

zopecode = os.path.expanduser(ZOPECODE)
sys.path.insert(0, zopecode)

from zodb.db import DB
from zodb.storage.file import FileStorage
from transaction import get_transaction
from zodb.btrees.IOBTree import IOBTree
from zodb.btrees.OIBTree import OIBTree
from zodb.btrees.IIBTree import IIBTree

from zope.index.text.okapiindex import OkapiIndex
from zope.index.text.lexicon import Splitter
from zope.index.text.lexicon import CaseNormalizer, StopWordRemover
from zope.index.text.stopdict import get_stopdict
from zope.index.text.textindexwrapper import TextIndexWrapper

NBEST = 3
MAXLINES = 3

def main():
    try:
        opts, args = getopt.getopt(sys.argv[1:], "bd:fhm:n:Op:t:uwW")
    except getopt.error, msg:
        print msg
        print "use -h for help"
        return 2
    update = 0
    bulk = 0
    optimize = 0
    nbest = NBEST
    maxlines = MAXLINES
    datafs = os.path.expanduser(DATAFS)
    pack = 0
    trans = 20000
    dumpwords = dumpwids = dumpfreqs = 0
    for o, a in opts:
        if o == "-b":
            bulk = 1
        if o == "-d":
            datafs = a
        if o == "-f":
            dumpfreqs = 1
        if o == "-h":
            print __doc__
            return
        if o == "-m":
            maxlines = int(a)
        if o == "-n":
            nbest = int(a)
        if o == "-O":
            optimize = 1
        if o == "-p":
            pack = int(a)
        if o == "-t":
            trans = int(a)
        if o == "-u":
            update = 1
        if o == "-w":
            dumpwords = 1
        if o == "-W":
            dumpwids = 1
    ix = Indexer(datafs, writable=update or bulk, trans=trans, pack=pack)
    if dumpfreqs:
        ix.dumpfreqs()
    if dumpwords:
        ix.dumpwords()
    if dumpwids:
        ix.dumpwids()
    if dumpwords or dumpwids or dumpfreqs:
        return
    if bulk:
        if optimize:
            ix.optimize(args)
        ix.bulkupdate(args)
    elif update:
        ix.update(args)
    elif args:
        for i in range(len(args)):
            a = args[i]
            if " " in a:
                if a[0] == "-":
                    args[i] = '-"' + a[1:] + '"'
                else:
                    args[i] = '"' + a + '"'
        ix.query(" ".join(args), nbest, maxlines)
    else:
        ix.interact(nbest)
    if pack:
        ix.pack()

class Indexer:

    filestorage = database = connection = root = None

    def __init__(self, datafs, writable=0, trans=0, pack=0):
        self.trans_limit = trans
        self.pack_limit = pack
        self.trans_count = 0
        self.pack_count = 0
        self.stopdict = get_stopdict()
        self.mh = mhlib.MH()
        self.filestorage = FileStorage(datafs, read_only=(not writable))
        self.database = DB(self.filestorage)
        self.connection = self.database.open()
        self.root = self.connection.root()
        try:
            self.index = self.root["index"]
        except KeyError:
            self.index = self.root["index"] = TextIndexWrapper()
        try:
            self.docpaths = self.root["docpaths"]
        except KeyError:
            self.docpaths = self.root["docpaths"] = IOBTree()
        try:
            self.doctimes = self.root["doctimes"]
        except KeyError:
            self.doctimes = self.root["doctimes"] = IIBTree()
        try:
            self.watchfolders = self.root["watchfolders"]
        except KeyError:
            self.watchfolders = self.root["watchfolders"] = {}
        self.path2docid = OIBTree()
        for docid in self.docpaths.keys():
            path = self.docpaths[docid]
            self.path2docid[path] = docid
        try:
            self.maxdocid = max(self.docpaths.keys())
        except ValueError:
            self.maxdocid = 0
        print len(self.docpaths), "Document ids"
        print len(self.path2docid), "Pathnames"
        print self.index.lexicon.length(), "Words"

    def dumpfreqs(self):
        lexicon = self.index.lexicon
        index = self.index.index
        assert isinstance(index, OkapiIndex)
        L = []
        for wid in lexicon.wids():
            freq = 0
            for f in index._wordinfo.get(wid, {}).values():
                freq += f
            L.append((freq, wid, lexicon.get_word(wid)))
        L.sort()
        L.reverse()
        for freq, wid, word in L:
            print "%10d %10d %s" % (wid, freq, word)

    def dumpwids(self):
        lexicon = self.index.lexicon
        index = self.index.index
        assert isinstance(index, OkapiIndex)
        for wid in lexicon.wids():
            freq = 0
            for f in index._wordinfo.get(wid, {}).values():
                freq += f
            print "%10d %10d %s" % (wid, freq, lexicon.get_word(wid))

    def dumpwords(self):
        lexicon = self.index.lexicon
        index = self.index.index
        assert isinstance(index, OkapiIndex)
        for word in lexicon.words():
            wid = lexicon.get_wid(word)
            freq = 0
            for f in index._wordinfo.get(wid, {}).values():
                freq += f
            print "%10d %10d %s" % (wid, freq, word)

    def close(self):
        self.root = None
        if self.connection is not None:
            self.connection.close()
            self.connection = None
        if self.database is not None:
            self.database.close()
            self.database = None
        if self.filestorage is not None:
            self.filestorage.close()
            self.filestorage = None

    def interact(self, nbest=NBEST, maxlines=MAXLINES):
        try:
            import readline
        except ImportError:
            pass
        text = ""
        top = 0
        results = []
        while 1:
            try:
                line = raw_input("Query: ")
            except EOFError:
                print "\nBye."
                break
            line = line.strip()
            if line.startswith("/"):
                self.specialcommand(line, results, top - nbest)
                continue
            if line:
                text = line
                top = 0
            else:
                if not text:
                    continue
            try:
                results, n = self.timequery(text, top + nbest)
            except KeyboardInterrupt:
                raise
            except:
                reportexc()
                text = ""
                continue
            if len(results) <= top:
                if not n:
                    print "No hits for %r." % text
                else:
                    print "No more hits for %r." % text
                text = ""
                continue
            print "[Results %d-%d from %d" % (top+1, min(n, top+nbest), n),
            print "for query %s]" % repr(text)
            self.formatresults(text, results, maxlines, top, top+nbest)
            top += nbest

    def specialcommand(self, line, results, first):
        assert line.startswith("/")
        line = line[1:]
        if not line:
            n = first
        else:
            try:
                n = int(line) - 1
            except:
                print "Huh?"
                return
        if n < 0 or n >= len(results):
            print "Out of range"
            return
        docid, score = results[n]
        path = self.docpaths[docid]
        i = path.rfind("/")
        assert i > 0
        folder = path[:i]
        n = path[i+1:]
        cmd = "show +%s %s" % (folder, n)
        if os.getenv("DISPLAY"):
            os.system("xterm -e  sh -c '%s | less' &" % cmd)
        else:
            os.system(cmd)

    def query(self, text, nbest=NBEST, maxlines=MAXLINES):
        results, n = self.timequery(text, nbest)
        if not n:
            print "No hits for %r." % text
            return
        print "[Results 1-%d from %d]" % (len(results), n)
        self.formatresults(text, results, maxlines)

    def timequery(self, text, nbest):
        t0 = time.time()
        c0 = time.clock()
        results, n = self.index.query(text, 0, nbest)
        t1 = time.time()
        c1 = time.clock()
        print "[Query time: %.3f real, %.3f user]" % (t1-t0, c1-c0)
        return results, n

    def formatresults(self, text, results, maxlines=MAXLINES,
                      lo=0, hi=sys.maxint):
        stop = self.stopdict.has_key
        words = [w for w in re.findall(r"\w+\*?", text.lower()) if not stop(w)]
        pattern = r"\b(" + "|".join(words) + r")\b"
        pattern = pattern.replace("*", ".*") # glob -> re syntax
        prog = re.compile(pattern, re.IGNORECASE)
        print '='*70
        rank = lo
        for docid, score in results[lo:hi]:
            rank += 1
            path = self.docpaths[docid]
            score *= 100.0
            print "Rank:    %d   Score: %d%%   File: %s" % (rank, score, path)
            path = os.path.join(self.mh.getpath(), path)
            try:
                fp = open(path)
            except (IOError, OSError), msg:
                print "Can't open:", msg
                continue
            msg = mhlib.Message("<folder>", 0, fp)
            for header in "From", "To", "Cc", "Bcc", "Subject", "Date":
                h = msg.getheader(header)
                if h:
                    print "%-8s %s" % (header+":", h)
            text = self.getmessagetext(msg)
            if text:
                print
                nleft = maxlines
                for part in text:
                    for line in part.splitlines():
                        if prog.search(line):
                            print line
                            nleft -= 1
                            if nleft <= 0:
                                break
                    if nleft <= 0:
                        break
            print '-'*70

    def update(self, args):
        folder = None
        seqs = []

        for arg in args:
            if arg.startswith("+"):
                if folder is None:
                    folder = arg[1:]
                else:
                    print "only one folder at a time"
                    return
            else:
                seqs.append(arg)

        if not folder:
            folder = self.mh.getcontext()
        if not seqs:
            seqs = ['all']

        try:
            f = self.mh.openfolder(folder)
        except mhlib.Error, msg:
            print msg
            return

        dict = {}
        for seq in seqs:
            try:
                nums = f.parsesequence(seq)
            except mhlib.Error, msg:
                print msg or "unparsable message sequence: %s" % `seq`
                return
            for n in nums:
                dict[n] = n
        msgs = dict.keys()
        msgs.sort()

        self.updatefolder(f, msgs)
        self.commit()

    def optimize(self, args):
        uniqwords = {}
        for folder in args:
            if folder.startswith("+"):
                folder = folder[1:]
            print "\nOPTIMIZE FOLDER", folder
            try:
                f = self.mh.openfolder(folder)
            except mhlib.Error, msg:
                print msg
                continue
            self.prescan(f, f.listmessages(), uniqwords)
        L = [(uniqwords[word], word) for word in uniqwords.keys()]
        L.sort()
        L.reverse()
        for i in range(100):
            print "%3d. %6d %s" % ((i+1,) + L[i])
        self.index.lexicon.sourceToWordIds([word for (count, word) in L])

    def prescan(self, f, msgs, uniqwords):
        pipeline = [Splitter(), CaseNormalizer(), StopWordRemover()]
        for n in msgs:
            print "prescanning", n
            m = f.openmessage(n)
            text = self.getmessagetext(m, f.name)
            for p in pipeline:
                text = p.process(text)
            for word in text:
                uniqwords[word] = uniqwords.get(word, 0) + 1

    def bulkupdate(self, args):
        if not args:
            print "No folders specified; use ALL to bulk-index all folders"
            return
        if "ALL" in args:
            i = args.index("ALL")
            args[i:i+1] = self.mh.listfolders()
        for folder in args:
            if folder.startswith("+"):
                folder = folder[1:]
            print "\nFOLDER", folder
            try:
                f = self.mh.openfolder(folder)
            except mhlib.Error, msg:
                print msg
                continue
            self.updatefolder(f, f.listmessages())
            print "Total", len(self.docpaths)
        self.commit()
        print "Indexed", self.index.lexicon._nbytes, "bytes and",
        print self.index.lexicon._nwords, "words;",
        print len(self.index.lexicon._words), "unique words."

    def updatefolder(self, f, msgs):
        self.watchfolders[f.name] = self.getmtime(f.name)
        for n in msgs:
            path = "%s/%s" % (f.name, n)
            docid = self.path2docid.get(path, 0)
            if docid and self.getmtime(path) == self.doctimes.get(docid, 0):
                print "unchanged", docid, path
                continue
            docid = self.newdocid(path)
            try:
                m = f.openmessage(n)
            except IOError:
                print "disappeared", docid, path
                self.unindexpath(path)
                continue
            text = self.getmessagetext(m, f.name)
            if not text:
                self.unindexpath(path)
                continue
            print "indexing", docid, path
            self.index.index_doc(docid, text)
            self.maycommit()
        # Remove messages from the folder that no longer exist
        for path in list(self.path2docid.keys(f.name)):
            if not path.startswith(f.name + "/"):
                break
            if self.getmtime(path) == 0:
                self.unindexpath(path)
        print "done."

    def unindexpath(self, path):
        if self.path2docid.has_key(path):
            docid = self.path2docid[path]
            print "unindexing", docid, path
            del self.docpaths[docid]
            del self.doctimes[docid]
            del self.path2docid[path]
            try:
                self.index.unindex_doc(docid)
            except KeyError, msg:
                print "KeyError", msg
            self.maycommit()

    def getmessagetext(self, m, name=None):
        L = []
        if name:
            L.append("_folder " + name) # To restrict search to a folder
            self.getheaders(m, L)
        try:
            self.getmsgparts(m, L, 0)
        except KeyboardInterrupt:
            raise
        except:
            print "(getmsgparts failed:)"
            reportexc()
        return L

    def getmsgparts(self, m, L, level):
        ctype = m.gettype()
        if level or ctype != "text/plain":
            print ". "*level + str(ctype)
        if ctype == "text/plain":
            L.append(m.getbodytext())
        elif ctype in ("multipart/alternative", "multipart/mixed"):
            for part in m.getbodyparts():
                self.getmsgparts(part, L, level+1)
        elif ctype == "message/rfc822":
            f = StringIO(m.getbodytext())
            m = mhlib.Message("<folder>", 0, f)
            self.getheaders(m, L)
            self.getmsgparts(m, L, level+1)

    def getheaders(self, m, L):
        H = []
        for key in "from", "to", "cc", "bcc", "subject":
            value = m.get(key)
            if value:
                H.append(value)
        if H:
            L.append("\n".join(H))

    def newdocid(self, path):
        docid = self.path2docid.get(path)
        if docid is not None:
            self.doctimes[docid] = self.getmtime(path)
            return docid
        docid = self.maxdocid + 1
        self.maxdocid = docid
        self.docpaths[docid] = path
        self.doctimes[docid] = self.getmtime(path)
        self.path2docid[path] = docid
        return docid

    def getmtime(self, path):
        path = os.path.join(self.mh.getpath(), path)
        try:
            st = os.stat(path)
        except os.error, msg:
            return 0
        return int(st[ST_MTIME])

    def maycommit(self):
        self.trans_count += 1
        if self.trans_count >= self.trans_limit > 0:
            self.commit()

    def commit(self):
        if self.trans_count > 0:
            print "committing..."
            get_transaction().commit()
            self.trans_count = 0
            self.pack_count += 1
            if self.pack_count >= self.pack_limit > 0:
                self.pack()

    def pack(self):
        if self.pack_count > 0:
            print "packing..."
            self.database.pack()
            self.pack_count = 0

def reportexc():
    traceback.print_exc()

if __name__ == "__main__":
    sys.exit(main())


=== Added File Zope3/src/zope/index/text/tests/queryhtml.py ===
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
from time import clock

from zodb.storage.file import FileStorage

QUERIES = ["nested recursive functions",
           "explicit better than implicit",
           "build hpux",
           "cannot create 'method-wrapper' instances",
            "extension module C++",
           "class method",
           "instance variable",
           "articulate information",
           "import default files",
           "gopher ftp http",
           "documentation",
           ]

def path2url(p):
    # convert the paths to a python.org URL
    # hack: only works for the way Jeremy indexed his copy of python.org
    marker = "www.python.org/."
    i = p.find(marker)
    if i == -1:
        return p
    i += len(marker)
    return "http://www.python.org" + p[i:]

from Products.PluginIndexes.TextIndex.TextIndex import And, Or
from zope.index.text.tests.indexhtml import MySplitter
from zope.index.text.nbest import NBest

def main(rt):
    index = rt["index"]
    files = rt["files"]
    times = {}
    ITERS = range(50)
    for i in range(11):
        for q in QUERIES:
            terms = q.split()
            for c in " OR ", " AND ":
                query = c.join(terms)
                t0 = clock()
                if TEXTINDEX:
                    if c == " OR ":
                        op = Or
                    else:
                        op = And
                    _q = " ".join(terms)
                    for _ in ITERS:
                        b = index.query(_q, op).bucket()
                        num = len(b)
                        chooser = NBest(10)
                        chooser.addmany(b.items())
                        results = chooser.getbest()

                else:
                    try:
                        for _ in ITERS:
                            results, num = index.query(query)
                    except:
                        continue
                t1 = clock()
                print "<p>Query: \"%s\"" % query
                print "<br>Num results: %d" % num
                print "<br>time.clock(): %s" % (t1 - t0)
                key = query
                if i == 0:
                    print "<ol>"
                    for docid, score in results:
                        url = path2url(files[docid])
                        fmt = '<li><a href="%s">%s</A> score = %s'
                        print fmt % (url, url, score)
                    print "</ol>"
                    continue
                l = times.setdefault(key, [])
                l.append(t1 - t0)

    l = times.keys()
    l.sort()
    print "<hr>"
    for k in l:
        v = times[k]
        print "<p>Query: \"%s\"" % k
        print "<br>Min time: %s" % min(v)
        print "<br>All times: %s" % " ".join(map(str, v))

if __name__ == "__main__":
    import sys
    import getopt

    VERBOSE = 0
    FSPATH = "Data.fs"
    TEXTINDEX = 0

    try:
        opts, args = getopt.getopt(sys.argv[1:], 'vf:T')
    except getopt.error, msg:
        print msg
        print __doc__
        sys.exit(2)

    for o, v in opts:
        if o == '-v':
            VERBOSE += 1
        if o == '-f':
            FSPATH = v
        if o == '-T':
            TEXTINDEX = 1

    fs = FileStorage(FSPATH, read_only=1)
    db = ZODB.DB(fs, cache_size=10000)
    cn = db.open()
    rt = cn.root()
    main(rt)


=== Added File Zope3/src/zope/index/text/tests/test_index.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################

from unittest import TestCase, TestSuite, main, makeSuite

from zope.index.text.lexicon import Lexicon, Splitter
from zope.index.text.cosineindex import CosineIndex
from zope.index.text.okapiindex import OkapiIndex

# Subclasses must set a class variable IndexFactory to the appropriate
# index object constructor.

class IndexTest(TestCase):

    def setUp(self):
        self.lexicon = Lexicon(Splitter())
        self.index = self.IndexFactory(self.lexicon)

    def test_index_document(self, DOCID=1):
        doc = "simple document contains five words"
        self.assert_(not self.index.has_doc(DOCID))
        self.index.index_doc(DOCID, doc)
        self.assertEqual(self.index.documentCount(), 1)
        self.assertEqual(self.index.wordCount(), 5)
        self.assertEqual(self.lexicon.wordCount(), 5)
        self.assert_(self.index.has_doc(DOCID))
        self.assert_(self.index._docweight[DOCID])
        self.assertEqual(len(self.index._docweight), 1)
        self.assertEqual(len(self.index._wordinfo), 5)
        self.assertEqual(len(self.index._docwords), 1)
        self.assertEqual(len(self.index.get_words(DOCID)), 5)
        self.assertEqual(len(self.index._wordinfo),
                         self.index.wordCount())
        for map in self.index._wordinfo.values():
            self.assertEqual(len(map), 1)
            self.assert_(map.has_key(DOCID))

    def test_unindex_document(self):
        DOCID = 1
        self.test_index_document(DOCID)
        self.index.unindex_doc(DOCID)
        self.assertEqual(len(self.index._docweight), 0)
        self.assertEqual(len(self.index._wordinfo), 0)
        self.assertEqual(len(self.index._docwords), 0)
        self.assertEqual(len(self.index._wordinfo),
                         self.index.wordCount())

    def test_index_two_documents(self):
        self.test_index_document()
        doc = "another document just four"
        DOCID = 2
        self.index.index_doc(DOCID, doc)
        self.assert_(self.index._docweight[DOCID])
        self.assertEqual(len(self.index._docweight), 2)
        self.assertEqual(len(self.index._wordinfo), 8)
        self.assertEqual(len(self.index._docwords), 2)
        self.assertEqual(len(self.index.get_words(DOCID)), 4)
        self.assertEqual(len(self.index._wordinfo),
                         self.index.wordCount())
        wids = self.lexicon.termToWordIds("document")
        self.assertEqual(len(wids), 1)
        document_wid = wids[0]
        for wid, map in self.index._wordinfo.items():
            if wid == document_wid:
                self.assertEqual(len(map), 2)
                self.assert_(map.has_key(1))
                self.assert_(map.has_key(DOCID))
            else:
                self.assertEqual(len(map), 1)

    def test_index_two_unindex_one(self):
        # index two documents, unindex one, and test the results
        self.test_index_two_documents()
        self.index.unindex_doc(1)
        DOCID = 2
        self.assertEqual(len(self.index._docweight), 1)
        self.assert_(self.index._docweight[DOCID])
        self.assertEqual(len(self.index._wordinfo), 4)
        self.assertEqual(len(self.index._docwords), 1)
        self.assertEqual(len(self.index.get_words(DOCID)), 4)
        self.assertEqual(len(self.index._wordinfo),
                         self.index.wordCount())
        for map in self.index._wordinfo.values():
            self.assertEqual(len(map), 1)
            self.assert_(map.has_key(DOCID))

    def test_index_duplicated_words(self, DOCID=1):
        doc = "very simple repeat repeat repeat document test"
        self.index.index_doc(DOCID, doc)
        self.assert_(self.index._docweight[DOCID])
        self.assertEqual(len(self.index._wordinfo), 5)
        self.assertEqual(len(self.index._docwords), 1)
        self.assertEqual(len(self.index.get_words(DOCID)), 7)
        self.assertEqual(len(self.index._wordinfo),
                         self.index.wordCount())
        wids = self.lexicon.termToWordIds("repeat")
        self.assertEqual(len(wids), 1)
        repititive_wid = wids[0]
        for wid, map in self.index._wordinfo.items():
            self.assertEqual(len(map), 1)
            self.assert_(map.has_key(DOCID))

    def test_simple_query_oneresult(self):
        self.index.index_doc(1, 'not the same document')
        results = self.index.search("document")
        self.assertEqual(list(results.keys()), [1])

    def test_simple_query_noresults(self):
        self.index.index_doc(1, 'not the same document')
        results = self.index.search("frobnicate")
        self.assertEqual(list(results.keys()), [])

    def test_query_oneresult(self):
        self.index.index_doc(1, 'not the same document')
        self.index.index_doc(2, 'something about something else')
        results = self.index.search("document")
        self.assertEqual(list(results.keys()), [1])

    def test_search_phrase(self):
        self.index.index_doc(1, "the quick brown fox jumps over the lazy dog")
        self.index.index_doc(2, "the quick fox jumps lazy over the brown dog")
        results = self.index.search_phrase("quick brown fox")
        self.assertEqual(list(results.keys()), [1])

    def test_search_glob(self):
        self.index.index_doc(1, "how now brown cow")
        self.index.index_doc(2, "hough nough browne cough")
        self.index.index_doc(3, "bar brawl")
        results = self.index.search_glob("bro*")
        self.assertEqual(list(results.keys()), [1, 2])
        results = self.index.search_glob("b*")
        self.assertEqual(list(results.keys()), [1, 2, 3])

class CosineIndexTest(IndexTest):
    IndexFactory = CosineIndex

class OkapiIndexTest(IndexTest):
    IndexFactory = OkapiIndex

def test_suite():
    return TestSuite((makeSuite(CosineIndexTest),
                      makeSuite(OkapiIndexTest),
                    ))

if __name__=='__main__':
    main(defaultTest='test_suite')


=== Added File Zope3/src/zope/index/text/tests/test_lexicon.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################

import sys
from unittest import TestCase, main, makeSuite

from zope.index.text.lexicon import Lexicon
from zope.index.text.lexicon import Splitter, CaseNormalizer

class StupidPipelineElement:
    def __init__(self, fromword, toword):
        self.__fromword = fromword
        self.__toword = toword

    def process(self, seq):
        res = []
        for term in seq:
            if term == self.__fromword:
                res.append(self.__toword)
            else:
                res.append(term)
        return res

class WackyReversePipelineElement:
    def __init__(self, revword):
        self.__revword = revword

    def process(self, seq):
        res = []
        for term in seq:
            if term == self.__revword:
                x = list(term)
                x.reverse()
                res.append(''.join(x))
            else:
                res.append(term)
        return res

class StopWordPipelineElement:
    def __init__(self, stopdict={}):
        self.__stopdict = stopdict

    def process(self, seq):
        res = []
        for term in seq:
            if self.__stopdict.get(term):
                continue
            else:
                res.append(term)
        return res


class Test(TestCase):
    def testSourceToWordIds(self):
        lexicon = Lexicon(Splitter())
        wids = lexicon.sourceToWordIds('cats and dogs')
        self.assertEqual(wids, [1, 2, 3])

    def testTermToWordIds(self):
        lexicon = Lexicon(Splitter())
        wids = lexicon.sourceToWordIds('cats and dogs')
        wids = lexicon.termToWordIds('dogs')
        self.assertEqual(wids, [3])

    def testMissingTermToWordIds(self):
        lexicon = Lexicon(Splitter())
        wids = lexicon.sourceToWordIds('cats and dogs')
        wids = lexicon.termToWordIds('boxes')
        self.assertEqual(wids, [0])

    def testOnePipelineElement(self):
        lexicon = Lexicon(Splitter(), StupidPipelineElement('dogs', 'fish'))
        wids = lexicon.sourceToWordIds('cats and dogs')
        wids = lexicon.termToWordIds('fish')
        self.assertEqual(wids, [3])

    def testSplitterAdaptorFold(self):
        lexicon = Lexicon(Splitter(), CaseNormalizer())
        wids = lexicon.sourceToWordIds('CATS and dogs')
        wids = lexicon.termToWordIds('cats and dogs')
        self.assertEqual(wids, [1, 2, 3])

    def testSplitterAdaptorNofold(self):
        lexicon = Lexicon(Splitter())
        wids = lexicon.sourceToWordIds('CATS and dogs')
        wids = lexicon.termToWordIds('cats and dogs')
        self.assertEqual(wids, [0, 2, 3])

    def testTwoElementPipeline(self):
        lexicon = Lexicon(Splitter(),
                          StupidPipelineElement('cats', 'fish'),
                          WackyReversePipelineElement('fish'))
        wids = lexicon.sourceToWordIds('cats and dogs')
        wids = lexicon.termToWordIds('hsif')
        self.assertEqual(wids, [1])

    def testThreeElementPipeline(self):
        lexicon = Lexicon(Splitter(),
                          StopWordPipelineElement({'and':1}),
                          StupidPipelineElement('dogs', 'fish'),
                          WackyReversePipelineElement('fish'))
        wids = lexicon.sourceToWordIds('cats and dogs')
        wids = lexicon.termToWordIds('hsif')
        self.assertEqual(wids, [2])

    def testSplitterLocaleAwareness(self):
        from zope.index.text.htmlsplitter import HTMLWordSplitter
        import locale
        loc = locale.setlocale(locale.LC_ALL) # get current locale
         # set German locale
        try:
            if sys.platform != 'win32':
                locale.setlocale(locale.LC_ALL, 'de_DE.ISO8859-1')
            else:
                locale.setlocale(locale.LC_ALL, 'German_Germany.1252')
        except locale.Error:
            return # This test doesn't work here :-(
        expected = ['m\xfclltonne', 'waschb\xe4r',
                    'beh\xf6rde', '\xfcberflieger']
        words = [" ".join(expected)]
        words = Splitter().process(words)
        self.assertEqual(words, expected)
        words = HTMLWordSplitter().process(words)
        self.assertEqual(words, expected)
        locale.setlocale(locale.LC_ALL, loc) # restore saved locale

def test_suite():
    return makeSuite(Test)

if __name__=='__main__':
    main(defaultTest='test_suite')


=== Added File Zope3/src/zope/index/text/tests/test_nbest.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################

from unittest import TestCase, main, makeSuite

from zope.index.text.nbest import NBest

class NBestTest(TestCase):

    def testConstructor(self):
        self.assertRaises(ValueError, NBest, 0)
        self.assertRaises(ValueError, NBest, -1)

        for n in range(1, 11):
            nb = NBest(n)
            self.assertEqual(len(nb), 0)
            self.assertEqual(nb.capacity(), n)

    def testOne(self):
        nb = NBest(1)
        nb.add('a', 0)
        self.assertEqual(nb.getbest(), [('a', 0)])

        nb.add('b', 1)
        self.assertEqual(len(nb), 1)
        self.assertEqual(nb.capacity(), 1)
        self.assertEqual(nb.getbest(), [('b', 1)])

        nb.add('c', -1)
        self.assertEqual(len(nb), 1)
        self.assertEqual(nb.capacity(), 1)
        self.assertEqual(nb.getbest(), [('b', 1)])

        nb.addmany([('d', 3), ('e', -6), ('f', 5), ('g', 4)])
        self.assertEqual(len(nb), 1)
        self.assertEqual(nb.capacity(), 1)
        self.assertEqual(nb.getbest(), [('f', 5)])

    def testMany(self):
        import random
        inputs = [(-i, i) for i in range(50)]

        reversed_inputs = inputs[:]
        reversed_inputs.reverse()

        # Test the N-best for a variety of n (1, 6, 11, ... 50).
        for n in range(1, len(inputs)+1, 5):
            expected = inputs[-n:]
            expected.reverse()

            random_inputs = inputs[:]
            random.shuffle(random_inputs)

            for source in inputs, reversed_inputs, random_inputs:
                # Try feeding them one at a time.
                nb = NBest(n)
                for item, score in source:
                    nb.add(item, score)
                self.assertEqual(len(nb), n)
                self.assertEqual(nb.capacity(), n)
                self.assertEqual(nb.getbest(), expected)

                # And again in one gulp.
                nb = NBest(n)
                nb.addmany(source)
                self.assertEqual(len(nb), n)
                self.assertEqual(nb.capacity(), n)
                self.assertEqual(nb.getbest(), expected)

                for i in range(1, n+1):
                    self.assertEqual(nb.pop_smallest(), expected[-i])
                self.assertRaises(IndexError, nb.pop_smallest)

    def testAllSameScore(self):
        inputs = [(i, 0) for i in range(10)]
        for n in range(1, 12):
            nb = NBest(n)
            nb.addmany(inputs)
            outputs = nb.getbest()
            self.assertEqual(outputs, inputs[:len(outputs)])

def test_suite():
    return makeSuite(NBestTest)

if __name__=='__main__':
    main(defaultTest='test_suite')


=== Added File Zope3/src/zope/index/text/tests/test_pipelinefactory.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################

from unittest import TestCase, main, makeSuite
from zope.index.interfaces.pipelineelement import IPipelineElement
from zope.index.text.pipelinefactory import PipelineElementFactory
from zope.interface import implements

class NullPipelineElement:
    implements(IPipelineElement)

    def process(source):
        pass

class PipelineFactoryTest(TestCase):

    def setUp(self):
        self.huey = NullPipelineElement()
        self.dooey = NullPipelineElement()
        self.louie = NullPipelineElement()
        self.daffy = NullPipelineElement()

    def testPipeline(self):
        pf = PipelineElementFactory()
        pf.registerFactory('donald', 'huey', self.huey)
        pf.registerFactory('donald', 'dooey',  self.dooey)
        pf.registerFactory('donald', 'louie', self.louie)
        pf.registerFactory('looney', 'daffy', self.daffy)
        self.assertRaises(ValueError, pf.registerFactory,'donald',  'huey',
                          self.huey)
        self.assertEqual(pf.getFactoryGroups(), ['donald', 'looney'])
        self.assertEqual(pf.getFactoryNames('donald'),
                         ['dooey', 'huey', 'louie'])

def test_suite():
    return makeSuite(PipelineFactoryTest)

if __name__=='__main__':
    main(defaultTest='test_suite')


=== Added File Zope3/src/zope/index/text/tests/test_queryengine.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################

import unittest

from zodb.btrees.IIBTree import IIBucket

from zope.index.text.queryparser import QueryParser
from zope.index.text.parsetree import QueryError
from zope.index.text.lexicon import Lexicon, Splitter

class FauxIndex:

    def search(self, term):
        b = IIBucket()
        if term == "foo":
            b[1] = b[3] = 1
        elif term == "bar":
            b[1] = b[2] = 1
        elif term == "ham":
            b[1] = b[2] = b[3] = b[4] = 1
        return b

class TestQueryEngine(unittest.TestCase):

    def setUp(self):
        self.lexicon = Lexicon(Splitter())
        self.parser = QueryParser(self.lexicon)
        self.index = FauxIndex()

    def compareSet(self, set, dict):
        d = {}
        for k, v in set.items():
            d[k] = v
        self.assertEqual(d, dict)

    def compareQuery(self, query, dict):
        tree = self.parser.parseQuery(query)
        set = tree.executeQuery(self.index)
        self.compareSet(set, dict)

    def testExecuteQuery(self):
        self.compareQuery("foo AND bar", {1: 2})
        self.compareQuery("foo OR bar", {1: 2, 2: 1, 3:1})
        self.compareQuery("foo AND NOT bar", {3: 1})
        self.compareQuery("foo AND foo AND foo", {1: 3, 3: 3})
        self.compareQuery("foo OR foo OR foo", {1: 3, 3: 3})
        self.compareQuery("ham AND NOT foo AND NOT bar", {4: 1})
        self.compareQuery("ham OR foo OR bar", {1: 3, 2: 2, 3: 2, 4: 1})
        self.compareQuery("ham AND foo AND bar", {1: 3})

    def testInvalidQuery(self):
        from zope.index.text.parsetree import NotNode, AtomNode
        tree = NotNode(AtomNode("foo"))
        self.assertRaises(QueryError, tree.executeQuery, self.index)

def test_suite():
    return unittest.makeSuite(TestQueryEngine)

if __name__=='__main__':
    unittest.main(defaultTest='test_suite')


=== Added File Zope3/src/zope/index/text/tests/test_queryparser.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################

from unittest import TestCase, TestSuite, main, makeSuite

from zope.interface.verify import verifyClass

from zope.index.interfaces.queryparser import IQueryParser
from zope.index.interfaces.queryparsetree import IQueryParseTree

from zope.index.text.queryparser import QueryParser
from zope.index.text.parsetree import ParseError, ParseTreeNode
from zope.index.text.parsetree import OrNode, AndNode, NotNode
from zope.index.text.parsetree import AtomNode, PhraseNode, GlobNode
from zope.index.text.lexicon import Lexicon, Splitter


class TestInterfaces(TestCase):

    def testInterfaces(self):
        verifyClass(IQueryParser, QueryParser)
        verifyClass(IQueryParseTree, ParseTreeNode)
        verifyClass(IQueryParseTree, OrNode)
        verifyClass(IQueryParseTree, AndNode)
        verifyClass(IQueryParseTree, NotNode)
        verifyClass(IQueryParseTree, AtomNode)
        verifyClass(IQueryParseTree, PhraseNode)
        verifyClass(IQueryParseTree, GlobNode)


class TestQueryParserBase(TestCase):

    def setUp(self):
        self.lexicon = Lexicon(Splitter())
        self.parser = QueryParser(self.lexicon)

    def expect(self, input, output, expected_ignored=[]):
        tree = self.parser.parseQuery(input)
        ignored = self.parser.getIgnored()
        self.compareParseTrees(tree, output)
        self.assertEqual(ignored, expected_ignored)
        # Check that parseQueryEx() == (parseQuery(), getIgnored())
        ex_tree, ex_ignored = self.parser.parseQueryEx(input)
        self.compareParseTrees(ex_tree, tree)
        self.assertEqual(ex_ignored, expected_ignored)

    def failure(self, input):
        self.assertRaises(ParseError, self.parser.parseQuery, input)
        self.assertRaises(ParseError, self.parser.parseQueryEx, input)

    def compareParseTrees(self, got, expected, msg=None):
        if msg is None:
            msg = repr(got)
        self.assertEqual(isinstance(got, ParseTreeNode), 1)
        self.assertEqual(got.__class__, expected.__class__, msg)
        if isinstance(got, PhraseNode):
            self.assertEqual(got.nodeType(), "PHRASE", msg)
            self.assertEqual(got.getValue(), expected.getValue(), msg)
        elif isinstance(got, GlobNode):
            self.assertEqual(got.nodeType(), "GLOB", msg)
            self.assertEqual(got.getValue(), expected.getValue(), msg)
        elif isinstance(got, AtomNode):
            self.assertEqual(got.nodeType(), "ATOM", msg)
            self.assertEqual(got.getValue(), expected.getValue(), msg)
        elif isinstance(got, NotNode):
            self.assertEqual(got.nodeType(), "NOT")
            self.compareParseTrees(got.getValue(), expected.getValue(), msg)
        elif isinstance(got, AndNode) or isinstance(got, OrNode):
            self.assertEqual(got.nodeType(),
                             isinstance(got, AndNode) and "AND" or "OR", msg)
            list1 = got.getValue()
            list2 = expected.getValue()
            self.assertEqual(len(list1), len(list2), msg)
            for i in range(len(list1)):
                self.compareParseTrees(list1[i], list2[i], msg)


class TestQueryParser(TestQueryParserBase):

    def test001(self):
        self.expect("foo", AtomNode("foo"))

    def test002(self):
        self.expect("note", AtomNode("note"))

    def test003(self):
        self.expect("aa and bb AND cc",
                    AndNode([AtomNode("aa"), AtomNode("bb"), AtomNode("cc")]))

    def test004(self):
        self.expect("aa OR bb or cc",
                    OrNode([AtomNode("aa"), AtomNode("bb"), AtomNode("cc")]))

    def test005(self):
        self.expect("aa AND bb OR cc AnD dd",
                    OrNode([AndNode([AtomNode("aa"), AtomNode("bb")]),
                            AndNode([AtomNode("cc"), AtomNode("dd")])]))

    def test006(self):
        self.expect("(aa OR bb) AND (cc OR dd)",
                    AndNode([OrNode([AtomNode("aa"), AtomNode("bb")]),
                             OrNode([AtomNode("cc"), AtomNode("dd")])]))

    def test007(self):
        self.expect("aa AND NOT bb",
                    AndNode([AtomNode("aa"), NotNode(AtomNode("bb"))]))

    def test010(self):
        self.expect('"foo bar"', PhraseNode(["foo", "bar"]))

    def test011(self):
        self.expect("foo bar", AndNode([AtomNode("foo"), AtomNode("bar")]))

    def test012(self):
        self.expect('(("foo bar"))"', PhraseNode(["foo", "bar"]))

    def test013(self):
        self.expect("((foo bar))", AndNode([AtomNode("foo"), AtomNode("bar")]))

    def test014(self):
        self.expect("foo-bar", PhraseNode(["foo", "bar"]))

    def test015(self):
        self.expect("foo -bar", AndNode([AtomNode("foo"),
                                         NotNode(AtomNode("bar"))]))

    def test016(self):
        self.expect("-foo bar", AndNode([AtomNode("bar"),
                                         NotNode(AtomNode("foo"))]))

    def test017(self):
        self.expect("booh -foo-bar",
                    AndNode([AtomNode("booh"),
                             NotNode(PhraseNode(["foo", "bar"]))]))

    def test018(self):
        self.expect('booh -"foo bar"',
                    AndNode([AtomNode("booh"),
                             NotNode(PhraseNode(["foo", "bar"]))]))

    def test019(self):
        self.expect('foo"bar"',
                    AndNode([AtomNode("foo"), AtomNode("bar")]))

    def test020(self):
        self.expect('"foo"bar',
                    AndNode([AtomNode("foo"), AtomNode("bar")]))

    def test021(self):
        self.expect('foo"bar"blech',
                    AndNode([AtomNode("foo"), AtomNode("bar"),
                             AtomNode("blech")]))

    def test022(self):
        self.expect("foo*", GlobNode("foo*"))

    def test023(self):
        self.expect("foo* bar", AndNode([GlobNode("foo*"),
                                         AtomNode("bar")]))

    def test101(self):
        self.failure("")

    def test102(self):
        self.failure("not")

    def test103(self):
        self.failure("or")

    def test104(self):
        self.failure("and")

    def test105(self):
        self.failure("NOT")

    def test106(self):
        self.failure("OR")

    def test107(self):
        self.failure("AND")

    def test108(self):
        self.failure("NOT foo")

    def test109(self):
        self.failure(")")

    def test110(self):
        self.failure("(")

    def test111(self):
        self.failure("foo OR")

    def test112(self):
        self.failure("foo AND")

    def test113(self):
        self.failure("OR foo")

    def test114(self):
        self.failure("AND foo")

    def test115(self):
        self.failure("(foo) bar")

    def test116(self):
        self.failure("(foo OR)")

    def test117(self):
        self.failure("(foo AND)")

    def test118(self):
        self.failure("(NOT foo)")

    def test119(self):
        self.failure("-foo")

    def test120(self):
        self.failure("-foo -bar")

    def test121(self):
        self.failure("foo OR -bar")

    def test122(self):
        self.failure("foo AND -bar")


class StopWordTestQueryParser(TestQueryParserBase):

    def setUp(self):
        # Only 'stop' is a stopword (but 'and' is still an operator)
        self.lexicon = Lexicon(Splitter(), FakeStopWordRemover())
        self.parser = QueryParser(self.lexicon)

    def test201(self):
        self.expect('and/', AtomNode("and"))

    def test202(self):
        self.expect('foo AND stop', AtomNode("foo"), ["stop"])

    def test203(self):
        self.expect('foo AND NOT stop', AtomNode("foo"), ["stop"])

    def test204(self):
        self.expect('stop AND foo', AtomNode("foo"), ["stop"])

    def test205(self):
        self.expect('foo OR stop', AtomNode("foo"), ["stop"])

    def test206(self):
        self.expect('stop OR foo', AtomNode("foo"), ["stop"])

    def test301(self):
        self.failure('stop')

    def test302(self):
        self.failure('stop stop')

    def test303(self):
        self.failure('stop AND stop')

    def test304(self):
        self.failure('stop OR stop')

    def test305(self):
        self.failure('stop -foo')

    def test306(self):
        self.failure('stop AND NOT foo')


class FakeStopWordRemover:

    def process(self, list):
        return [word for word in list if word != "stop"]


def test_suite():
    return TestSuite((makeSuite(TestQueryParser),
                      makeSuite(StopWordTestQueryParser),
                      makeSuite(TestInterfaces),
                    ))


if __name__=="__main__":
    main(defaultTest='test_suite')


=== Added File Zope3/src/zope/index/text/tests/test_setops.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################

from unittest import TestCase, main, makeSuite

from zodb.btrees.IIBTree import IIBTree, IIBucket

from zope.index.text.setops import mass_weightedIntersection
from zope.index.text.setops import mass_weightedUnion

class TestSetOps(TestCase):

    def testEmptyLists(self):
        self.assertEqual(len(mass_weightedIntersection([])), 0)
        self.assertEqual(len(mass_weightedUnion([])), 0)

    def testIdentity(self):
        t = IIBTree([(1, 2)])
        b = IIBucket([(1, 2)])
        for x in t, b:
            for func in mass_weightedUnion, mass_weightedIntersection:
                result = func([(x, 1)])
                self.assertEqual(len(result), 1)
                self.assertEqual(list(result.items()), list(x.items()))

    def testScalarMultiply(self):
        t = IIBTree([(1, 2), (2, 3), (3, 4)])
        allkeys = [1, 2, 3]
        b = IIBucket(t)
        for x in t, b:
            self.assertEqual(list(x.keys()), allkeys)
            for func in mass_weightedUnion, mass_weightedIntersection:
                for factor in 0, 1, 5, 10:
                    result = func([(x, factor)])
                    self.assertEqual(allkeys, list(result.keys()))
                    for key in x.keys():
                        self.assertEqual(x[key] * factor, result[key])

    def testPairs(self):
        t1 = IIBTree([(1, 10), (3, 30), (7, 70)])
        t2 = IIBTree([(3, 30), (5, 50), (7, 7), (9, 90)])
        allkeys = [1, 3, 5, 7, 9]
        b1 = IIBucket(t1)
        b2 = IIBucket(t2)
        for x in t1, t2, b1, b2:
            for key in x.keys():
                self.assertEqual(key in allkeys, 1)
            for y in t1, t2, b1, b2:
                for w1, w2 in (0, 0), (1, 10), (10, 1), (2, 3):
                    # Test the union.
                    expected = []
                    for key in allkeys:
                        if x.has_key(key) or y.has_key(key):
                            result = x.get(key, 0) * w1 + y.get(key, 0) * w2
                            expected.append((key, result))
                    expected.sort()
                    got = mass_weightedUnion([(x, w1), (y, w2)])
                    self.assertEqual(expected, list(got.items()))
                    got = mass_weightedUnion([(y, w2), (x, w1)])
                    self.assertEqual(expected, list(got.items()))

                    # Test the intersection.
                    expected = []
                    for key in allkeys:
                        if x.has_key(key) and y.has_key(key):
                            result = x[key] * w1 + y[key] * w2
                            expected.append((key, result))
                    expected.sort()
                    got = mass_weightedIntersection([(x, w1), (y, w2)])
                    self.assertEqual(expected, list(got.items()))
                    got = mass_weightedIntersection([(y, w2), (x, w1)])
                    self.assertEqual(expected, list(got.items()))

    def testMany(self):
        import random
        N = 15  # number of IIBTrees to feed in
        L = []
        commonkey = N * 1000
        allkeys = {commonkey: 1}
        for i in range(N):
            t = IIBTree()
            t[commonkey] = i
            for j in range(N-i):
                key = i + j
                allkeys[key] = 1
                t[key] = N*i + j
            L.append((t, i+1))
        random.shuffle(L)
        allkeys = allkeys.keys()
        allkeys.sort()

        # Test the union.
        expected = []
        for key in allkeys:
            sum = 0
            for t, w in L:
                if t.has_key(key):
                    sum += t[key] * w
            expected.append((key, sum))
        # print 'union', expected
        got = mass_weightedUnion(L)
        self.assertEqual(expected, list(got.items()))

        # Test the intersection.
        expected = []
        for key in allkeys:
            sum = 0
            for t, w in L:
                if t.has_key(key):
                    sum += t[key] * w
                else:
                    break
            else:
                # We didn't break out of the loop so it's in the intersection.
                expected.append((key, sum))
        # print 'intersection', expected
        got = mass_weightedIntersection(L)
        self.assertEqual(expected, list(got.items()))

def test_suite():
    return makeSuite(TestSetOps)

if __name__=="__main__":
    main(defaultTest='test_suite')


=== Added File Zope3/src/zope/index/text/tests/test_textindexwrapper.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Unit tests for TextIndexWrapper.

$Id: test_textindexwrapper.py,v 1.1 2003/07/14 03:53:48 anthony Exp $
"""

import unittest

from zope.index.text.textindexwrapper import TextIndexWrapper
from zope.index.text import parsetree

class TextIndexWrapperTest(unittest.TestCase):

    def setUp(self):
        w = TextIndexWrapper()
        doc = u"the quick brown fox jumps over the lazy dog"
        w.index_doc(1000, [doc])
        doc = u"the brown fox and the yellow fox don't need the retriever"
        w.index_doc(1001, [doc])
        self.wrapper = w

    def testCounts(self):
        w = self.wrapper
        self.assertEqual(self.wrapper.documentCount(), 2)
        self.assertEqual(self.wrapper.wordCount(), 12)
        doc = u"foo bar"
        w.index_doc(1002, [doc])
        self.assertEqual(self.wrapper.documentCount(), 3)
        self.assertEqual(self.wrapper.wordCount(), 14)

    def testOne(self):
        matches, total = self.wrapper.query(u"quick fox", 0, 10)
        self.assertEqual(total, 1)
        [(docid, rank)] = matches # if this fails there's a problem
        self.assertEqual(docid, 1000)

    def testDefaultBatch(self):
        matches, total = self.wrapper.query(u"fox", 0)
        self.assertEqual(total, 2)
        self.assertEqual(len(matches), 2)
        matches, total = self.wrapper.query(u"fox")
        self.assertEqual(total, 2)
        self.assertEqual(len(matches), 2)
        matches, total = self.wrapper.query(u" fox", 1)
        self.assertEqual(total, 2)
        self.assertEqual(len(matches), 1)

    def testGlobbing(self):
        matches, total = self.wrapper.query("fo*")
        self.assertEqual(total, 2)
        self.assertEqual(len(matches), 2)

    def testLatin1(self):
        w = self.wrapper
        doc = u"Fran\xe7ois"
        w.index_doc(1002, [doc])
        matches, total = self.wrapper.query(doc, 0, 10)
        self.assertEqual(total, 1)
        [(docid, rank)] = matches # if this fails there's a problem
        self.assertEqual(docid, 1002)

    def testUnicode(self):
        w = self.wrapper
        # Verbose, but easy to debug
        delta  = u"\N{GREEK SMALL LETTER DELTA}"
        delta += u"\N{GREEK SMALL LETTER EPSILON}"
        delta += u"\N{GREEK SMALL LETTER LAMDA}"
        delta += u"\N{GREEK SMALL LETTER TAU}"
        delta += u"\N{GREEK SMALL LETTER ALPHA}"
        assert delta.islower()
        emdash = u"\N{EM DASH}"
        assert not emdash.isalnum()
        alpha  = u"\N{GREEK SMALL LETTER ALPHA}"
        assert alpha.islower()
        lamda  = u"\N{GREEK SMALL LETTER LAMDA}"
        lamda += u"\N{GREEK SMALL LETTER ALPHA}"
        assert lamda.islower()
        doc = delta + emdash + alpha
        w.index_doc(1002, [doc])
        for word in delta, alpha:
            matches, total = self.wrapper.query(word, 0, 10)
            self.assertEqual(total, 1)
            [(docid, rank)] = matches # if this fails there's a problem
            self.assertEqual(docid, 1002)
        self.assertRaises(parsetree.ParseError,
                          self.wrapper.query, emdash, 0, 10)
        matches, total = self.wrapper.query(lamda, 0, 10)
        self.assertEqual(total, 0)

    def testNone(self):
        matches, total = self.wrapper.query(u"dalmatian", 0, 10)
        self.assertEqual(total, 0)
        self.assertEqual(len(matches), 0)

    def testAll(self):
        matches, total = self.wrapper.query(u"brown fox", 0, 10)
        self.assertEqual(total, 2)
        self.assertEqual(len(matches), 2)
        matches.sort()
        self.assertEqual(matches[0][0], 1000)
        self.assertEqual(matches[1][0], 1001)

    def testBatching(self):
        matches1, total = self.wrapper.query(u"brown fox", 0, 1)
        self.assertEqual(total, 2)
        self.assertEqual(len(matches1), 1)
        matches2, total = self.wrapper.query(u"brown fox", 1, 1)
        self.assertEqual(total, 2)
        self.assertEqual(len(matches2), 1)
        matches = matches1 + matches2
        matches.sort()
        self.assertEqual(matches[0][0], 1000)
        self.assertEqual(matches[1][0], 1001)

def test_suite():
    return unittest.makeSuite(TextIndexWrapperTest)

if __name__=='__main__':
    unittest.main(defaultTest='test_suite')


=== Added File Zope3/src/zope/index/text/tests/wordstats.py ===
#! /usr/bin/env python
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Dump statistics about each word in the index.

usage: wordstats.py data.fs [index key]
"""

from zodb.storage.file import FileStorage

def main(fspath, key):
    fs = FileStorage(fspath, read_only=1)
    db = ZODB.DB(fs)
    rt = db.open().root()
    index = rt[key]

    lex = index.lexicon
    idx = index.index
    print "Words", lex.length()
    print "Documents", idx.length()

    print "Word frequencies: count, word, wid"
    for word, wid in lex.items():
        docs = idx._wordinfo[wid]
        print len(docs), word, wid

    print "Per-doc scores: wid, (doc, score,)+"
    for wid in lex.wids():
        print wid,
        docs = idx._wordinfo[wid]
        for docid, score in docs.items():
            print docid, score,
        print

if __name__ == "__main__":
    import sys

    args = sys.argv[1:]
    index_key = "index"
    if len(args) == 1:
        fspath = args[0]
    elif len(args) == 2:
        fspath, index_key = args
    else:
        print "Expected 1 or 2 args, got", len(args)
    main(fspath, index_key)