[zopeorg-checkins] CVS: Products/ParsedXML/NodePath - ElementIdPath.py:1.1 NodePath.py:1.1 README.txt:1.1 RobustPath.py:1.1 __init__.py:1.1

Sidnei da Silva sidnei at x3ng.com.br
Fri May 30 11:17:30 EDT 2003


Update of /cvs-zopeorg/Products/ParsedXML/NodePath
In directory cvs.zope.org:/tmp/cvs-serv19195/ParsedXML/NodePath

Added Files:
	ElementIdPath.py NodePath.py README.txt RobustPath.py 
	__init__.py 
Log Message:
Adding products needed for migration of NZO

=== Added File Products/ParsedXML/NodePath/ElementIdPath.py ===
from NodePath import BaseNodePathScheme, registry

class ElementIdPathScheme(BaseNodePathScheme):
    def __init__(self, scheme_name=None):
        scheme_name = scheme_name or 'element_id'
        BaseNodePathScheme.__init__(self, scheme_name)

    def resolve_steps(self, top_node, steps):
        node = top_node
        for step in steps:
            if step[0] == 'e':
                element_id = int(step[1:])
                for child in node.childNodes:
                    if self.get_element_id(child) == element_id:
                        node = child
                        break
                else:
                    # couldn't find node with such element_id
                    return None
            else:
                node = node.childNodes.item(int(step))
                if node is None:
                    return None
        return node # return node

    def create_steps(self, top_node, node):
        steps = []
        while node is not top_node:
            parent = node.parentNode
            if parent is None:
                break
            element_id = self.get_element_id(node)
            if element_id != -1:
                steps.append("e%s" % element_id)
            else:
                steps.append(str(parent.childNodes.index(node)))
            node = parent
        steps.reverse()
        return steps

    def get_element_id(self, node):
        return getattr(node, 'elementId', -1)



=== Added File Products/ParsedXML/NodePath/NodePath.py ===
import string

class NodePathError(Exception):
    pass

class NodePathSchemeRegistry:
    def __init__(self):
        self._schemes = {}
        
    def register_scheme(self, scheme):
        self._schemes[scheme._scheme_name] = scheme

    def resolve_steps(self, top_node, steps, scheme_name):
        """Resolve steps from top_node.
        """
        # if we don't know scheme, return None as node could not be found
        if not self._schemes.has_key(scheme_name):
            raise NodePathError, "Unknown scheme: %s" % scheme_name 
        # we do know the scheme, so look it up
        scheme = self._schemes[scheme_name]
        # now resolve steps using this scheme
        return scheme.resolve_steps(top_node, steps)
    
    def resolve_path(self, top_node, path):
        """Resolve path.
        """
        if not path:
            return top_node
        steps = string.split(path, ',')
        # get scheme as first element of path
        scheme_name, steps = steps[0], steps[1:]
        # resolve steps using scheme
        return self.resolve_steps(top_node, steps, scheme_name)

    def create_steps(self, top_node, node, scheme_name):
        """Construct steps tuple to node.
        """
        return self._schemes[scheme_name].create_steps(top_node, node)
    
    def create_path(self, top_node, node, scheme_name):
        """Construct path to node according to scheme_name.
        """
        steps = self.create_steps(top_node, node, scheme_name)
        if not steps:
            return '' 
        return '%s,%s' % (scheme_name, string.join(steps, ','))

class BaseNodePathScheme:
    def __init__(self, scheme_name):
        self._scheme_name = scheme_name
    
    def resolve_steps(self, top_node, steps):
        """Resolve path from top_node, return node found or None.
        Steps are in order top_node to node.
        """
        pass

    def create_steps(self, top_node, node):
        """Return list of steps from top_node to node.
        Steps returned are in reverse order.
        """
        pass
    
class ChildNodePathScheme(BaseNodePathScheme):
    def __init__(self):
        BaseNodePathScheme.__init__(self, 'child')
        
    def resolve_steps(self, top_node, steps):
        node = top_node
        for step in steps:
            node = node.childNodes.item(int(step))
            if node is None:
                return None
        return node # return node

    def create_steps(self, top_node, node):
        steps = []
        # FIXME: is it safe to compare nodes like this?
        while node is not top_node:
            parent = node.parentNode
            if parent is None:
                break
            # FIXME: can index() be used in all python DOMs?
            steps.append(str(parent.childNodes.index(node)))
            node = parent
        steps.reverse()
        return steps

# create the registry
registry = NodePathSchemeRegistry()


=== Added File Products/ParsedXML/NodePath/README.txt ===
Everything in this directory, including the tests subdirectory, should work
independently of ParsedXML and with another Python DOM as well. The only
change necessary should be to the DOM loading in the unit tests subdir.



=== Added File Products/ParsedXML/NodePath/RobustPath.py ===
from NodePath import BaseNodePathScheme, NodePathError
import random, string, urllib
import xml.dom

words_amount = 5

