~sidnei/storm/mssql-support

« back to all changes in this revision

Viewing changes to tests/store/base.py

  • Committer: Sidnei da Silva
  • Date: 2008-05-23 06:53:19 UTC
  • mfrom: (173.2.54 trunk)
  • Revision ID: sidnei@enfoldsystems.com-20080523065319-6rao99dgz5iil8mf
- Merged from trunk
- Renamed connect() to raw_connect() and adjusted
- Added 'selfref' table to setup

Tests are now pretty much at the same state they were in September. Yay!

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- coding: utf-8 -*-
1
2
#
2
3
# Copyright (c) 2006, 2007 Canonical
3
4
#
31
32
from storm.exceptions import *
32
33
from storm.store import *
33
34
 
 
35
from tests.info import Wrapper
34
36
from tests.helper import run_this
35
37
 
36
38
 
57
59
    foo_id = Int()
58
60
    bar_id = Int()
59
61
 
 
62
class SelfRef(object):
 
63
    __storm_table__ = "selfref"
 
64
    id = Int(primary=True)
 
65
    title = Unicode()
 
66
    selfref_id = Int()
 
67
    selfref = Reference(selfref_id, id)
 
68
    selfref_on_remote = Reference(id, selfref_id, on_remote=True)
 
69
 
60
70
class FooRef(Foo):
61
71
    bar = Reference(Foo.id, Bar.foo_id)
62
72
 
107
117
    title = Property(variable_class=DecorateVariable)
108
118
 
109
119
 
110
 
class Wrapper(object):
111
 
 
112
 
    def __init__(self, obj):
113
 
        self.obj = obj
114
 
 
115
 
    __storm_object_info__ = property(lambda self:
116
 
                                     self.obj.__storm_object_info__)
117
 
 
118
 
 
119
120
class StoreTest(object):
120
121
 
121
122
    def setUp(self):
 
123
        self.store = None
 
124
        self.stores = []
122
125
        self.create_database()
123
126
        self.drop_tables()
124
127
        self.create_tables()
125
128
        self.create_sample_data()
126
129
        self.create_store()
127
 
    
 
130
 
128
131
    def tearDown(self):
129
132
        self.drop_store()
130
133
        self.drop_sample_data()
159
162
        connection.execute("INSERT INTO link VALUES (20, 200)")
160
163
        connection.execute("INSERT INTO link VALUES (30, 300)")
161
164
        connection.execute("INSERT INTO money VALUES (10, '12.3455')")
 
165
        connection.execute("INSERT INTO selfref VALUES (15, 'SelfRef 15', NULL)")
 
166
        connection.execute("INSERT INTO selfref VALUES (25, 'SelfRef 25', NULL)")
 
167
        connection.execute("INSERT INTO selfref VALUES (35, 'SelfRef 35', 15)")
162
168
 
163
169
    def create_store(self):
164
 
        self.store = Store(self.database)
 
170
        store = Store(self.database)
 
171
        self.stores.append(store)
 
172
        if self.store is None:
 
173
            self.store = store
 
174
        return store
165
175
 
166
176
    def drop_store(self):
167
 
        self.store.rollback()
 
177
        for store in self.stores:
 
178
            store.rollback()
168
179
 
169
 
        # Closing the store is needed because testcase objects are all
170
 
        # instantiated at once, and thus connections are kept open.
171
 
        self.store.close()
 
180
            # Closing the store is needed because testcase objects are all
 
181
            # instantiated at once, and thus connections are kept open.
 
182
            store.close()
172
183
 
173
184
    def drop_sample_data(self):
174
185
        pass
175
186
 
176
187
    def drop_tables(self):
177
 
        for table in ["foo", "bar", "bin", "link", "money"]:
178
 
            connection = self.database.connect()
 
188
        connection = self.database.connect()
 
189
        for table in ["foo", "bar", "bin", "link", "money", "selfref"]:
179
190
            try:
180
191
                connection.execute("DROP TABLE %s" % table)
181
192
                connection.commit()
196
207
        result = connection.execute("SELECT * FROM foo ORDER BY id")
197
208
        return list(result)
198
209
 
 
210
    def get_cache(self, store):
 
211
        # We don't offer a public API for this just yet.
 
212
        return store._cache
199
213
 
200
214
    def test_execute(self):
201
215
        result = self.store.execute("SELECT 1")
202
216
        self.assertTrue(isinstance(result, Result))
203
217
        self.assertEquals(result.get_one(), (1,))
204
 
        
 
218
 
205
219
        result = self.store.execute("SELECT 1", noresult=True)
206
220
        self.assertEquals(result, None)
207
221
 
246
260
        self.store._connection = connection
247
261
 
248
262
    def test_cache_cleanup(self):
 
263
        # Disable the cache, which holds strong references.
 
264
        self.get_cache(self.store).set_size(0)
 
265
 
249
266
        foo = self.store.get(Foo, 10)
250
267
        foo.taint = True
251
268
 
286
303
    def test_obj_info_with_deleted_object(self):
287
304
        # Let's try to put Storm in trouble by killing the object
288
305
        # while still holding a reference to the obj_info.
 
306
 
 
307
        # Disable the cache, which holds strong references.
 
308
        self.get_cache(self.store).set_size(0)
 
309
 
289
310
        class MyFoo(Foo):
290
311
            loaded = False
291
312
            def __storm_loaded__(self):
297
318
 
298
319
        del foo
299
320
        gc.collect()
300
 
        
 
321
 
301
322
        self.assertEquals(obj_info.get_obj(), None)
302
323
 
303
324
        foo = self.store.find(MyFoo, id=20).one()
309
330
 
310
331
    def test_obj_info_with_deleted_object_with_get(self):
311
332
        # Same thing, but using get rather than find.
 
333
 
 
334
        # Disable the cache, which holds strong references.
 
335
        self.get_cache(self.store).set_size(0)
 
336
 
312
337
        foo = self.store.get(Foo, 20)
313
338
        foo.tainted = True
314
339
        obj_info = get_obj_info(foo)
315
340
 
316
341
        del foo
317
342
        gc.collect()
