~aafshar/storm/automatic-table-schema-generation

« back to all changes in this revision

Viewing changes to storm/zope/zstorm.py

  • Committer: Jamu Kakar
  • Date: 2007-08-06 22:48:34 UTC
  • mfrom: (144.3.21 storm-zope)
  • Revision ID: jkakar@kakar.ca-20070806224834-rkwwd619zgglcl6o
Merged storm-zope [r=niemeyer,radix] [f=126613].

This branch adds ZStorm, a component which allows Storm to be used
with Zope 3.  It's in the storm.zope package.  Be sure to read
tests/zope/README.txt for an overview of its operation.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
"""ZStorm integrates Storm with Zope 3.
 
2
 
 
3
@var global_zstorm: A global L{ZStorm} instance.  It used the
 
4
    L{IZStorm} utility registered in C{configure.zcml}.
 
5
"""
 
6
 
 
7
#
 
8
# Copyright (c) 2006, 2007 Canonical
 
9
#
 
10
# Written by Gustavo Niemeyer <gustavo@niemeyer.net>
 
11
#
 
12
# This file is part of Storm Object Relational Mapper.
 
13
#
 
14
# Storm is free software; you can redistribute it and/or modify
 
15
# it under the terms of the GNU Lesser General Public License as
 
16
# published by the Free Software Foundation; either version 2.1 of
 
17
# the License, or (at your option) any later version.
 
18
#
 
19
# Storm is distributed in the hope that it will be useful,
 
20
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
21
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
22
# GNU Lesser General Public License for more details.
 
23
#
 
24
# You should have received a copy of the GNU Lesser General Public License
 
25
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
26
#
 
27
import threading
 
28
import weakref
 
29
 
 
30
from zope.testing.cleanup import addCleanUp
 
31
from zope.interface import implements
 
32
 
 
33
import transaction
 
34
from transaction.interfaces import IDataManager, ISynchronizer
 
35
from transaction._transaction import TransactionFailedError
 
36
 
 
37
from storm.zope.interfaces import IZStorm, ZStormError
 
38
from storm.database import create_database
 
39
from storm.store import Store
 
40
 
 
41
 
 
42
class ZStorm(object):
 
43
    """A utility which integrates Storm with Zope.
 
44
 
 
45
    Typically, applications will register stores using ZCML similar
 
46
    to:
 
47
 
 
48
      <store name='main' uri='sqlite:' />
 
49
 
 
50
    Application code can then acquire the store by name using code
 
51
    similar to:
 
52
 
 
53
      from zope.component import getUtility
 
54
      from storm.zope.interfaces import IZStorm
 
55
 
 
56
      store = getUtility(IZStorm).get('main')
 
57
    """
 
58
 
 
59
    implements(IZStorm)
 
60
 
 
61
    _databases = {}
 
62
 
 
63
    def __init__(self):
 
64
        self._local = threading.local()
 
65
        self._default_databases = {}
 
66
        self._default_uris = {}
 
67
 
 
68
    def _reset(self):
 
69
        for name, store in self.iterstores():
 
70
            store.close()
 
71
        self._local = threading.local()
 
72
        self._databases.clear()
 
73
        self._default_databases.clear()
 
74
        self._default_uris.clear()
 
75
 
 
76
    @property
 
77
    def _stores(self):
 
78
        try:
 
79
            return self._local.stores
 
80
        except AttributeError:
 
81
            stores = weakref.WeakValueDictionary()
 
82
            return self._local.__dict__.setdefault("stores", stores)
 
83
 
 
84
    @property
 
85
    def _named(self):
 
86
        try:
 
87
            return self._local.named
 
88
        except AttributeError:
 
89
            return self._local.__dict__.setdefault("named", {})
 
90
 
 
91
    def _get_database(self, uri):
 
92
        database = self._databases.get(uri)
 
93
        if database is None:
 
94
            return self._databases.setdefault(uri, create_database(uri))
 
95
        return database
 
96
 
 
97
    def set_default_uri(self, name, default_uri):
 
98
        """Set C{default_uri} as the default URI for stores called C{name}."""
 
99
        self._default_databases[name] = self._get_database(default_uri)
 
100
        self._default_uris[name] = default_uri
 
101
 
 
102
    def create(self, name, uri=None):
 
103
        """Create a new store called C{name}.
 
104
 
 
105
        @param uri: Optionally, the URI to use.
 
106
        @raises ZStormError: Raised if C{uri} is None and no default
 
107
            URI exists for C{name}.  Also raised if a store with
 
108
            C{name} already exists.
 
109
        """
 
110
        if uri is None:
 
111
            database = self._default_databases.get(name)
 
112
            if database is None:
 
113
                raise ZStormError("Store named '%s' not found" % name)
 
114
        else:
 
115
            database = self._get_database(uri)
 
116
        store = Store(database)
 
117
        store.__synchronizer = StoreSynchronizer(store)
 
118
 
 
119
        self._stores[id(store)] = store
 
120
 
 
121
        if name is not None:
 
122
            old_store = self._named.setdefault(name, store)
 
123
            if old_store is not store:
 
124
                raise ZStormError("Store named '%s' already exists" % name)
 
125
        return store
 
126
 
 
127
    def get(self, name, default_uri=None):
 
