Source code for b3j0f.utils.proxy

# -*- coding: utf-8 -*-

# --------------------------------------------------------------------
# The MIT License (MIT)
#
# Copyright (c) 2015 Jonathan Labéjof <jonathan.labejof@gmail.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# --------------------------------------------------------------------

"""Module in charge of creating proxies like the design pattern ``proxy``.

A proxy is based on a callable element. It respects its signature but not the
implementation.
"""

from __future__ import absolute_import

__all__ = [
    'get_proxy', 'proxify_routine', 'proxify_elt', 'is_proxy', 'proxified_elt'
]

from time import time

from types import MethodType, FunctionType

from opcode import opmap

from random import randint

from sys import maxsize

from inspect import (
    getmembers, isroutine, ismethod, getargspec, getfile, isbuiltin, isclass
)

from six import (
    get_function_closure, get_function_code, get_function_defaults,
    get_function_globals, get_method_function, get_method_self, exec_, PY2, PY3,
    string_types, wraps
)

from .path import lookup
from .runtime import getcodeobj

# consts for interception loading
LOAD_GLOBAL = opmap['LOAD_GLOBAL']
LOAD_CONST = opmap['LOAD_CONST']

#: list of attributes to set after proxifying a function
WRAPPER_ASSIGNMENTS = ['__doc__', '__name__']
#: list of attributes to update after proxifying a function
WRAPPER_UPDATES = ['__dict__']
#: lambda function name
__LAMBDA_NAME__ = (lambda: None).__name__
#: proxy class name
__PROXY_CLASS__ = 'Proxy'
#: attribute name for proxified element
__PROXIFIED__ = '__proxified__'
#: instance method name for delegating proxy generation to the elt to proxify
__GETPROXY__ = '__getproxy__'


