7
from .compat import callable, exec_, string_types, with_metaclass
9
from sqlalchemy.util import format_argspec_plus, update_wrapper
10
from sqlalchemy.util.compat import inspect_getfullargspec
13
class _ModuleClsMeta(type):
14
def __setattr__(cls, key, value):
15
super(_ModuleClsMeta, cls).__setattr__(key, value)
16
cls._update_module_proxies(key)
19
class ModuleClsProxy(with_metaclass(_ModuleClsMeta)):
20
"""Create module level proxy functions for the
21
methods on a given class.
23
The functions will have a compatible signature
28
_setups = collections.defaultdict(lambda: (set(), []))
31
def _update_module_proxies(cls, name):
32
attr_names, modules = cls._setups[cls]
33
for globals_, locals_ in modules:
34
cls._add_proxied_attribute(name, globals_, locals_, attr_names)
36
def _install_proxy(self):
37
attr_names, modules = self._setups[self.__class__]
38
for globals_, locals_ in modules:
39
globals_['_proxy'] = self
40
for attr_name in attr_names:
41
globals_[attr_name] = getattr(self, attr_name)
43
def _remove_proxy(self):
44
attr_names, modules = self._setups[self.__class__]
45
for globals_, locals_ in modules:
46
globals_['_proxy'] = None
47
for attr_name in attr_names:
48
del globals_[attr_name]
51
def create_module_class_proxy(cls, globals_, locals_):
52
attr_names, modules = cls._setups[cls]
56
cls._setup_proxy(globals_, locals_, attr_names)
59
def _setup_proxy(cls, globals_, locals_, attr_names):
60
for methname in dir(cls):
61
cls._add_proxied_attribute(methname, globals_, locals_, attr_names)
64
def _add_proxied_attribute(cls, methname, globals_, locals_, attr_names):
65
if not methname.startswith('_'):
66
meth = getattr(cls, methname)
68
locals_[methname] = cls._create_method_proxy(
69
methname, globals_, locals_)
71
attr_names.add(methname)
74
def _create_method_proxy(cls, name, globals_, locals_):
75
fn = getattr(cls, name)
76
spec = inspect.getargspec(fn)
77
if spec[0] and spec[0][0] == 'self':
79
args = inspect.formatargspec(*spec)
82
num_defaults += len(spec[3])
85
defaulted_vals = name_args[0 - num_defaults:]
89
apply_kw = inspect.formatargspec(
90
name_args, spec[1], spec[2],
92
formatvalue=lambda x: '=' + x)
94
def _name_error(name):
96
"Can't invoke function '%s', as the proxy object has "
98
"established for the Alembic '%s' class. "
99
"Try placing this code inside a callable." % (
102
globals_['_name_error'] = _name_error
104
translations = getattr(fn, "_legacy_translations", [])
106
outer_args = inner_args = "*args, **kw"
107
translate_str = "args, kw = _translate(%r, %r, %r, args, kw)" % (
113
def translate(fn_name, spec, translations, args, kw):
117
for oldname, newname in translations:
120
"Argument %r is now named %r "
121
"for method %s()." % (
122
oldname, newname, fn_name
124
return_kw[newname] = kw.pop(oldname)
129
pos_only = spec[0][:-len(spec[3])]
133
if arg not in return_kw:
135
return_args.append(args.pop(0))
138
"missing required positional argument: %s"
140
return_args.extend(args)
142
return return_args, return_kw
143
globals_['_translate'] = translate
145
outer_args = args[1:-1]
146
inner_args = apply_kw[1:-1]
149
func_text = textwrap.dedent("""\
150
def %(name)s(%(args)s):
156
_name_error('%(name)s')
157
return _proxy.%(name)s(%(apply_kw)s)
161
'translate': translate_str,
163
'apply_kw': inner_args,
167
exec_(func_text, globals_, lcl)
171
def _with_legacy_names(translations):
173
fn._legacy_translations = translations
180
return value is not None and \
181
value.lower() == 'true'
185
return uuid.uuid4().hex[-12:]
188
def to_list(x, default=None):
191
elif isinstance(x, string_types):
193
elif isinstance(x, collections.Iterable):
199
def to_tuple(x, default=None):
202
elif isinstance(x, string_types):
204
elif isinstance(x, collections.Iterable):
210
def unique_list(seq, hashfunc=None):
214
return [x for x in seq
218
return [x for x in seq
219
if hashfunc(x) not in seen
220
and not seen_add(hashfunc(x))]
223
def dedupe_tuple(tup):
224
return tuple(unique_list(tup))
228
class memoized_property(object):
230
"""A read-only @property that is only evaluated once."""
232
def __init__(self, fget, doc=None):
234
self.__doc__ = doc or fget.__doc__
235
self.__name__ = fget.__name__
237
def __get__(self, obj, cls):
240
obj.__dict__[self.__name__] = result = self.fget(obj)
244
class immutabledict(dict):
246
def _immutable(self, *arg, **kw):
247
raise TypeError("%s object is immutable" % self.__class__.__name__)
249
__delitem__ = __setitem__ = __setattr__ = \
250
clear = pop = popitem = setdefault = \
253
def __new__(cls, *args):
254
new = dict.__new__(cls)
255
dict.__init__(new, *args)
258
def __init__(self, *args):
261
def __reduce__(self):
262
return immutabledict, (dict(self), )
266
return immutabledict(d)
268
d2 = immutabledict(self)
273
return "immutabledict(%s)" % dict.__repr__(self)
276
class Dispatcher(object):
277
def __init__(self, uselist=False):
279
self.uselist = uselist
281
def dispatch_for(self, target, qualifier='default'):
284
self._registry.setdefault((target, qualifier), []).append(fn)
286
assert (target, qualifier) not in self._registry
287
self._registry[(target, qualifier)] = fn
291
def dispatch(self, obj, qualifier='default'):
293
if isinstance(obj, string_types):
295
elif isinstance(obj, type):
296
targets = obj.__mro__
298
targets = type(obj).__mro__
300
for spcls in targets:
301
if qualifier != 'default' and (spcls, qualifier) in self._registry:
302
return self._fn_or_list(
303
self._registry[(spcls, qualifier)])
304
elif (spcls, 'default') in self._registry:
305
return self._fn_or_list(
306
self._registry[(spcls, 'default')])
308
raise ValueError("no dispatch function for object: %s" % obj)
310
def _fn_or_list(self, fn_or_list):
313
for fn in fn_or_list:
320
"""Return a copy of this dispatcher that is independently
326
(k, [fn for fn in self._registry[k]])
327
for k in self._registry
330
d._registry.update(self._registry)