~certify-web-dev/storm/certify-staging

« back to all changes in this revision

Viewing changes to storm/zope/zstorm.py

  • Committer: Jamu Kakar
  • Date: 2007-07-17 04:42:52 UTC
  • mto: This revision was merged to the branch mainline in revision 159.
  • Revision ID: jkakar@kakar.ca-20070717044252-k13a6522lk2ep7g2
- Migrated canonical.zstorm code into storm.zope.  Other than updating
  imports the code is unchanged.  ZStormTest.is_supported needs to be
  fleshed out to do the right thing.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import threading
 
2
import weakref
 
3
 
 
4
from zope.testing.cleanup import addCleanUp
 
5
from zope.interface import implements
 
6
 
 
7
import transaction
 
8
from transaction.interfaces import IDataManager, ISynchronizer
 
9
from transaction._transaction import TransactionFailedError
 
10
 
 
11
from storm.zope.interfaces import IZStorm, ZStormError
 
12
from storm.database import create_database
 
13
from storm.store import Store
 
14
 
 
15
 
 
16
class ZStorm(object):
 
17
 
 
18
    implements(IZStorm)
 
19
 
 
20
    _databases = {}
 
21
 
 
22
    def __init__(self):
 
23
        self._local = threading.local()
 
24
        self._default_databases = {}
 
25
        self._default_uris = {}
 
26
 
 
27
    def _reset(self):
 
28
        for name, store in self.iterstores():
 
29
            store.close()
 
30
        self._local = threading.local()
 
31
        self._databases.clear()
 
32
        self._default_databases.clear()
 
33
        self._default_uris.clear()
 
34
 
 
35
    @property
 
36
    def _stores(self):
 
37
        try:
 
38
            return self._local.stores
 
39
        except AttributeError:
 
40
            stores = weakref.WeakValueDictionary()
 
41
            return self._local.__dict__.setdefault("stores", stores)
 
42
 
 
43
    @property
 
44
    def _named(self):
 
45
        try:
 
46
            return self._local.named
 
47
        except AttributeError:
 
48
            return self._local.__dict__.setdefault("named", {})
 
49
 
 
50
    def _get_database(self, uri):
 
51
        database = self._databases.get(uri)
 
52
        if database is None:
 
53
            return self._databases.setdefault(uri, create_database(uri))
 
54
        return database
 
55
 
 
56
    def set_default_uri(self, name, default_uri):
 
57
        self._default_databases[name] = self._get_database(default_uri)
 
58
        self._default_uris[name] = default_uri
 
59
 
 
60
    def create(self, name, uri=None):
 
61
        if uri is None:
 
62
            database = self._default_databases.get(name)
 
63
            if database is None:
 
64
                raise ZStormError("Store named '%s' not found" % name)
 
65
        else:
 
66
            database = self._get_database(uri)
 
67
        store = Store(database)
 
68
        store.__synchronizer = StoreSynchronizer(store)
 
69
 
 
70
        self._stores[id(store)] = store
 
71
 
 
72
        if name is not None:
 
73
            old_store = self._named.setdefault(name, store)
 
74
            if old_store is not store:
 
75
                raise ZStormError("Store named '%s' already exists" % name)
 
76
        return store
 
77
 
 
78
    def get(self, name, default_uri=None):
 
79
        store = self._named.get(name)
 
80
        if not store:
 
81
            return self.create(name, default_uri)
 
82
        return store
 
83
 
 
84
    def remove(self, store):
 
85
        """Remove the given store from ZStorm.
 
86
 
 
87
        This removes any management of the store from ZStorm.
 
88
 
 
89
        Notice that if the store was used inside the current
 
90
        transaction, it's probably joined the transaction system as
 
91
        a resource already, and thus it will commit/rollback when
 
92
        the transaction system requests so.
 
93
 
 
94
        This method will unlink the *synchronizer* from the transaction
 
95
        system, so that once the current transaction is over it won't
 
96
        link back to it in future transactions.
 
97
        """
 
98
        del self._stores[id(store)]
 
99
        for name, named_store in self._named.items():
 