318
 
        
 
343
 
319
344
        self.assertEquals(obj_info.get_obj(), None)
320
345
 
321
346
        foo = self.store.get(Foo, 20)
323
348
        self.assertFalse(getattr(foo, "tainted", False))
324
349
 
325
350
    def test_delete_object_when_obj_info_is_dirty(self):
326
 
        # Object should stay in memory if dirty.
 
351
        """Object should stay in memory if dirty."""
 
352
 
 
353
        # Disable the cache, which holds strong references.
 
354
        self.get_cache(self.store).set_size(0)
 
355
 
327
356
        foo = self.store.get(Foo, 20)
328
357
        foo.title = u"Changed"
329
358
        foo.tainted = True
331
360
 
332
361
        del foo
333
362
        gc.collect()
334
 
        
 
363
 
335
364
        self.assertTrue(obj_info.get_obj())
336
365
 
337
366
    def test_get_tuple(self):
505
534
    def test_find_slice_offset(self):
506
535
        result = self.store.find(Foo).order_by(Foo.title)[1:]
507
536
        lst = [(foo.id, foo.title) for foo in result]
508
 
        self.assertEquals(lst, 
 
537
        self.assertEquals(lst,
509
538
                          [(20, "Title 20"),
510
539
                           (10, "Title 30")])
511
540
 
532
561
    def test_find_slice_limit(self):
533
562
        result = self.store.find(Foo).order_by(Foo.title)[:2]
534
563
        lst = [(foo.id, foo.title) for foo in result]
535
 
        self.assertEquals(lst, 
 
564
        self.assertEquals(lst,
536
565
                          [(30, "Title 10"),
537
566
                           (20, "Title 20")])
538
567
 
758
787
        self.assertEquals(self.store.find(Foo, title=u"Title 20").cached(), [])
759
788
 
760
789
    def test_find_cached_with_info_alive_and_object_dead(self):
 
790
        # Disable the cache, which holds strong references.
 
791
        self.get_cache(self.store).set_size(0)
 
792
 
761
793
        foo = self.store.get(Foo, 20)
762
794
        foo.tainted = True
763
795
        obj_info = get_obj_info(foo)
881
913
        self.assertEquals(bar.id, 300)
882
914
        self.assertEquals(bar.title, u"Title 100")
883
915
 
884
 
    def test_find_tuple_first(self):
 
916
    def test_find_tuple_one(self):
885
917
        bar = self.store.get(Bar, 200)
886
918
        bar.foo_id = None
887
919
 
919
951
        result = self.store.using(Foo, Bar).find(Foo)
920
952
        self.assertRaises(FeatureError, result.cached)
921
953
 
 
954
    def test_get_does_not_validate(self):
 
955
        def validator(object, attr, value):
 
956
            self.fail("validator called with arguments (%r, %r, %r)" %
 
957
                      (object, attr, value))
 
958
 
 
959
        class Foo(object):
 
960
            __storm_table__ = "foo"
 
961
            id = Int(primary=True)
 
962
            title = Unicode(validator=validator)
 
963
 
 
964
        foo = self.store.get(Foo, 10)
 
965
        self.assertEqual(foo.title, "Title 30")
 
966
 
 
967
    def test_get_does_not_validate_default_value(self):
 
968
        def validator(object, attr, value):
 
969
            self.fail("validator called with arguments (%r, %r, %r)" %
 
970
                      (object, attr, value))
 
971
 
 
972
        class Foo(object):
 
973
            __storm_table__ = "foo"
 
974
            id = Int(primary=True)
 
975
            title = Unicode(validator=validator, default=u"default value")
 
976
 
 
977
        foo = self.store.get(Foo, 10)
 
978
        self.assertEqual(foo.title, "Title 30")
 
979
 
 
980
    def test_find_does_not_validate(self):
 
981
        def validator(object, attr, value):
 
982
            self.fail("validator called with arguments (%r, %r, %r)" %
 
983
                      (object, attr, value))
 
984
 
 
985
        class Foo(object):
 
986
            __storm_table__ = "foo"
 
987
            id = Int(primary=True)
 
988
            title = Unicode(validator=validator)
 
989
 
 
990
        foo = self.store.find(Foo, Foo.id == 10).one()
 
991
        self.assertEqual(foo.title, "Title 30")
 
992
 
922
993
    def test_add_commit(self):
923
994
        foo = Foo()
924
995
        foo.id = 40
1068
1139
 
1069
1140
        self.store.remove(foo)
1070
1141
        self.store.rollback()
1071
 
        
 
1142
 
1072
1143
        foo.title = u"Title 200"
1073
1144
 
1074
1145
        self.store.flush()
1101
1172
 
1102
1173
        self.store.remove(foo)
1103
1174
        self.store.add(foo)
1104
 
        
 
1175
 
1105
1176
        foo.title = u"Title 200"
1106
1177
 
1107
1178
        self.store.flush()
1118
1189
        self.store.remove(foo)
1119
1190
        self.store.flush()
1120
1191
        self.store.add(foo)
1121
 
        
 
1192
 
1122
1193
        foo.title = u"Title 200"
1123
1194
 
1124
1195
        self.store.flush()
1147
1218
        obj_info = get_obj_info(foo)
1148
1219
        self.store.remove(foo)
1149
1220
        self.store.flush()
1150
 
        
 
1221
 
1151
1222
        foo.title = u"Title 200"
1152
1223
 
1153
1224
        self.assertTrue(obj_info not in self.store._dirty)
1274
1345
 
1275
1346
        # Update twice to see if the notion of primary key for the
1276
1347
        # existent object was updated as well.
1277
 
        
 
1348
 
1278
1349
        foo.id = 27
1279
1350
 
1280
1351
        self.store.commit()
1336
1407
    def test_update_find(self):
1337
1408
        foo = self.store.get(Foo, 20)
1338
1409
        foo.title = u"Title 200"
1339
 
        
 
1410
 
1340
1411
        result = self.store.find(Foo, Foo.title == u"Title 200")
1341
1412
 
1342
1413
        self.assertTrue(result.one() is foo)
1725
1796
                          (30, "Title 10"),
1726
1797
                         ])
