1
from test.lib.testing import assert_raises, assert_raises_message
3
3
from sqlalchemy import util, sql, exc
4
from test.lib.testing import eq_, is_, ne_, fails_if
5
from test.lib.util import gc_collect, picklers
4
from sqlalchemy.testing import assert_raises, assert_raises_message, fixtures
5
from sqlalchemy.testing import eq_, is_, ne_, fails_if
6
from sqlalchemy.testing.util import picklers
6
7
from sqlalchemy.util import classproperty
7
from test.lib import fixtures
10
class KeyedTupleTest():
13
keyed_tuple = util.KeyedTuple([])
14
eq_(type(keyed_tuple), util.KeyedTuple)
15
eq_(str(keyed_tuple), '()')
16
eq_(len(keyed_tuple), 0)
18
eq_(keyed_tuple.__dict__, {'_labels': []})
19
eq_(keyed_tuple.keys(), [])
20
eq_(keyed_tuple._fields, ())
21
eq_(keyed_tuple._asdict(), {})
23
def test_values_but_no_labels(self):
24
keyed_tuple = util.KeyedTuple([1, 2])
25
eq_(type(keyed_tuple), util.KeyedTuple)
26
eq_(str(keyed_tuple), '(1, 2)')
27
eq_(len(keyed_tuple), 2)
29
eq_(keyed_tuple.__dict__, {'_labels': []})
30
eq_(keyed_tuple.keys(), [])
31
eq_(keyed_tuple._fields, ())
32
eq_(keyed_tuple._asdict(), {})
34
eq_(keyed_tuple[0], 1)
35
eq_(keyed_tuple[1], 2)
37
def test_basic_creation(self):
38
keyed_tuple = util.KeyedTuple([1, 2], ['a', 'b'])
39
eq_(str(keyed_tuple), '(1, 2)')
40
eq_(keyed_tuple.keys(), ['a', 'b'])
41
eq_(keyed_tuple._fields, ('a', 'b'))
42
eq_(keyed_tuple._asdict(), {'a': 1, 'b': 2})
44
def test_basic_index_access(self):
45
keyed_tuple = util.KeyedTuple([1, 2], ['a', 'b'])
46
eq_(keyed_tuple[0], 1)
47
eq_(keyed_tuple[1], 2)
51
assert_raises(IndexError, should_raise)
53
def test_basic_attribute_access(self):
54
keyed_tuple = util.KeyedTuple([1, 2], ['a', 'b'])
60
assert_raises(AttributeError, should_raise)
62
def test_none_label(self):
63
keyed_tuple = util.KeyedTuple([1, 2, 3], ['a', None, 'b'])
64
eq_(str(keyed_tuple), '(1, 2, 3)')
66
# TODO: consider not allowing None labels
67
expected = {'a': 1, None: 2, 'b': 3, '_labels': ['a', None, 'b']}
68
eq_(keyed_tuple.__dict__, expected)
69
eq_(keyed_tuple.keys(), ['a', 'b'])
70
eq_(keyed_tuple._fields, ('a', 'b'))
71
eq_(keyed_tuple._asdict(), {'a': 1, 'b': 3})
73
# attribute access: can't get at value 2
77
# index access: can get at value 2
78
eq_(keyed_tuple[0], 1)
79
eq_(keyed_tuple[1], 2)
80
eq_(keyed_tuple[2], 3)
82
def test_duplicate_labels(self):
83
keyed_tuple = util.KeyedTuple([1, 2, 3], ['a', 'b', 'b'])
84
eq_(str(keyed_tuple), '(1, 2, 3)')
86
# TODO: consider not allowing duplicate labels
87
expected = {'a': 1, 'b': 3, '_labels': ['a', 'b', 'b']}
88
eq_(keyed_tuple.__dict__, expected)
89
eq_(keyed_tuple.keys(), ['a', 'b', 'b'])
90
eq_(keyed_tuple._fields, ('a', 'b', 'b'))
91
eq_(keyed_tuple._asdict(), {'a': 1, 'b': 3})
93
# attribute access: can't get at value 2
97
# index access: can get at value 2
98
eq_(keyed_tuple[0], 1)
99
eq_(keyed_tuple[1], 2)
100
eq_(keyed_tuple[2], 3)
102
def test_immutable(self):
103
keyed_tuple = util.KeyedTuple([1, 2], ['a', 'b'])
104
eq_(str(keyed_tuple), '(1, 2)')
106
# attribute access: mutable
107
eq_(keyed_tuple.a, 1)
109
eq_(keyed_tuple.a, 100)
111
eq_(keyed_tuple.c, 300)
113
# index access: immutable
116
assert_raises(TypeError, should_raise)
9
119
class OrderedDictTest(fixtures.TestBase):
10
121
def test_odict(self):
11
122
o = util.OrderedDict()
304
439
eq_(ids2, IdentitySet([o2, o3]))
441
def test_dunder_eq(self):
442
_, _, twin1, twin2, unique1, unique2 = self._create_sets()
445
eq_(twin1 == twin2, True)
446
eq_(unique1 == unique2, False)
449
not_an_identity_set = object()
450
eq_(unique1 == not_an_identity_set, False)
452
def test_dunder_ne(self):
453
_, _, twin1, twin2, unique1, unique2 = self._create_sets()
456
eq_(twin1 != twin2, False)
457
eq_(unique1 != unique2, True)
460
not_an_identity_set = object()
461
eq_(unique1 != not_an_identity_set, True)
463
def test_dunder_le(self):
464
super_, sub_, twin1, twin2, unique1, unique2 = self._create_sets()
467
eq_(sub_ <= super_, True)
468
eq_(super_ <= sub_, False)
471
eq_(twin1 <= twin2, True)
472
eq_(twin2 <= twin1, True)
474
# totally different sets
475
eq_(unique1 <= unique2, False)
476
eq_(unique2 <= unique1, False)
480
not_an_identity_set = object()
481
return unique1 <= not_an_identity_set
482
self._assert_unorderable_types(should_raise)
484
def test_dunder_lt(self):
485
super_, sub_, twin1, twin2, unique1, unique2 = self._create_sets()
488
eq_(sub_ < super_, True)
489
eq_(super_ < sub_, False)
492
eq_(twin1 < twin2, False)
493
eq_(twin2 < twin1, False)
495
# totally different sets
496
eq_(unique1 < unique2, False)
497
eq_(unique2 < unique1, False)
501
not_an_identity_set = object()
502
return unique1 < not_an_identity_set
503
self._assert_unorderable_types(should_raise)
505
def test_dunder_ge(self):
506
super_, sub_, twin1, twin2, unique1, unique2 = self._create_sets()
509
eq_(sub_ >= super_, False)
510
eq_(super_ >= sub_, True)
513
eq_(twin1 >= twin2, True)
514
eq_(twin2 >= twin1, True)
516
# totally different sets
517
eq_(unique1 >= unique2, False)
518
eq_(unique2 >= unique1, False)
522
not_an_identity_set = object()
523
return unique1 >= not_an_identity_set
524
self._assert_unorderable_types(should_raise)
526
def test_dunder_gt(self):
527
super_, sub_, twin1, twin2, unique1, unique2 = self._create_sets()
530
eq_(sub_ > super_, False)
531
eq_(super_ > sub_, True)
534
eq_(twin1 > twin2, False)
535
eq_(twin2 > twin1, False)
537
# totally different sets
538
eq_(unique1 > unique2, False)
539
eq_(unique2 > unique1, False)
543
not_an_identity_set = object()
544
return unique1 > not_an_identity_set
545
self._assert_unorderable_types(should_raise)
547
def test_issubset(self):
548
super_, sub_, twin1, twin2, unique1, unique2 = self._create_sets()
551
eq_(sub_.issubset(super_), True)
552
eq_(super_.issubset(sub_), False)
555
eq_(twin1.issubset(twin2), True)
556
eq_(twin2.issubset(twin1), True)
558
# totally different sets
559
eq_(unique1.issubset(unique2), False)
560
eq_(unique2.issubset(unique1), False)
563
not_an_identity_set = object()
564
assert_raises(TypeError, unique1.issubset, not_an_identity_set)
566
def test_issuperset(self):
567
super_, sub_, twin1, twin2, unique1, unique2 = self._create_sets()
570
eq_(sub_.issuperset(super_), False)
571
eq_(super_.issuperset(sub_), True)
574
eq_(twin1.issuperset(twin2), True)
575
eq_(twin2.issuperset(twin1), True)
577
# totally different sets
578
eq_(unique1.issuperset(unique2), False)
579
eq_(unique2.issuperset(unique1), False)
582
not_an_identity_set = object()
583
assert_raises(TypeError, unique1.issuperset, not_an_identity_set)
585
def test_union(self):
586
super_, sub_, twin1, twin2, _, _ = self._create_sets()
589
eq_(sub_.union(super_), super_)
590
eq_(super_.union(sub_), super_)
593
eq_(twin1.union(twin2), twin1)
594
eq_(twin2.union(twin1), twin1)
597
empty = util.IdentitySet([])
598
eq_(empty.union(empty), empty)
600
# totally different sets
601
unique1 = util.IdentitySet([1])
602
unique2 = util.IdentitySet([2])
603
eq_(unique1.union(unique2), util.IdentitySet([1, 2]))
606
not_an_identity_set = object()
607
assert_raises(TypeError, unique1.union, not_an_identity_set)
609
def test_dunder_or(self):
610
super_, sub_, twin1, twin2, _, _ = self._create_sets()
613
eq_(sub_ | super_, super_)
614
eq_(super_ | sub_, super_)
617
eq_(twin1 | twin2, twin1)
618
eq_(twin2 | twin1, twin1)
621
empty = util.IdentitySet([])
622
eq_(empty | empty, empty)
624
# totally different sets
625
unique1 = util.IdentitySet([1])
626
unique2 = util.IdentitySet([2])
627
eq_(unique1 | unique2, util.IdentitySet([1, 2]))
631
not_an_identity_set = object()
632
return unique1 | not_an_identity_set
633
assert_raises(TypeError, should_raise)
635
def test_update(self):
638
def test_dunder_ior(self):
639
super_, sub_, _, _, _, _ = self._create_sets()
647
# totally different sets
648
unique1 = util.IdentitySet([1])
649
unique2 = util.IdentitySet([2])
651
eq_(unique1, util.IdentitySet([1, 2]))
652
eq_(unique2, util.IdentitySet([2]))
656
unique = util.IdentitySet([1])
657
not_an_identity_set = object()
658
unique |= not_an_identity_set
659
assert_raises(TypeError, should_raise)
661
def test_difference(self):
662
_, _, twin1, twin2, _, _ = self._create_sets()
665
set1 = util.IdentitySet([1, 2, 3])
666
set2 = util.IdentitySet([2, 3, 4])
667
eq_(set1.difference(set2), util.IdentitySet([1]))
668
eq_(set2.difference(set1), util.IdentitySet([4]))
671
empty = util.IdentitySet([])
672
eq_(empty.difference(empty), empty)
675
eq_(twin1.difference(twin2), empty)
676
eq_(twin2.difference(twin1), empty)
678
# totally different sets
679
unique1 = util.IdentitySet([1])
680
unique2 = util.IdentitySet([2])
681
eq_(unique1.difference(unique2), util.IdentitySet([1]))
682
eq_(unique2.difference(unique1), util.IdentitySet([2]))
685
not_an_identity_set = object()
686
assert_raises(TypeError, unique1.difference, not_an_identity_set)
688
def test_dunder_sub(self):
689
_, _, twin1, twin2, _, _ = self._create_sets()
692
set1 = util.IdentitySet([1, 2, 3])
693
set2 = util.IdentitySet([2, 3, 4])
694
eq_(set1 - set2, util.IdentitySet([1]))
695
eq_(set2 - set1, util.IdentitySet([4]))
698
empty = util.IdentitySet([])
699
eq_(empty - empty, empty)
702
eq_(twin1 - twin2, empty)
703
eq_(twin2 - twin1, empty)
705
# totally different sets
706
unique1 = util.IdentitySet([1])
707
unique2 = util.IdentitySet([2])
708
eq_(unique1 - unique2, util.IdentitySet([1]))
709
eq_(unique2 - unique1, util.IdentitySet([2]))
713
not_an_identity_set = object()
714
unique1 - not_an_identity_set
715
assert_raises(TypeError, should_raise)
717
def test_difference_update(self):
720
def test_dunder_isub(self):
723
def test_intersection(self):
724
super_, sub_, twin1, twin2, unique1, unique2 = self._create_sets()
727
eq_(sub_.intersection(super_), sub_)
728
eq_(super_.intersection(sub_), sub_)
731
eq_(twin1.intersection(twin2), twin1)
732
eq_(twin2.intersection(twin1), twin1)
735
empty = util.IdentitySet([])
736
eq_(empty.intersection(empty), empty)
738
# totally different sets
739
eq_(unique1.intersection(unique2), empty)
742
not_an_identity_set = object()
743
assert_raises(TypeError, unique1.intersection, not_an_identity_set)
745
def test_dunder_and(self):
746
super_, sub_, twin1, twin2, unique1, unique2 = self._create_sets()
749
eq_(sub_ & super_, sub_)
750
eq_(super_ & sub_, sub_)
753
eq_(twin1 & twin2, twin1)
754
eq_(twin2 & twin1, twin1)
757
empty = util.IdentitySet([])
758
eq_(empty & empty, empty)
760
# totally different sets
761
eq_(unique1 & unique2, empty)
765
not_an_identity_set = object()
766
return unique1 & not_an_identity_set
767
assert_raises(TypeError, should_raise)
769
def test_intersection_update(self):
772
def test_dunder_iand(self):
775
def test_symmetric_difference(self):
776
_, _, twin1, twin2, _, _ = self._create_sets()
779
set1 = util.IdentitySet([1, 2, 3])
780
set2 = util.IdentitySet([2, 3, 4])
781
eq_(set1.symmetric_difference(set2), util.IdentitySet([1, 4]))
782
eq_(set2.symmetric_difference(set1), util.IdentitySet([1, 4]))
785
empty = util.IdentitySet([])
786
eq_(empty.symmetric_difference(empty), empty)
789
eq_(twin1.symmetric_difference(twin2), empty)
790
eq_(twin2.symmetric_difference(twin1), empty)
792
# totally different sets
793
unique1 = util.IdentitySet([1])
794
unique2 = util.IdentitySet([2])
795
eq_(unique1.symmetric_difference(unique2), util.IdentitySet([1, 2]))
796
eq_(unique2.symmetric_difference(unique1), util.IdentitySet([1, 2]))
799
not_an_identity_set = object()
801
TypeError, unique1.symmetric_difference, not_an_identity_set)
803
def test_dunder_xor(self):
804
_, _, twin1, twin2, _, _ = self._create_sets()
807
set1 = util.IdentitySet([1, 2, 3])
808
set2 = util.IdentitySet([2, 3, 4])
809
eq_(set1 ^ set2, util.IdentitySet([1, 4]))
810
eq_(set2 ^ set1, util.IdentitySet([1, 4]))
813
empty = util.IdentitySet([])
814
eq_(empty ^ empty, empty)
817
eq_(twin1 ^ twin2, empty)
818
eq_(twin2 ^ twin1, empty)
820
# totally different sets
821
unique1 = util.IdentitySet([1])
822
unique2 = util.IdentitySet([2])
823
eq_(unique1 ^ unique2, util.IdentitySet([1, 2]))
824
eq_(unique2 ^ unique1, util.IdentitySet([1, 2]))
828
not_an_identity_set = object()
829
return unique1 ^ not_an_identity_set
830
assert_raises(TypeError, should_raise)
832
def test_symmetric_difference_update(self):
835
def _create_sets(self):
836
o1, o2, o3, o4, o5 = object(), object(), object(), object(), object()
837
super_ = util.IdentitySet([o1, o2, o3])
838
sub_ = util.IdentitySet([o2])
839
twin1 = util.IdentitySet([o3])
840
twin2 = util.IdentitySet([o3])
841
unique1 = util.IdentitySet([o4])
842
unique2 = util.IdentitySet([o5])
843
return super_, sub_, twin1, twin2, unique1, unique2
845
def _assert_unorderable_types(self, callable_):
847
#assert_raises_message(
848
# TypeError, 'unorderable types', callable_)
850
assert_raises_message(
851
TypeError, 'cannot compare sets using cmp()', callable_)
306
854
def test_basic_sanity(self):
307
855
IdentitySet = util.IdentitySet
616
1201
assert rt is sym1
617
1202
assert rt is sym2
619
class WeakIdentityMappingTest(fixtures.TestBase):
623
def _some_data(self, some=20):
624
return [self.Data() for _ in xrange(some)]
626
def _fixture(self, some=20):
627
data = self._some_data()
628
wim = util.WeakIdentityMapping()
629
for idx, obj in enumerate(data):
633
def test_delitem(self):
634
data, wim = self._fixture()
638
assert id(needle) in wim.by_id
639
eq_(wim[needle], wim.by_id[id(needle)])
643
assert needle not in wim
644
assert id(needle) not in wim.by_id
645
eq_(len(wim), (len(data) - 1))
649
assert needle not in wim
650
assert id(needle) not in wim.by_id
651
eq_(len(wim), len(data))
653
def test_setitem(self):
654
data, wim = self._fixture()
656
o1, oid1 = data[-1], id(data[-1])
659
assert oid1 in wim.by_id
660
eq_(wim[o1], wim.by_id[oid1])
661
id_keys = set(wim.by_id.keys())
665
assert oid1 in wim.by_id
666
eq_(wim[o1], wim.by_id[oid1])
667
eq_(set(wim.by_id.keys()), id_keys)
674
assert oid2 in wim.by_id
675
eq_(wim[o2], wim.by_id[oid2])
678
data, wim = self._fixture()
683
assert id(needle) in wim.by_id
684
eq_(wim[needle], wim.by_id[id(needle)])
685
eq_(len(wim), (len(data) + 1))
688
assert needle not in wim
689
assert id(needle) not in wim.by_id
690
eq_(len(wim), len(data))
692
def test_pop_default(self):
693
data, wim = self._fixture()
697
x = wim.pop(needle, 123)
700
assert needle not in wim
701
assert id(needle) not in wim.by_id
702
eq_(len(data), (len(wim) + 1))
708
assert id(n2) not in wim.by_id
709
eq_(len(data), (len(wim) + 1))
711
def test_popitem(self):
712
data, wim = self._fixture()
713
(needle, idx) = wim.popitem()
715
assert needle in data
716
eq_(len(data), (len(wim) + 1))
717
assert id(needle) not in wim.by_id
719
def test_setdefault(self):
720
data, wim = self._fixture()
727
res1 = wim.setdefault(o1, 123)
729
assert oid1 in wim.by_id
731
id_keys = set(wim.by_id.keys())
733
res2 = wim.setdefault(o1, 456)
735
assert oid1 in wim.by_id
737
assert set(wim.by_id.keys()) == id_keys
741
assert oid1 not in wim.by_id
742
ne_(set(wim.by_id.keys()), id_keys)
744
res3 = wim.setdefault(o1, 789)
746
assert oid1 in wim.by_id
748
eq_(set(wim.by_id.keys()), id_keys)
750
def test_clear(self):
751
data, wim = self._fixture()
753
assert len(data) == len(wim) == len(wim.by_id)
759
def test_update(self):
760
data, wim = self._fixture()
761
assert_raises(NotImplementedError, wim.update)
763
def test_weak_clear(self):
764
data, wim = self._fixture()
766
assert len(data) == len(wim) == len(wim.by_id)
773
eq_(wim._weakrefs, {})
775
def test_weak_single(self):
776
data, wim = self._fixture()
778
assert len(data) == len(wim) == len(wim.by_id)
784
assert len(data) == len(wim) == len(wim.by_id)
785
assert oid not in wim.by_id
787
def test_weak_threadhop(self):
788
data, wim = self._fixture()
791
cv = threading.Condition()
799
th = threading.Thread(target=empty, args=(data,))
809
eq_(wim._weakrefs, {})
1204
def test_bitflags(self):
1205
sym1 = util.symbol('sym1', canonical=1)
1206
sym2 = util.symbol('sym2', canonical=2)
1209
assert not sym1 & sym2
1210
assert not sym1 & sym1 & sym2
1212
def test_composites(self):
1213
sym1 = util.symbol('sym1', canonical=1)
1214
sym2 = util.symbol('sym2', canonical=2)
1215
sym3 = util.symbol('sym3', canonical=4)
1216
sym4 = util.symbol('sym4', canonical=8)
1218
assert sym1 & (sym2 | sym1 | sym4)
1219
assert not sym1 & (sym2 | sym3)
1221
assert not (sym1 | sym2) & (sym3 | sym4)
1222
assert (sym1 | sym2) & (sym2 | sym4)
812
1225
class TestFormatArgspec(fixtures.TestBase):
813
1227
def test_specs(self):
814
1228
def test(fn, wanted, grouped=None):
815
1229
if grouped is None:
821
1235
test(lambda: None,
822
1236
{'args': '()', 'self_arg': None,
823
'apply_kw': '()', 'apply_pos': '()' })
1237
'apply_kw': '()', 'apply_pos': '()'})
825
1239
test(lambda: None,
826
1240
{'args': '', 'self_arg': None,
827
'apply_kw': '', 'apply_pos': '' },
1241
'apply_kw': '', 'apply_pos': ''},
830
1244
test(lambda self: None,
831
1245
{'args': '(self)', 'self_arg': 'self',
832
'apply_kw': '(self)', 'apply_pos': '(self)' })
1246
'apply_kw': '(self)', 'apply_pos': '(self)'})
834
1248
test(lambda self: None,
835
1249
{'args': 'self', 'self_arg': 'self',
836
'apply_kw': 'self', 'apply_pos': 'self' },
1250
'apply_kw': 'self', 'apply_pos': 'self'},
839
1253
test(lambda *a: None,
840
1254
{'args': '(*a)', 'self_arg': 'a[0]',
841
'apply_kw': '(*a)', 'apply_pos': '(*a)' })
1255
'apply_kw': '(*a)', 'apply_pos': '(*a)'})
843
1257
test(lambda **kw: None,
844
1258
{'args': '(**kw)', 'self_arg': None,
845
'apply_kw': '(**kw)', 'apply_pos': '(**kw)' })
1259
'apply_kw': '(**kw)', 'apply_pos': '(**kw)'})
847
1261
test(lambda *a, **kw: None,
848
1262
{'args': '(*a, **kw)', 'self_arg': 'a[0]',
849
'apply_kw': '(*a, **kw)', 'apply_pos': '(*a, **kw)' })
1263
'apply_kw': '(*a, **kw)', 'apply_pos': '(*a, **kw)'})
851
1265
test(lambda a, *b: None,
852
1266
{'args': '(a, *b)', 'self_arg': 'a',
853
'apply_kw': '(a, *b)', 'apply_pos': '(a, *b)' })
1267
'apply_kw': '(a, *b)', 'apply_pos': '(a, *b)'})
855
1269
test(lambda a, **b: None,
856
1270
{'args': '(a, **b)', 'self_arg': 'a',
857
'apply_kw': '(a, **b)', 'apply_pos': '(a, **b)' })
1271
'apply_kw': '(a, **b)', 'apply_pos': '(a, **b)'})
859
1273
test(lambda a, *b, **c: None,
860
1274
{'args': '(a, *b, **c)', 'self_arg': 'a',
861
'apply_kw': '(a, *b, **c)', 'apply_pos': '(a, *b, **c)' })
1275
'apply_kw': '(a, *b, **c)', 'apply_pos': '(a, *b, **c)'})
863
1277
test(lambda a, b=1, **c: None,
864
1278
{'args': '(a, b=1, **c)', 'self_arg': 'a',
865
'apply_kw': '(a, b=b, **c)', 'apply_pos': '(a, b, **c)' })
1279
'apply_kw': '(a, b=b, **c)', 'apply_pos': '(a, b, **c)'})
867
1281
test(lambda a=1, b=2: None,
868
1282
{'args': '(a=1, b=2)', 'self_arg': 'a',
869
'apply_kw': '(a=a, b=b)', 'apply_pos': '(a, b)' })
1283
'apply_kw': '(a=a, b=b)', 'apply_pos': '(a, b)'})
871
1285
test(lambda a=1, b=2: None,
872
1286
{'args': 'a=1, b=2', 'self_arg': 'a',
873
'apply_kw': 'a=a, b=b', 'apply_pos': 'a, b' },
1287
'apply_kw': 'a=a, b=b', 'apply_pos': 'a, b'},
876
1290
@fails_if(lambda: util.pypy, "object.__init__ is introspectable")