100
            if store == named_store:
 
101
                del self._named[name]
 
102
        transaction.manager.unregisterSynch(store.__synchronizer)
 
103
 
 
104
    def iterstores(self):
 
105
        names = {}
 
106
        for name, store in self._named.items():
 
107
            names[id(store)] = name
 
108
        for store in self._stores.values():
 
109
            yield names.get(id(store)), store
 
110
 
 
111
    def get_default_uris(self):
 
112
        """
 
113
        Return a list of name, uri tuples that are named as the default
 
114
        databases for those names.
 
115
        """
 
116
        return self._default_uris.copy()
 
117
 
 
118
 
 
119
class StoreSynchronizer(object):
 
120
    """This class takes cares of plugging the store in new transactions.
 
121
 
 
122
    Garbage collection should work fine, because the transaction manager
 
123
    stores synchronizers as weak references. Even then, we give a hand
 
124
    to the garbage collector by avoiding strong circular references
 
125
    on the store.
 
126
    """
 
127
 
 
128
    implements(ISynchronizer)
 
129
 
 
130
    def __init__(self, store):
 
131
        data_manager = StoreDataManager(store)
 
132
 
 
133
        self._store_ref = weakref.ref(store)
 
134
        self._data_manager_ref = weakref.ref(data_manager)
 
135
 
 
136
        # Join now ...
 
137
        data_manager.join(transaction.get())
 
138
 
 
139
        # ... and in the future.
 
140
        transaction.manager.registerSynch(self)
 
141
 
 
142
    def _join(self, trans):
 
143
        # If the store is still alive and the transaction is in this thread.
 
144
        store = self._store_ref()
 
145
        if store and trans is transaction.get():
 
146
            data_manager = self._data_manager_ref()
 
147
            if data_manager is None:
 
148
                data_manager = StoreDataManager(store)
 
149
                self._data_manager_ref = weakref.ref(data_manager)
 
150
            try:
 
151
                data_manager.join(trans)
 
152
            except TransactionFailedError:
 
153
                # It means that an *already failed* transaction is trying
 
154
                # to join us to notify that it is indeed failed.  We don't
 
155
                # care about these (see the double_abort test case).
 
156
                pass
 
157
 
 
158
    def beforeCompletion(self, trans):
 
159
        self._join(trans)
 
160
 
 
161
    def afterCompletion(self, trans):
 
162
        pass
 
163
 
 
164
    def newTransaction(self, trans):
 
165
        self._join(trans)
 
166
 
 
167
 
 
168
class StoreDataManager(object):
 
169
 
 
170
    implements(IDataManager)
 
171
 
 
172
    transaction_manager = transaction.manager
 
173
 
 
174
    def __init__(self, store):
 
175
        self._store = store
 
176
        self._trans = None
 
177
 
 
178
    def join(self, trans):
 
179
        if trans is not self._trans:
 
180
            self._trans = trans
 
181
            trans.join(self)
 
182
 
 
183
    def abort(self, txn):
 
184
        self._trans = None
 
185
        self._store.rollback()
 
186
 
 
187
    def tpc_begin(self, txn):
 
188
        # Zope's transaction system will call tpc_begin() on all
 
189
        # managers before calling commit, so flushing here may help
 
190
        # in cases where there are two stores with changes, and one
 
191
        # of them will fail.  In such cases, flushing earlier will
 
192
        # ensure that both transactions will be rolled back, instead
 
193
        # of one committed and one rolled back.
 
194
        self._store.flush()
 
195
 
 
196
    def commit(self, txn):
 
197
        self._trans = None
 
198
        self._store.commit()
 
199
 
 
200
    def tpc_vote(self, txn):
 
201
        pass
 
202
 
 
203
    def tpc_finish(self, txn):
 
204
        pass
 
205
 
 
206
    def tpc_abort(self, txn):
 
207
        pass
 
208
 
 
209
    def sortKey(self):
 
210
        return "store_%d" % id(self)
 
211
 
 
212
 
 
213
global_zstorm = ZStorm()
 
214
 
 
215
addCleanUp(global_zstorm._reset)