[Zope-CVS] CVS: Products/Basket - basket.py:1.1 unzip.py:1.1 __init__.py:1.23

Chris McDonough chrism at plope.com
Thu Nov 10 13:21:59 EST 2005


Update of /cvs-repository/Products/Basket
In directory cvs.zope.org:/tmp/cvs-serv15635

Modified Files:
	__init__.py 
Added Files:
	basket.py unzip.py 
Log Message:
Break oiut Basket class and support functions into a basket.py module.

If an egg is marked explicitly as "not-zip-safe", unzip it to a tempdir and add the tempdir to sys.path.


=== Added File Products/Basket/basket.py ===
import sys
import os
import traceback
import textwrap
import atexit
import shutil
import tempfile
import unzip

import zLOG
import App

from utils import EggProductContext
from utils import EggProduct

import pkg_resources

entrypoint_group = 'zope2.initialize'

class Basket(object):
    def __init__(self):
        self.pre_initialized = False
        self.tempdirs = []
        #atexit.register(self.cleanup)
        try:
            etc = os.path.join(INSTANCE_HOME, 'etc')
        except NameError: # INSTANCE_HOME may not be available?
            etc = ''
        self.pdist_fname = os.path.join(etc, 'PRODUCT_DISTRIBUTIONS.txt')
        
    def require(self, distro_str):
        """ Specifically require a distribution specification """
        pkg_resources.require(distro_str)

    def parse_product_distributions_file(self, fp):
        L = []
        for distro_str in fp:
            distro_str = distro_str.strip()
            if distro_str:
                L.append(distro_str)
        return L

    def initialize(self, context):
        context.registerClass(EggProduct, constructors = (('dummy',None),),
                              visibility=None, icon='icon_egg.gif')
        # Grab app from Zope product context
        # It's a "protected" attribute, hence the name mangling
        app = context._ProductContext__app
        debug_mode = App.config.getConfiguration().debug_mode

        if not self.pre_initialized: # this services unit testing
            self.preinitialize()

        data = []
        points = pkg_resources.iter_entry_points(entrypoint_group)
        meta_types = []

        for point in points:
            # XXX deal with duplicate product names by raising an exception
            # somewhere in here.
            eggname = ' '.join(textwrap.wrap(point.dist.location, 80))
            try:
                product_pkg = get_containing_package(point.module_name)
            except:
                zLOG.LOG('Egg Product Init', zLOG.ERROR,
                         'Problem initializing product with entry point '
                         '"%s" in module "%s"' % (point.name,point.module_name),
                         error=sys.exc_info())
                if debug_mode:
                    raise
                else:
                    continue
                
            productname = product_pkg.__name__.split('.')[-1]
            initializer = get_initializer(point, productname, debug_mode)
            context = EggProductContext(productname, initializer, app,
                                        product_pkg, eggname)
            returned = context.install(debug_mode)
            data.append(returned)
        return data

    def product_distributions_by_dwim(self):
        """ Return all product distributions which have an appropriate
        entry point group on sys.path """
        environment = pkg_resources.Environment()
        product_distros = []
        for project_name in environment:
            distributions = environment[project_name]
            for distribution in distributions:
                if is_product_distribution(distribution):
                    product_distros.append(distribution)
        return product_distros

    def product_distributions_by_require(self):
        """ Return all product distributions which are listed in the
        product distributions file """
        pdist_file = open(self.pdist_fname, 'r')
        strings = self.parse_product_distributions_file(pdist_file)
        product_distros = []
        for string in strings:
            distribution = pkg_resources.get_distribution(string)
            if is_product_distribution(distribution):
                product_distros.append(distribution)
            else:
                zLOG.LOG('Egg Product Init',
                         zLOG.ERROR,
                         'A requirement was listed in %s that is not a Zope '
                         'product package: %s' % (self.pdist_fname, string))
        return product_distros

    def preinitialize(self):
        pdist_fname = self.pdist_fname
        if pdist_fname and os.path.exists(pdist_fname):
            distributions = self.product_distributions_by_require()
        else:
            distributions = self.product_distributions_by_dwim()


        for distribution in distributions:
            if is_zip_safe_distribution(distribution):
                pkg_resources.working_set.add(distribution)
            else:
                # if it's not zip-safe, blast it out to a tempdir and create
                # new distro out of the file-based egg; we delete the
                # tempdir at system exit
                tempdir = tempfile.mkdtemp()
                eggname = os.path.basename(distribution.location)
                eggdir = os.path.join(tempdir, eggname)
                os.makedirs(eggdir)
                self.tempdirs.append(tempdir)
                un = unzip.unzip()
                un.extract(distribution.location, eggdir)
                new_distro = pkg_resources.Distribution.from_filename(eggdir)
                # XXX this is nasty... create an API for this
                working_set = pkg_resources.working_set
                working_set.entries.remove(distribution.location)
                del working_set.by_key[distribution.key]
                working_set.entry_keys[distribution.location] = []
                sys.path.remove(distribution.location)
                working_set.add(new_distro)

        self.pre_initialized = True

    def cleanup(self):
        for tempdir in self.tempdirs:
            shutil.rmtree(tempdir, ignore_errors=True)

