~certify-web-dev/storm/certify-staging

« back to all changes in this revision

Viewing changes to tests/zope/zstorm.py

  • Committer: Jamu Kakar
  • Date: 2007-07-17 04:42:52 UTC
  • mto: This revision was merged to the branch mainline in revision 159.
  • Revision ID: jkakar@kakar.ca-20070717044252-k13a6522lk2ep7g2
- Migrated canonical.zstorm code into storm.zope.  Other than updating
  imports the code is unchanged.  ZStormTest.is_supported needs to be
  fleshed out to do the right thing.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import unittest
 
2
import thread
 
3
 
 
4
from zope import component
 
5
 
 
6
import transaction
 
7
 
 
8
from storm.exceptions import OperationalError
 
9
from storm.locals import Store, Int
 
10
from storm.zope.interfaces import IZStorm, ZStormError
 
11
from storm.zope.zstorm import ZStorm
 
12
 
 
13
 
 
14
class ZStormTest(unittest.TestCase):
 
15
 
 
16
    def is_supported(self):
 
17
        # FIXME This needs to do the right thing!
 
18
        return True
 
19
 
 
20
    def setUp(self):
 
21
        self.zstorm = ZStorm()
 
22
 
 
23
    def tearDown(self):
 
24
        # Free the transaction to avoid having errors that cross
 
25
        # test cases.
 
26
        transaction.manager.free(transaction.get())
 
27
 
 
28
    def test_utility(self):
 
29
        component.provideUtility(ZStorm())
 
30
        self.assertTrue(isinstance(component.getUtility(IZStorm), ZStorm))
 
31
 
 
32
    def test_create(self):
 
33
        store = self.zstorm.create(None, "sqlite:")
 
34
        self.assertTrue(isinstance(store, Store))
 
35
 
 
36
    def test_create_twice_unnamed(self):
 
37
        store = self.zstorm.create(None, "sqlite:")
 
38
        store.execute("CREATE TABLE test (id INTEGER)")
 
39
        store.commit()
 
40
 
 
41
        store = self.zstorm.create(None, "sqlite:")
 
42
        self.assertRaises(OperationalError,
 
43
                          store.execute, "SELECT * FROM test")
 
44
 
 
45
    def test_create_twice_same_name(self):
 
46
        store = self.zstorm.create("name", "sqlite:")
 
47
        self.assertRaises(ZStormError, self.zstorm.create, "name", "sqlite:")
 
48
 
 
49
    def test_create_and_get_named(self):
 
50
        store = self.zstorm.create("name", "sqlite:")
 
51
        self.assertTrue(self.zstorm.get("name") is store)
 
52
 
 
53
    def test_create_and_get_named_another_thread(self):
 
54
        store = self.zstorm.create("name", "sqlite:")
 
55
 
 
56
        raised = []
 
57
 
 
58
        lock = thread.allocate_lock()
 
59
        lock.acquire()
 
60
        def f():
 
61
            try:
 
62
                try:
 
63
                    self.zstorm.get("name")
 
64
                except ZStormError:
 
65
                    raised.append(True)
 
66
            finally:
 
67
                lock.release()
 
68
        thread.start_new_thread(f, ())
 
69
        lock.acquire()
 
70
 
 
71
        self.assertTrue(raised)
 
72
 
 
73
    def test_get_unexistent(self):
 
74
        self.assertRaises(ZStormError, self.zstorm.get, "name")
 
75
 
 
76
    def test_get_with_uri(self):
 
77
        store = self.zstorm.get("name", "sqlite:")
 
78
        self.assertTrue(isinstance(store, Store))
 
79
        self.assertTrue(self.zstorm.get("name") is store)
 
80
        self.assertTrue(self.zstorm.get("name", "sqlite:") is store)
 
81
 
 
82
    def test_set_default_uri(self):
 
83
        self.zstorm.set_default_uri("name", "sqlite:")
 
84
        store = self.zstorm.get("name")
 
85
        self.assertTrue(isinstance(store, Store))
 
86
 
 
87
    def test_create_default(self):
 
88
        self.zstorm.set_default_uri("name", "sqlite:")
 
89
        store = self.zstorm.create("name")
 
90
        self.assertTrue(isinstance(store, Store))
 
91
 
 
92
    def test_create_default_twice(self):
 
93
        self.zstorm.set_default_uri("name", "sqlite:")
 
94
        self.zstorm.create("name")
 
95
        self.assertRaises(ZStormError, self.zstorm.create, "name")
 
96
 
 
97
    def test_iterstores(self):
 
98
        store1 = self.zstorm.create(None, "sqlite:")
 
99
        store2 = self.zstorm.create(None, "sqlite:")
 
100
        store3 = self.zstorm.create("name", "sqlite:")
 
101
        stores = []
 
102
        for name, store in self.zstorm.iterstores():
 
103
            stores.append((name, store))
 
104
        self.assertEquals(len(stores), 3)
 
105
        self.assertEquals(set(stores),
 
106
                          set([(None, store1), (None, store2),
 
107
                               ("name", store3)]))
 
108
 
 
109
    def test_default_databases(self):
 
110
        self.zstorm.set_default_uri("name1", "sqlite:1")
 
111
        self.zstorm.set_default_uri("name2", "sqlite:2")
 
112
        self.zstorm.set_default_uri("name3", "sqlite:3")
 
113
        default_uris = self.zstorm.get_default_uris()
 
114
        self.assertEquals(default_uris, {"name1": "sqlite:1",
 
115
                                         "name2": "sqlite:2",
 
116
                                         "name3": "sqlite:3"})
 
117
 
 
118
    def test_remove(self):
 
119
        removed_store = self.zstorm.get("name", "sqlite:")
 
120
        self.zstorm.remove(removed_store)
 
121
        for name, store in self.zstorm.iterstores():
 
122
            self.assertNotEquals(store, removed_store)
 
123
        self.assertRaises(ZStormError, self.zstorm.get, "name")
 
124
 
 
125
        # Abort the transaction so that the currently registered
 
126
        # resource detaches.
 
127
        transaction.abort()
 
128
 
 
129
        # Let's try to make storm blow up when committing, so that
 
130
        # we can be sure that the removed store isn't linked to the
 
131
        # transaction system by a synchronizer anymore.
 
132
        class BadTable(object):
 
133
            __storm_table__ = "bad_table"
 
134
            id = Int(primary=True)
 
135
        removed_store.add(BadTable())
 
136
 
 
137
        # If this fails, the store is still linked to the transaction
 
138
        # system.
 
139
        transaction.commit()
 
140
 
 
141
    def test_double_abort(self):
 
142
        """
 
143
        Surprisingly, transaction.abort() won't detach the internal
 
144
        transaction from the manager.  This means that when
 
145
        transaction.begin() runs, it will detect that there's still
 
146
        a Transaction instance around, and will call abort on it once
 
147
        more before instantiating a new Transaction.  The Transaction's
 
148
        abort() method will then call beforeCompletion() on our
 
149
        synchronizer, which then tries to join() on the current
 
150
        transaction, which is still the old one and blows up saying
 
151
        that a previous transaction has failed (we know it, since we
 
152
        *aborted* it).
 
153
        """
 
154
        class BadTable(object):
 
155
            __storm_table__ = "bad_table"
 
156
            id = Int(primary=True, default=1)
 
157
        store = self.zstorm.get("name", "sqlite:")
 
158
        store.add(BadTable())
 
159
        self.assertRaises(OperationalError, transaction.commit)
 
160
        transaction.abort()
 
161
        transaction.abort()