1727
1798
 
 
1799
    def test_wb_block_implicit_flushes(self):
 
1800
        # Make sure calling store.flush() will fail.
 
1801
        def flush():
 
1802
            raise RuntimeError("Flush called")
 
1803
        self.store.flush = flush
 
1804
 
 
1805
        # The following operations do not call flush.
 
1806
        self.store.block_implicit_flushes()
 
1807
        foo = self.store.get(Foo, 20)
 
1808
        foo = self.store.find(Foo, Foo.id == 20).one()
 
1809
        self.store.execute("SELECT title FROM foo WHERE id = 20")
 
1810
 
 
1811
        self.store.unblock_implicit_flushes()
 
1812
        self.assertRaises(RuntimeError, self.store.get, Foo, 20)
 
1813
 
 
1814
    def test_wb_block_implicit_flushes_is_recursive(self):
 
1815
        # Make sure calling store.flush() will fail.
 
1816
        def flush():
 
1817
            raise RuntimeError("Flush called")
 
1818
        self.store.flush = flush
 
1819
 
 
1820
        self.store.block_implicit_flushes()
 
1821
        self.store.block_implicit_flushes()
 
1822
        self.store.unblock_implicit_flushes()
 
1823
        # implicit flushes are still blocked, until unblock() is called again.
 
1824
        foo = self.store.get(Foo, 20)
 
1825
        self.store.unblock_implicit_flushes()
 
1826
        self.assertRaises(RuntimeError, self.store.get, Foo, 20)
 
1827
 
1728
1828
    def test_reload(self):
1729
1829
        foo = self.store.get(Foo, 20)
1730
1830
        self.store.execute("UPDATE foo SET title='Title 40' WHERE id=20")
1760
1860
 
1761
1861
    def test_reload_unknown(self):
1762
1862
        foo = self.store.get(Foo, 20)
1763
 
        store = Store(self.database)
 
1863
        store = self.create_store()
1764
1864
        self.assertRaises(WrongStoreError, store.reload, foo)
1765
1865
 
1766
1866
    def test_wb_reload_not_dirty(self):
1867
1967
        self.assertEquals(bar.title, "Title 500")
1868
1968
 
1869
1969
    def test_find_set_with_info_alive_and_object_dead(self):
 
1970
        # Disable the cache, which holds strong references.
 
1971
        self.get_cache(self.store).set_size(0)
 
1972
 
1870
1973
        foo = self.store.get(Foo, 20)
1871
1974
        foo.tainted = True
1872
1975
        obj_info = get_obj_info(foo)
1882
1985
        self.assertTrue(bar.foo)
1883
1986
        self.assertEquals(bar.foo.title, "Title 30")
1884
1987
 
 
1988
    def test_reference_explicitly_with_wrapper(self):
 
1989
        bar = self.store.get(Bar, 100)
 
1990
        foo = Bar.foo.__get__(Wrapper(bar))
 
1991
        self.assertTrue(foo)
 
1992
        self.assertEquals(foo.title, "Title 30")
 
1993
 
1885
1994
    def test_reference_break_on_local_diverged(self):
1886
1995
        bar = self.store.get(Bar, 100)
1887
1996
        self.assertTrue(bar.foo)
1949
2058
        result = self.store.execute("SELECT foo_id FROM bar WHERE id=100")
1950
2059
        self.assertEquals(result.get_one(), (30,))
1951
2060
 
 
2061
    def test_set_reference_explicitly_with_wrapper(self):
 
2062
        bar = self.store.get(Bar, 100)
 
2063
        self.assertEquals(bar.foo.id, 10)
 
2064
        foo = self.store.get(Foo, 30)
 
2065
        Bar.foo.__set__(Wrapper(bar), Wrapper(foo))
 
2066
        self.assertEquals(bar.foo.id, 30)
 
2067
        result = self.store.execute("SELECT foo_id FROM bar WHERE id=100")
 
2068
        self.assertEquals(result.get_one(), (30,))
 
2069
 
1952
2070
    def test_reference_assign_remote_key(self):
1953
2071
        bar = self.store.get(Bar, 100)
1954
2072
        self.assertEquals(bar.foo.id, 10)
2032
2150
        self.assertEquals(type(bar.id), int)
2033
2151
        self.assertEquals(foo.id, None)
2034
2152
 
 
2153
    def test_reference_assign_none_with_unseen(self):
 
2154
        bar = self.store.get(Bar, 200)
 
2155
        bar.foo = None
 
2156
        self.assertEquals(bar.foo, None)
 
2157
 
2035
2158
    def test_reference_on_added_composed_key(self):
2036
2159
        class Bar(object):
2037
2160
            __storm_table__ = "bar"
2104
2227
        foo.id = 70
2105
2228
        self.assertEquals(bar.foo_id, 60)
2106
2229
 
 
2230
    def test_reference_on_added_unsets_original_key(self):
 
2231
        foo = Foo()
 
2232
        self.store.add(foo)
 
2233
 
 
2234
        bar = Bar()
 
2235
        bar.id = 400
 
2236
        bar.foo_id = 40
 
2237
        bar.foo = foo
 
2238
 
 
2239
        self.assertEquals(bar.foo_id, None)
 
2240
 
2107
2241
    def test_reference_on_two_added(self):
2108
2242
        foo1 = Foo()
2109
2243
        foo1.title = u"Title 40"
2228
2362
        self.assertEquals(type(bar.foo_id), int)
2229
2363
 
2230
2364
    def test_reference_on_added_wrong_store(self):
2231
 
        store = Store(self.database)
 
2365
        store = self.create_store()
2232
2366
 
2233
2367
        foo = Foo()
2234
2368
        foo.title = u"Title 40"
2253
2387
 
2254
2388
        self.store.add(bar)
2255
2389
 
2256
 
        store = Store(self.database)
 
2390
        store = self.create_store()
2257
2391
        store.add(foo1)
2258
2392
 
2259
2393
        self.assertEquals(Store.of(bar), self.store)
2260
2394
        self.assertEquals(Store.of(foo1), store)