def get_containing_package(module_name):
    __import__(module_name)
    thing = sys.modules[module_name]
    if hasattr(thing, '__path__'):
        return thing
    new = '.'.join(module_name.split('.')[:-1])
    if new == module_name:
        return None
    return get_containing_package(new)

def get_initializer(point, productname, debug_mode):
    initializer = None
    try:
        # this will raise an import error if the initializer can't
        # be imported (presumably because of a module-scope error)
        initializer = point.load()
    except:
        exc = sys.exc_info()
        zLOG.LOG('Zope', zLOG.ERROR, 'Could not import %s' % productname,
                 error=exc)
        f = StringIO()
        traceback.print_exc(100, f)
        product_pkg.__import_error__ = f.getvalue()
        if debug_mode:
            raise exc[0], exc[1], exc[2]
    return initializer

def is_product_distribution(distribution): 
    entry_meta = 'entry_points.txt'
    if distribution.has_metadata(entry_meta):
        inifile = distribution.get_metadata(entry_meta)
        sections = pkg_resources.split_sections(inifile)
        for section, content in sections:
            if section == entrypoint_group:
                return True

def is_zip_safe_distribution(distribution):
    return not distribution.has_metadata('not-zip-safe')




=== Added File Products/Basket/unzip.py ===
""" unzip.py
    Version: 1.1

    Extract a zipfile to the directory provided
    It first creates the directory structure to house the files
    then it extracts the files to it.

    Sample usage:
    command line
    unzip.py -p 10 -z c:\testfile.zip -o c:\testoutput

    python class
    import unzip
    un = unzip.unzip()
    un.extract(r'c:\testfile.zip', 'c:\testoutput')
    

    By Doug Tolton
"""

import sys
import zipfile
import os
import os.path
import getopt

class unzip:
        
    def extract(self, file, dir):
        if not dir.endswith(':') and not os.path.exists(dir):
            os.mkdir(dir)

        zf = zipfile.ZipFile(file)

        # create directory structure to house files
        self._createstructure(file, dir)

        num_files = len(zf.namelist())

        # extract files to directory structure
        for i, name in enumerate(zf.namelist()):
            if not name.endswith('/'):
                outfile = open(os.path.join(dir, name), 'wb')
                outfile.write(zf.read(name))
                outfile.flush()
                outfile.close()


    def _createstructure(self, file, dir):
        self._makedirs(self._listdirs(file), dir)


    def _makedirs(self, directories, basedir):
        """ Create any directories that don't currently exist """
        for dir in directories:
            curdir = os.path.join(basedir, dir)
            if not os.path.exists(curdir):
                os.makedirs(curdir)

    def _listdirs(self, file):
        """ Grabs all the directories in the zip structure
        This is necessary to create the structure before trying
        to extract the file to it. """
        zf = zipfile.ZipFile(file)

        dirs = []

        for name in zf.namelist():
            dirname = os.path.dirname(name)
            if dirname:
                dirs.append(dirname)

        dirs.sort()
        return dirs



=== Products/Basket/__init__.py 1.22 => 1.23 ===
--- Products/Basket/__init__.py:1.22	Wed Nov  9 22:13:52 2005
+++ Products/Basket/__init__.py	Thu Nov 10 13:21:28 2005
@@ -1,7 +1,4 @@
 import sys
-import os
-import traceback
-import textwrap
 
 # pkg_resource monkeypatching (if necessary) needs to happen before
 # Products.Basket.utils is imported
@@ -14,153 +11,7 @@
     import pkg_resources_0_6a7 as pkg_resources
     sys.modules['pkg_resources'] = pkg_resources
 
