1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
|
"""
Persistent Globally Unique Indentifiers for KupferObjects.
Some objects are assigned identifiers by reference, some are assigned
identifiers containing the whole object data (SerializedObject).
SerializedObject is a saved representation of a KupferObject, i.e. a
data model user-level object.
We unpickle SerializedObjects in an especially conservative way: new
module loading is always refused; this way, we avoid loading parts of
the program that we didn't wish to activate.
"""
import contextlib
import pickle
from kupfer import pretty
from kupfer.core import actioncompat
from kupfer.core import qfurl
from kupfer.core.sources import GetSourceController
from kupfer.conspickle import ConservativeUnpickler
__all__ = [
"SerializedObject", "SERIALIZABLE_ATTRIBUTE",
"resolve_unique_id", "resolve_action_id", "get_unique_id", "is_reference",
]
SERIALIZABLE_ATTRIBUTE = "serializable"
class SerializedObject (object):
# treat the serializable attribute as a version number, defined on the class
def __init__(self, obj):
self.version = getattr(obj, SERIALIZABLE_ATTRIBUTE)
self.data = pickle.dumps(obj, pickle.HIGHEST_PROTOCOL)
def __eq__(self, other):
return (isinstance(other, type(self)) and self.data == other.data and
self.version == other.version)
def reconstruct(self):
obj = ConservativeUnpickler.loads(self.data)
if self.version != getattr(obj, SERIALIZABLE_ATTRIBUTE):
raise ValueError("Version mismatch for reconstructed %s" % obj)
return obj
def get_unique_id(obj):
if obj is None:
return None
if hasattr(obj, "qf_id"):
return str(qfurl.qfurl(obj))
if getattr(obj, SERIALIZABLE_ATTRIBUTE, None) is not None:
try:
return SerializedObject(obj)
except pickle.PicklingError, exc:
pretty.print_error(__name__, type(exc).__name__, exc)
return None
return repr(obj)
def is_reference(puid):
"Return True if @puid is a reference-type ID"
return not isinstance(puid, SerializedObject)
# A Context manager to block recursion when seeking inside a
# catalog; we have a stack (@_excluding) of the sources we
# are visiting, and nested context with the _exclusion
# context manager
_excluding = []
@contextlib.contextmanager
def _exclusion(src):
try:
_excluding.append(src)
yield
finally:
_excluding.pop()
def _is_currently_excluding(src):
return src is not None and src in _excluding
def _find_obj_in_catalog(puid, catalog):
if puid.startswith(qfurl.QFURL_SCHEME):
qfu = qfurl.qfurl(url=puid)
return qfu.resolve_in_catalog(catalog)
for src in catalog:
if _is_currently_excluding(src):
continue
with _exclusion(src):
for obj in src.get_leaves():
if repr(obj) == puid:
return obj
return None
def resolve_unique_id(puid, excluding=None):
"""
Resolve unique id @puid
The caller (if a Source) should pass itself as @excluding,
so that recursion into itself is avoided.
"""
if excluding is not None:
with _exclusion(excluding):
return resolve_unique_id(puid, None)
if puid is None:
return None
if isinstance(puid, SerializedObject):
try:
return puid.reconstruct()
except Exception, exc:
pretty.print_debug(__name__, type(exc).__name__, exc)
return None
sc = GetSourceController()
obj = _find_obj_in_catalog(puid, sc._firstlevel)
if obj is not None:
return obj
other_sources = set(sc.sources) - set(sc._firstlevel)
obj = _find_obj_in_catalog(puid, other_sources)
return obj
def resolve_action_id(puid, for_item=None):
if puid is None:
return None
if isinstance(puid, SerializedObject):
return resolve_unique_id(puid)
get_action_id = repr
sc = GetSourceController()
if for_item is not None:
for action in actioncompat.actions_for_item(for_item, sc):
if get_unique_id(action) == puid:
return action
for item_type, actions in sc.action_decorators.iteritems():
for action in actions:
if get_action_id(action) == puid:
return action
pretty.print_debug(__name__, "Unable to resolve %s (%s)" % (puid, for_item))
return None
|