2261
2395
 
 
2396
    def test_reference_on_removed_wont_add_back(self):
 
2397
        bar = self.store.get(Bar, 200)
 
2398
        foo = self.store.get(Foo, bar.foo_id)
 
2399
 
 
2400
        self.store.remove(bar)
 
2401
 
 
2402
        self.assertEquals(bar.foo, foo)
 
2403
        self.store.flush()
 
2404
 
 
2405
        self.assertEquals(Store.of(bar), None)
 
2406
        self.assertEquals(Store.of(foo), self.store)
 
2407
 
2262
2408
    def test_reference_equals(self):
2263
2409
        foo = self.store.get(Foo, 10)
2264
2410
 
2284
2430
                                 myself=(link.foo_id, link.bar_id)).one()
2285
2431
        self.assertEquals(link, myself)
2286
2432
 
 
2433
    def test_reference_equals_with_wrapped(self):
 
2434
        foo = self.store.get(Foo, 10)
 
2435
 
 
2436
        bar = self.store.find(Bar, foo=Wrapper(foo)).one()
 
2437
        self.assertTrue(bar)
 
2438
        self.assertEquals(bar.foo, foo)
 
2439
 
2287
2440
    def test_reference_self(self):
 
2441
        selfref = self.store.add(SelfRef())
 
2442
        selfref.id = 400
 
2443
        selfref.title = u"Title 400"
 
2444
        selfref.selfref_id = 25
 
2445
        self.assertEquals(selfref.selfref.id, 25)
 
2446
        self.assertEquals(selfref.selfref.title, "SelfRef 25")
 
2447
 
 
2448
    def get_bar_200_title(self):
 
2449
        connection = self.store._connection
 
2450
        result = connection.execute("SELECT title FROM bar WHERE id=200")
 
2451
        return result.get_one()[0]
 
2452
 
 
2453
    def test_reference_wont_touch_store_when_key_is_none(self):
 
2454
        bar = self.store.get(Bar, 200)
 
2455
        bar.foo_id = None
 
2456
        bar.title = u"Don't flush this!"
 
2457
 
 
2458
        self.assertEquals(bar.foo, None)
 
2459
 
 
2460
        # Bypass the store to prevent flushing.
 
2461
        self.assertEquals(self.get_bar_200_title(), "Title 200")
 
2462
 
 
2463
    def test_reference_wont_touch_store_when_key_is_unset(self):
 
2464
        bar = self.store.get(Bar, 200)
 
2465
        del bar.foo_id
 
2466
        bar.title = u"Don't flush this!"
 
2467
 
 
2468
        self.assertEquals(bar.foo, None)
 
2469
 
 
2470
        # Bypass the store to prevent flushing.
 
2471
        connection = self.store._connection
 
2472
        result = connection.execute("SELECT title FROM bar WHERE id=200")
 
2473
        self.assertEquals(result.get_one()[0], "Title 200")
 
2474
 
 
2475
    def test_reference_wont_touch_store_with_composed_key_none(self):
2288
2476
        class Bar(object):
2289
2477
            __storm_table__ = "bar"
2290
2478
            id = Int(primary=True)
 
2479
            foo_id = Int()
2291
2480
            title = Unicode()
2292
 
            bar_id = Int("foo_id")
2293
 
            bar = Reference(bar_id, id)
2294
 
 
2295
 
        bar = self.store.add(Bar())
2296
 
        bar.id = 400
2297
 
        bar.title = u"Title 400"
2298
 
        bar.bar_id = 100
2299
 
        self.assertEquals(bar.bar.id, 100)
2300
 
        self.assertEquals(bar.bar.title, "Title 300")
 
2481
            foo = Reference((foo_id, title), (Foo.id, Foo.title))
 
2482
 
 
2483
        bar = self.store.get(Bar, 200)
 
2484
        bar.foo_id = None
 
2485
        bar.title = None
 
2486
 
 
2487
        self.assertEquals(bar.foo, None)
 
2488
 
 
2489
        # Bypass the store to prevent flushing.
 
2490
        self.assertEquals(self.get_bar_200_title(), "Title 200")
 
2491
 
 
2492
    def test_reference_will_resolve_auto_reload(self):
 
2493
        bar = self.store.get(Bar, 200)
 
2494
        bar.foo_id = AutoReload
 
2495
        self.assertTrue(bar.foo)
2301
2496
 
2302
2497
    def test_back_reference(self):
2303
2498
        class MyFoo(Foo):
2359
2554
                                    "foo.title = 'Title 40'")
2360
2555
        self.assertEquals(result.get_one(), ("Title 400",))
2361
2556
 
 
2557
    def test_back_reference_assign_none_with_unseen(self):
 
2558
        class MyFoo(Foo):
 
2559
            bar = Reference(Foo.id, Bar.foo_id, on_remote=True)
 
2560
        foo = self.store.get(MyFoo, 20)
 
2561
        foo.bar = None
 
2562
        self.assertEquals(foo.bar, None)
 
2563
 
 
2564
    def test_back_reference_assign_none_from_none(self):
 
2565
        class MyFoo(Foo):
 
2566
            bar = Reference(Foo.id, Bar.foo_id, on_remote=True)
 
2567
        self.store.execute("INSERT INTO foo (id, title)"
 
2568
                           " VALUES (40, 'Title 40')")
 
2569
        self.store.commit()
 
2570
        foo = self.store.get(MyFoo, 40)
 
2571
        foo.bar = None
 
2572
        self.assertEquals(foo.bar, None)
 
2573
 
 
2574
    def test_back_reference_on_added_unsets_original_key(self):
 
2575
        class MyFoo(Foo):
 
2576
            bar = Reference(Foo.id, Bar.foo_id, on_remote=True)
 
2577
 
 
2578
        foo = MyFoo()
 
2579
 
 
2580
        bar = Bar()
 
2581
        bar.id = 400
 
2582
        bar.foo_id = 40
 
2583
 
 
2584
        foo.bar = bar
 
2585
 
 
2586
        self.assertEquals(bar.foo_id, None)
 
2587
 