-from Products.Basket.utils import EggProductContext
-from Products.Basket.utils import EggProduct
-import zLOG
-import App
-
-entrypoint_group = 'zope2.initialize'
-
-class Basket(object):
-    def __init__(self):
-        self.pre_initialized = False
-        
-    def require(self, distro_str):
-        """ Specifically require a distribution specification """
-        pkg_resources.require(distro_str)
-
-    def parse_product_distributions_file(self, fp):
-        L = []
-        for distro_str in fp:
-            distro_str = distro_str.strip()
-            if distro_str:
-                L.append(distro_str)
-        return L
-
-    def initialize(self, context):
-        context.registerClass(EggProduct, constructors = (('dummy',None),),
-                              visibility=None, icon='icon_egg.gif')
-        # Grab app from Zope product context
-        # It's a "protected" attribute, hence the name mangling
-        app = context._ProductContext__app
-        debug_mode = App.config.getConfiguration().debug_mode
-
-        if not self.pre_initialized:
-            try:
-                etc = os.path.join(INSTANCE_HOME, 'etc')
-            except NameError: # INSTANCE_HOME may not be available
-                etc = ''
-            pdist_fname = os.path.join(etc, 'PRODUCT_DISTRIBUTIONS.txt')
-            self.preinitialize(pdist_fname)
-
-        data = []
-        points = pkg_resources.iter_entry_points(entrypoint_group)
-        meta_types = []
-
-        for point in points:
-            # XXX deal with duplicate product names by raising an exception
-            # somewhere in here.
-            eggname = ' '.join(textwrap.wrap(point.dist.location, 80))
-            try:
-                product_pkg = get_containing_package(point.module_name)
-            except:
-                zLOG.LOG('Egg Product Init', zLOG.ERROR,
-                         'Problem initializing product with entry point '
-                         '"%s" in module "%s"' % (point.name,point.module_name),
-                         error=sys.exc_info())
-                if debug_mode:
-                    raise
-                else:
-                    continue
-                
-            productname = product_pkg.__name__.split('.')[-1]
-            initializer = get_initializer(point, productname, debug_mode)
-            context = EggProductContext(productname, initializer, app,
-                                        product_pkg, eggname)
-            returned = context.install(debug_mode)
-            data.append(returned)
-        return data
-
-    def product_distributions_by_dwim(self):
-        """ Return all product distributions which have an appropriate
-        entry point group on sys.path """
-        environment = pkg_resources.Environment()
-        product_distros = []
-        for project_name in environment:
-            distributions = environment[project_name]
-            for distribution in distributions:
-                if is_product_distribution(distribution):
-                    product_distros.append(distribution)
-        return product_distros
-
-    def product_distributions_by_require(self, pdist_fname):
-        """ Return all product distributions which are listed in the
-        product distributions file """
-        pdist_file = open(pdist_fname, 'r')
-        strings = self.parse_product_distributions_file(pdist_file)
-        product_distros = []
-        for string in strings:
-            distribution = pkg_resources.get_distribution(string)
-            if is_product_distribution(distribution):
-                product_distros.append(distribution)
-            else:
-                zLOG.LOG('Egg Product Init',
-                         zLOG.ERROR,
-                         'A requirement was listed in %s that is not a Zope '
-                         'product package: %s' % (pdist_fname, string))
-        return product_distros
-
-    def preinitialize(self, pdist_fname=None):
-        if pdist_fname and os.path.exists(pdist_fname):
-            distributions = self.product_distributions_by_require(pdist_fname)
-        else:
-            distributions = self.product_distributions_by_dwim()
-
-        for distribution in distributions:
-            pkg_resources.working_set.add(distribution)
-
-        self.pre_initialized = True
-
-def get_containing_package(module_name):
-    __import__(module_name)
-    thing = sys.modules[module_name]
-    if hasattr(thing, '__path__'):
-        return thing
-    new = '.'.join(module_name.split('.')[:-1])
-    if new == module_name:
-        return None
-    return get_containing_package(new)
-
-def get_initializer(point, productname, debug_mode):
-    initializer = None
-    try:
-        # this will raise an import error if the initializer can't
-        # be imported (presumably because of a module-scope error)
-        initializer = point.load()
-    except:
-        exc = sys.exc_info()
-        zLOG.LOG('Zope', zLOG.ERROR, 'Could not import %s' % productname,
-                 error=exc)
-        f = StringIO()
-        traceback.print_exc(100, f)
-        product_pkg.__import_error__ = f.getvalue()
-        if debug_mode:
-            raise exc[0], exc[1], exc[2]
-    return initializer
-
-def is_product_distribution(distribution): 
-    entry_meta = 'entry_points.txt'
-    if distribution.has_metadata(entry_meta):
-        inifile = distribution.get_metadata(entry_meta)
-        sections = pkg_resources.split_sections(inifile)
-        for section, content in sections:
-            if section == entrypoint_group:
-                return True
-
-def is_zip_safe_distribution(distribution):
-    zip_safe_meta = 'zip-safe'
-    if distribution.has_metadata(zip_safe_meta):
-        return True
+from Products.Basket.basket import Basket
 
 basket = Basket()
 initialize = basket.initialize
@@ -180,4 +31,3 @@
 del resource.ImageResource
 del resource.DTMLResource
 del resource.PageTemplateResource
-



More information about the Zope-CVS mailing list