class RobustPathScheme(BaseNodePathScheme):
    def __init__(self):
        BaseNodePathScheme.__init__(self, 'robust')

    def resolve_steps(self, top_node, steps):
        steps, type_step, context_step = steps[:-2], steps[-2], steps[-1]
        rsteps = []
        # create element steps
        for step in steps:
            nodeName, nodeIndex = string.split(step, "*")
            rsteps.append(GenericStep(xml.dom.Node.ELEMENT_NODE,
                                      nodeName, int(nodeIndex)))
        # now create type and context end steps
        context_parts = string.split(context_step, "*")
        node_type, nodeIndex = string.split(type_step, "*")
        nodeIndex = int(nodeIndex)
        if node_type == 'element':
            type_rstep = GenericStep(xml.dom.Node.ELEMENT_NODE,
                                context_parts[0], nodeIndex)
            context_rstep = ElementEndStep(int(context_parts[1]),
                                           int(context_parts[2]))
        elif node_type == 'text':
            type_rstep = GenericStep(xml.dom.Node.TEXT_NODE,
                                     "#text", nodeIndex)
            words = []
            for i in range(0, len(context_parts), 2):
                words.append((urllib.unquote(context_parts[i]),
                             int(context_parts[i + 1])))
            context_rstep = TextEndStep(words)
        elif node_type == 'empty':
            type_rstep = GenericStep(xml.dom.Node.TEXT_NODE,
                                     "#text", nodeIndex)
            context_rstep = EmptyTextEndStep()
        elif node_type == 'other':
            raise NodePathError, "Cannot handle other path elements yet."
        else:
            raise NodePathError, "Unknown step type in path: %s" % node_type

        rsteps.append(type_rstep)
        rsteps.append(context_rstep)

        # resolve the steps, starting with step 0
        context = Context(0, rsteps)
        node, level = rsteps[0].resolve(context, top_node, 0)
        return node
        
    def create_steps(self, top_node, node):
        # first create last two steps
        parent = node.parentNode
        steps = self.create_end_steps(parent, node)
        node = parent
        # now create element steps
        while node is not top_node:
            parent = node.parentNode
            if parent is None:
                break
            steps.append(self.create_element_step(parent, node))
            node = parent
        steps.reverse()
        return steps
    
    def create_end_steps(self, parent, node):
        nodeType = node.nodeType
        if nodeType == node.ELEMENT_NODE:
            return ['%s*%s*%s' % (node.nodeName,
                                  len(node.attributes),
                                  len(node.childNodes)),
                    'element*%s' % parent.childNodes.index(node) ]
        elif nodeType == node.TEXT_NODE:
            if string.strip(node.data) == '':
                return ['empty',
                        'empty*%s' % parent.childNodes.index(node)]
            else:
                return [string.join(get_words_sample(node.data), "*"),
                        'text*%s' % parent.childNodes.index(node)]
        else:
            # FIXME: simplistic way to deal with other nodes
            return ['%s' % node.nodeType,
                    'other*%s' % parent.childNodes.index(node)]
        
    def create_element_step(self, parent, node):
        return "%s*%s" % (node.nodeName, parent.childNodes.index(node))
    
class Context:
    threshold = 30
    success_threshold = 10
    offset_unreliability = 3
    tree_skip_unreliability = 3
    step_skip_unreliability = 3
    word_max_distance = 7
    word_offset_unreliability = 2
    
    def __init__(self, i, steps):
        self._i = i
        self._steps = steps
            
    def resolve_next(self, node, level):
        steps = self._steps
        i = self._i + 1
        if i < len(steps):
            # prepare new context for next step
            context = Context(i, steps)
            # resolve the next step
            return steps[i].resolve(context, node, level)
        else:
            return node, level

class GenericStep:
    def __init__(self, nodeType, nodeName, index):
        self._nodeType = nodeType
        self._nodeName = nodeName
        self._index = index
    
    def resolve(self, context, node, level):
        nodeType = self._nodeType
        nodeName = self._nodeName
        i = self._index
        childNodes = node.childNodes
        l = len(childNodes)

        results = []
        # try if the indicate node is the one, if so, return it
        if i < l:
            current = childNodes[i]
            if (current.nodeType == nodeType and
                current.nodeName == nodeName):
                 current_node, current_unreliability = context.resolve_next(
                     current, level)
                 if current_unreliability < context.success_threshold:
                     return current_node, current_unreliability
                 results.append((current_node, current_unreliability))
            # we need to go forward and backward from position i
            forward_i = i + 1
            backward_i = i - 1
            can_go_forward = can_go_backward = 1
        else:
            # i is beyond length, so we need to go backward from end
            backward_i = l - 1
            forward_i = l
            can_go_forward = 0
            can_go_backward = 1

        # try nodes forward and backward of this one
        unreliability = level + context.offset_unreliability
        while ((can_go_forward or can_go_backward) and
               unreliability < context.threshold):
            if forward_i < l:
                current = childNodes[forward_i]
                if (current.nodeType == nodeType and
                    current.nodeName == nodeName):
                    current_node, current_unreliability = context.resolve_next(
                        current, unreliability)
                    if current_unreliability < context.success_threshold:
                        return current_node, current_unreliability
                    results.append((current_node, current_unreliability))
                forward_i = forward_i + 1
            else:
                can_go_forward = 0
                
            if backward_i >= 0:
                current = childNodes[backward_i]
                if (current.nodeType == nodeType and
                    current.nodeName == nodeName):
                    current_node, current_unreliability = context.resolve_next(
                        current, unreliability)
                    if current_unreliability < context.success_threshold:
                        return current_node, current_unreliability
                    results.append((current_node, current_unreliability))   
                backward_i = backward_i - 1
            else:
                can_go_backward = 0

            unreliability = unreliability + context.offset_unreliability

        # try skipping level in the tree
        unreliability = level + context.tree_skip_unreliability
        if unreliability < context.threshold:
            for current in childNodes:
                # use same step but with next nodes
                current_node, current_unreliability = self.resolve(
                    context, current, unreliability)
                if current_unreliability < context.success_threshold:
                    return current_node, current_unreliability
                results.append((current_node, current_unreliability))  

        # try skipping this step
        # use same node but with next step
        unreliability = level + context.step_skip_unreliability
        if unreliability < context.threshold:
            current_node, current_unreliability = context.resolve_next(
                node, unreliability)
            if current_unreliability < context.success_threshold:
                return current_node, current_unreliability
            results.append((current_node, current_unreliability))
            
        # no immediate success, so try the best branch
        best_unreliability = context.threshold
        best_node = None
        for found_node, found_unreliability in results:
            if found_unreliability < best_unreliability:
                best_unreliability = found_unreliability
                best_node = found_node
        return best_node, best_unreliability
    
