[zopeorg-checkins] CVS: Products/ZopeOrg-NV/Extensions - NZOMigrate.py:1.1

Sidnei da Silva sidnei at x3ng.com.br
Tue Dec 3 14:36:24 EST 2002


Update of /cvs-zopeorg/Products/ZopeOrg-NV/Extensions
In directory cvs.zope.org:/tmp/cvs-serv3847/Extensions

Added Files:
	NZOMigrate.py 
Log Message:
migration script

=== Added File Products/ZopeOrg-NV/Extensions/NZOMigrate.py ===
from StringIO import StringIO
from ZODB.POSException import POSKeyError
from Acquisition import aq_base
from Products.CMFCore.utils import getToolByName
import zLOG

def migrate(source, dest, ignore_path=None, type_map=None):
    dest_path = dest.getPhysicalPath()
    if ignore_path is None: ignore_path = []
    if not dest_path in ignore_path:
        ignore_path.append(dest_path)
    ignore_path = [tuple(p.split('/')) for p in ignore_path \
		   if type(p) == type('')]
    if type_map is None:
        type_map = {"Formatted Document" : "CMF Default Document",
                    "STX Document" : "CMF Default Document",
                    "HTML Document" : "CMF Default Document",
                    "Link" : "CMF Default Link",
                    "News Item": "CMF Default News Item",
                    "Folder": "CMF Portal Folder",
                    "Image": "CMF Default Image",
                    "File": "CMF Default File",
                    "Tip": "ZopeOrg Tip",
                    "HowTo": "ZopeOrg HowTo",
                    "Software Product": "CMF Software Package",
                    "Product Release": "CMF Software Release",
                    "Tracker": "CMF Collector",
                    "User Folder": "User Folder",
                    "Members Folder": "User Folder",
                    "ZCatalog": "ZCatalog",
                    "BTree Folder": "CMF BTree Folder",
                    "ZWiki Page": "ZWiki Page",
                    "Broken Because Product is Gone": "BBR",
                    "XML Document": "XML Document",
                    "BackTalk Book": "CMF BackTalk Book",
                    "BackTalk Document": "CMF BackTalk Document"
                    }
        
    tmut = Transmutator(source, dest, ignore_path, type_map)
    return tmut.run()

def normalizeMetaType(meta_type):
    return meta_type.replace(' ', '')

def fixZWikiPortalType(self):
    ct = self.portal_catalog
    result = ct(meta_type='ZWiki Page')
    counter = 0
    for r in result:
        obj = ct.getobject(r.data_record_id_)
        obj.portal_type = 'ZWiki Page'
        ct.reindexObject(obj, idxs=['portal_type'])
        counter += 1
        if not (counter % 100):
            get_transaction().commit(1)
    return 'ok'

def _cleanupOwnership(ob, res, cleanup_children):
    '''
    If the user name of the owner of the referenced object
    is not found in its current user database but is found
    in the local user database, this function changes the
    ownership of the object to the local database.
    '''
    try: changed = ob._p_changed
    except: changed = 0

    owner = getattr(ob, '_owner', None)
    if owner:
        udb, uid = owner
        #res.append('Owner of %s is %s!%s' % (
        #    join(ob.getPhysicalPath(), '/'), join(udb, '/'), uid,))
        root = ob.getPhysicalRoot()
        try:
            db = root.unrestrictedTraverse(udb, None)
            user = db.getUserById(uid)
            if hasattr(ob, 'aq_inContextOf'):
                ucontext = aq_parent(aq_inner(db))
                if not ob.aq_inContextOf(ucontext):
                    # Not in the right context.
                    user = None
        except:
            user = None
        if user is None:
            # Try to change to a local database.
            p = ob
            old_udb = udb
            udb = None
            while p is not None:
                if hasattr(p, 'acl_users'):
                    acl_users = p.acl_users
                    try:
                        user = acl_users.getUserById(uid)
                    except:
                        user = None
                    if user is not None:
                        # Found the right database.
                        udb = acl_users.getPhysicalPath()[1:]
                        break
                p = aq_parent(aq_inner(p))
            if udb is not None:
                ob._owner = udb, uid
                res.append('Changed ownership of %s from %s!%s to %s!%s' %
                           (join(ob.getPhysicalPath(), '/'),
                            join(old_udb, '/'), uid,
                            join(udb, '/'), uid,))
            else:
                res.append('Could not fix the ownership of %s, '
                           'which is set to %s!%s' %
                           (join(ob.getPhysicalPath(), '/'),
                            join(old_udb, '/'), uid,))
                
    if cleanup_children:
        if hasattr(ob, 'objectValues'):
            for subob in ob.objectValues():
                _cleanupOwnership(subob, res, 1)

    # Deactivate object if possible.
    if changed is None: ob._p_deactivate()
    
    return res