128
        """Get the store called C{name} or None if one isn't available.
 
129
 
 
130
        @param default_uri: Optionally, the URI to use to create a
 
131
           store called C{name} when one doesn't already exist.
 
132
        """
 
133
        store = self._named.get(name)
 
134
        if not store:
 
135
            return self.create(name, default_uri)
 
136
        return store
 
137
 
 
138
    def remove(self, store):
 
139
        """Remove the given store from ZStorm.
 
140
 
 
141
        This removes any management of the store from ZStorm.
 
142
 
 
143
        Notice that if the store was used inside the current
 
144
        transaction, it's probably joined the transaction system as
 
145
        a resource already, and thus it will commit/rollback when
 
146
        the transaction system requests so.
 
147
 
 
148
        This method will unlink the *synchronizer* from the transaction
 
149
        system, so that once the current transaction is over it won't
 
150
        link back to it in future transactions.
 
151
        """
 
152
        del self._stores[id(store)]
 
153
        for name, named_store in self._named.items():
 
154
            if store == named_store:
 
155
                del self._named[name]
 
156
        transaction.manager.unregisterSynch(store.__synchronizer)
 
157
 
 
158
    def iterstores(self):
 
159
        """Iterate C{name, store} 2-tuples."""
 
160
        names = {}
 
161
        for name, store in self._named.items():
 
162
            names[id(store)] = name
 
163
        for store in self._stores.values():
 
164
            yield names.get(id(store)), store
 
165
 
 
166
    def get_default_uris(self):
 
167
        """
 
168
        Return a list of name, uri tuples that are named as the default
 
169
        databases for those names.
 
170
        """
 
171
        return self._default_uris.copy()
 
172
 
 
173
 
 
174
class StoreSynchronizer(object):
 
175
    """This class takes cares of plugging the store in new transactions.
 
176
 
 
177
    Garbage collection should work fine, because the transaction manager
 
178
    stores synchronizers as weak references. Even then, we give a hand
 
179
    to the garbage collector by avoiding strong circular references
 
180
    on the store.
 
181
    """
 
182
 
 
183
    implements(ISynchronizer)
 
184
 
 
185
    def __init__(self, store):
 
186
        data_manager = StoreDataManager(store)
 
187
 
 
188
        self._store_ref = weakref.ref(store)
 
189
        self._data_manager_ref = weakref.ref(data_manager)
 
190
 
 
191
        # Join now ...
 
192
        data_manager.join(transaction.get())
 
193
 
 
194
        # ... and in the future.
 
195
        transaction.manager.registerSynch(self)
 
196
 
 
197
    def _join(self, trans):
 
198
        # If the store is still alive and the transaction is in this thread.
 
199
        store = self._store_ref()
 
200
        if store and trans is transaction.get():
 
201
            data_manager = self._data_manager_ref()
 
202
            if data_manager is None:
 
203
                data_manager = StoreDataManager(store)
 
204
                self._data_manager_ref = weakref.ref(data_manager)
 
205
            try:
 
206
                data_manager.join(trans)
 
207
            except TransactionFailedError:
 
208
                # It means that an *already failed* transaction is trying
 
209
                # to join us to notify that it is indeed failed.  We don't
 
210
                # care about these (see the double_abort test case).
 
211
                pass
 
212
 
 
213
    def beforeCompletion(self, trans):
 
214
        self._join(trans)
 
215
 
 
216
    def afterCompletion(self, trans):
 
217
        pass
 
218
 
 
219
    def newTransaction(self, trans):
 
220
        self._join(trans)
 
221
 
 
222
 
 
223
class StoreDataManager(object):
 
224
    """An L{IDataManager} implementation for C{ZStorm}."""
 
225
 
 
226
    implements(IDataManager)
 
227
 
 
228
    transaction_manager = transaction.manager
 
229
 
 
230
    def __init__(self, store):
 
231
        self._store = store
 
232
        self._trans = None
 
233
 
 
234
    def join(self, trans):
 
235
        if trans is not self._trans:
 
236
            self._trans = trans
 
237
            trans.join(self)
 
238
 
 
239
    def abort(self, txn):
 
240
        self._trans = None
 
241
        self._store.rollback()
 
242
 
 
243
    def tpc_begin(self, txn):
 
244
        # Zope's transaction system will call tpc_begin() on all
 
245
        # managers before calling commit, so flushing here may help
 
246
        # in cases where there are two stores with changes, and one
 
247
        # of them will fail.  In such cases, flushing earlier will
 
248
        # ensure that both transactions will be rolled back, instead
 
249
        # of one committed and one rolled back.
 
250
        self._store.flush()
 
251
 
 
252
    def commit(self, txn):
 
253
        self._trans = None
 
254
        self._store.commit()
 
255
 
 
256
    def tpc_vote(self, txn):
 
257
        pass
 
258
 
 
259
    def tpc_finish(self, txn):
 
260
        pass
 
261
 
 
262
    def tpc_abort(self, txn):
 
263
        pass
 
264
 
 
265
    def sortKey(self):
 
266
        return "store_%d" % id(self)
 
267
 
 
268
 
 
269
global_zstorm = ZStorm()
 
270
 
 
271
addCleanUp(global_zstorm._reset)