2362
2588
    def test_back_reference_on_added_no_store(self):
2363
2589
        class MyFoo(Foo):
2364
2590
            bar = Reference(Foo.id, Bar.foo_id, on_remote=True)
2399
2625
 
2400
2626
        self.assertEquals(type(bar.foo_id), int)
2401
2627
 
 
2628
    def test_back_reference_remove_remote(self):
 
2629
        class MyFoo(Foo):
 
2630
            bar = Reference(Foo.id, Bar.foo_id, on_remote=True)
 
2631
 
 
2632
        bar = Bar()
 
2633
        bar.title = u"Title 400"
 
2634
 
 
2635
        foo = MyFoo()
 
2636
        foo.title = u"Title 40"
 
2637
        foo.bar = bar
 
2638
 
 
2639
        self.store.add(foo)
 
2640
        self.store.flush()
 
2641
 
 
2642
        self.assertEquals(foo.bar, bar)
 
2643
        self.store.remove(bar)
 
2644
        self.assertEquals(foo.bar, None)
 
2645
 
 
2646
    def test_back_reference_remove_remote_pending_add(self):
 
2647
        class MyFoo(Foo):
 
2648
            bar = Reference(Foo.id, Bar.foo_id, on_remote=True)
 
2649
 
 
2650
        bar = Bar()
 
2651
        bar.title = u"Title 400"
 
2652
 
 
2653
        foo = MyFoo()
 
2654
        foo.title = u"Title 40"
 
2655
        foo.bar = bar
 
2656
 
 
2657
        self.store.add(foo)
 
2658
 
 
2659
        self.assertEquals(foo.bar, bar)
 
2660
        self.store.remove(bar)
 
2661
        self.assertEquals(foo.bar, None)
 
2662
 
 
2663
    def test_reference_loop_with_undefined_keys_fails(self):
 
2664
        """A loop of references with undefined keys raises OrderLoopError."""
 
2665
        ref1 = SelfRef()
 
2666
        self.store.add(ref1)
 
2667
        ref2 = SelfRef()
 
2668
        ref2.selfref = ref1
 
2669
        ref1.selfref = ref2
 
2670
 
 
2671
        self.assertRaises(OrderLoopError, self.store.flush)
 
2672
 
 
2673
    def test_reference_loop_with_dirty_keys_fails(self):
 
2674
        ref1 = SelfRef()
 
2675
        self.store.add(ref1)
 
2676
        ref1.id = 42
 
2677
        ref2 = SelfRef()
 
2678
        ref2.id = 43
 
2679
        ref2.selfref = ref1
 
2680
        ref1.selfref = ref2
 
2681
 
 
2682
        self.assertRaises(OrderLoopError, self.store.flush)
 
2683
 
 
2684
    def test_reference_loop_with_dirty_keys_changed_later_fails(self):
 
2685
        ref1 = SelfRef()
 
2686
        ref2 = SelfRef()
 
2687
        self.store.add(ref1)
 
2688
        self.store.add(ref2)
 
2689
        self.store.flush()
 
2690
        ref2.selfref = ref1
 
2691
        ref1.selfref = ref2
 
2692
        ref1.id = 42
 
2693
        ref2.id = 43
 
2694
 
 
2695
        self.assertRaises(OrderLoopError, self.store.flush)
 
2696
 
 
2697
    def test_reference_loop_with_dirty_keys_on_remote_fails(self):
 
2698
        ref1 = SelfRef()
 
2699
        self.store.add(ref1)
 
2700
        ref1.id = 42
 
2701
        ref2 = SelfRef()
 
2702
        ref2.id = 43
 
2703
        ref2.selfref_on_remote = ref1
 
2704
        ref1.selfref_on_remote = ref2
 
2705
 
 
2706
        self.assertRaises(OrderLoopError, self.store.flush)
 
2707
 
 
2708
    def test_reference_loop_with_dirty_keys_on_remote_changed_later_fails(self):
 
2709
        ref1 = SelfRef()
 
2710
        ref2 = SelfRef()
 
2711
        self.store.add(ref1)
 
2712
        self.store.flush()
 
2713
        ref2.selfref_on_remote = ref1
 
2714
        ref1.selfref_on_remote = ref2
 
2715
        ref1.id = 42
 
2716
        ref2.id = 43
 
2717
 
 
2718
        self.assertRaises(OrderLoopError, self.store.flush)
 
2719
 
 
2720
    def test_reference_loop_with_unchanged_keys_succeeds(self):
 
2721
        ref1 = SelfRef()
 
2722
        self.store.add(ref1)
 
2723
        ref1.id = 42
 
2724
        ref2 = SelfRef()
 
2725
        self.store.add(ref2)
 
2726
        ref1.id = 43
 
2727
 
 
2728
        self.store.flush()
 
2729
 
 
2730
        # As ref1 and ref2 have been flushed to the database, so these
 
2731
        # changes can be flushed.
 
2732
        ref2.selfref = ref1
 
2733
        ref1.selfref = ref2
 
2734
        self.store.flush()
 
2735
 
 
2736
    def test_reference_loop_with_one_unchanged_key_succeeds(self):
 
2737
        ref1 = SelfRef()
 
2738
        self.store.add(ref1)
 
2739
        self.store.flush()
 
2740
 
 
2741
        ref2 = SelfRef()
 
2742
        ref2.selfref = ref1
 
2743
        ref1.selfref = ref2
 
2744
 
 
2745
        # As ref1 and ref2 have been flushed to the database, so these
 
2746
        # changes can be flushed.
 
2747
        self.store.flush()
 
2748
 
 
2749
    def test_reference_loop_with_key_changed_later_succeeds(self):
 
2750
        ref1 = SelfRef()
 
2751
        self.store.add(ref1)
 
2752
        self.store.flush()
 
2753
 
 
2754
        ref2 = SelfRef()
 
2755
        ref1.selfref = ref2
 
2756
        ref2.id = 42
 
2757
 
 
2758
        self.store.flush()
 
2759
 
 
2760
    def test_reference_loop_with_key_changed_later_on_remote_succeeds(self):
 
2761
        ref1 = SelfRef()
 