[docs]def proxify_elt(elt, bases=None, _dict=None, public=False): """Proxify input elt. :param elt: elt to proxify. :param bases: elt class base classes. If None, use elt type. :param dict _dict: specific elt class content to use. :param bool public: if True (default False), proxify only public members (where name starts with the character '_'). :return: proxified element. :raises: TypeError if elt does not implement all routines of bases and _dict. """ # ensure _dict is a dictionary proxy_dict = {} if _dict is None else _dict.copy() # set of proxified attribute names which are proxified during bases parsing # and avoid to proxify them twice during _dict parsing proxified_attribute_names = set() # ensure bases is a tuple of types if bases is None: bases = (elt if isclass(elt) else elt.__class__,) if isinstance(bases, string_types): bases = (lookup(bases),) elif isclass(bases): bases = (bases,) else: bases = tuple(bases) # fill proxy_dict with routines of bases for base in bases: # exclude object if base is object: continue for name, member in getmembers(base, isroutine): # check if name is public if public and not name.startswith('_'): continue eltmember = getattr(elt, name, None) if eltmember is None: raise TypeError( 'Wrong elt {0}. Must implement {1} ({2}) of {3}.'. format(elt, name, member, base) ) # proxify member if member is not a constructor if name not in ['__new__', '__init__']: # get routine from proxy_dict or eltmember routine = proxy_dict.get(name, eltmember) # exclude object methods if getattr(routine, '__objclass__', None) is not object: # get routine proxy routine_proxy = proxify_routine(routine) if ismethod(routine_proxy): routine_proxy = get_method_function(routine_proxy) # update proxy_dict proxy_dict[name] = routine_proxy # and save the proxified attribute flag proxified_attribute_names.add(name) # proxify proxy_dict for name in proxy_dict: value = proxy_dict[name] if not hasattr(elt, name): raise TypeError( 'Wrong elt {0}. Must implement {1} ({2}).'.format( elt, name, value ) ) if isroutine(value): # if member has not already been proxified if name not in proxified_attribute_names: # proxify it value = proxify_routine(value) proxy_dict[name] = value # set default constructors if not present in proxy_dict if '__new__' not in proxy_dict: proxy_dict['__new__'] = object.__new__ if '__init__' not in proxy_dict: proxy_dict['__init__'] = object.__init__ # generate a new proxy class cls = type('Proxy', bases, proxy_dict) # instantiate proxy cls result = cls if isclass(elt) else cls() # bind elt to proxy setattr(result, __PROXIFIED__, elt) return result
[docs]def proxify_routine(routine, impl=None): """Proxify a routine with input impl. :param routine: routine to proxify. :param impl: new impl to use. If None, use routine. """ # init impl impl = routine if impl is None else impl is_method = ismethod(routine) if is_method: function = get_method_function(routine) else: function = routine # flag which indicates that the function is not a pure python function # and has to be wrapped wrap_function = not hasattr(function, '__code__') try: # get params from routine args, varargs, kwargs, _ = getargspec(function) except TypeError: # in case of error, wrap the function wrap_function = True if wrap_function: # if function is not pure python, create a generic one # with assignments assigned = [] for wrapper_assignment in WRAPPER_ASSIGNMENTS: if hasattr(function, wrapper_assignment): assigned.append(wrapper_assignment) # and updates updated = [] for wrapper_update in WRAPPER_UPDATES: if hasattr(function, wrapper_update): updated.append(wrapper_update) @wraps(function, assigned=assigned, updated=updated) def wrappedfunction(*args, **kwargs): """Default wrap function.""" function = wrappedfunction # get params from function args, varargs, kwargs, _ = getargspec(function) name = function.__name__ result = _compilecode( function=function, name=name, impl=impl, args=args, varargs=varargs, kwargs=kwargs ) # set wrapping assignments for wrapper_assignment in WRAPPER_ASSIGNMENTS: try: value = getattr(function, wrapper_assignment) except AttributeError: pass else: setattr(result, wrapper_assignment, value) # set proxy module result.__module__ = proxify_routine.__module__ # update wrapping updating for wrapper_update in WRAPPER_UPDATES: try: value = getattr(function, wrapper_update) except AttributeError: pass else: getattr(result, wrapper_update).update(value) # set proxyfied element on proxy setattr(result, __PROXIFIED__, routine) if is_method: # create a new method args = [result, get_method_self(routine)] if PY2: args.append(routine.im_class) result = MethodType(*args) return result
def _compilecode(function, name, impl, args, varargs, kwargs): """Get generated code. :return: function proxy generated code. :rtype: str """ newcodestr, generatedname, impl_name = _generatecode( function=function, name=name, impl=impl, args=args, varargs=varargs, kwargs=kwargs ) try: __file__ = getfile(function) except TypeError: __file__ = '<string>' # compile newcodestr code = compile(newcodestr, __file__, 'single') # define the code with the new function _globals = {} exec_(code, _globals) # get new code _var = _globals[generatedname] newco = get_function_code(_var) # get new consts list newconsts = list(newco.co_consts) if PY3: newcode = list(newco.co_code) else: newcode = [ord(co) for co in newco.co_code] consts_values = {impl_name: impl} # change LOAD_GLOBAL to LOAD_CONST index = 0 newcodelen = len(newcode) while index < newcodelen: if newcode[index] == LOAD_GLOBAL: oparg = newcode[index + 1] + (newcode[index + 2] << 8) name = newco.co_names[oparg] if name in consts_values: const_value = consts_values[name] if const_value in newconsts: pos = newconsts.index(const_value) else: pos = len(newconsts) newconsts.append(consts_values[name]) newcode[index] = LOAD_CONST newcode[index + 1] = pos & 0xFF newcode[index + 2] = pos >> 8 index += 1 codeobj = getcodeobj(newconsts, newcode, newco, get_function_code(function)) # instanciate a new function if function is None or isbuiltin(function): result = FunctionType(codeobj, {}) else: result = type(function)( codeobj, get_function_globals(function), function.__name__, get_function_defaults(function), get_function_closure(function) ) return result def _generatecode(function, name, impl, args, varargs, kwargs): code = '' # flag for lambda function islambda = __LAMBDA_NAME__ == name if islambda: generatedname = '_{0}'.format(int(time())) else: generatedname = name # get join method for reducing concatenation time execution join = "".join # default indentation indent = ' ' if islambda: code = '{0} = lambda '.format(generatedname) else: code = 'def {0}('.format(generatedname) if args: code = join((code, '{0}'.format(args[0]))) for arg in args[1:]: code = join((code, ', {0}'.format(arg))) if varargs is not None: if args: code = join((code, ', ')) code = join((code, '*{0}'.format(varargs))) if kwargs is not None: if args or varargs is not None: code = join((code, ', ')) code = join((code, '**{0}'.format(kwargs))) impl_name = '_{0}'.format(randint(0, maxsize)) # insert impl call if islambda: code = join((code, ': {0}('.format(impl_name))) else: code = join( ( code, '):\n{0}return {1}('.format(indent, impl_name) ) ) impl_args = args[1:] if ismethod(impl) else args if impl_args: code = join((code, '{0}'.format(impl_args[0]))) for arg in impl_args[1:]: code = join((code, ', {0}'.format(arg))) if varargs is not None: if args: code = join((code, ', ')) code = join((code, '*{0}'.format(varargs))) if kwargs is not None: if args or varargs is not None: code = join((code, ', ')) code = join((code, '**{0}'.format(kwargs))) code = join((code, ')\n')) result = code, generatedname, impl_name return result
[docs]def get_proxy(elt, bases=None, _dict=None): """Get proxy from an elt. If elt implements the proxy generator method (named ``__getproxy__``), use it instead of using this module functions. :param elt: elt to proxify. :type elt: object or function/method :param bases: base types to enrich in the result cls if not None. :param _dict: class members to proxify if not None. """ # try to find an instance proxy generator proxygenerator = getattr(elt, __GETPROXY__, None) # if a proxy generator is not found, use this module if proxygenerator is None: if isroutine(elt): result = proxify_routine(elt) else: # in case of object, result is a Proxy result = proxify_elt(elt, bases=bases, _dict=_dict) else: # otherwise, use the specific proxy generator result = proxygenerator() return result
[docs]def proxified_elt(proxy): """Get proxified element. :param proxy: proxy element from where get proxified element. :return: proxified element. None if proxy is not proxified. """ if ismethod(proxy): proxy = get_method_function(proxy) result = getattr(proxy, __PROXIFIED__, None) return result
[docs]def is_proxy(elt): """Return True if elt is a proxy. :param elt: elt to check such as a proxy. :return: True iif elt is a proxy. :rtype: bool """ if ismethod(elt): elt = get_method_function(elt) result = hasattr(elt, __PROXIFIED__) return result