~canonical-livepatch-dependencies/canonical-livepatch-service-dependencies/alembic

« back to all changes in this revision

Viewing changes to alembic/util/langhelpers.py

  • Committer: Free Ekanayaka
  • Author(s): Ondřej Nový
  • Date: 2016-04-21 20:04:57 UTC
  • Revision ID: free.ekanayaka@canonical.com-20160421200457-b4x0zmvn1nhc094p
Tags: upstream-0.8.6
Import upstream version 0.8.6

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import textwrap
 
2
import warnings
 
3
import inspect
 
4
import uuid
 
5
import collections
 
6
 
 
7
from .compat import callable, exec_, string_types, with_metaclass
 
8
 
 
9
from sqlalchemy.util import format_argspec_plus, update_wrapper
 
10
from sqlalchemy.util.compat import inspect_getfullargspec
 
11
 
 
12
 
 
13
class _ModuleClsMeta(type):
 
14
    def __setattr__(cls, key, value):
 
15
        super(_ModuleClsMeta, cls).__setattr__(key, value)
 
16
        cls._update_module_proxies(key)
 
17
 
 
18
 
 
19
class ModuleClsProxy(with_metaclass(_ModuleClsMeta)):
 
20
    """Create module level proxy functions for the
 
21
    methods on a given class.
 
22
 
 
23
    The functions will have a compatible signature
 
24
    as the methods.
 
25
 
 
26
    """
 
27
 
 
28
    _setups = collections.defaultdict(lambda: (set(), []))
 
29
 
 
30
    @classmethod
 
31
    def _update_module_proxies(cls, name):
 
32
        attr_names, modules = cls._setups[cls]
 
33
        for globals_, locals_ in modules:
 
34
            cls._add_proxied_attribute(name, globals_, locals_, attr_names)
 
35
 
 
36
    def _install_proxy(self):
 
37
        attr_names, modules = self._setups[self.__class__]
 
38
        for globals_, locals_ in modules:
 
39
            globals_['_proxy'] = self
 
40
            for attr_name in attr_names:
 
41
                globals_[attr_name] = getattr(self, attr_name)
 
42
 
 
43
    def _remove_proxy(self):
 
44
        attr_names, modules = self._setups[self.__class__]
 
45
        for globals_, locals_ in modules:
 
46
            globals_['_proxy'] = None
 
47
            for attr_name in attr_names:
 
48
                del globals_[attr_name]
 
49
 
 
50
    @classmethod
 
51
    def create_module_class_proxy(cls, globals_, locals_):
 
52
        attr_names, modules = cls._setups[cls]
 
53
        modules.append(
 
54
            (globals_, locals_)
 
55
        )
 
56
        cls._setup_proxy(globals_, locals_, attr_names)
 
57
 
 
58
    @classmethod
 
59
    def _setup_proxy(cls, globals_, locals_, attr_names):
 
60
        for methname in dir(cls):
 
61
            cls._add_proxied_attribute(methname, globals_, locals_, attr_names)
 
62
 
 
63
    @classmethod
 
64
    def _add_proxied_attribute(cls, methname, globals_, locals_, attr_names):
 
65
        if not methname.startswith('_'):
 
66
            meth = getattr(cls, methname)
 
67
            if callable(meth):
 
68
                locals_[methname] = cls._create_method_proxy(
 
69
                    methname, globals_, locals_)
 
70
            else:
 
71
                attr_names.add(methname)
 
72
 
 
73
    @classmethod
 
74
    def _create_method_proxy(cls, name, globals_, locals_):
 
75
        fn = getattr(cls, name)
 
76
        spec = inspect.getargspec(fn)
 
77
        if spec[0] and spec[0][0] == 'self':
 
78
            spec[0].pop(0)
 
79
        args = inspect.formatargspec(*spec)
 
80
        num_defaults = 0
 
81
        if spec[3]:
 
82
            num_defaults += len(spec[3])
 
83
        name_args = spec[0]
 
84
        if num_defaults:
 
85
            defaulted_vals = name_args[0 - num_defaults:]
 
86
        else:
 
87
            defaulted_vals = ()
 
88
 
 
89
        apply_kw = inspect.formatargspec(
 
90
            name_args, spec[1], spec[2],
 
91
            defaulted_vals,
 
92
            formatvalue=lambda x: '=' + x)
 