2762
        self.store.add(ref1)
 
2763
        self.store.flush()
 
2764
 
 
2765
        ref2 = SelfRef()
 
2766
        ref2.selfref_on_remote = ref1
 
2767
        ref2.id = 42
 
2768
 
 
2769
        self.store.flush()
 
2770
 
 
2771
    def test_reference_loop_with_undefined_and_changed_keys_fails(self):
 
2772
        ref1 = SelfRef()
 
2773
        self.store.add(ref1)
 
2774
        self.store.flush()
 
2775
 
 
2776
        ref1.id = 400
 
2777
        ref2 = SelfRef()
 
2778
        ref2.selfref = ref1
 
2779
        ref1.selfref = ref2
 
2780
 
 
2781
        self.assertRaises(OrderLoopError, self.store.flush)
 
2782
 
 
2783
    def test_reference_loop_with_undefined_and_changed_keys_fails2(self):
 
2784
        ref1 = SelfRef()
 
2785
        self.store.add(ref1)
 
2786
        self.store.flush()
 
2787
 
 
2788
        ref2 = SelfRef()
 
2789
        ref2.selfref = ref1
 
2790
        ref1.selfref = ref2
 
2791
        ref1.id = 400
 
2792
 
 
2793
        self.assertRaises(OrderLoopError, self.store.flush)
 
2794
 
 
2795
    def test_reference_loop_broken_by_set(self):
 
2796
        ref1 = SelfRef()
 
2797
        ref2 = SelfRef()
 
2798
        ref1.selfref = ref2
 
2799
        ref2.selfref = ref1
 
2800
        self.store.add(ref1)
 
2801
 
 
2802
        ref1.selfref = None
 
2803
        self.store.flush()
 
2804
 
 
2805
    def test_reference_loop_set_only_removes_own_flush_order(self):
 
2806
        ref1 = SelfRef()
 
2807
        ref2 = SelfRef()
 
2808
        self.store.add(ref2)
 
2809
        self.store.flush()
 
2810
 
 
2811
        # The following does not create a loop since the keys are
 
2812
        # dirty (as shown in another test).
 
2813
        ref1.selfref = ref2
 
2814
        ref2.selfref = ref1
 
2815
 
 
2816
        # Now add a flush order loop.
 
2817
        self.store.add_flush_order(ref1, ref2)
 
2818
        self.store.add_flush_order(ref2, ref1)
 
2819
 
 
2820
        # Now break the reference.  This should leave the flush
 
2821
        # ordering loop we previously created in place..
 
2822
        ref1.selfref = None
 
2823
        self.assertRaises(OrderLoopError, self.store.flush)
 
2824
 
2402
2825
    def add_reference_set_bar_400(self):
2403
2826
        bar = Bar()
2404
2827
        bar.id = 400
2421
2844
                          (400, 20, "Title 100"),
2422
2845
                         ])
2423
2846
 
 
2847
    def test_reference_set_explicitly_with_wrapper(self):
 
2848
        self.add_reference_set_bar_400()
 
2849
 
 
2850
        foo = self.store.get(FooRefSet, 20)
 
2851
 
 
2852
        items = []
 
2853
        for bar in FooRefSet.bars.__get__(Wrapper(foo)):
 
2854
            items.append((bar.id, bar.foo_id, bar.title))
 
2855
        items.sort()
 
2856
 
 
2857
        self.assertEquals(items, [
 
2858
                          (200, 20, "Title 200"),
 
2859
                          (400, 20, "Title 100"),
 
2860
                         ])
 
2861
 
2424
2862
    def test_reference_set_with_added(self):
2425
2863
        bar1 = Bar()
2426
2864
        bar1.id = 400
2759
3197
 
2760
3198
        self.store.add(foo)
2761
3199
 
2762
 
        store = Store(self.database)
 
3200
        store = self.create_store()
2763
3201
        store.add(bar1)
2764
3202
 
2765
3203
        self.assertEquals(Store.of(foo), self.store)
2776
3214
                          (200, 20, "Title 200"),
2777
3215
                          (400, 20, "Title 100"),
2778
3216
                         ])
2779
 
        
 
3217
 
2780
3218
 
2781
3219
    def test_indirect_reference_set(self):
2782
3220
        foo = self.store.get(FooIndRefSet, 20)
3032
3470
                          (200, "Title 200"),
3033
3471
                         ])
3034
3472
 
 
3473
    def test_indirect_reference_set_add_remove_with_wrapper(self):
 
3474
        foo = self.store.get(FooIndRefSet, 20)
 
3475
        bar300 = self.store.get(Bar, 300)
 
3476
        bar200 = self.store.get(Bar, 200)
 
3477
 
 
3478
        foo.bars.add(Wrapper(bar300))
 
3479
        foo.bars.remove(Wrapper(bar200))
 
3480
 
 
3481
        items = []
 
3482
        for bar in foo.bars:
 
3483
            items.append((bar.id, bar.title))
 
3484
        items.sort()
 
3485
 
 
3486
        self.assertEquals(items, [
 
3487
                          (100, "Title 300"),
 
3488
                          (300, "Title 100"),
 
3489
                         ])
 
3490
 
3035
3491
    def test_indirect_reference_set_add_remove_with_added(self):
3036
3492
        foo = FooIndRefSet()
3037
3493
        foo.id = 40
3334
3790
        self.store.reload(blob)
3335
3791
        self.assertEquals(blob.bin, "\x80\x02}q\x01(U\x01aK\x01U\x01bK\x02u.")
3336
3792
 
 
3793
    def test_undefined_variables_filled_on_find(self):
 
3794
        """
 
3795
        Check that when data is fetched from the database on a find,
 
3796
        it is used to fill up any undefined variables.
 
3797
        """
 
3798
        # We do a first find to get the object_infos into the cache.
 
3799
        foos = list(self.store.find(Foo, title=u"Title 20"))
 
3800
 
 
3801
        # Commit so that all foos are invalidated and variables are
 
3802
        # set back to AutoReload.
 
3803
        self.store.commit()
 
3804
 
 
3805
        # Another find which should reuse in-memory foos.
 