class ElementEndStep:
    """Not very robust, but representable in a url pretty easily.
    """
    def __init__(self, attributeAmount, childAmount):
        self._attributeAmount = attributeAmount
        self._childAmount = childAmount

    def resolve(self, context, node, level):
        if node.nodeType != node.ELEMENT_NODE:
            return None, context.threshold
        if (len(node.attributes) != self._attributeAmount or 
            len(node.childNodes) != self._childAmount):
            return None, context.threshold
        return node, level

class EmptyTextEndStep:
    
    def resolve(self, context, node, level):
        if node.nodeType != node.TEXT_NODE:
            return None, context.threshold
        if string.strip(node.data) == "":
            return node, level
        return None, context.threshold
    
class TextEndStep:
    def __init__(self, words):
        self._words = words
   
    def resolve(self, context, node, level):
        if node.nodeType != node.TEXT_NODE:
            return None, context.threshold
        words = string.split(node.data)
        if len(words) == 0:
            return None, context.threshold

        unreliability = level
        for word, nr in self._words:
            unreliability = unreliability + (
                find_word_distance(words, word, nr,
                                   context.word_max_distance) *
                context.word_offset_unreliability )
            if unreliability >= context.threshold:
                return None, context.threshold
        return node, unreliability

def find_word_distance(words, word, i, max_distance):
    """Find distance of word in words from expected location i.
    If word could not be found or is further than max_distance, return -1.
    """
    try:
        if words[i] == word:
            return 0
    except IndexError:
        l = len(words)
        distance = i - l + 1
        forward_i = l
        backward_i = l - 1
        can_go_forward = 0
        can_go_backward = 1
    else:
        l = len(words)
        distance = 1
        forward_i = i + 1
        backward_i = i - 1
        can_go_forward = can_go_backward = 1
    while (can_go_forward or can_go_backward) and (distance < max_distance):
        if forward_i < l:
            if words[forward_i] == word:
                return distance
            forward_i = forward_i + 1
        else:
            can_go_forward = 0
        if backward_i >= 0:
            if words[backward_i] == word:
                return distance
            backward_i = backward_i - 1
        else:
            can_go_backward = 0

        distance = distance + 1

    return max_distance

def get_words_sample(text,
                     words_amount=words_amount,
                     quote=urllib.quote):
    words = string.split(text)
    result = []

    if len(words) <= words_amount:
        # get all words if there are only a few
        for i in range(len(words)):
            # don't add any words with separator in it, so as not to confuse
            if "*" not in words[i] and "," not in words[i]:
                qword = quote(words[i])
                result.append(qword + "*" + str(i))
    else:
        # otherwise, take a random sample of words from the text
        nrs = []
        l = len(words) - 1
        for i in range(words_amount):
            while 1:
                r = random.randint(0, l)
                # if we selected new word from sample, done
                # NOTE: would introduce subtle bug if we checked for
                # * or , here
                if r not in nrs:
                    break
            nrs.append(r)
        nrs.sort()
        for nr in nrs:
            # don't want any word with separator in it
            if "*" not in words[nr] and "," not in words[nr]:
                qword = quote(words[nr])
                result.append(qword + "*" + str(nr))
    return result


=== Added File Products/ParsedXML/NodePath/__init__.py ===
import NodePath, RobustPath, ElementIdPath
from NodePath import registry, BaseNodePathScheme

registry.register_scheme(NodePath.ChildNodePathScheme())
registry.register_scheme(RobustPath.RobustPathScheme())
registry.register_scheme(ElementIdPath.ElementIdPathScheme())





More information about the zopeorg-checkins mailing list