93
 
 
94
        def _name_error(name):
 
95
            raise NameError(
 
96
                "Can't invoke function '%s', as the proxy object has "
 
97
                "not yet been "
 
98
                "established for the Alembic '%s' class.  "
 
99
                "Try placing this code inside a callable." % (
 
100
                    name, cls.__name__
 
101
                ))
 
102
        globals_['_name_error'] = _name_error
 
103
 
 
104
        translations = getattr(fn, "_legacy_translations", [])
 
105
        if translations:
 
106
            outer_args = inner_args = "*args, **kw"
 
107
            translate_str = "args, kw = _translate(%r, %r, %r, args, kw)" % (
 
108
                fn.__name__,
 
109
                tuple(spec),
 
110
                translations
 
111
            )
 
112
 
 
113
            def translate(fn_name, spec, translations, args, kw):
 
114
                return_kw = {}
 
115
                return_args = []
 
116
 
 
117
                for oldname, newname in translations:
 
118
                    if oldname in kw:
 
119
                        warnings.warn(
 
120
                            "Argument %r is now named %r "
 
121
                            "for method %s()." % (
 
122
                                oldname, newname, fn_name
 
123
                            ))
 
124
                        return_kw[newname] = kw.pop(oldname)
 
125
                return_kw.update(kw)
 
126
 
 
127
                args = list(args)
 
128
                if spec[3]:
 
129
                    pos_only = spec[0][:-len(spec[3])]
 
130
                else:
 
131
                    pos_only = spec[0]
 
132
                for arg in pos_only:
 
133
                    if arg not in return_kw:
 
134
                        try:
 
135
                            return_args.append(args.pop(0))
 
136
                        except IndexError:
 
137
                            raise TypeError(
 
138
                                "missing required positional argument: %s"
 
139
                                % arg)
 
140
                return_args.extend(args)
 
141
 
 
142
                return return_args, return_kw
 
143
            globals_['_translate'] = translate
 
144
        else:
 
145
            outer_args = args[1:-1]
 
146
            inner_args = apply_kw[1:-1]
 
147
            translate_str = ""
 
148
 
 
149
        func_text = textwrap.dedent("""\
 
150
        def %(name)s(%(args)s):
 
151
            %(doc)r
 
152
            %(translate)s
 
153
            try:
 
154
                p = _proxy
 
155
            except NameError:
 
156
                _name_error('%(name)s')
 
157
            return _proxy.%(name)s(%(apply_kw)s)
 
158
            e
 
159
        """ % {
 
160
            'name': name,
 
161
            'translate': translate_str,
 
162
            'args': outer_args,
 
163
            'apply_kw': inner_args,
 
164
            'doc': fn.__doc__,
 
165
        })
 
166
        lcl = {}
 
167
        exec_(func_text, globals_, lcl)
 
168
        return lcl[name]
 
169
 
 
170
 
 
171
def _with_legacy_names(translations):
 
172
    def decorate(fn):
 
173
        fn._legacy_translations = translations
 
174
        return fn
 
175
 
 
176
    return decorate
 
177
 
 
178
 
 
179
def asbool(value):
 
180
    return value is not None and \
 
181
        value.lower() == 'true'
 
182
 
 
183
 
 
184
def rev_id():
 
185
    return uuid.uuid4().hex[-12:]
 
186
 
 
187
 
 
188
def to_list(x, default=None):
 
189
    if x is None:
 
190
        return default
 
191
    elif isinstance(x, string_types):
 
192
        return [x]
 
193
    elif isinstance(x, collections.Iterable):
 
194
        return list(x)
 
195
    else:
 
196
        return [x]
 
197
 
 
198
 
 
199
def to_tuple(x, default=None):
 
200
    if x is None:
 
201
        return default
 
202
    elif isinstance(x, string_types):
 
203
        return (x, )
 
204
    elif isinstance(x, collections.Iterable):
 
205
        return tuple(x)
 
206
    else:
 
207
        return (x, )
 
208
 
 
209
 
 
210
def unique_list(seq, hashfunc=None):
 
211
    seen = set()
 
212
    seen_add = seen.add
 