3806
        for foo in self.store.find(Foo, title=u"Title 20"):
 
3807
            # Make sure we have all variables defined, because
 
3808
            # values were already retrieved by the find's select.
 
3809
            obj_info = get_obj_info(foo)
 
3810
            for column in obj_info.variables:
 
3811
                self.assertTrue(obj_info.variables[column].is_defined())
 
3812
 
 
3813
    def test_defined_variables_not_overridden_on_find(self):
 
3814
        """
 
3815
        Check that the keep_defined=True setting in _load_object()
 
3816
        is in place.  In practice, it ensures that already defined
 
3817
        values aren't replaced during a find, when new data comes
 
3818
        from the database and is used whenever possible.
 
3819
        """
 
3820
        blob = self.store.get(Blob, 20)
 
3821
        blob.bin = "\x80\x02}q\x01U\x01aK\x01s."
 
3822
        class PickleBlob(object):
 
3823
            __storm_table__ = "bin"
 
3824
            id = Int(primary=True)
 
3825
            pickle = Pickle("bin")
 
3826
        blob = self.store.get(PickleBlob, 20)
 
3827
        value = blob.pickle
 
3828
        # Now the find should not destroy our value pointer.
 
3829
        blob = self.store.find(PickleBlob, id=20).one()
 
3830
        self.assertTrue(value is blob.pickle)
 
3831
 
3337
3832
    def test_pickle_variable_with_deleted_object(self):
3338
3833
        class PickleBlob(Blob):
3339
3834
            bin = Pickle()
3423
3918
 
3424
3919
        self.assertEquals(foo.title, "New title")
3425
3920
 
3426
 
 
3427
3921
    def test_expr_values_flush_on_demand(self):
3428
3922
        foo = self.store.get(Foo, 20)
3429
3923
 
3446
3940
                          (30, "Title 10"),
3447
3941
                         ])
3448
3942
 
 
3943
    def test_expr_values_flush_and_load_in_separate_steps(self):
 
3944
        foo = self.store.get(Foo, 20)
 
3945
 
 
3946
        foo.title = SQL("'New title'")
 
3947
 
 
3948
        self.store.flush()
 
3949
 
 
3950
        # It's already in the database.
 
3951
        self.assertEquals(self.get_items(), [
 
3952
                          (10, "Title 30"),
 
3953
                          (20, "New title"),
 
3954
                          (30, "Title 10"),
 
3955
                         ])
 
3956
 
 
3957
        # But our value is now an AutoReload.
 
3958
        lazy_value = get_obj_info(foo).variables[Foo.title].get_lazy()
 
3959
        self.assertTrue(lazy_value is AutoReload)
 
3960
 
 
3961
        # Which gets resolved once touched.
 
3962
        self.assertEquals(foo.title, u"New title")
 
3963
 
3449
3964
    def test_expr_values_flush_on_demand_with_added(self):
3450
3965
        foo = Foo()
3451
3966
        foo.id = 40
3452
3967
        foo.title = SQL("'New title'")
3453
 
        
 
3968
 
3454
3969
        self.store.add(foo)
3455
3970
 
3456
3971
        # No commits yet.
3474
3989
    def test_expr_values_flush_on_demand_with_removed_and_added(self):
3475
3990
        foo = self.store.get(Foo, 20)
3476
3991
        foo.title = SQL("'New title'")
3477
 
        
 
3992
 
3478
3993
        self.store.remove(foo)
3479
3994
        self.store.add(foo)
3480
3995
 
3497
4012
 
3498
4013
    def test_expr_values_flush_on_demand_with_removed_and_rollbacked(self):
3499
4014
        foo = self.store.get(Foo, 20)
3500
 
        
 
4015
 
3501
4016
        self.store.remove(foo)
3502
4017
        self.store.rollback()
3503
4018
 
3671
4186
        self.store.autoreload(foo)
3672
4187
        self.assertTrue(get_obj_info(foo) not in self.store._dirty)
3673
4188
 
 
4189
    def test_autoreload_missing_columns_on_insertion(self):
 
4190
        foo = Foo()
 
4191
        self.store.add(foo)
 
4192
        self.store.flush()
 
4193
        lazy_value = get_obj_info(foo).variables[Foo.title].get_lazy()
 
4194
        self.assertEquals(lazy_value, AutoReload)
 
4195
        self.assertEquals(foo.title, u"Default Title")
 
4196
 
3674
4197
    def test_reference_break_on_local_diverged_doesnt_autoreload(self):
3675
4198
        foo = self.store.get(Foo, 10)
3676
4199
        self.store.autoreload(foo)
3740
4263
        self.store.invalidate(foo)
3741
4264
        self.assertRaises(LostObjectError, setattr, foo, "title", u"Title 40")
3742
4265
 
3743
 
    def test_invalidate_and_get_fills_undefined(self):
 
4266
    def test_invalidate_and_get_returns_autoreloaded(self):
3744
4267
        foo = self.store.get(Foo, 20)
3745
4268
        self.store.invalidate(foo)
3746
 
 
3747
 
        unset_foo = Foo()
3748
 
        unset_state = get_obj_info(unset_foo).variables[Foo.title].get_state()
3749
 
        get_obj_info(foo).variables[Foo.title].set_state(unset_state)
3750
 
 
3751
4269
        foo = self.store.get(Foo, 20)
 
4270
        self.assertEquals(get_obj_info(foo).variables[Foo.title].get_lazy(),
 
4271
                          AutoReload)
3752
4272
        self.assertEquals(foo.title, "Title 20")
3753
4273
 
3754
4274
    def test_invalidated_hook(self):
3766
4286
    def test_invalidated_hook_called_after_all_invalidated(self):
3767
4287
        """
3768
4288
        Ensure that invalidated hooks are called only when all objects have
3769
 
        already been marked as invalidated. See comment in 
 
4289
        already been marked as invalidated. See comment in
3770
4290
        store.py:_mark_autoreload.
3771
4291
        """
3772
4292
        called = []
3997
4517
        variable = MyBarProxy.foo_title.variable_factory(value=u"Hello")
