30
30
from storm.properties import PropertyPublisherMeta, Decimal
31
31
from storm.variables import PickleVariable
32
32
from storm.expr import (
33
Asc, Desc, Select, LeftJoin, SQL, Count, Sum, Avg, And, Or, Eq, Lower)
33
Asc, Desc, Select, Func, LeftJoin, SQL, Count, Sum, Avg, And, Or, Eq,
34
35
from storm.variables import Variable, UnicodeVariable, IntVariable
35
36
from storm.info import get_obj_info, ClassAlias
36
37
from storm.exceptions import *
147
148
def test_wb_default_cache_size(self):
148
149
store = Store(DummyDatabase())
149
self.assertEquals(store._cache._size, 1000)
152
class StoreDatabaseTest(TestHelper):
154
def test_store_has_reference_to_its_database(self):
155
database = DummyDatabase()
156
store = Store(database)
157
self.assertIdentical(store.get_database(), database)
150
self.assertEquals(store._cache._size, 100)
160
153
class StoreTest(object):
428
421
self.store.flush()
429
422
self.assertEquals(len(events), 1)
431
def test_wb_flush_event_with_deleted_object_before_flush(self):
433
When an object is deleted before flush and it contains mutable
434
variables, those variables unhook from the global event system to
437
class PickleBlob(Blob):
440
# Disable the cache, which holds strong references.
441
self.get_cache(self.store).set_size(0)
443
blob = self.store.get(Blob, 20)
444
blob.bin = "\x80\x02}q\x01U\x01aK\x01s."
449
pickle_blob = self.store.get(PickleBlob, 20)
450
pickle_blob.bin = "foobin"
454
self.assertEquals(self.store._event._hooks["flush"], set())
456
424
def test_obj_info_with_deleted_object_with_get(self):
457
425
# Same thing, but using get rather than find.
873
841
count = self.store.find(Link).count(Link.foo_id, distinct=True)
874
842
self.assertEquals(count, 3)
876
def test_find_limit_count(self):
877
result = self.store.find(Link.foo_id)
878
result.config(limit=2)
879
count = result.count()
880
self.assertEquals(count, 2)
882
def test_find_offset_count(self):
883
result = self.store.find(Link.foo_id)
884
result.config(offset=3)
885
count = result.count()
886
self.assertEquals(count, 3)
888
def test_find_sliced_count(self):
889
result = self.store.find(Link.foo_id)
890
count = result[2:4].count()
891
self.assertEquals(count, 2)
893
def test_find_distinct_count(self):
894
result = self.store.find(Link.foo_id)
895
result.config(distinct=True)
896
count = result.count()
897
self.assertEquals(count, 3)
899
def test_find_distinct_order_by_limit_count(self):
900
result = self.store.find(Foo)
901
result.order_by(Foo.title)
902
result.config(distinct=True, limit=3)
903
count = result.count()
904
self.assertEquals(count, 3)
906
def test_find_distinct_count_multiple_columns(self):
907
result = self.store.find((Link.foo_id, Link.bar_id))
908
result.config(distinct=True)
909
count = result.count()
910
self.assertEquals(count, 6)
912
def test_find_count_column_with_implicit_distinct(self):
913
result = self.store.find(Link)
914
result.config(distinct=True)
915
count = result.count(Link.foo_id)
916
self.assertEquals(count, 6)
918
844
def test_find_max(self):
919
845
self.assertEquals(self.store.find(Foo).max(Foo.id), 30)
926
852
self.assertEquals(title, "Title 30")
927
853
self.assertTrue(isinstance(title, unicode))
929
def test_find_max_with_empty_result_and_disallow_none(self):
931
__storm_table__ = "bar"
932
id = Int(primary=True)
933
foo_id = Int(allow_none=False)
935
result = self.store.find(Bar, Bar.id > 1000)
936
self.assertTrue(result.is_empty())
937
self.assertEquals(result.max(Bar.foo_id), None)
939
855
def test_find_min(self):
940
856
self.assertEquals(self.store.find(Foo).min(Foo.id), 10)
947
863
self.assertEquals(title, "Title 10")
948
864
self.assertTrue(isinstance(title, unicode))
950
def test_find_min_with_empty_result_and_disallow_none(self):
952
__storm_table__ = "bar"
953
id = Int(primary=True)
954
foo_id = Int(allow_none=False)
956
result = self.store.find(Bar, Bar.id > 1000)
957
self.assertTrue(result.is_empty())
958
self.assertEquals(result.min(Bar.foo_id), None)
960
866
def test_find_avg(self):
961
867
self.assertEquals(self.store.find(Foo).avg(Foo.id), 20)
976
882
def test_find_sum_expr(self):
977
883
self.assertEquals(self.store.find(Foo).sum(Foo.id * 2), 120)
979
def test_find_sum_with_empty_result_and_disallow_none(self):
981
__storm_table__ = "bar"
982
id = Int(primary=True)
983
foo_id = Int(allow_none=False)
985
result = self.store.find(Bar, Bar.id > 1000)
986
self.assertTrue(result.is_empty())
987
self.assertEquals(result.sum(Bar.foo_id), None)
989
885
def test_find_max_order_by(self):
990
886
"""Interaction between order by and aggregation shouldn't break."""
991
887
result = self.store.find(Foo)
2611
2507
bar.foo_id = SQL("20")
2612
2508
self.assertEquals(bar.foo.id, 20)
2614
def test_reference_remote_leak_on_flush_with_changed(self):
2616
"changed" events only hold weak references to remote infos object, thus
2617
not creating a leak when unhooked.
2619
self.get_cache(self.store).set_size(0)
2620
bar = self.store.get(Bar, 100)
2621
bar.foo.title = u"Changed title"
2622
bar_ref = weakref.ref(get_obj_info(bar))
2627
self.assertEquals(bar_ref(), None)
2629
def test_reference_remote_leak_on_flush_with_removed(self):
2631
"removed" events only hold weak references to remote infos objects,
2632
thus not creating a leak when unhooked.
2634
self.get_cache(self.store).set_size(0)
2636
bar = Reference(Foo.id, Bar.foo_id, on_remote=True)
2638
foo = self.store.get(MyFoo, 10)
2639
foo.bar.title = u"Changed title"
2640
foo_ref = weakref.ref(get_obj_info(foo))
2645
self.assertEquals(foo_ref(), None)
2647
2510
def test_reference_break_on_remote_diverged_by_lazy(self):
2648
2511
class MyBar(Bar):
4532
4387
self.store.invalidate()
4534
4389
obj_info = get_obj_info(pickle_blob)
4535
variable = obj_info.variables[PickleBlob.bin]
4390
variable = obj_info.variables[PickleBlob.bin]
4536
4391
var_ref = weakref.ref(variable)
4537
4392
del variable, blob, pickle_blob, obj_info
4571
4426
self.store.invalidate()
4573
4428
obj_info = get_obj_info(pickle_blob)
4574
variable = obj_info.variables[PickleBlob.bin]
4429
variable = obj_info.variables[PickleBlob.bin]
4575
4430
var_ref = weakref.ref(variable)
4576
4431
del variable, blob, pickle_blob, obj_info, foo
5024
4879
obj_info = get_obj_info(foo)
5025
4880
self.assertEquals(obj_info.variables[Foo.title].get_lazy(), AutoReload)
5027
def test_primary_key_reference(self):
5029
When an object references another one using its primary key, it
5030
correctly checks for the invalidated state after the store has been
5031
committed, detecting if the referenced object has been removed behind
5034
class BarOnRemote(object):
5035
__storm_table__ = "bar"
5036
foo_id = Int(primary=True)
5037
foo = Reference(foo_id, Foo.id, on_remote=True)
5038
foo = self.store.get(Foo, 10)
5039
bar = self.store.get(BarOnRemote, 10)
5040
self.assertEqual(bar.foo, foo)
5041
self.store.execute("DELETE FROM foo WHERE id = 10")
5043
self.assertEqual(bar.foo, None)
5045
4882
def test_invalidate_and_get_object(self):
5046
4883
foo = self.store.get(Foo, 20)
5047
4884
self.store.invalidate(foo)
5178
5015
self.store.reset()
5179
5016
self.assertIdentical(Store.of(foo1), None)
5181
def test_result_find(self):
5182
result1 = self.store.find(Foo, Foo.id <= 20)
5183
result2 = result1.find(Foo.id > 10)
5185
self.assertTrue(foo)
5186
self.assertEqual(foo.id, 20)
5188
def test_result_find_kwargs(self):
5189
result1 = self.store.find(Foo, Foo.id <= 20)
5190
result2 = result1.find(id=20)
5192
self.assertTrue(foo)
5193
self.assertEqual(foo.id, 20)
5195
def test_result_find_introduce_join(self):
5196
result1 = self.store.find(Foo, Foo.id <= 20)
5197
result2 = result1.find(Foo.id == Bar.foo_id,
5198
Bar.title == u"Title 300")
5200
self.assertTrue(foo)
5201
self.assertEqual(foo.id, 10)
5203
def test_result_find_tuple(self):
5204
result1 = self.store.find((Foo, Bar), Foo.id == Bar.foo_id)
5205
result2 = result1.find(Bar.title == u"Title 100")
5206
foo_bar = result2.one()
5207
self.assertTrue(foo_bar)
5209
self.assertEqual(foo.id, 30)
5210
self.assertEqual(bar.id, 300)
5212
def test_result_find_undef_where(self):
5213
result = self.store.find(Foo, Foo.id == 20).find()
5215
self.assertTrue(foo)
5216
self.assertEqual(foo.id, 20)
5217
result = self.store.find(Foo).find(Foo.id == 20)
5219
self.assertTrue(foo)
5220
self.assertEqual(foo.id, 20)
5222
def test_result_find_fails_on_set_expr(self):
5223
result1 = self.store.find(Foo)
5224
result2 = self.store.find(Foo)
5225
result = result1.union(result2)
5226
self.assertRaises(FeatureError, result.find, Foo.id == 20)
5228
def test_result_find_fails_on_slice(self):
5229
result = self.store.find(Foo)[1:2]
5230
self.assertRaises(FeatureError, result.find, Foo.id == 20)
5232
def test_result_find_fails_on_group_by(self):
5233
result = self.store.find(Foo)
5234
result.group_by(Foo)
5235
self.assertRaises(FeatureError, result.find, Foo.id == 20)
5237
5018
def test_result_union(self):
5238
5019
result1 = self.store.find(Foo, id=30)
5239
5020
result2 = self.store.find(Foo, id=10)
5719
5500
self.assertEqual(len(calls), 1)
5720
5501
self.assertEqual(calls[0], self.store)
5722
def test_rowcount_remove(self):
5723
# All supported backends support rowcount, so far.
5724
result_to_remove = self.store.find(Foo, Foo.id <= 30)
5725
self.assertEquals(result_to_remove.remove(), 3)
5728
5503
class EmptyResultSetTest(object):
5730
5505
def setUp(self):
5825
5600
self.assertEquals(self.empty.order_by(Foo.title), self.empty)
5827
5602
def test_remove(self):
5828
self.assertEquals(self.result.remove(), 0)
5829
self.assertEquals(self.empty.remove(), 0)
5603
self.assertEquals(self.result.remove(), None)
5604
self.assertEquals(self.empty.remove(), None)
5831
5606
def test_count(self):
5832
5607
self.assertEquals(self.result.count(), 0)
5866
5641
self.assertEquals(self.result.cached(), [])
5867
5642
self.assertEquals(self.empty.cached(), [])
5869
def test_find(self):
5870
self.assertEquals(list(self.result.find(Foo.title == u"foo")), [])
5871
self.assertEquals(list(self.empty.find(Foo.title == u"foo")), [])
5873
5644
def test_union(self):
5874
5645
self.assertEquals(self.empty.union(self.empty), self.empty)
5875
5646
self.assertEquals(type(self.empty.union(self.result)),