213
    if not hashfunc:
 
214
        return [x for x in seq
 
215
                if x not in seen
 
216
                and not seen_add(x)]
 
217
    else:
 
218
        return [x for x in seq
 
219
                if hashfunc(x) not in seen
 
220
                and not seen_add(hashfunc(x))]
 
221
 
 
222
 
 
223
def dedupe_tuple(tup):
 
224
    return tuple(unique_list(tup))
 
225
 
 
226
 
 
227
 
 
228
class memoized_property(object):
 
229
 
 
230
    """A read-only @property that is only evaluated once."""
 
231
 
 
232
    def __init__(self, fget, doc=None):
 
233
        self.fget = fget
 
234
        self.__doc__ = doc or fget.__doc__
 
235
        self.__name__ = fget.__name__
 
236
 
 
237
    def __get__(self, obj, cls):
 
238
        if obj is None:
 
239
            return self
 
240
        obj.__dict__[self.__name__] = result = self.fget(obj)
 
241
        return result
 
242
 
 
243
 
 
244
class immutabledict(dict):
 
245
 
 
246
    def _immutable(self, *arg, **kw):
 
247
        raise TypeError("%s object is immutable" % self.__class__.__name__)
 
248
 
 
249
    __delitem__ = __setitem__ = __setattr__ = \
 
250
        clear = pop = popitem = setdefault = \
 
251
        update = _immutable
 
252
 
 
253
    def __new__(cls, *args):
 
254
        new = dict.__new__(cls)
 
255
        dict.__init__(new, *args)
 
256
        return new
 
257
 
 
258
    def __init__(self, *args):
 
259
        pass
 
260
 
 
261
    def __reduce__(self):
 
262
        return immutabledict, (dict(self), )
 
263
 
 
264
    def union(self, d):
 
265
        if not self:
 
266
            return immutabledict(d)
 
267
        else:
 
268
            d2 = immutabledict(self)
 
269
            dict.update(d2, d)
 
270
            return d2
 
271
 
 
272
    def __repr__(self):
 
273
        return "immutabledict(%s)" % dict.__repr__(self)
 
274
 
 
275
 
 
276
class Dispatcher(object):
 
277
    def __init__(self, uselist=False):
 
278
        self._registry = {}
 
279
        self.uselist = uselist
 
280
 
 
281
    def dispatch_for(self, target, qualifier='default'):
 
282
        def decorate(fn):
 
283
            if self.uselist:
 
284
                self._registry.setdefault((target, qualifier), []).append(fn)
 
285
            else:
 
286
                assert (target, qualifier) not in self._registry
 
287
                self._registry[(target, qualifier)] = fn
 
288
            return fn
 
289
        return decorate
 
290
 
 
291
    def dispatch(self, obj, qualifier='default'):
 
292
 
 
293
        if isinstance(obj, string_types):
 
294
            targets = [obj]
 
295
        elif isinstance(obj, type):
 
296
            targets = obj.__mro__
 
297
        else:
 
298
            targets = type(obj).__mro__
 
299
 
 
300
        for spcls in targets:
 
301
            if qualifier != 'default' and (spcls, qualifier) in self._registry:
 
302
                return self._fn_or_list(
 
303
                    self._registry[(spcls, qualifier)])
 
304
            elif (spcls, 'default') in self._registry:
 
305
                return self._fn_or_list(
 
306
                    self._registry[(spcls, 'default')])
 
307
        else:
 
308
            raise ValueError("no dispatch function for object: %s" % obj)
 
309
 
 
310
    def _fn_or_list(self, fn_or_list):
 
311
        if self.uselist:
 
312
            def go(*arg, **kw):
 
313
                for fn in fn_or_list:
 
314
                    fn(*arg, **kw)
 
315
            return go
 
316
        else:
 
317
            return fn_or_list
 
318
 
 
319
    def branch(self):
 
320
        """Return a copy of this dispatcher that is independently
 
321
        writable."""
 
322
 
 
323
        d = Dispatcher()
 
324
        if self.uselist:
 
325
            d._registry.update(
 
326
                (k, [fn for fn in self._registry[k]])
 
327
                for k in self._registry
 
328
            )
 
329
        else:
 
330
            d._registry.update(self._registry)
 
331
        return d