3998
4518
        self.assertTrue(isinstance(variable, UnicodeVariable))
3999
4519
 
 
4520
    def test_proxy_with_extra_table(self):
 
4521
        """
 
4522
        Proxies use a join on auto_tables. It should work even if we have
 
4523
        more tables in the query.
 
4524
        """
 
4525
        result = self.store.find((BarProxy, Link),
 
4526
                                 BarProxy.foo_title == u"Title 20",
 
4527
                                 BarProxy.foo_id == Link.foo_id)
 
4528
        results = list(result)
 
4529
        self.assertEquals(len(results), 2)
 
4530
        for bar, link in results:
 
4531
            self.assertEquals(bar.id, 200)
 
4532
            self.assertEquals(bar.foo_title, u"Title 20")
 
4533
            self.assertEquals(bar.foo_id, 20)
 
4534
            self.assertEquals(link.foo_id, 20)
 
4535
 
4000
4536
    def test_get_decimal_property(self):
4001
4537
        money = self.store.get(Money, 10)
4002
4538
        self.assertEquals(money.value, decimal.Decimal("12.3455"))
4054
4590
        finally:
4055
4591
            store.close()
4056
4592
 
 
4593
    def test_strong_cache_used(self):
 
4594
        """
 
4595
        Objects should be referenced in the cache if not referenced
 
4596
        in application code.
 
4597
        """
 
4598
        foo = self.store.get(Foo, 20)
 
4599
        foo.tainted = True
 
4600
        obj_info = get_obj_info(foo)
 
4601
        del foo
 
4602
        gc.collect()
 
4603
        cached = self.store.find(Foo).cached()
 
4604
        self.assertEquals(len(cached), 1)
 
4605
        foo = self.store.get(Foo, 20)
 
4606
        self.assertEquals(cached, [foo])
 
4607
        self.assertTrue(hasattr(foo, "tainted"))
 
4608
 
 
4609
    def test_strong_cache_cleared_on_invalidate_all(self):
 
4610
        cache = self.get_cache(self.store)
 
4611
        foo = self.store.get(Foo, 20)
 
4612
        self.assertEquals(cache.get_cached(), [get_obj_info(foo)])
 
4613
        self.store.invalidate()
 
4614
        self.assertEquals(cache.get_cached(), [])
 
4615
 
 
4616
    def test_strong_cache_loses_object_on_invalidate(self):
 
4617
        cache = self.get_cache(self.store)
 
4618
        foo = self.store.get(Foo, 20)
 
4619
        self.assertEquals(cache.get_cached(), [get_obj_info(foo)])
 
4620
        self.store.invalidate(foo)
 
4621
        self.assertEquals(cache.get_cached(), [])
 
4622
 
 
4623
    def test_strong_cache_loses_object_on_remove(self):
 
4624
        """
 
4625
        Make sure an object gets removed from the strong reference
 
4626
        cache when removed from the store.
 
4627
        """
 
4628
        cache = self.get_cache(self.store)
 
4629
        foo = self.store.get(Foo, 20)
 
4630
        self.assertEquals(cache.get_cached(), [get_obj_info(foo)])
 
4631
        self.store.remove(foo)
 
4632
        self.store.flush()
 
4633
        self.assertEquals(cache.get_cached(), [])
 
4634
 
 
4635
    def test_strong_cache_renews_object_on_get(self):
 
4636
        cache = self.get_cache(self.store)
 
4637
        foo1 = self.store.get(Foo, 10)
 
4638
        foo2 = self.store.get(Foo, 20)
 
4639
        foo1 = self.store.get(Foo, 10)
 
4640
        self.assertEquals(cache.get_cached(),
 
4641
                          [get_obj_info(foo1), get_obj_info(foo2)])
 
4642
 
 
4643
    def test_strong_cache_renews_object_on_find(self):
 
4644
        cache = self.get_cache(self.store)
 
4645
        foo1 = self.store.find(Foo, id=10).one()
 
4646
        foo2 = self.store.find(Foo, id=20).one()
 
4647
        foo1 = self.store.find(Foo, id=10).one()
 
4648
        self.assertEquals(cache.get_cached(),
 
4649
                          [get_obj_info(foo1), get_obj_info(foo2)])
 
4650
 
 
4651
    def test_unicode(self):
 
4652
        class MyFoo(Foo):
 
4653
            pass
 
4654
        foo = self.store.get(Foo, 20)
 
4655
        myfoo = self.store.get(MyFoo, 20)
 
4656
        for title in [u'Cừơng', u'Đức', u'Hạnh']:
 
4657
            foo.title = title
 
4658
            self.store.commit()
 
4659
            try:
 
4660
                self.assertEquals(myfoo.title, title)
 
4661
            except AssertionError, e:
 
4662
                raise AssertionError(str(e) +
 
4663
                    " (ensure your database was created with CREATE DATABASE"
 
4664
                    " ... CHARACTER SET utf8)")
 
4665
 
4057
4666
 
4058
4667
class EmptyResultSetTest(object):
4059
4668
 
4064
4673
        self.create_store()
4065
4674
        self.empty = EmptyResultSet()
4066
4675
        self.result = self.store.find(Foo)
4067
 
        
 
4676
 
4068
4677
    def tearDown(self):
4069
4678
        self.drop_store()
4070
4679
        self.drop_tables()
4116
4725
 
4117
4726
    def test_any(self):
4118
4727
        self.assertEquals(self.result.any(), None)
4119
 
        self.assertEquals(self.empty.any(), None)        
 
4728
        self.assertEquals(self.empty.any(), None)
4120
4729
 
4121
4730
    def test_first_unordered(self):
4122
4731
        self.assertRaises(UnorderedError, self.result.first)
4182
4791
 
4183
4792
    def test_set_no_args(self):
4184
4793
        self.assertEquals(self.result.set(), None)
4185
 
        self.assertEquals(self.empty.set(), None)        
 
4794
        self.assertEquals(self.empty.set(), None)
4186
4795
 
4187
4796
    def test_cached(self):
4188
4797
        self.assertEquals(self.result.cached(), [])