[zopeorg-checkins] CVS: Products/Extensions - Catalog.py:1.1 DocSurvey.py:1.1 Expander.py:1.1 Expander.sh:1.1 ExportToLDAP.py:1.1 Hacks.py:1.1 MemberCleanout.py:1.1 MemberInfo.py:1.1 Membership.py:1.1 MyCatalog.py:1.1 Project.py:1.1 ProjectLib.py:1.1 TrackerMethods.py:1.1 ZopeSite.py:1.1 commit_version.py:1.1 countDoubles.py:1.1 fixfiles.py:1.1 fixids.py:1.1 getcwd.py:1.1 move_news.py:1.1 ownerous.py:1.1 plaintext.py:1.1 rconsole.py:1.1 rfc1123.py:1.1 rungc.py:1.1 set_title.py:1.1 sort_link.py:1.1 sort_link.py.old:1.1 test.py:1.1 which_node.py:1.1 wikimnt.py:1.1
Sidnei da Silva
sidnei at x3ng.com.br
Fri May 30 12:19:37 EDT 2003
Update of /cvs-zopeorg/Products/Extensions
In directory cvs.zope.org:/tmp/cvs-serv29717/Extensions
Added Files:
Catalog.py DocSurvey.py Expander.py Expander.sh
ExportToLDAP.py Hacks.py MemberCleanout.py MemberInfo.py
Membership.py MyCatalog.py Project.py ProjectLib.py
TrackerMethods.py ZopeSite.py commit_version.py
countDoubles.py fixfiles.py fixids.py getcwd.py move_news.py
ownerous.py plaintext.py rconsole.py rfc1123.py rungc.py
set_title.py sort_link.py sort_link.py.old test.py
which_node.py wikimnt.py
Log Message:
Add external methods from old zope.org
=== Added File Products/Extensions/Catalog.py ===
"""
Catalog management
"""
import string, pdb
def catalogedIn(self, catalog):
"""
Am I cataloged in a given catalog?
"""
return catalog._catalog.uids.has_key(self.url())
manageCatalog_request_letter = """
%(username)s requests that an object be cataloged.
View it:
%(url)s
Catalog it
%(url)s/manageCatalogForm
Make sure to be logged in as superuser to
perform catalog entry adding.
-------------------------------------------------------------------------------
Manage the items in the reviewing queue at http://www.zope.org/UnreviewedIndex/
%(queue)s
"""
def manageCatalog(self, submit, REQUEST):
"""
Change catalog entry for an object.
"""
path=self.url()
catalog=self.SiteIndex
user=REQUEST.AUTHENTICATED_USER
if not (user.has_role('Owner', self) or
user.has_role('Manager', self) or
user.has_role('Reviewer', self)):
return self.manageCatalogForm(self, REQUEST,
manage_tabs_message='Cannot change catalog entry. You must be the Owner of this object.')
if submit==' Add Entry ':
if user.has_role('Contributor', self) or \
user.has_role('Reviewer', self) or \
user.has_role('Manager', self):
self.unindex_object(self, 'UnreviewedIndex')
self.index_object(self, 'SiteIndex')
if not user.has_role('Owner', self):
try: notify_cataloged(self, REQUEST)
except: pass
elif submit==' Update Entry ':
if user.has_role('Contributor', self) or catalog._catalog.uids.has_key(path):
self.index_object(self)
elif submit==' Remove Entry ':
if user.has_role('Contributor', self) or catalog._catalog.uids.has_key(path):
self.unindex_object(self)
elif submit==' Request Entry ':
unreviewedQueue = self.UnreviewedIndex.reportUnreviewedQueue()
self.index_object(self, 'UnreviewedIndex')
self.Utilities.mail_reviewers(self, REQUEST,
subject="Catalog Request",
body=manageCatalog_request_letter % {
'fullname': user.full_name,
'email': user.email,
'username': user.getUserName(),
'url': self.absolute_url(),
'queue': unreviewedQueue
}
)
return self.manageCatalogForm(self, REQUEST, manage_tabs_message='Your request has been submitted.')
return self.manageCatalogForm(self, REQUEST, manage_tabs_message='Catalog entry changed.')
def notify_cataloged(self, REQUEST):
message = """\
From: "%(manager)s" <%(manageremail)s>
To: "%(user)s" <%(useremail)s>
Subject: [Zope Site] Object cataloged
%(manager)s has cataloged your object at:
%(url)s
"""
dict = { 'manager':REQUEST.AUTHENTICATED_USER.full_name,
'manageremail':REQUEST.AUTHENTICATED_USER.email,
'user': self.acl_users.getUser(self.creator()).full_name,
'useremail': self.acl_users.getUser(self.creator()).email,
'url': self.absolute_url()
}
self.MailHost.send(message % dict)
def reportUnreviewedQueue(self, REQUEST=None):
if not REQUEST: REQUEST = self.REQUEST
queue = self.searchResults()
if not queue: return 'There are currently no other items awaiting review.'
report = 'Total items awaiting review: %s' % (len(queue) + 1)
report = '%s\n\nOther items still waiting:\n' % report
count = 0
for entry in queue:
count = count + 1
rec_url = '%s/%s' % (REQUEST.BASE0, self.getpath(entry.data_record_id_))
report = '%s%s: %s\n' % (report, count, rec_url)
return report
=== Added File Products/Extensions/DocSurvey.py ===
file="/usr/local/dc/www.zope.org/var/docsurvey.data"
import marshal
import os.path
import string
import fcntl
import FCNTL
def process(REQUEST):
"""
process survey results
"""
d={}
for i in range(1,13) + ['comments']:
k=str(i)
if REQUEST.has_key(k):
d[k]=REQUEST[k]
try:
d[k]=string.atoi(d[k])
except:
pass
if not os.path.exists(file):
results={}
for i in range(1,13) + ['comments']:
results[str(i)]=[]
else:
f=open(file,'rb')
fcntl.flock(f.fileno(), FCNTL.LOCK_SH)
results=marshal.load(f)
fcntl.flock(f.fileno(), FCNTL.LOCK_UN)
f.close()
for k,v in d.items():
results[k].append(v)
f=open(file,'wb')
fcntl.flock(f.fileno(), FCNTL.LOCK_EX)
marshal.dump(results,f)
fcntl.flock(f.fileno(), FCNTL.LOCK_UN)
f.close()
def results():
"""
survey results
"""
f=open(file,'rb')
fcntl.flock(f.fileno(), FCNTL.LOCK_SH)
return marshal.load(f)
fcntl.flock(f.fileno(), FCNTL.LOCK_UN)
f.close()
def averages():
"""
average scores for survey results
"""
d=results()
del d['comments']
averages={}
for k,v in d.items():
if len(v)==0:
averages[k]='n/a'
else:
total=0
for i in v:
total=total+i
averages[k]=total/float(len(v))
return averages
def table():
"""
A table showing responses to question 1
"""
out="""<table><tr align="center" valign="bottom">"""
r=results()['1']
d={1:0,2:0,3:0,4:0,5:0}
for v in r:
d[v]=d[v]+1
scale =200.0/max(d.values())
for i in range(1,6):
out=out+"""<td>
<table width="70" height="200" cellspacing="0" cellpadding="0" border="0">"""
out=out+"""
<tr><td width="70" height="%s"> </td></tr>""" % (200 - int(round(scale*d[i])),)
out=out+"""
<tr><td bgcolor="#DDDDFF" width="70" height="%s"><font size="1"> </font></td></tr>""" % int(round(scale * d[i]))
out=out+"""</table> (%s) </td>""" % d[i]
out=out+"""</tr><tr><td colspan="2"><font size="-1">Very useful<br>and effective</font></td>
<td></td><td colspan="2" align="right"><font size="-1">Very unhelpful<br>and ineffective</font></td></tr></table>"""
return out
=== Added File Products/Extensions/Expander.py ===
import os
def Expander():
os.system('/usr/local/dc/org/Extensions/Expander.sh')
=== Added File Products/Extensions/Expander.sh ===
#!/bin/sh
cd /usr/local/dc/apache/htdocs/Browsable
for i in $(ncftpls -u mindlace -p 6vSuG -1 \
ftp://www.zope.org:1321/Documentation/Developer/ZopeSrcDocs/ \
| grep '.tar.gz'); do
ncftpget -u mindlace -p 6vSuG -P 1321 www.zope.org ./ /Documentation/Developer/ZopeSrcDocs/$i
tar xzf $i
done
=== Added File Products/Extensions/ExportToLDAP.py ===
# Export the user folder data to an LDAP importfile, to be downloaded through
# the TTW interface.
#
# We map the user properties as follows:
#
# Property LDAP field
#
# name cn
# email mail
# full_name gn and sn (split on first occuring space)
# company o
# public_email public
#
# last_visit and prev_visit are not exported.
#
# We first write to a temp file, then write the contents of that file to the
# broswer using RESPONSE.write
import string
import tempfile, thread, sha, base64, binascii
import re
import time
LDAP_MEMBER_ENTRY = (
('objectClass', 'top'),
('objectClass', 'zopeOrgPerson'),
('cn', 'id'),
('sn', 'last_name'),
('gn', 'first_name'),
('o', 'company'),
('mail', 'email'),
('public', 'public'),
('lastLogin', 'last_visit'),
('prevLogin', 'prev_visit'),
('userPassword', 'password'),
)
LDAP_STRUCTURAL_ENTRIES = """dn: dc=zope,dc=org
objectClass: top
objectClass: dcObject
dc: zope
dn: ou=people,dc=zope,dc=org
objectClass: top
objectClass: organizationalUnit
ou: people
description: All user records are stored here
dn: ou=groups,dc=zope,dc=org
objectClass: top
objectClass: organizationalUnit
ou: groups
description: All groupings are stored underneath this unit
dn: ou=websites,ou=groups,dc=zope,dc=org
objectClass: top
objectClass: organizationalUnit
ou: websites
description: All website membership groups are underneath this unit
dn: ou=maillists,ou=groups,dc=zope,dc=org
objectClass: top
objectClass: organizationalUnit
ou: maillists
description: All mailing list membership groups go here
dn: ou=www.zope.org,ou=websites,ou=groups,dc=zope,dc=org
objectClass: top
objectClass: organizationalUnit
ou: www.zope.org
description: membership groups for www.zope.org
"""
LDAP_ROLE_ENTRY = (
('objectClass', 'top'),
('objectClass', 'groupOfUniqueNames'),
('cn', 'role'),
)
# LDAP safe-string, any attribute value matching this RE will not have to be
# base64 encoded.
safeString = re.compile(
# Start with any ascii < 127 except NUL, LF, CR, SPACE, ':' or '<'.
'[\x01-\x09\x0B-\x0C\x0E-\x1F\x21-\x39\x3B\x3D-\x7F]'
# Followed by any ascii < 127 except NUL, LF or CR.
'[\x01-\x09\x0B-\x0C\x0E-\x7F]*'
# This must match the whole value
'$'
)
def encryptPassword(password):
return '{SHA}%s' % binascii.b2a_base64(sha.new(password).digest())[:-1]
def formatGeneralizedTime(dt):
try:
return dt.strftime('%Y%m%d%H%M%SZ')
except:
return time.strftime('%Y%m%d%H%M%SZ', time.localtime(time.time()))
def createAttrVal(attr, val, commaAllowed=0):
if not val: return '' # Empty values should not be included.
# Escape commas; unless it is explicetly allowed (dn).
if not commaAllowed and ',' in str(val):
val = string.join(string.split(val, ','), '\,')
if safeString.match(val):
# The value doesn't need to be base64 encoded
return '%s: %s\n' % (attr, val)
else:
val = base64.encodestring(val)
# Multiline base64 values are okay, just make sure there is an extra
# space at the start of all the lines.
if '\n' in val[:-1]:
val = string.join(string.split(val[:-1], '\n'), '\n ') + '\n'
return '%s:: %s' % (attr, val)
def createCnValue(Cn, template):
if ',' in Cn:
Cn = string.join(string.split(Cn, ','), '\,')
return template % Cn
def createMemberDistinguishedName(id):
value = createCnValue(id, 'cn=%s,ou=people,dc=zope,dc=org')
return createAttrVal('dn', value, commaAllowed=1)
def createRoleDistinguishedName(role):
value = createCnValue(role,
'cn=%s,ou=www.zope.org,ou=websites,ou=groups,dc=zope,dc=org')
return createAttrVal('dn', value, commaAllowed=1)
def createUniqueMember(id):
value = createCnValue(id, 'cn=%s,ou=people,dc=zope,dc=org')
return createAttrVal('uniqueMember', value, commaAllowed=1)
def exportToLDAP(self, max=0):
temp = tempfile.TemporaryFile(suffix='LDAPExport%s' % thread.get_ident())
# Write the LDIF version number
#temp.write('version: 1\n')
# Write the structural entries for the directory at the top
temp.write(string.strip(LDAP_STRUCTURAL_ENTRIES))
users = self.acl_users.getUsers()
count = 0
roles = {}
for user in users:
parts = string.split(user.full_name, ' ', 1)
first_name = parts[0]
if len(parts) > 1:
last_name = parts[1]
else:
last_name = ''
# Force public to be 1 or 0 (strings)
public = user.public_email and '1' or '0'
# Build record.
data = {
'top': 'top',
'zopeOrgPerson': 'zopeOrgPerson',
'id': user.name,
'last_name': last_name,
'first_name': first_name,
'company': user.company,
'email': user.email,
'public': public,
'last_visit': formatGeneralizedTime(user.last_visit),
'prev_visit': formatGeneralizedTime(user.prev_visit),
'password': encryptPassword(user._pw_)
}
entry = createMemberDistinguishedName(user.name)
for attr, key in LDAP_MEMBER_ENTRY:
entry = '%s%s' % (entry, createAttrVal(attr, data[key]))
# Add extra newline to delimit records.
if entry: entry = '\n%s' % entry
temp.write(entry)
# Collect roles information.
for role in user.roles:
if not roles.has_key(role):
roles[role] = []
roles[role].append(user.name)
# For testing purposes we can limit the number of records returned by
# adding ?max:int=<number> to the URL.
if max:
count = count + 1
if count >= max: break
# Process the collected roles info
for role, members in roles.items():
data = {
'top': 'top',
'groupOfUniqueNames': 'groupOfUniqueNames',
'role': role,
}
entry = createRoleDistinguishedName(role)
for attr, key in LDAP_ROLE_ENTRY:
entry = '%s%s' % (entry, createAttrVal(attr, data[key]))
for member in members:
entry = '%s%s' % (entry, createUniqueMember(member))
# Add extra newline to delimit records.
if entry: entry = '\n%s' % entry
temp.write(entry)
# Write collected data to browser
length = temp.tell()
temp.seek(0)
resp = self.REQUEST.RESPONSE
resp.setHeader('content-type', 'text/directory')
resp.setHeader('content-length', length)
while 1:
buffer = temp.read(1024)
if not buffer: break
resp.write(buffer)
temp.close()
=== Added File Products/Extensions/Hacks.py ===
def convert(self, REQUEST):
'''dock string'''
for x in self.SiteIndex._catalog.paths.values():
obj = REQUEST.resolve_url('http://www.zope.org:18200/' + x)
if obj is None:
raise 'DumnAssError'
self.Foo._catalog.catalogObject(obj, x)
=== Added File Products/Extensions/MemberCleanout.py ===
# External methods used to clean out old Members (no login for ore that 60 days)
import tempfile, thread
from DateTime import DateTime
from cPickle import Pickler
# Determine if a member is stale; hasn't logged in for a number of days.
def memberIsStale(context, member, staleDate=(DateTime()-90)):
if member.last_visit.lessThan(staleDate):
hasContent = context.SiteIndex(creator=member.getUserName())
hasContent = len(filter(lambda o: o.url[:7] == 'Members', hasContent))
return not hasContent
return 0
# Export a pickle file with member data of stale members to generate warning
# emails.
def exportStaleMembers(self, max=0):
stale = []
add = stale.append
users = self.acl_users.getUsers()
for user in users:
if memberIsStale(self, user):
add((user.name, user.full_name, user.email, user.last_visit))
# Write collected data to browser
temp = tempfile.TemporaryFile(suffix='StaleMembers%s' % thread.get_ident())
pout = Pickler(temp, 1) # Binary pickles please (saves a *lot* of bandwidth)
pout.dump(stale)
length = temp.tell()
temp.seek(0)
resp = self.REQUEST.RESPONSE
resp.setHeader('content-type', 'text/directory')
resp.setHeader('content-length', length)
while 1:
buffer = temp.read(256)
if not buffer: break
resp.write(buffer)
temp.close()
# Remove still existing domains fields
def cleanDomainsCruft(self):
users = self.acl_users.getUsers()
count = 0
for user in users:
if user.domains:
self.acl_users.changeUser(user.name, user._pw_, user.roles, ())
count = count + 1
return '%s domains fields cleared out.' % count
# Remove all accounts on "the date", which is 7 days after the warning mails
# have gone out.
now = DateTime()
today = DateTime(now.Date())
theDate = DateTime('2001/04/12')
daysTillTheDate = theDate - today
staleDate = now - (90 + (7 - daysTillTheDate))
def deleteStaleMembers(self):
# Only on or before the set date.
if daysTillTheDate < 0:
return (
'The date (%s) has passed. Please reconfigure the External '
'Method.' % theDate.Date())
stale = []
add = stale.append
users = self.acl_users.getUsers()
total = len(users) # Store before removal of users..
for user in users:
if memberIsStale(self, user, staleDate=staleDate):
add(user.name)
totalRemove = len(stale) # Store before removal of users..
# If not yet the date, only return a count.
if daysTillTheDate:
return (
"Today, we'd be removing %s users out of %s total Members, "
"leaving %s active Members." % (
totalRemove, total, total - totalRemove))
else:
self.acl_users.delUser(stale)
return (
"Removed %s users out of %s total Members, "
"leaving %s active Members." % (
totalRemove, total, total - totalRemove))
=== Added File Products/Extensions/MemberInfo.py ===
###
# Methods for collecting and caching info on the
# Member Folders
#
# MJ
###
from time import time
from string import find, upper
# Return info hash on Member Folders
# Create cash if needed.
def memberInfo(self, max_cache_time=60*60, max_active_age=2*7*24*60*60):
try:
if hasattr(self, '_v_meminfo_cache'):
info=self._v_meminfo_cache
if time()-info['timestamp']<=max_cache_time:
return info
except: pass
# cache empty or expired
info=self._v_meminfo_cache={}
info['timestamp']=time()
# Member folder statistics and listed members list
allFolders=self.objectValues(['Folder'])
listedMembers=[]
for folder in allFolders:
if hasattr(folder, 'listed') and folder.listed:
listedMembers.append((folder.id, folder.title))
info['number_of_members']=len(allFolders)
listedMembers.sort(lambda a,b: cmp(a[0], b[0]))
info['listed_members']=tuple(listedMembers)
# Active members
allMembers=self.acl_users.data.values()
activeMembers=0
for member in allMembers:
if time()-member.last_visit.timeTime()<=max_active_age:
activeMembers=activeMembers+1
info['active_members']=activeMembers
return info
def filterMembers(self, member_filter):
info=self.memberInfo()
listed=info['listed_members']
member_filter=upper(member_filter)
return filter(lambda m, f=member_filter: find(upper(m[0]), f)!=-1, listed)
=== Added File Products/Extensions/Membership.py ===
# This module implements various external methods used
# by our Membership and registration objects.
import sys, os, string, time, random, re
from DateTime import DateTime
from DocumentTemplate import HTML
from App.Dialogs import MessageDialog
# Registration methods
# --------------------
def added_props(self):
""" """
stdprops=('name','email','last_visit','prev_visit','full_name','company')
c=self.Control_Panel.Products.ZopeSite.MemberFolder
c=c.propertysheets.methods._getOb('Member')
ps=c.propertysheets.common.CommonProperties
ids=ps.propertyIds()
set=[]
for id in ids:
if not (id in stdprops):
set.append(id)
return set
chars=string.letters+string.digits
# remove characters that cause confusion in certain fonts
# this will cut down on support for 'wrong' passwords. MJ
chars=string.translate(chars,string.maketrans('',''),'iIloO01')
def gen_password(choose=random.choice, chars=chars, join=string.join):
result=[]
for n in range(5):
result.append(choose(chars))
return join(result, '')
def nameUsed(name, names, lower=string.lower):
return lower(name) in map(lower, names)
def register(self, username='', full_name='', company='', email='',
REQUEST=None):
""" """
register_html=getattr(self, 'register.html')
if not username or not email:
return register_html(self, REQUEST,
error=('You must enter a value for Name and a valid Email '
'Address.'))
disapproval = username_policy(username)
if disapproval:
return register_html(self, REQUEST,
error=(disapproval))
if nameUsed(username, self.acl_users.data.keys()):
return register_html(self, REQUEST,
error=('The login name you selected is already in use. Please '
'choose another.'))
password = REQUEST.get('password', gen_password())
disapproval = password_policy(password)
if disapproval:
return register_html(self, REQUEST,
error=(disapproval))
confirm = REQUEST.get('confirm', password)
if confirm != password:
return register_html(self, REQUEST,
error=('Your password and confirmation did not match. '
'Please try again.'))
# Add the member
self.acl_users.addUser(username, password, ['Member'], [])
user=self.acl_users.getUser(username)
for ps in user.propertysheets:
ps.manage_changeProperties(REQUEST)
# Here we add a new Folder for this user in Members,
# giving the user the local role 'Owner' in his Folder.
try:
members=self.Members
except AttributeError:
members = None
if members is not None:
members.manage_addFolder(username)
folder=members[username]
if REQUEST:
name=REQUEST['AUTHENTICATED_USER'].getUserName()
folder.manage_delLocalRoles([name])
folder.manage_setLocalRoles(username, ['Owner'])
# Generate email notification
self.notify(self, REQUEST, password=password)
return getattr(self, 'success.html')(self, REQUEST)
def password_policy(candidate):
"""Implement here the password characteristics policy for your site.
Returns an error message on failure, or None if all is ok."""
if len(candidate) < 5:
return 'Your password must contain at least 5 characters.'
def username_policy(candidate,
username_match=re.compile('^[-_a-zA-Z0-9]+$').match):
"""Check for illegal characters in usernames"""
if not username_match(candidate):
return "You can only use letters, digits, dashes and underscores" +\
" in your username. Please choose another name."
def updateMember(self, REQUEST):
""" """
user=REQUEST['AUTHENTICATED_USER']
name=user.getUserName()
if hasattr(user, 'propertysheets'):
for ps in user.propertysheets:
ps.manage_changeProperties(REQUEST)
return self.update_html(self, REQUEST,
message='Your changes have been saved.')
else:
return self.update_html(self, REQUEST,
message='Cannot set non-member %s' % user)
# MemberFolder methods
# --------------------
def fixup_user(user):
if hasattr(user, 'last_visit'):
try:
if not user.last_visit.isCurrentDay():
user.prev_visit=user.last_visit
user.last_visit=DateTime().earliestTime()
get_transaction().commit()
except:
pass # don't screw login if in read-only mode
return user
def addMemberFolder(self, REQUEST=None):
if hasattr(self.aq_base, 'acl_users'):
return MessageDialog(
title ='Item Exists',
message='This object already contains a User Folder',
action ='%s/manage_main' % REQUEST['URL1'])
dest=self.Destination()
ob=self.MemberFolder() #'acl_users', REQUEST)
dest._setObject('acl_users', ob)
dest.__allow_groups__=dest.acl_users
if REQUEST: return dest.manage_main(dest, REQUEST, update_menu=1)
def addUser(self, name, password, roles, domains, REQUEST=None):
""" """
# zclass=self.Control_Panel.Products.ZopeSite.MemberFolder.Member._zclass_
user=self.Member(name, password, roles, domains)
user.id=name
self.data[name]=user
if REQUEST is not None:
return self.manage_main(self, REQUEST=REQUEST)
def getUser(self, name):
""" """
if self.data.has_key(name):
user=self.data[name].__of__(self)
# user=fixup_user(user) Only on login!!
# Placing this here wil cause *all* access to the user (like checking
# for when the last visit was) to update the last_visit property.
return user
return None
def changeUser_Form(self, names, REQUEST=None):
""" """
name = names[0]
user=self.data[name].__of__(self)
return self.changeUserForm(self, REQUEST=REQUEST, user=user,
password=user._getPassword())
def changeUser(self, name, password, roles, domains, REQUEST=None):
""" """
user=self.data[name]
user._pw_=password
user.roles=roles
user.domains=domains
if REQUEST is not None:
return self.changeUser_Form(self, names=(user,), REQUEST=REQUEST)
def changeUserPassword(self, name, password, domains, REQUEST=None):
""" """
user=self.data[name]
user._pw_=password
user.domains=domains
if REQUEST is not None:
return self.manage_main(self, REQUEST=REQUEST)
def delUser(self, names, REQUEST=None):
""" """
# Delete and clean up after a user
try:
members=self.Members
except AttributeError:
members = None
for name in names:
if self.data.has_key(name):
del self.data[name]
if (members is not None) and hasattr(members.aq_base, name):
members._delObject(name)
if REQUEST is not None:
return self.manage_main(self, REQUEST=REQUEST)
def getUserNames(self):
"""Return a list of usernames"""
# convert to a list (data is a BTree)
names=list(self.data.keys())
names.sort()
return names
def getUsers(self):
"""Return a list of user objects"""
names=self.getUserNames()
data=self.data
users=[]
f=users.append
for n in names:
f(data[n])
return users
def renameUsers(self, names=[], new_names=[], REQUEST=None):
"""Rename users"""
if len(names) != len(new_names):
raise 'Bad Request','Please rename each listed user.'
try: members = self.Members
except AttributeError:
members = None
data = self.data
si = self.SiteIndex
ur = self.UnreviewedIndex
for i in range(len(names)):
name, new_name = names[i], new_names[i]
if name != new_name:
if nameUsed(new_name, data.keys()):
raise 'Rename Error', MessageDialog(
title='Invalid User Id',
message=sys.exc_info()[1],
action ='manage_main')
user = data[name]
user.name = new_name
data[new_name] = user
del data[name]
# Store objects that have been catalogued
indexed = si(creator=name)
indexed = map(lambda o, go=si.getobject: go(o.data_record_id_),
indexed)
unreviewed = ur(creator=name)
unreviewed = map(lambda o, go=ur.getobject: go(o.data_record_id_),
unreviewed)
# Rename the Member Folder
if members is not None and hasattr(members.aq_base, name):
memberFolder = getattr(members, name)
fixupLocalRolesAndOwnership(memberFolder, name, new_name,
user.__of__(self))
members.manage_renameObject(name, new_name)
# Fix catalogued stuff
for ob in indexed:
try:
ob.unindex_object(ob, 'SiteIndex')
except:
ob.unindex_object('SiteIndex')
fixupLocalRolesAndOwnership(ob, name, new_name,
user.__of__(self))
try:
ob.index_object(ob, 'SiteIndex')
except:
ob.index_object('SiteIndex')
for ob in unreviewed:
try:
ob.unindex_object(ob, 'UnreviewedIndex')
except:
ob.unindex_object('UnreviewedIndex')
fixupLocalRolesAndOwnership(ob, name, new_name,
user.__of__(self))
try:
ob.index_object(ob, 'UnreviewedIndex')
except:
ob.index_object('UnreviewedIndex')
if REQUEST is not None:
return self.manage_main(self, REQUEST)
return None
# Utility methods
# ---------------
def fixupLocalRolesAndOwnership(base, oldUser, newUser, user):
if hasattr(base.aq_base, 'owner_info'):
info = base.owner_info()
if info and info['explicit'] and info['id'] == oldUser:
base.changeOwnership(user, recursive=1)
if hasattr(base.aq_base, 'get_local_roles_for_userid'):
roles = base.get_local_roles_for_userid(oldUser)
if roles:
base.manage_delLocalRoles([oldUser])
base.manage_setLocalRoles(newUser, list(roles))
if hasattr(base.aq_base, 'objectValues'):
for ob in base.objectValues():
fixupLocalRolesAndOwnership(ob, oldUser, newUser, user)
def mail_password(self, forgotten_username, REQUEST):
"""Email a forgotten password to a member"""
user = self.acl_users.getUser(forgotten_username)
if user is None:
return MessageDialog(
title ='Username not found',
message='The username you entered could not be found.',
action ='mail_password_form')
if not user.email:
return MessageDialog(
title ='User has no email address',
message='This user has no email address and cannot be mailed a '
'password. Please file an isssue in the <a href="/Tracker">'
'Zope.org Tracker</a> for assistance.',
action ='mail_password_form')
return self.mail_password_template(self, REQUEST,
full_name=user.full_name, email=user.email, password=user._getPassword())
=== Added File Products/Extensions/MyCatalog.py ===
def myUpdate(self, submit, REQUEST, urls=None):
"""
Update my Catalog entries
"""
user=REQUEST.AUTHENTICATED_USER
n=0
if urls is None: urls=[]
if submit == " Update Selected ":
for url in urls:
ob=self.resolve_url(url, REQUEST)
if ob and user.has_role('Owner', ob):
self.catalog_object(ob, url)
n=n+1
message='Updated %d objects.' % n
elif submit == " Remove Selected ":
for url in urls:
ob=self.resolve_url(url, REQUEST)
if ob and user.has_role('Owner', ob):
self.uncatalog_object(url)
n=n+1
message='Removed %d objects.' % n
elif submit == " Update All ":
for record in self.searchResults(creator=user.getUserName()):
url=self.getpath(record.data_record_id_)
ob=self.resolve_url(url, REQUEST)
if ob and user.has_role('Owner', ob):
self.catalog_object(ob, url)
n=n+1
message='Updated %d objects.' % n
elif submit == " Remove All ":
for record in self.searchResults(creator=user.getUserName()):
url=self.getpath(record.data_record_id_)
ob=self.resolve_url(url, REQUEST)
if ob and user.has_role('Owner', ob):
self.uncatalog_object(url)
n=n+1
message='Removed %d objects.' % n
return self.myObjects(self, REQUEST,
message=message)
=== Added File Products/Extensions/Project.py ===
import sys, os, string, time, random, urllib
from DateTime import DateTime
def sort_key(self):
if hasattr(self.aq_parent.aq_base, 'sort_key_'):
return str(self.sort_key_)
return self.id
def addProject(self, id, title='', REQUEST=None):
""" """
dest=self.Destination()
ob=self.Project(id)
ob.id=id
ob.title=title
dest._setObject(id, ob)
ob=getattr(dest, id)
k=0
for n in ('Inception','Elaboration','Construction','Transition'):
p=ob.Phase(n)
p.id=n
p.sort_key_=k
k=k+1
ob._setObject(n, p)
p=getattr(ob, n)
addIteration(p, 'Iteration_1')
ob.Inception.status='Active'
ob.Inception.Iteration_1.status='Active'
# date=DateTime()
# ds=REQUEST.get('date_override','')
# if ds: date=DateTime(ds)
# else: date=DateTime()
# REQUEST['date']=date
# ob.propertysheets.Findables.manage_changeProperties(REQUEST)
# ob.index_object()
if REQUEST:
try: url=self.DestinationURL()
except: url=REQUEST['URL1']
REQUEST.RESPONSE.redirect(url+'/manage_workspace')
def addIteration(self, id, title='', REQUEST=None):
""" """
ob=self.Iteration(id)
ob.id=id
ob.title=title
self._setObject(id, ob)
if REQUEST:
try: url=self.DestinationURL()
except: url=REQUEST['URL1']
REQUEST.RESPONSE.redirect(url+'/manage_workspace')
def addTask(self, id, title='', REQUEST=None):
""" """
ob=self.Task(id)
ob.id=id
ob.title=title
self._setObject(id, ob)
if REQUEST:
try: url=self.DestinationURL()
except: url=REQUEST['URL1']
REQUEST.RESPONSE.redirect(url+'/manage_workspace')
# Findable object methods. These are methods common to objects
# which subclass Findable, which provide implementations for
# obtaining certain metadata (such as creator), as well as
# common methods that handle the details of indexing and
# unindexing Findable objects.
def onDeleteObject(self):
"""Object delete handler."""
self.unindex_object()
def creator(self):
"""Return a sequence of user names who have the local
Owner role on an object. The name creator is used
for this method to conform to Dublin Core."""
parent=self.aq_parent
roles=parent.aq_acquire('__ac_local_roles__')
dict=roles or {} #parent.__ac_local_roles__ or {}
items=[]
for key, val in dict.items():
if 'Owner' in val:
items.append(key)
return string.join(items, ', ')
def url(self, ftype=urllib.splittype, fhost=urllib.splithost):
"""Return a SCRIPT_NAME-based url for an object."""
if hasattr(self, 'DestinationURL') and \
callable(self.DestinationURL):
url='%s/%s' % (self.DestinationURL(), self.id)
else: url=self.absolute_url()
type, uri=ftype(url)
host, uri=fhost(uri)
script_name=self.REQUEST['SCRIPT_NAME']
__traceback_info__=(`uri`, `script_name`)
if script_name:
uri=filter(None, string.split(uri, script_name))[0]
uri=uri or '/'
if uri[0]=='/': uri=uri[1:]
return uri
def summary(self, num=200):
"""Return a summary of the text content of the object."""
if not hasattr(self, 'text_content'):
return ''
attr=getattr(self, 'text_content')
if callable(attr):
text=attr()
else: text=attr
n=min(num, len(text))
return text[:n]
def index_object(self):
"""A common method to allow Findables to index themselves."""
self.SiteIndex.indexObject(self)
def unindex_object(self):
"""A common method to allow Findables to unindex themselves."""
self.SiteIndex.unindexObject(self)
# STX Document methods
# ---------------------
default_format="""<!--#var standard_html_header-->
<!--#var content fmt="structured-text"-->
<!--#var standard_html_footer-->"""
def addSTXDocument(self, id, title='', submit='', REQUEST={}):
""" """
dest=self.Destination()
ob=self.STXDocument(default_format, __name__=id)
ob.title=title
dest._setObject(id, ob)
ob=getattr(dest, id)
date=DateTime()
ds=REQUEST.get('date_override','')
if ds: date=DateTime(ds)
else: date=DateTime()
REQUEST['date']=date
ob.propertysheets.Findables.manage_changeProperties(REQUEST)
ob.propertysheets.Basic.manage_changeProperties(REQUEST)
ob.index_object()
if REQUEST:
try: url=self.DestinationURL()
except: url=REQUEST['URL1']
if submit==" Add and Edit ":
url="%s/%s" % (url, urllib.quote(id))
REQUEST.RESPONSE.redirect(url+'/manage_workspace')
def STXDocument_text_content(self):
"""Return the text content of the object."""
# return the actual stx content...
return self.content
def STXDocument_onEditBegin(self):
""" """
self.SiteIndex.unindexObject(self)
def STXDocument_onEditComplete(self):
""" """
self.SiteIndex.indexObject(self)
# HTML Document methods
# ---------------------
def addHTMLDocument(self, id, title='', file='', submit='', REQUEST={}):
""" """
dest=self.Destination()
ob=self.HTMLDocument(file, __name__=id)
ob.title=title
dest._setObject(id, ob)
ob=getattr(dest, id)
date=DateTime()
ds=REQUEST.get('date_override','')
if ds: date=DateTime(ds)
else: date=DateTime()
REQUEST['date']=date
ob.propertysheets.Findables.manage_changeProperties(REQUEST)
ob.index_object()
if REQUEST:
try: url=self.DestinationURL()
except: url=REQUEST['URL1']
if submit==" Add and Edit ":
url="%s/%s" % (url, urllib.quote(id))
REQUEST.RESPONSE.redirect(url+'/manage_workspace')
def HTMLDocument_text_content(self):
"""Return the text content of the object."""
# could try to __call__ ourselves here, but its probably
# better not to, since the text of the object should be
# the most relevant and stuff common to site pages should
# be include using var tags...
return self.read()
def HTMLDocument_onEditBegin(self):
""" """
self.SiteIndex.unindexObject(self)
def HTMLDocument_onEditComplete(self):
""" """
self.SiteIndex.indexObject(self)
# NewsItem methods
# ----------------
def NewsItem_text_content(self):
"""Return the text content of the object."""
return self.text
def approveNewsItem(self, approve=0, REQUEST=None):
""" """
for ps in self.propertysheets.values():
ps.manage_changeProperties(REQUEST=REQUEST)
if approve:
self.index_object()
# Add to catalog here...
self.title=title
def findAllMemberNames(self):
""" """
# XXX This should go up the folder tree...
allusers=self.acl_users.getUsers()
names=[]
for user in allusers:
if user.has_role('Member'):
names.append(user.getUserName())
return names
def foo(self):
t = []
obj = self
count = 0
while 1:
if hasattr(obj, 'aq_parent'):
t.append(`obj`)
obj = obj.aq_parent
else:
break
count = count + 1
if count > 10:
t.append("!!")
break
return "<h3>Self gots .%s.</h3>" % string.join(t, ', ')
def getOwners(self, join=string.join):
parent = self.aq_parent # XXX Workaround
parent.aq_acquire('__ac_local_roles__')
dict = parent.__ac_local_roles__ or {} # XXX Workaround
list=[]
for key, val in dict.items():
if 'Owner' in val:
list.append(key)
return join(list, ', ')
def aq_base(ob):
if hasattr(ob, 'aq_base'):
return ob.aq_base
return ob
def reindex_all(self, obj=None):
""" """
if obj is None: obj=self
if hasattr(aq_base(obj), 'index_object'):
obj.index_object()
if hasattr(aq_base(obj), 'objectValues'):
sub=obj.objectValues()
for item in obj.objectValues():
reindex_all(self, item)
return 'done!'
=== Added File Products/Extensions/ProjectLib.py ===
import sys, os, string, time, random, urllib
from DateTime.DateTime import DateTime
def gen_class_id(self):
import md5, base64, time
id=md5.new()
id.update(self.absolute_url())
id.update(str(time.time()))
id=id.digest()
id=string.strip(base64.encodestring(id))
return '*'+id
def sort_key(self):
if hasattr(self.aq_parent.aq_base, 'sort_key_'):
return str(self.sort_key_)
return self.id
def getValidUserNames(self):
""" """
# XXX This should go up the folder tree...
allusers=self.acl_users.getUsers()
names=[]
for user in allusers:
if user.has_role('Member'):
names.append(user.getUserName())
return names
def assignedTo(self):
return ['brian']
def index_object(self):
""" """
try:
if self.project_type=='Community':
self.SiteIndex.catalog_object(self, self.url())
except: pass
def unindex_object(self):
""" """
try:
if self.project_type=='Community':
self.SiteIndex.uncatlog_object(self.url())
except: pass
def nid(self):
""" """
return self._p_oid
def getArtifacts(self, subs=0, obs=None):
if obs is None: obs=[]
for ob in self.objectValues(['Generic Artifact','File']):
obs.append(ob)
if subs:
for ob in self.objectValues(['Project','Phase','Iteration']):
obs=ob.getArtifacts(subs=1, obs=obs)
return obs
def status_change(self, status, req={}, zero=DateTime('1/1/1970')):
sd, fd=self.act_start_date, self.act_finish_date
date=DateTime()
if status==self.status: return
elif status=='Pending':
req['act_start_date']=zero
req['act_finish_date']=zero
elif status=='Active':
if sd == zero: req['act_start_date']=date
if fd != zero: req['act_finish_date']=zero
elif status=='Completed':
if sd == zero: req['act_start_date']=date
if fd == zero: req['act_finish_date']=date
def addProject(self, id, title, project_type,
est_start_date, est_finish_date,
est_effort, est_value, status,
abstract, customer='',
REQUEST={}):
""" """
# Projects are created in Project Libraries, so self in
# this case is the Project factory, which is contained
# in the ProjectLibrary ZClass -self.aq_parent.aq_parent
# is the Project Library instance where we are creating
# the new Project.
dest=self.aq_parent.aq_parent
ob=self.Project(id)
ob.id=id
ob.title=title
dest._setObject(id, ob)
ob=getattr(dest, id)
k=0
for n in ('Inception','Elaboration','Construction','Transition'):
p=ob.Phase(n)
p.id=n
p.sort_key_=k
k=k+1
ob._setObject(n, p)
p=getattr(ob, n)
addIteration(p, 'Iteration_1')
REQUEST['date']=DateTime()
status_change(ob, status, REQUEST)
ob.propertysheets.Findables.manage_changeProperties(REQUEST)
ob.propertysheets.ProjectProperties.manage_changeProperties(REQUEST)
ob.propertysheets.TrackableProperties.manage_changeProperties(REQUEST)
ob.index_object()
if REQUEST:
try: url=self.DestinationURL()
except: url=REQUEST['URL2']
REQUEST.RESPONSE.redirect(url+'/manage_workspace')
def editProject(self, title, project_type,
est_start_date, est_finish_date,
est_effort, est_value, status,
abstract, customer='',
REQUEST={}):
""" """
self.unindex_object()
status_change(self, status, REQUEST)
self.propertysheets.ProjectProperties.manage_changeProperties(REQUEST)
self.propertysheets.TrackableProperties.manage_changeProperties(REQUEST)
self.propertysheets.Findables.manage_changeProperties(REQUEST)
self.index_object()
if REQUEST:
try: url=self.DestinationURL()
except: url=REQUEST['URL1']
REQUEST.RESPONSE.redirect(url+'/manage_workspace')
def addIteration(self, id, title='', REQUEST=None):
""" """
ob=self.Iteration(id)
ob.id=id
ob.title=title
self._setObject(id, ob)
ob=getattr(self, id)
ob.date=DateTime()
ob.propertysheets.TrackableProperties.manage_changeProperties(REQUEST)
if REQUEST:
try: url=self.DestinationURL()
except: url=REQUEST['URL2']
REQUEST.RESPONSE.redirect(url+'/manage_workspace')
def editIteration(self, title,
est_start_date, est_finish_date,
est_effort, act_effort, status,
REQUEST={}):
""" """
self.title=title
status_change(self, status, REQUEST)
self.propertysheets.TrackableProperties.manage_changeProperties(REQUEST)
if REQUEST:
try: url=self.DestinationURL()
except: url=REQUEST['URL1']
REQUEST.RESPONSE.redirect(url+'/manage_workspace')
def addTask(self, id, title, priority, assigned_to, est_start_date,
est_finish_date, est_effort, status, desc,
REQUEST={}):
""" """
ob=self.Task(id)
ob.id=id
self._setObject(id, ob)
ob=getattr(self, id)
REQUEST['date']=DateTime()
status_change(self, status, REQUEST)
ob.propertysheets.TaskProperties.manage_changeProperties(REQUEST)
ob.propertysheets.TrackableProperties.manage_changeProperties(REQUEST)
ob.propertysheets.Findables.manage_changeProperties(REQUEST)
ob.index_object()
if REQUEST:
try: url=self.DestinationURL()
except: url=REQUEST['URL2']
REQUEST.RESPONSE.redirect(url+'/manage_workspace')
def editTask(self, title, priority, assigned_to, est_start_date,
est_finish_date, est_effort, status, desc,
REQUEST={}):
""" """
self.unindex_object()
status_change(self, status, REQUEST)
self.propertysheets.TaskProperties.manage_changeProperties(REQUEST)
self.propertysheets.TrackableProperties.manage_changeProperties(REQUEST)
self.propertysheets.Findables.manage_changeProperties(REQUEST)
self.index_object()
if REQUEST:
try: url=self.DestinationURL()
except: url=REQUEST['URL1']
REQUEST.RESPONSE.redirect(url+'/manage_workspace')
default_format="""<!--#var standard_html_header-->
<!--#if "format=='Structured Text'"-->
<!--#var content fmt="structured-text"-->
<!--#elif "format=='Plain Text'"-->
<pre><!--#var content--></pre>
<!--#else-->
<!--#var content-->
<!--#/if-->
<!--#var standard_html_footer-->"""
def addGenericArtifact(self, id, title, format, content='',
submit='', REQUEST={}):
""" """
ob=self.GenericArtifact(default_format, __name__=id)
ob.id=id
ob.title=title
self._setObject(id, ob)
ob=getattr(self, id)
REQUEST['date']=DateTime()
ob.propertysheets.Basic.manage_changeProperties(REQUEST)
ob.propertysheets.Findables.manage_changeProperties(REQUEST)
ob.index_object()
if REQUEST:
try: url=self.DestinationURL()
except: url=REQUEST['URL2']
if submit==" Add and Edit ":
url="%s/%s" % (url, urllib.quote(id))
REQUEST.RESPONSE.redirect(url+'/manage_workspace')
def onDeleteObject(self):
"""Object delete handler."""
self.unindex_object()
def findAllMemberNames(self):
""" """
# XXX This should go up the folder tree...
allusers=self.acl_users.getUsers()
names=[]
for user in allusers:
if user.has_role('Member'):
names.append(user.getUserName())
return names
def getOwners(self, join=string.join):
parent = self.aq_parent # XXX Workaround
parent.aq_acquire('__ac_local_roles__')
dict = parent.__ac_local_roles__ or {} # XXX Workaround
list=[]
for key, val in dict.items():
if 'Owner' in val:
list.append(key)
return join(list, ', ')
def aq_base(ob):
if hasattr(ob, 'aq_base'):
return ob.aq_base
return ob
=== Added File Products/Extensions/TrackerMethods.py === (2194/2294 lines abridged)
"""External methods for Tracker-related ZClasses and Products."""
#import Products.TrackerBase.TrackerMisc # DEBUG
#reload(Products.TrackerBase.TrackerMisc) # DEBUG
from Products.TrackerBase.TrackerMisc import *
import OFS.Image
import sys, time
TRACKER_DATA_VERSION = 1
ISSUE_DATA_VERSION = 1
ITEM_DATA_VERSION = 1
def addTracker(self, id, REQUEST=None):
""" """
# self = Factory - actually, FactoryDispatcher
# Destination() = the folder in which the obj is being created.
if not id:
raise ValueError, "You must specify an id."
inst = self.Tracker(id)
dest = self.Destination()
dest._setObject(id, inst)
tracker = getattr(dest, id)
tracker.DATA_VERSION = TRACKER_DATA_VERSION
user = (REQUEST and `REQUEST.AUTHENTICATED_USER`) or ''
tracker.manage_addLocalRoles(user, ['TrackerOwner'])
for role in unique(ROLE_CATEGORIES['open']
+ ROLE_CATEGORIES['dedicated']):
if not tracker._has_user_defined_role(role):
tracker._addRole(role)
setTrackerRules(tracker)
# Use the default, not the local host name
# tracker.smtphost = socket.gethostname()
tracker.smtpport = 25
for ps in tracker.propertysheets:
ps.manage_changeProperties(REQUEST)
tracker.supporters = []
tracker.clients = []
tracker.peers = []
tracker.open_subscribe = tracker.default_open_subscribe
[-=- -=- -=- 2194 lines omitted -=- -=- -=-]
## (INCLUDE, 'overdue', 'ownership')))),
( # Clear up ownership pending state when suitable.
(CONDITION, (NOT, (STATE, 'stage', 'pending'))),
( (CONDITION, (CONTAINS, 'due', 'ownership')),
(ACTIONS, (REMOVE, 'due', 'ownership'))),
( (CONDITION, (CONTAINS, 'overdue', 'ownership')),
(ACTIONS, (REMOVE, 'overdue', 'ownership')))),
( # Clear up rsvp pending state when suitable.
(CONDITION, (AND, (OR, (CONTAINS, 'due', 'rsvp'),
(CONTAINS, 'overdue', 'rsvp')),
(NOT, (STATE, 'msgfromrole', 'Requester', 0)))),
( (CONDITION, (CONTAINS, 'due', 'rsvp')),
(ACTIONS, (REMOVE, 'due', 'rsvp'))),
( (CONDITION, (CONTAINS, 'overdue', 'rsvp')),
(ACTIONS, (REMOVE, 'overdue', 'rsvp')))),
( # Alert to languishing reply.
(CONDITION,
# The last correspondence is from the requester:
(AND,
(OR, ('<',
(STATE, 'msgfromrole', ['Requester', 'TrackerOwner'], 0),
(STATE, 'msgfromrole', 'Supporter', 0)),
('>', (STATE, 'msgfromrole', 'Requester', 0), 3*ADAY)))),
# Send daily alerts to issue owner, after initial grace period:
( (CONDITION, (OR,
(NOT, (STATE, 'alert', 'rsvp due', 0)),
('>', (STATE, 'alert', 'rsvp due', 0), 1*ADAY))),
(ACTIONS,
(SET, 'alert', 'rsvp due'),
(NOTIFY, 'IssueOwner',
"Issue %[var title]s %[var id]s correspondence pending",
"Issue %[var issueIdentity]s requester is owed"
" correspondence.")
)),
# This has been going on too long - alert the tracker owner once:
( (CONDITION, (AND,
('>', (STATE, 'alert', 'rsvp due', 0), 7*ADAY),
(NOT, (STATE, 'alert', 'rsvp overdue', 0)))),
(ACTIONS,
(SET, 'alert', 'rsvp overdue'),
(NOTIFY, 'TrackerOwner',
"Issue %[var title]s %[var id]s correspondence WAY pending",
"Issue %[var issueIdentity]s requester has been owed"
" correspondence for more than 7 days."))))
]
return standard
=== Added File Products/Extensions/ZopeSite.py ===
import sys, os, string, time, random, urllib
from DateTime import DateTime
import pdb
# Findable object methods. These are methods common to objects
# which subclass Findable, which provide implementations for
# obtaining certain metadata (such as creator), as well as
# common methods that handle the details of indexing and
# unindexing Findable objects.
def manage_afterAdd(self, item, container):
self.index_object(self)
for object in self.objectValues():
try: s=object._p_changed
except: s=0
object.manage_afterAdd(item, container)
if s is None: object._p_deactivate()
def manage_afterClone(self, item):
# self.index_object(self)
# Don't automaticall catalog clones. You can't, anyway. They're not
# acquiring yet.
for object in self.objectValues():
try: s=object._p_changed
except: s=0
object.manage_afterClone(item)
if s is None: object._p_deactivate()
def manage_beforeDelete(self, item, container):
self.unindex_object(self)
for object in self.objectValues():
try: s=object._p_changed
except: s=0
object.manage_beforeDelete(item, container)
if s is None: object._p_deactivate()
def creator(self, first_only=0):
"""Return a sequence of user names who have the local
Owner role on an object. The name creator is used
for this method to conform to Dublin Core.
If first_only is true, return only the first name."""
#parent=self.aq_parent
#roles=parent.aq_acquire('__ac_local_roles__')
#dict=roles or {} #parent.__ac_local_roles__ or {}
#items=[]
#for key, val in dict.items():
# if 'Owner' in val:
# items.append(key)
#return string.join(items, ', ')
users=[]
for user, roles in self.get_local_roles():
if 'Owner' in roles:
users.append(user)
if users and first_only: return users[0]
return string.join(users, ', ')
def url(self, ftype=urllib.splittype, fhost=urllib.splithost):
"""Return a SCRIPT_NAME-based url for an object."""
if hasattr(self, 'DestinationURL') and \
callable(self.DestinationURL):
url='%s/%s' % (self.DestinationURL(), self.id)
else: url=self.absolute_url()
type, uri=ftype(url)
host, uri=fhost(uri)
script_name=self.REQUEST['SCRIPT_NAME']
if script_name:
uri=filter(None, string.split(uri, script_name))[0]
uri=uri or '/'
if uri[0]=='/': uri=uri[1:]
return uri
def summary(self, num=200):
"""Return a summary of the text content of the object."""
if not hasattr(self, 'text_content'):
return ''
attr=getattr(self, 'text_content')
if callable(attr):
text=attr()
else: text=attr
n=min(num, len(text))
return text[:n]
def getIndex(self, REQUEST=None):
# return the index to index an object in.
# this is used to index Members's objects in a different
# index than Contributors's objects.
if REQUEST is None:
REQUEST=self.REQUEST
if REQUEST.AUTHENTICATED_USER.has_role('Contributor'):
return self.SiteIndex
return self.UnreviewedIndex
def in_catalogs(self):
"""What catalogs am I REALLY in?"""
catalogs = getattr(self, 'SiteCatalogs', ['SiteIndex', 'UnreviewedIndex'])
catalogs = map(lambda x, self=self: getattr(self, x), catalogs)
catalogs = filter(lambda x, self=self: x._catalog.uids.has_key(self.url()), catalogs)
return map(lambda x: x.id, catalogs)
def index_object(self, catalog=None):
"""
Update all catalogs I belong to. If passed a path in Catalog, also add
self to that catalog.
"""
# Get the list of catalogs I already belong to
catalogs = self.in_catalogs()
# Refresh my entry in each of these catalogs
for path in catalogs:
try:
catobj = resolve_path(self, path)
try: catobj.uncatalog_object(self.absolute_url(1))
except: pass
catobj.catalog_object(self, self.absolute_url(1))
except:
pass
# Did I get passed a new path?
if catalog is not None and catalog not in catalogs:
catobj = resolve_path(self, catalog)
try: catobj.uncatalog_object(self.absolute_url(1))
except: pass
catobj.catalog_object(self, self.absolute_url(1))
def unindex_object(self, catalog=None):
"""
Remove self from the path passed in catalog. If catalog is not given,
remove self from all catalogs.
"""
if catalog is not None:
catobj = resolve_path(self, catalog)
try: catobj.uncatalog_object(self.absolute_url(1))
except: pass
return
catalogs = self.in_catalogs()
for catalog in catalogs:
catobj = resolve_path(self, catalog)
try: catobj.uncatalog_object(self.absolute_url(1))
except: pass
def date(self):
"""
Modification date.
"""
return self.bobobase_modification_time()
# Utility functions
# -----------------
def resolve_path(obj, path):
while hasattr(obj, 'aq_parent'):
obj = obj.aq_parent
if hasattr(obj, 'aq_base'):
b = obj.aq_base
else:
b = obj
if hasattr(b, 'isTopLevelPrincipiaApplicationObject') and \
b.isTopLevelPrincipiaApplicationObject:
break
try:
for id in string.split(path, '/'):
try:
obj=getattr(obj, id)
except:
obj=obj[id]
return obj
except:
raise 'Catalog error', 'Can\'t resolve path: ' + `path`
def fix_catalog_items(self, REQUEST):
"""self should be a catalog"""
url = self.absolute_url(1)
write = REQUEST.RESPONSE.write
buf = ''
for item in self.searchResults():
try:
obj = self.getobject(item.data_record_id_, REQUEST)
catalogs = getattr(obj, 'aq_base', obj).catalogs
try:
if url not in catalogs:
catalogs.append(url)
buf = buf + 'fixed %s\n' % obj.absolute_url(1)
except:
catalogs = [url]
obj.catalogs = catalogs
except:
buf = buf + '<b>can\'t fix %s</b>\n' % self.getpath(item.data_record_id_)
return buf
# STX Document methods
# ---------------------
default_format="""<!--#var standard_html_header-->
<!--#var content fmt="structured-text"-->
<!--#var standard_html_footer-->"""
def addSTXDocument(self, id, title='', submit='', REQUEST={}):
""" """
dest=self.Destination()
ob=self.STXDocument(default_format, __name__=id)
REQUEST['date']=DateTime()
ob.propertysheets.Findables.manage_changeProperties(REQUEST)
ob.propertysheets.Basic.manage_changeProperties(REQUEST)
dest._setObject(id, ob)
if REQUEST:
try: url=self.DestinationURL()
except: url=REQUEST['URL1']
if submit==" Add and Edit ":
url="%s/%s" % (url, urllib.quote(id))
REQUEST.RESPONSE.redirect(url+'/manage_workspace')
def manage_edit_ex(self, data, title, SUBMIT='Change',
dtpref_cols='50',
dtpref_rows='20',
REQUEST=None):
"""Override manage_edit"""
self.unindex_object(REQUEST=REQUEST)
method=self.__class__.inheritedAttribute('manage_edit')
method(self, data, title, SUBMIT, dtpref_cols, dtpref_rows)
self.index_object()
if REQUEST:
message="Content changed."
return self.manage_main(self,REQUEST,manage_tabs_message=message)
def manage_upload_ex(self, file='', REQUEST=None):
"""Override manage_upload"""
self.unindex_object(REQUEST=REQUEST)
method=self.__class__.inheritedAttribute('manage_upload')
method(self, file)
self.index_object()
if REQUEST:
message="Content changed."
return self.manage_main(self,REQUEST,manage_tabs_message=message)
def STXDocument_text_content(self):
"""Return the text content of the object."""
# return the actual stx content...
return self.content + ' ' + self.title
# HTML Document methods
# ---------------------
def addHTMLDocument(self, id, title='', file='', submit='', REQUEST={}):
""" """
dest=self.Destination()
ob=self.HTMLDocument(file, __name__=id)
REQUEST['date']=DateTime()
ob.propertysheets.Findables.manage_changeProperties(REQUEST)
dest._setObject(id, ob)
if REQUEST:
try: url=self.DestinationURL()
except: url=REQUEST['URL1']
if submit==" Add and Edit ":
url="%s/%s" % (url, urllib.quote(id))
REQUEST.RESPONSE.redirect(url+'/manage_workspace')
def HTMLDocument_text_content(self):
"""Return the text content of the object."""
try:
return self.read()
except AttributeError:
return ""
# NewsItem methods
# ----------------
def NewsItem_text_content(self):
"""Return the text content of the object."""
return self.text + ' ' + self.title
# Link methods
# ------------
def Link_text_content(self):
"""Return text description of a link."""
return self.description + ' ' + self.title
# How-To methods
# --------------
def HowTo_text_content(self):
"""Return text content of a How-To."""
return self.content + ' ' + self.title
# Tip methods
# -----------
def Tip_text_content(self):
"""Return text content of a tip"""
return self.tip + self.details
# Case Study methods
# ------------------
def CaseStudy_text_content(self):
"""Return composite of major sections for indexing."""
return self.case_study_summary + '\n' + \
self.facts + '\n' + \
self.problem + '\n' + \
self.solution + '\n' + \
self.external_url
# Software methods
# ----------------
def Software_text_content(self):
"""Return text content of a Software object."""
return self.description + ' ' + self.title
def findAllMemberNames(self):
""" """
# XXX This should go up the folder tree...
allusers=self.acl_users.getUsers()
names=[]
for user in allusers:
if user.has_role('Member'):
names.append(user.getUserName())
return names
def foo(self):
t = []
obj = self
count = 0
while 1:
if hasattr(obj, 'aq_parent'):
t.append(`obj`)
obj = obj.aq_parent
else:
break
count = count + 1
if count > 10:
t.append("!!")
break
return "<h3>Self gots .%s.</h3>" % string.join(t, ', ')
def getOwners(self, join=string.join):
parent = self.aq_parent # XXX Workaround
parent.aq_acquire('__ac_local_roles__')
dict = parent.__ac_local_roles__ or {} # XXX Workaround
list=[]
for key, val in dict.items():
if 'Owner' in val:
list.append(key)
return join(list, ', ')
def reindex_all(self, obj=None):
""" """
if obj is None: obj=self
if hasattr(aq_base(obj), 'index_object'):
obj.index_object()
if hasattr(aq_base(obj), 'objectValues'):
sub=obj.objectValues()
for item in obj.objectValues():
reindex_all(self, item)
return 'done!'
=== Added File Products/Extensions/commit_version.py ===
"""Put this in the Extensions directory and create an external method in the
Zope root named "commit_version" with a function name of "commit_version".
Then fire up the debugger and do import Zope; app=Zope.app()
Then grab a hold of the object that's locked in the database and assign it
to a local variable, e.g.:
a = app.Wikis.locked
Then do:
app.commit_version('version_name', a)
This should commit the version."""
def commit_version(self, version=None, ob=None):
"""
version should be string which contains the version name
that is to be committed.
ob should be any object out of the database which houses
the version to be committed. A good choice of objects is the
one on which a "versionlock" icon is showing.
"""
if version is None:
raise "You must provide a version name"
if ob is None:
raise ("You must pass in an object that resides in the same"
" database as the one in which the version resides.")
db = ob._p_jar._db
db.commitVersion(version)
get_transaction().commit()
=== Added File Products/Extensions/countDoubles.py ===
#
# countDoubles: count the number of user names that are the same and only differ in capitalization
#
import string
def countDoubles(self):
double_names = {}
uf = self.acl_users
user_list = uf.getUserNames()
total = len(user_list)
header = ''
lower_list = map(string.lower, user_list)
for name in lower_list:
occurrences = lower_list.count(name)
if occurrences > 1:
double_names[name] = occurrences
doubles_list = double_names.keys()
doubles_list.sort()
double_count = len(doubles_list)
header = 'Total users: %d\nNumber of doubles: %d\n\n' % (total, double_count)
doubles_str = ''
for name in doubles_list:
user_str = name + ' (%d):\n' % double_names.get(name)
lc_user = uf.getUser(name)
fc_user = uf.getUser(string.capitalize(name))
auc_user = uf.getUser(string.upper(name))
for user in (lc_user, fc_user, auc_user):
if user is not None:
str = '%s is: %s (%s), last visit %s\n' % ( getattr(user, 'name', 'n/a')
, getattr(user, 'full_name', 'n/a')
, getattr(user, 'email', 'n/a')
, user.last_visit.Date()
)
user_str = user_str + str
doubles_str = doubles_str + user_str + '\n\n'
return header + doubles_str
=== Added File Products/Extensions/fixfiles.py ===
from types import StringType
def fixfiles(self):
nzo = self.getPhysicalRoot().nzo
recs = nzo.portal_catalog(Type='Software Release File')
for rec in recs:
ob = rec.getObject()
if isinstance(ob.data, StringType) and len(ob.data) >= 65536:
ob.manage_upload(ob.data)
get_transaction().commit(1)
ob._p_deactivate()
=== Added File Products/Extensions/fixids.py ===
from string import join
def _fix(obj, fixed):
if hasattr(obj, 'objectIds'):
for id in obj.objectIds():
subobj = getattr(obj, id)
if hasattr(subobj, '__name__') and
subobj.__name__ ==
'<string>':
# Correct this one.
subobj.__name__ = id
fixed.append(subobj.absolute_url())
_fix(subobj, fixed)
return fixed
def fixids(self):
corrections = _fix(self, [])
return 'Corrections:\n%s\n\nDone.' % join(corrections,
'\n')
=== Added File Products/Extensions/getcwd.py ===
import os
def getcwd(): return os.getcwd()
=== Added File Products/Extensions/move_news.py ===
import sys, os, string, md5
from webdav.client import Resource
from base64 import encodestring
from urllib import quote
dest_url='http://tarzan.digicool.com/dev/zope/News'
username='Brian Lloyd'
password='123'
def auth_gen(name, pw):
# hack - create a MemberFolder auth cookie.
enc=md5.new()
enc.update('%s:%s' % (name, pw))
token=enc.digest()
token=string.join(map(lambda c: '%02x' % ord(c), token), '')
token='%s:%s' % (name, token)
token=encodestring(token)
token=quote(token)
return token
def send_item(item):
""" """
url='%s/manage_addProduct/ZopeSite/fNewsItem/addNewsItem' % dest_url
obj=Resource(url)
headers={'Cookie': '__ac="%s"' % auth_gen(username, password)}
id='%04s' % item.data_record_id_
subject=filter(None, map(string.strip, string.split(item.Keywords, ', ')))
result=obj.post(id=id,
title=item.Title,
text=item.Text,
subject__list=subject,
date_override=str(item.Date),
headers=headers,
)
return result
def test():
class foo:
data_record_id_=99
Text='item text'
Keywords='foo, bar'
Title='item title'
Date='1999/12/05'
result=send_item(foo())
print result
if __name__=='__main__':
test()
=== Added File Products/Extensions/ownerous.py ===
# External method to fixup membership on zope.org.
import string
def fixup_members(self):
users=self.acl_users
Members=self.Members
r=[]
for i in Members.objectIds():
try:
u=users.getUserById(i)
m=getattr(Members, i)
except: pass
else:
if u is not None:
u=u.__of__(users)
m.changeOwnership(u)
r.append(i)
return string.join(r,'\n')
=== Added File Products/Extensions/plaintext.py ===
import string
import re
pat=re.compile(r"(http://|https://|ftp://|mailto:)(.+?)(\s|\")")
def linkf(m):
url=m.group(1) + m.group(2)
return """<a href="%s">%s</a>%s""" % (url, url, m.group(3))
def replace(text):
"simple minded url linking"
return re.sub(pat, linkf, text)
def plaintext(text):
"simple text formatting"
text=string.replace(text, '>', '>')
text=string.replace(text, '<', '<')
text=string.replace(text, '&', '&')
text=string.replace(text, '"', '"')
text=replace(text)
text=string.replace(text, '\n', '<BR>\n')
return text
def strip_html(text):
"strip html brutally"
html_smell = re.compile('<.*?>')
text=html_smell.sub('',text)
broken_link= re.compile('<.*')
finished_text=broken_link.sub('',text)
return finished_text
=== Added File Products/Extensions/rconsole.py ===
import socket
from code import InteractiveConsole
from types import IntType, StringType
from thread import start_new_thread, get_ident
import sys
import time
class ThreadedObjectProxy:
"""Proxy to different objects based which thread invoked it.
"""
def __init__(self, default):
self._default = default
self._alts = {}
def setAlternative(self, o):
self._alts[get_ident()] = o
def delAlternative(self):
try: del self._alts[get_ident()]
except KeyError: pass
def __getattr__(self, name):
ob = self._alts.get(get_ident(), self._default)
return getattr(ob, name)
class RemoteConsole(InteractiveConsole):
def __init__(self, sock, file, filename=None, locals=None):
if filename is None:
filename = str(file)
self.sock = sock
self.file = file
InteractiveConsole.__init__(self, locals=locals, filename=filename)
def raw_input(self, prompt=''):
if prompt:
self.file.write(prompt)
s = self.file.readline().rstrip()
if s == '\x04': # Ctrl-D
raise EOFError
return s
def interactAndClose(self):
sys.stdout.setAlternative(self.file)
sys.stderr.setAlternative(self.file)
sys.stdin.setAlternative(self.file)
try:
try:
self.interact()
except EOFError:
pass
finally:
sys.stdout.delAlternative()
sys.stderr.delAlternative()
sys.stdin.delAlternative()
self.file.close()
self.sock.close()
def setupStreams():
if not hasattr(sys.stdout, 'setAlternative'):
sys.stdout = ThreadedObjectProxy(sys.stdout)
if not hasattr(sys.stderr, 'setAlternative'):
sys.stderr = ThreadedObjectProxy(sys.stderr)
if not hasattr(sys.stdin, 'setAlternative'):
sys.stdin = ThreadedObjectProxy(sys.stdin)
def accept_connections(s):
while 1:
cs, addr = s.accept()
f = cs.makefile('w+', 0)
i = RemoteConsole(cs, f)
start_new_thread(i.interactAndClose, ())
def listen(addr, locals=None):
setupStreams()
if isinstance(addr, StringType):
t = socket.AF_UNIX
elif isinstance(addr, IntType):
t = socket.AF_INET
addr = ('', addr)
else:
t = socket.AF_INET
s = socket.socket(t, socket.SOCK_STREAM)
s.setsockopt(
socket.SOL_SOCKET, socket.SO_REUSEADDR,
s.getsockopt (socket.SOL_SOCKET, socket.SO_REUSEADDR) | 1
)
s.bind(addr)
s.listen(1)
start_new_thread(accept_connections, (s,))
def listenlocal(port=8765):
listen(('localhost', port))
if __name__ == '__main__':
listen(8765)
while 1:
time.sleep(1)
=== Added File Products/Extensions/rfc1123.py ===
from webdav.common import rfc1123_date
def rfc1123(self, date):
"""
Returns a RFC1123 date string give a DateTime object.
This is useful for HTTP headers.
"""
return rfc1123_date(date.timeTime())
=== Added File Products/Extensions/rungc.py ===
def rungc():
"""Invokes garbage collection."""
import gc
res = gc.collect()
return 'gc results: %s' % repr(res)
=== Added File Products/Extensions/set_title.py ===
import re
pat=re.compile(r"<TITLE>(.*?)</TITLE>", re.IGNORECASE)
def set_title(self):
"""
set the title on a title-less DTML Document or Method
by consulting the document source and looking for a
title tag.
"""
m=pat.search(self.read())
if m is not None:
title=m.group(1)
setattr(self, 'title', title)
=== Added File Products/Extensions/sort_link.py ===
import re, urllib
sort_pat=re.compile(r'(\&)?sort(-|_)on=([^\&]+)')
order_pat=re.compile(r'(\&)?sort(-|_)order=([^&]+)')
start_pat=re.compile(r'(\&)?query_start=([^&]+)')
def sort_link(self, name, index, REQUEST):
"""
Returns Anchor tag with appropriate link to do a sorted search.
name is the name of the link
index is the index to sort on
"""
sort=None
order=None
klass='listheader'
image='spc.gif'
qs=REQUEST['QUERY_STRING']
m=sort_pat.search(qs)
if m is not None:
sort=m.group(3)
qs=qs[:m.start()] + qs[m.end():]
m=order_pat.search(qs)
if m is not None:
order=m.group(3)
qs=qs[:m.start()] + qs[m.end():]
m=start_pat.search(qs)
if m is not None:
start=m.group(2)
qs=qs[:m.start()] + qs[m.end():]
qs=qs + '&sort_on=' + urllib.quote_plus(index)
if sort==index:
if order != 'reverse':
qs=qs + '&sort_order=reverse'
klass='listheader_selected'
image='dwn_arrow.gif'
else:
klass='listheader_reverse'
image='up_arrow.gif'
# return '<p class="%s"><a href="%s?%s">%s</a></p>' % (klass, REQUEST['URL'], qs, name)
return '<p class="listheader"><img src="/Images/%s"><a href="%s?%s">%s</a></p>' % (image, REQUEST['URL'], urllib.quote(qs, '/%&='), name)
=== Added File Products/Extensions/sort_link.py.old ===
import re, urllib
sort_pat=re.compile(r'(\&)?sort(-|_)on=([^\&]+)')
order_pat=re.compile(r'(\&)?sort(-|_)order=([^&]+)')
start_pat=re.compile(r'(\&)?query_start=([^&]+)')
def sort_link(self, name, index, REQUEST):
"""
Returns Anchor tag with appropriate link to do a sorted search.
name is the name of the link
index is the index to sort on
"""
sort=None
order=None
klass='listheader'
qs=REQUEST['QUERY_STRING']
m=sort_pat.search(qs)
if m is not None:
sort=m.group(3)
qs=qs[:m.start()] + qs[m.end():]
m=order_pat.search(qs)
if m is not None:
order=m.group(3)
qs=qs[:m.start()] + qs[m.end():]
m=start_pat.search(qs)
if m is not None:
start=m.group(2)
qs=qs[:m.start()] + qs[m.end():]
qs=qs + '&sort_on=' + urllib.quote_plus(index)
if sort==index:
if order != 'reverse':
qs=qs + '&sort_order=reverse'
klass='listheader_selected'
else:
klass='listheader_reverse'
return '<p class="%s"><a href="%s?%s">%s</a></p>' % (klass, REQUEST['URL'], qs, name)
=== Added File Products/Extensions/test.py ===
import sys, string, os
import pdb
def testfunc(self, arg1, arg2=None):
"""A test method"""
return 'ok'
def callmeth(self, REQUEST):
pdb.set_trace()
self.standard_html_header(REQUEST)
=== Added File Products/Extensions/which_node.py ===
def which_node():
return 'node 1'
=== Added File Products/Extensions/wikimnt.py ===
# Define the Wiki database mounted database
import ZServer, ZODB, ZEO.ClientStorage
def wikidb():
if 0:
import ZODB.FileStorage, os
Storage=ZODB.FileStorage.FileStorage(
os.path.join(INSTANCE_HOME,'var','Wikis.fs'),
#read_only=1
)
else:
Storage=ZEO.ClientStorage.ClientStorage(
('127.0.0.1',2222),
cache_size=50*1000*1000,
max_disconnect_poll=40,
min_disconnect_poll=1,
storage='Wikis',
)
return ZODB.DB(Storage)
More information about the zopeorg-checkins
mailing list