class Transmutator:

    def __init__(self, source, dest, ignore_path, type_map):
        self._source = source
        self._dest = dest
        self._ignore_path = ignore_path
        self._type_map = type_map
        self._out = StringIO()

    def log(self, message, summary='', severity=0, dup=1):
        self._out.write(message)
        if dup:
            zLOG.LOG('NZOMigration: ',severity,summary,message)

    def getMigrationMethod(self, obj):
        if hasattr(obj, 'meta_type'):
            mt = getattr(obj, 'meta_type')
            if callable(mt):
                mt = mt()
        else:
            mt = obj.__class__.__name__
        tm = self._type_map
        dt = tm.get(mt, None)
        if dt is None:
            mmt = 'defaultMigrationMethod'
        else:
            nmt = normalizeMetaType
            mmt = '%s2%s' % (nmt(mt), nmt(dt))
        obj_url = obj.absolute_url(relative=1)
        self.log('Will use %s to migrate %s (%s)\n' % \
                        (mmt, obj_url, mt))
        return getattr(self, mmt, self.defaultMigrationMethod)

    def defaultMigrationMethod(self, obj, source, dest):
        try:
            copy = obj._getCopy(dest)
            dest._setObject(obj.getId(), copy)
        except:
            self.log('Could not copy %s (%s). Possibly duplicate id.\n'\
                     % (obj.getId(), obj.absolute_url(relative=1)))
            return None
        return getattr(dest, obj.getId())

    def BrokenBecauseProductisGone2BBR(self, obj, source, dest):
        self.log('Ignoring Broken Beyond Repair.\n')
        return None

    def ZWikiPage2ZWikiPage(self, obj, source, dest):
        obj = self.defaultMigrationMethod(obj, source, dest)
        return obj
        
    def ZCatalog2ZCatalog(self, obj, source, dest):
        self.log('Ignoring ZCatalog for now.\n')
        return None

    def UserFolder2UserFolder(self, obj, source, dest):
        self.log('Ignoring User Folder for now.\n')
        return None
    
    def MembersFolder2UserFolder(self, obj, source, dest):
        self.log('Ignoring Members Folder for now.\n')
        return None

    def SoftwareProduct2CMFSoftwarePackage(self, obj, source, dest):
        self.log('Ignoring Software Product for now.\n')
        return None

    def ProductRelease2CMFSoftwareRelease(self, obj, source, dest):
        self.log('Ignoring Product Release for now.\n')
        return None

    def Tracker2CMFCollector(self, obj, source, dest):
        self.log('Ignoring Tracker for now.\n')
        return None

    def XMLDocument2XMLDocument(self, obj, source, dest):
        self.log('Ignoring XML Document for now.\n')
        return None

    def BackTalkBook2CMFBackTalkBook(self, obj, source, dest):
        dest.invokeFactory(id=obj.getId(), type_name='BackTalk Book')
        book = getattr(dest, obj.getId())
        for prop in obj.propertyIds():
            if prop in ['header', 'summary']:
                continue
            value = obj.getProperty(prop)
            if value is not None:
                book._updateProperty(prop, value)
        if obj.hasProperty('header'):
            book._updateProperty('book_header', obj.getProperty('header'))
        if obj.hasProperty('summary'):
            book.setDescription(obj.getProperty('summary'))
        return book

    def BackTalkDocument2CMFBackTalkDocument(self, obj, source, dest):
        dest.invokeFactory(id=obj.getId(), type_name='BackTalk Document')
        doc = getattr(dest, obj.getId())
        for prop in obj.propertyIds():
            if prop in ['summary']:
                continue
            value = obj.getProperty(prop)
            if value is not None:
                doc._updateProperty(prop, value)
        if obj.hasProperty('summary'):
            doc.setDescription(obj.getProperty('summary'))
        doc.munge(obj.read())
        return doc

    def File2CMFDefaultFile(self, obj, source, dest):
        dest.invokeFactory(id=obj.getId(), type_name='File')
        f = getattr(dest, obj.getId())
        title = obj.title or ''
        content_type = obj.content_type
        data, size = f._read_data(obj.data)
        f.update_data(data, content_type, size)
        f.setTitle(title)
        f.setFormat(content_type)
        return f

    def Image2CMFDefaultImage(self, obj, source, dest):
        dest.invokeFactory(id=obj.getId(), type_name='Image')
        f = getattr(dest, obj.getId())
        title = obj.title or ''
        content_type = obj.content_type
        data, size = f._read_data(obj.data)
        f.update_data(data, content_type, size)
        f.setTitle(title)
        f.setFormat(content_type)
        return f

    def NewsItem2CMFDefaultNewsItem(self, obj, source, dest):
        dest.invokeFactory(id=obj.getId(), type_name='News Item')
        news = getattr(dest, obj.getId())
        title = obj.title or ''
        text = obj.text
        format = obj.format
        subject = (type(obj.subject) in [type(''), type([])]) and \
                  obj.subject or []
        subject.extend(obj.NewsItem_topics)
        news.setTitle(title)
        news.edit(text_format=format, text=text)
        news.setSubject(subject)
        return news

    def Link2CMFDefaultLink(self, obj, source, dest):
        dest.invokeFactory(id=obj.getId(), type_name='Link')
        link = getattr(dest, obj.getId())
        title = obj.title or ''
        description = (type(obj.description) == type('')) and \
                      obj.description or ''
        subject = (type(obj.subject) in [type(''), type([])]) and \
                  obj.subject or []
        subject.extend(obj.Link_topics)
        remote_url = obj.remote_url
        zope_powered = obj.zope_powered or 0
        link.setTitle(title)
        link.setDescription(description)
        link.edit(remote_url=remote_url)
        link.setSubject(subject)
        link.manage_addProperty('zope_powered', zope_powered, 'boolean')
        return link

    def FormattedDocument2CMFDefaultDocument(self, obj, source, dest):
        dest.invokeFactory(id=obj.getId(), type_name='Document')
        doc = getattr(dest, obj.getId())
        title = obj.title or ''
        description = (type(obj.description) == type('')) and \
                      obj.description or ''
        text = obj.content
        format = hasattr(obj, 'format') and obj.format or 'structured-text'
        subject = (type(obj.subject) in [type(''), type([])]) and \
                  obj.subject or []
        doc.setTitle(title)
        doc.setDescription(description)
        doc.edit(text_format=format, text=text)
        doc.setSubject(subject)
        return doc

    STXDocument2CMFDefaultDocument = FormattedDocument2CMFDefaultDocument
    HTMLDocument2CMFDefaultDocument = FormattedDocument2CMFDefaultDocument

    def Tip2ZopeOrgTip(self, obj, source, dest):
        dest.invokeFactory(id=obj.getId(), type_name='Tip')
        tip = getattr(dest, obj.getId())
        title = obj.tip or ''
        description = obj.details or ''
        text = obj.content
        format = obj.format
        subject = (type(obj.subject) in [type(''), type([])]) and \
                  obj.subject or []
        subject.extend(obj.tip_topics)
        tip.setTitle(title)
        tip.setDescription(description)
        tip.edit(text_format=format, text=text)
        tip.setSubject(subject)
        return tip

    def HowTo2ZopeOrgHowTo(self, obj, source, dest):
        dest.invokeFactory(id=obj.getId(), type_name='HowTo')
        howto = getattr(dest, obj.getId())
        title = obj.title or ''
        text = obj.content
        format = obj.format
        subject = (type(obj.subject) in [type(''), type([])]) and \
                  obj.subject or []
        subject.extend(obj.HowTo_topics)
        howto.setTitle(title)
        howto.edit(text_format=format, text=text)
        howto.setSubject(subject)
        return howto

    def Folder2CMFPortalFolder(self, obj, source, dest):
        if dest is None or obj is None: return None
        try:
            dest.manage_addFolder(obj.getId(), obj.title)
        except: pass
        new = getattr(dest, obj.getId(), None)
        if new is not None and hasattr(new, 'portal_type') and\
               new.portal_type == "Folder":
            return new
        return None

    def BTreeFolder2CMFBTreeFolder(self, obj, source, dest):
        if dest is None or obj is None: return None
        try: 
            bt2 = dest.manage_addProduct['BTreeFolder2']
            bt2.manage_addCMFBTreeFolder(obj.getId(), obj.title)
        except: pass
        new = getattr(dest, obj.getId(), None)
        if new is not None and hasattr(new, 'portal_type') and\
               new.portal_type == "Folder":
            return new
        return None

    def WikiMethodsCleanup(self, obj, ignore_path):
        obj_ids = []
        if hasattr(obj, 'keys'):
            obj_ids = obj.keys()
        elif hasattr(obj, 'objectIds'):
            obj_ids = obj.objectIds()
        if obj_ids:
            if 'FrontPage' in obj_ids  or \
            'standard_wiki_header' in obj_ids or \
            'standard_wiki_page' in obj_ids:
                obj_path = obj.getPhysicalPath()
                for dm in ('standard_wiki_header', 'standard_wiki_footer',
                           'editform', 'commentform', 'backlinks',
                           'advancedform', 'pagehistory', 'search',
                           'standard_wiki_page', 'title_prefix'):
                    # We can possibly remove those later also
                    #       'AllPages', 'BackLinks', 'HelpPage'
                    #       'HowDoIEdit', 'HowDoINavigate', 'JumpTo',
                    #       'RecentChanges', 'RegulatingYourPages',
                    #       'RemoteWikiLinks', 'RemoteWikiURL',
                    #       'SandBox', 'SearchPage', 'StructuredText',
                    #       'StructuredTextExamples', 'StructuredTextRules',
                    #       'TextFormattingRiges', 'VisitorLog', 'WikiName',
                    #       'WikiWikiWeb', 'ZWiki', 'ZWikiLinks', 'ZWikiWeb'
                    if dm in obj_ids:
                        ignore_path.append(obj_path + (dm,))
        return ignore_path
                                          
    def DefaultDTMLCleanup(self, obj, ignore_path):
        if hasattr(obj, 'keys'):
            obj_ids = obj.keys()
        elif hasattr(obj, 'objectIds'):
            obj_ids = obj.objectIds()
        
        if obj_ids:
            if 'index_html' in obj_ids  and \
            obj.index_html.meta_type in ['DTML Method', 'DTML Document'] :
                obj_path = obj.getPhysicalPath()
                for dm in ('index_html', 'local_nav', 'page_title',
                           'standard_html_footer', 'standard_html_header',
                           'local_css', 'custom_html_header',
                           'custom_html_footer', 'local_exit'):
                    if dm in obj_ids:
                        ignore_path.append(obj_path + (dm,))
        return ignore_path

    def fixOwnership(self, orig, new):
        owner = getattr(orig, '_owner', None)
        if owner is not None:
            try:
                # Retain ownership.
                new._owner = owner
            except: pass
        return new

    def fixModificationDate(self, orig, new):
        mod_date = orig.bobobase_modification_time().ISO()
        if hasattr(aq_base(new), 'setModificationDate'):
            new.setModificationDate(mod_date)
        return new
        
    def run(self):
        source = self._source
        dest = self._dest
        ignore = self._ignore_path
        if hasattr(source, 'objectIds') and \
               hasattr(source, 'getPhysicalPath'):
            path = source.getPhysicalPath()
            oids = [o for o in source.objectIds() \
                       if (path + (o, )) not in ignore]
            oids.sort()
            for oid in oids:
                obj = getattr(source, oid)
                try:
                    # We may want to catch POSKeyErrors early,
                    # so lets prod the object and see what happens
                    obj_url = obj.absolute_url(relative=1)
                except POSKeyError:
                    # Skip this object
                    self.log('POSKeyError while trying to access' +\
                              ' %s.\n' % '/'.join(path + (oid, )))
                    continue
                mm = self.getMigrationMethod(obj)
                try:
                    new_obj = mm(obj, source, dest)
                except: new_obj = None
                
                if new_obj is None or not new_obj:
                    self.log('Result of conversion of' +\
                              ' %s: Failed.\n' % obj.absolute_url(relative=1))
                else:
                    new_obj = self.fixOwnership(obj, new_obj)
                    new_obj = self.fixModificationDate(obj, new_obj)
                    ct = getToolByName(new_obj, 'portal_catalog', None)
                    if ct is not None:
                        try: ct.reindexObject(obj)
                        except: pass
                    self.log('Result of conversion of' +\
                              ' %s: Success.\n' % obj.absolute_url(relative=1))
                    
                if hasattr(obj, 'objectIds') and new_obj is not None:
                    ignore = self.WikiMethodsCleanup(obj, ignore)
                    ignore = self.DefaultDTMLCleanup(obj, ignore)
                    get_transaction().commit(1)
                    self.log(Transmutator(obj, new_obj, \
                                          ignore, self._type_map).run(), dup=0)

            get_transaction().commit()
            
        return 'Ok.\n' or self._out.getvalue()


if __name__ == '__main__':
    print 'ok'





More information about the zopeorg-checkins mailing list