[zopeorg-checkins] CVS: Products/ZopeOrg-NV/Extensions - NZOMigrate.py:1.1
Sidnei da Silva
sidnei at x3ng.com.br
Tue Dec 3 14:36:24 EST 2002
Update of /cvs-zopeorg/Products/ZopeOrg-NV/Extensions
In directory cvs.zope.org:/tmp/cvs-serv3847/Extensions
Added Files:
NZOMigrate.py
Log Message:
migration script
=== Added File Products/ZopeOrg-NV/Extensions/NZOMigrate.py ===
from StringIO import StringIO
from ZODB.POSException import POSKeyError
from Acquisition import aq_base
from Products.CMFCore.utils import getToolByName
import zLOG
def migrate(source, dest, ignore_path=None, type_map=None):
dest_path = dest.getPhysicalPath()
if ignore_path is None: ignore_path = []
if not dest_path in ignore_path:
ignore_path.append(dest_path)
ignore_path = [tuple(p.split('/')) for p in ignore_path \
if type(p) == type('')]
if type_map is None:
type_map = {"Formatted Document" : "CMF Default Document",
"STX Document" : "CMF Default Document",
"HTML Document" : "CMF Default Document",
"Link" : "CMF Default Link",
"News Item": "CMF Default News Item",
"Folder": "CMF Portal Folder",
"Image": "CMF Default Image",
"File": "CMF Default File",
"Tip": "ZopeOrg Tip",
"HowTo": "ZopeOrg HowTo",
"Software Product": "CMF Software Package",
"Product Release": "CMF Software Release",
"Tracker": "CMF Collector",
"User Folder": "User Folder",
"Members Folder": "User Folder",
"ZCatalog": "ZCatalog",
"BTree Folder": "CMF BTree Folder",
"ZWiki Page": "ZWiki Page",
"Broken Because Product is Gone": "BBR",
"XML Document": "XML Document",
"BackTalk Book": "CMF BackTalk Book",
"BackTalk Document": "CMF BackTalk Document"
}
tmut = Transmutator(source, dest, ignore_path, type_map)
return tmut.run()
def normalizeMetaType(meta_type):
return meta_type.replace(' ', '')
def fixZWikiPortalType(self):
ct = self.portal_catalog
result = ct(meta_type='ZWiki Page')
counter = 0
for r in result:
obj = ct.getobject(r.data_record_id_)
obj.portal_type = 'ZWiki Page'
ct.reindexObject(obj, idxs=['portal_type'])
counter += 1
if not (counter % 100):
get_transaction().commit(1)
return 'ok'
def _cleanupOwnership(ob, res, cleanup_children):
'''
If the user name of the owner of the referenced object
is not found in its current user database but is found
in the local user database, this function changes the
ownership of the object to the local database.
'''
try: changed = ob._p_changed
except: changed = 0
owner = getattr(ob, '_owner', None)
if owner:
udb, uid = owner
#res.append('Owner of %s is %s!%s' % (
# join(ob.getPhysicalPath(), '/'), join(udb, '/'), uid,))
root = ob.getPhysicalRoot()
try:
db = root.unrestrictedTraverse(udb, None)
user = db.getUserById(uid)
if hasattr(ob, 'aq_inContextOf'):
ucontext = aq_parent(aq_inner(db))
if not ob.aq_inContextOf(ucontext):
# Not in the right context.
user = None
except:
user = None
if user is None:
# Try to change to a local database.
p = ob
old_udb = udb
udb = None
while p is not None:
if hasattr(p, 'acl_users'):
acl_users = p.acl_users
try:
user = acl_users.getUserById(uid)
except:
user = None
if user is not None:
# Found the right database.
udb = acl_users.getPhysicalPath()[1:]
break
p = aq_parent(aq_inner(p))
if udb is not None:
ob._owner = udb, uid
res.append('Changed ownership of %s from %s!%s to %s!%s' %
(join(ob.getPhysicalPath(), '/'),
join(old_udb, '/'), uid,
join(udb, '/'), uid,))
else:
res.append('Could not fix the ownership of %s, '
'which is set to %s!%s' %
(join(ob.getPhysicalPath(), '/'),
join(old_udb, '/'), uid,))
if cleanup_children:
if hasattr(ob, 'objectValues'):
for subob in ob.objectValues():
_cleanupOwnership(subob, res, 1)
# Deactivate object if possible.
if changed is None: ob._p_deactivate()
return res
class Transmutator:
def __init__(self, source, dest, ignore_path, type_map):
self._source = source
self._dest = dest
self._ignore_path = ignore_path
self._type_map = type_map
self._out = StringIO()
def log(self, message, summary='', severity=0, dup=1):
self._out.write(message)
if dup:
zLOG.LOG('NZOMigration: ',severity,summary,message)
def getMigrationMethod(self, obj):
if hasattr(obj, 'meta_type'):
mt = getattr(obj, 'meta_type')
if callable(mt):
mt = mt()
else:
mt = obj.__class__.__name__
tm = self._type_map
dt = tm.get(mt, None)
if dt is None:
mmt = 'defaultMigrationMethod'
else:
nmt = normalizeMetaType
mmt = '%s2%s' % (nmt(mt), nmt(dt))
obj_url = obj.absolute_url(relative=1)
self.log('Will use %s to migrate %s (%s)\n' % \
(mmt, obj_url, mt))
return getattr(self, mmt, self.defaultMigrationMethod)
def defaultMigrationMethod(self, obj, source, dest):
try:
copy = obj._getCopy(dest)
dest._setObject(obj.getId(), copy)
except:
self.log('Could not copy %s (%s). Possibly duplicate id.\n'\
% (obj.getId(), obj.absolute_url(relative=1)))
return None
return getattr(dest, obj.getId())
def BrokenBecauseProductisGone2BBR(self, obj, source, dest):
self.log('Ignoring Broken Beyond Repair.\n')
return None
def ZWikiPage2ZWikiPage(self, obj, source, dest):
obj = self.defaultMigrationMethod(obj, source, dest)
return obj
def ZCatalog2ZCatalog(self, obj, source, dest):
self.log('Ignoring ZCatalog for now.\n')
return None
def UserFolder2UserFolder(self, obj, source, dest):
self.log('Ignoring User Folder for now.\n')
return None
def MembersFolder2UserFolder(self, obj, source, dest):
self.log('Ignoring Members Folder for now.\n')
return None
def SoftwareProduct2CMFSoftwarePackage(self, obj, source, dest):
self.log('Ignoring Software Product for now.\n')
return None
def ProductRelease2CMFSoftwareRelease(self, obj, source, dest):
self.log('Ignoring Product Release for now.\n')
return None
def Tracker2CMFCollector(self, obj, source, dest):
self.log('Ignoring Tracker for now.\n')
return None
def XMLDocument2XMLDocument(self, obj, source, dest):
self.log('Ignoring XML Document for now.\n')
return None
def BackTalkBook2CMFBackTalkBook(self, obj, source, dest):
dest.invokeFactory(id=obj.getId(), type_name='BackTalk Book')
book = getattr(dest, obj.getId())
for prop in obj.propertyIds():
if prop in ['header', 'summary']:
continue
value = obj.getProperty(prop)
if value is not None:
book._updateProperty(prop, value)
if obj.hasProperty('header'):
book._updateProperty('book_header', obj.getProperty('header'))
if obj.hasProperty('summary'):
book.setDescription(obj.getProperty('summary'))
return book
def BackTalkDocument2CMFBackTalkDocument(self, obj, source, dest):
dest.invokeFactory(id=obj.getId(), type_name='BackTalk Document')
doc = getattr(dest, obj.getId())
for prop in obj.propertyIds():
if prop in ['summary']:
continue
value = obj.getProperty(prop)
if value is not None:
doc._updateProperty(prop, value)
if obj.hasProperty('summary'):
doc.setDescription(obj.getProperty('summary'))
doc.munge(obj.read())
return doc
def File2CMFDefaultFile(self, obj, source, dest):
dest.invokeFactory(id=obj.getId(), type_name='File')
f = getattr(dest, obj.getId())
title = obj.title or ''
content_type = obj.content_type
data, size = f._read_data(obj.data)
f.update_data(data, content_type, size)
f.setTitle(title)
f.setFormat(content_type)
return f
def Image2CMFDefaultImage(self, obj, source, dest):
dest.invokeFactory(id=obj.getId(), type_name='Image')
f = getattr(dest, obj.getId())
title = obj.title or ''
content_type = obj.content_type
data, size = f._read_data(obj.data)
f.update_data(data, content_type, size)
f.setTitle(title)
f.setFormat(content_type)
return f
def NewsItem2CMFDefaultNewsItem(self, obj, source, dest):
dest.invokeFactory(id=obj.getId(), type_name='News Item')
news = getattr(dest, obj.getId())
title = obj.title or ''
text = obj.text
format = obj.format
subject = (type(obj.subject) in [type(''), type([])]) and \
obj.subject or []
subject.extend(obj.NewsItem_topics)
news.setTitle(title)
news.edit(text_format=format, text=text)
news.setSubject(subject)
return news
def Link2CMFDefaultLink(self, obj, source, dest):
dest.invokeFactory(id=obj.getId(), type_name='Link')
link = getattr(dest, obj.getId())
title = obj.title or ''
description = (type(obj.description) == type('')) and \
obj.description or ''
subject = (type(obj.subject) in [type(''), type([])]) and \
obj.subject or []
subject.extend(obj.Link_topics)
remote_url = obj.remote_url
zope_powered = obj.zope_powered or 0
link.setTitle(title)
link.setDescription(description)
link.edit(remote_url=remote_url)
link.setSubject(subject)
link.manage_addProperty('zope_powered', zope_powered, 'boolean')
return link
def FormattedDocument2CMFDefaultDocument(self, obj, source, dest):
dest.invokeFactory(id=obj.getId(), type_name='Document')
doc = getattr(dest, obj.getId())
title = obj.title or ''
description = (type(obj.description) == type('')) and \
obj.description or ''
text = obj.content
format = hasattr(obj, 'format') and obj.format or 'structured-text'
subject = (type(obj.subject) in [type(''), type([])]) and \
obj.subject or []
doc.setTitle(title)
doc.setDescription(description)
doc.edit(text_format=format, text=text)
doc.setSubject(subject)
return doc
STXDocument2CMFDefaultDocument = FormattedDocument2CMFDefaultDocument
HTMLDocument2CMFDefaultDocument = FormattedDocument2CMFDefaultDocument
def Tip2ZopeOrgTip(self, obj, source, dest):
dest.invokeFactory(id=obj.getId(), type_name='Tip')
tip = getattr(dest, obj.getId())
title = obj.tip or ''
description = obj.details or ''
text = obj.content
format = obj.format
subject = (type(obj.subject) in [type(''), type([])]) and \
obj.subject or []
subject.extend(obj.tip_topics)
tip.setTitle(title)
tip.setDescription(description)
tip.edit(text_format=format, text=text)
tip.setSubject(subject)
return tip
def HowTo2ZopeOrgHowTo(self, obj, source, dest):
dest.invokeFactory(id=obj.getId(), type_name='HowTo')
howto = getattr(dest, obj.getId())
title = obj.title or ''
text = obj.content
format = obj.format
subject = (type(obj.subject) in [type(''), type([])]) and \
obj.subject or []
subject.extend(obj.HowTo_topics)
howto.setTitle(title)
howto.edit(text_format=format, text=text)
howto.setSubject(subject)
return howto
def Folder2CMFPortalFolder(self, obj, source, dest):
if dest is None or obj is None: return None
try:
dest.manage_addFolder(obj.getId(), obj.title)
except: pass
new = getattr(dest, obj.getId(), None)
if new is not None and hasattr(new, 'portal_type') and\
new.portal_type == "Folder":
return new
return None
def BTreeFolder2CMFBTreeFolder(self, obj, source, dest):
if dest is None or obj is None: return None
try:
bt2 = dest.manage_addProduct['BTreeFolder2']
bt2.manage_addCMFBTreeFolder(obj.getId(), obj.title)
except: pass
new = getattr(dest, obj.getId(), None)
if new is not None and hasattr(new, 'portal_type') and\
new.portal_type == "Folder":
return new
return None
def WikiMethodsCleanup(self, obj, ignore_path):
obj_ids = []
if hasattr(obj, 'keys'):
obj_ids = obj.keys()
elif hasattr(obj, 'objectIds'):
obj_ids = obj.objectIds()
if obj_ids:
if 'FrontPage' in obj_ids or \
'standard_wiki_header' in obj_ids or \
'standard_wiki_page' in obj_ids:
obj_path = obj.getPhysicalPath()
for dm in ('standard_wiki_header', 'standard_wiki_footer',
'editform', 'commentform', 'backlinks',
'advancedform', 'pagehistory', 'search',
'standard_wiki_page', 'title_prefix'):
# We can possibly remove those later also
# 'AllPages', 'BackLinks', 'HelpPage'
# 'HowDoIEdit', 'HowDoINavigate', 'JumpTo',
# 'RecentChanges', 'RegulatingYourPages',
# 'RemoteWikiLinks', 'RemoteWikiURL',
# 'SandBox', 'SearchPage', 'StructuredText',
# 'StructuredTextExamples', 'StructuredTextRules',
# 'TextFormattingRiges', 'VisitorLog', 'WikiName',
# 'WikiWikiWeb', 'ZWiki', 'ZWikiLinks', 'ZWikiWeb'
if dm in obj_ids:
ignore_path.append(obj_path + (dm,))
return ignore_path
def DefaultDTMLCleanup(self, obj, ignore_path):
if hasattr(obj, 'keys'):
obj_ids = obj.keys()
elif hasattr(obj, 'objectIds'):
obj_ids = obj.objectIds()
if obj_ids:
if 'index_html' in obj_ids and \
obj.index_html.meta_type in ['DTML Method', 'DTML Document'] :
obj_path = obj.getPhysicalPath()
for dm in ('index_html', 'local_nav', 'page_title',
'standard_html_footer', 'standard_html_header',
'local_css', 'custom_html_header',
'custom_html_footer', 'local_exit'):
if dm in obj_ids:
ignore_path.append(obj_path + (dm,))
return ignore_path
def fixOwnership(self, orig, new):
owner = getattr(orig, '_owner', None)
if owner is not None:
try:
# Retain ownership.
new._owner = owner
except: pass
return new
def fixModificationDate(self, orig, new):
mod_date = orig.bobobase_modification_time().ISO()
if hasattr(aq_base(new), 'setModificationDate'):
new.setModificationDate(mod_date)
return new
def run(self):
source = self._source
dest = self._dest
ignore = self._ignore_path
if hasattr(source, 'objectIds') and \
hasattr(source, 'getPhysicalPath'):
path = source.getPhysicalPath()
oids = [o for o in source.objectIds() \
if (path + (o, )) not in ignore]
oids.sort()
for oid in oids:
obj = getattr(source, oid)
try:
# We may want to catch POSKeyErrors early,
# so lets prod the object and see what happens
obj_url = obj.absolute_url(relative=1)
except POSKeyError:
# Skip this object
self.log('POSKeyError while trying to access' +\
' %s.\n' % '/'.join(path + (oid, )))
continue
mm = self.getMigrationMethod(obj)
try:
new_obj = mm(obj, source, dest)
except: new_obj = None
if new_obj is None or not new_obj:
self.log('Result of conversion of' +\
' %s: Failed.\n' % obj.absolute_url(relative=1))
else:
new_obj = self.fixOwnership(obj, new_obj)
new_obj = self.fixModificationDate(obj, new_obj)
ct = getToolByName(new_obj, 'portal_catalog', None)
if ct is not None:
try: ct.reindexObject(obj)
except: pass
self.log('Result of conversion of' +\
' %s: Success.\n' % obj.absolute_url(relative=1))
if hasattr(obj, 'objectIds') and new_obj is not None:
ignore = self.WikiMethodsCleanup(obj, ignore)
ignore = self.DefaultDTMLCleanup(obj, ignore)
get_transaction().commit(1)
self.log(Transmutator(obj, new_obj, \
ignore, self._type_map).run(), dup=0)
get_transaction().commit()
return 'Ok.\n' or self._out.getvalue()
if __name__ == '__main__':
print 'ok'
More information about the zopeorg-checkins
mailing list