6
6
from sqlalchemy.orm.collections import collection
8
8
import sqlalchemy as sa
9
from test.lib import testing
10
9
from sqlalchemy import Integer, String, ForeignKey, text
11
from test.lib.schema import Table, Column
10
from sqlalchemy.testing.schema import Table, Column
12
11
from sqlalchemy import util, exc as sa_exc
13
12
from sqlalchemy.orm import create_session, mapper, relationship, \
14
13
attributes, instrumentation
15
from test.lib import fixtures
16
from test.lib.testing import eq_, assert_raises, assert_raises_message
14
from sqlalchemy.testing import fixtures
15
from sqlalchemy.testing import assert_raises, assert_raises_message
18
17
class Canary(sa.orm.interfaces.AttributeExtension):
19
18
def __init__(self):
79
78
instrumentation.register_class(Foo)
80
attributes.register_attribute(Foo, 'attr', uselist=True, extension=canary,
79
attributes.register_attribute(Foo, 'attr', uselist=True,
81
81
typecallable=typecallable, useobject=True)
119
119
canary = Canary()
120
120
instrumentation.register_class(Foo)
121
attributes.register_attribute(Foo, 'attr', uselist=True, extension=canary,
121
attributes.register_attribute(Foo, 'attr', uselist=True,
122
123
typecallable=typecallable, useobject=True)
155
156
if util.reduce(and_, [hasattr(direct, a) for a in
156
157
('__delitem__', 'insert', '__len__')], True):
157
158
values = [creator(), creator(), creator(), creator()]
158
direct[slice(0,1)] = values
159
control[slice(0,1)] = values
159
direct[slice(0, 1)] = values
160
control[slice(0, 1)] = values
162
163
values = [creator(), creator()]
163
direct[slice(0,-1,2)] = values
164
control[slice(0,-1,2)] = values
164
direct[slice(0, -1, 2)] = values
165
control[slice(0, -1, 2)] = values
167
168
values = [creator()]
168
direct[slice(0,-1)] = values
169
control[slice(0,-1)] = values
169
direct[slice(0, -1)] = values
170
control[slice(0, -1)] = values
172
values = [creator(),creator(),creator()]
173
values = [creator(), creator(), creator()]
173
174
control[:] = values
174
175
direct[:] = values
320
321
canary = Canary()
321
322
instrumentation.register_class(Foo)
322
attributes.register_attribute(Foo, 'attr', uselist=True, extension=canary,
323
attributes.register_attribute(Foo, 'attr', uselist=True,
323
325
typecallable=typecallable, useobject=True)
488
490
canary = Canary()
489
491
instrumentation.register_class(Foo)
490
attributes.register_attribute(Foo, 'attr', uselist=True, extension=canary,
492
attributes.register_attribute(Foo, 'attr', uselist=True,
491
494
typecallable=typecallable, useobject=True)
746
749
canary = Canary()
747
750
instrumentation.register_class(Foo)
748
attributes.register_attribute(Foo, 'attr', uselist=True, extension=canary,
751
attributes.register_attribute(Foo, 'attr', uselist=True,
749
753
typecallable=typecallable, useobject=True)
856
860
canary = Canary()
857
861
instrumentation.register_class(Foo)
858
attributes.register_attribute(Foo, 'attr', uselist=True, extension=canary,
862
attributes.register_attribute(Foo, 'attr', uselist=True,
859
864
typecallable=typecallable, useobject=True)
978
983
canary = Canary()
979
984
instrumentation.register_class(Foo)
980
attributes.register_attribute(Foo, 'attr', uselist=True, extension=canary,
985
attributes.register_attribute(Foo, 'attr', uselist=True,
981
987
typecallable=typecallable, useobject=True)
994
1000
obj.attr = like_me
995
1001
self.assert_(obj.attr is not direct)
996
1002
self.assert_(obj.attr is not like_me)
997
self.assert_(set(collections.collection_adapter(obj.attr)) == set([e2]))
1004
set(collections.collection_adapter(obj.attr)) == set([e2]))
998
1005
self.assert_(e1 in canary.removed)
999
1006
self.assert_(e2 in canary.added)
1002
# key validity on bulk assignment is a basic feature of MappedCollection
1003
# but is not present in basic, @converter-less dict collections.
1009
# key validity on bulk assignment is a basic feature of
1010
# MappedCollection but is not present in basic, @converter-less
1005
1013
if isinstance(obj.attr, collections.MappedCollection):
1006
1014
real_dict = dict(badkey=e3)
1035
1043
self.assert_(e4 not in canary.data)
1037
1045
def test_dict(self):
1039
self._test_adapter(dict, self.dictable_entity,
1040
to_set=lambda c: set(c.values()))
1042
except sa_exc.ArgumentError, e:
1043
self.assert_(e.args[0] == 'Type InstrumentedDict must elect an appender method to be a collection class')
1046
assert_raises_message(
1047
sa_exc.ArgumentError,
1048
'Type InstrumentedDict must elect an appender '
1049
'method to be a collection class',
1050
self._test_adapter, dict, self.dictable_entity,
1051
to_set=lambda c: set(c.values())
1046
self._test_dict(dict)
1048
except sa_exc.ArgumentError, e:
1049
self.assert_(e.args[0] == 'Type InstrumentedDict must elect an appender method to be a collection class')
1054
assert_raises_message(
1055
sa_exc.ArgumentError,
1056
'Type InstrumentedDict must elect an appender method '
1057
'to be a collection class',
1058
self._test_dict, dict
1051
1061
def test_dict_subclass(self):
1052
1062
class MyDict(dict):
1088
1098
self._test_dict_bulk(MyOrdered)
1089
1099
self.assert_(getattr(MyOrdered, '_sa_instrumented') == id(MyOrdered))
1101
def test_dict_subclass4(self):
1103
class MyDict(collections.MappedCollection):
1105
super(MyDict, self).__init__(lambda value: "k%d" % value)
1107
@collection.converter
1108
def _convert(self, dictlike):
1109
for key, value in dictlike.iteritems():
1117
instrumentation.register_class(Foo)
1118
attributes.register_attribute(Foo, 'attr', uselist=True,
1120
typecallable=MyDict, useobject=True)
1123
f.attr = {"k1": 1, "k2": 2}
1125
eq_(f.attr, {'k7': 7, 'k6': 6})
1091
1127
def test_dict_duck(self):
1092
1128
class DictLike(object):
1093
1129
def __init__(self):
1177
1213
canary = Canary()
1178
1214
instrumentation.register_class(Foo)
1179
attributes.register_attribute(Foo, 'attr', uselist=True, extension=canary,
1215
attributes.register_attribute(Foo, 'attr', uselist=True,
1180
1217
typecallable=typecallable, useobject=True)
1313
1350
canary = Canary()
1314
1351
instrumentation.register_class(Foo)
1315
attributes.register_attribute(Foo, 'attr', uselist=True, extension=canary,
1352
attributes.register_attribute(Foo, 'attr', uselist=True,
1316
1354
typecallable=Custom, useobject=True)
1382
1420
canary = Canary()
1383
1421
creator = self.entity_maker
1384
1422
instrumentation.register_class(Foo)
1385
attributes.register_attribute(Foo, 'attr', uselist=True, extension=canary, useobject=True)
1423
attributes.register_attribute(Foo, 'attr', uselist=True,
1424
extension=canary, useobject=True)
1388
1427
col1 = obj.attr
1414
1453
def define_tables(cls, metadata):
1415
1454
Table('parents', metadata,
1416
Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
1455
Column('id', Integer, primary_key=True,
1456
test_needs_autoincrement=True),
1417
1457
Column('label', String(128)))
1418
1458
Table('children', metadata,
1419
Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
1459
Column('id', Integer, primary_key=True,
1460
test_needs_autoincrement=True),
1420
1461
Column('parent_id', Integer, ForeignKey('parents.id'),
1421
1462
nullable=False),
1422
1463
Column('a', String(128)),
1472
1513
self.assert_(set(p.children.keys()) == set(['foo', 'bar']))
1473
1514
self.assert_(p.children['foo'].id != cid)
1475
self.assert_(len(list(collections.collection_adapter(p.children))) == 2)
1517
len(list(collections.collection_adapter(p.children))) == 2)
1476
1518
session.flush()
1477
1519
session.expunge_all()
1479
1521
p = session.query(Parent).get(pid)
1480
self.assert_(len(list(collections.collection_adapter(p.children))) == 2)
1523
len(list(collections.collection_adapter(p.children))) == 2)
1482
1525
collections.collection_adapter(p.children).remove_with_event(
1483
1526
p.children['foo'])
1485
self.assert_(len(list(collections.collection_adapter(p.children))) == 1)
1529
len(list(collections.collection_adapter(p.children))) == 1)
1486
1530
session.flush()
1487
1531
session.expunge_all()
1489
1533
p = session.query(Parent).get(pid)
1490
self.assert_(len(list(collections.collection_adapter(p.children))) == 1)
1535
len(list(collections.collection_adapter(p.children))) == 1)
1492
1537
del p.children['bar']
1493
self.assert_(len(list(collections.collection_adapter(p.children))) == 0)
1539
len(list(collections.collection_adapter(p.children))) == 0)
1494
1540
session.flush()
1495
1541
session.expunge_all()
1497
1543
p = session.query(Parent).get(pid)
1498
self.assert_(len(list(collections.collection_adapter(p.children))) == 0)
1545
len(list(collections.collection_adapter(p.children))) == 0)
1501
1548
def _test_composite_mapped(self, collection_class):
1523
1570
p = session.query(Parent).get(pid)
1525
self.assert_(set(p.children.keys()) == set([('foo', '1'), ('foo', '2')]))
1573
set(p.children.keys()) == set([('foo', '1'), ('foo', '2')]))
1526
1574
cid = p.children[('foo', '1')].id
1528
1576
collections.collection_adapter(p.children).append_with_event(
1534
1582
p = session.query(Parent).get(pid)
1536
self.assert_(set(p.children.keys()) == set([('foo', '1'), ('foo', '2')]))
1585
set(p.children.keys()) == set([('foo', '1'), ('foo', '2')]))
1537
1586
self.assert_(p.children[('foo', '1')].id != cid)
1539
self.assert_(len(list(collections.collection_adapter(p.children))) == 2)
1589
len(list(collections.collection_adapter(p.children))) == 2)
1541
1591
def test_mapped_collection(self):
1542
1592
collection_class = collections.mapped_collection(lambda c: c.a)
1551
1601
self._test_scalar_mapped(collection_class)
1553
1603
def test_declarative_column_mapped(self):
1554
"""test that uncompiled attribute usage works with column_mapped_collection"""
1604
"""test that uncompiled attribute usage works with
1605
column_mapped_collection"""
1556
1607
from sqlalchemy.ext.declarative import declarative_base
1644
1695
Bar = self.classes.Bar
1645
1696
bar = self.tables["x.bar"]
1646
1697
mapper(Foo, self.tables.foo, properties={
1647
"foo_id":self.tables.foo.c.id
1698
"foo_id": self.tables.foo.c.id
1649
1700
mapper(Bar, bar, inherits=Foo, properties={
1653
1704
bar_spec = Bar(foo_id=1, bar_id=2, bat_id=3)
1671
1722
def _run_test(self, specs):
1672
from test.lib.util import picklers
1723
from sqlalchemy.testing.util import picklers
1673
1724
for spec, obj, expected in specs:
1674
1725
coll = collections.column_mapped_collection(spec)()
1690
1741
def define_tables(cls, metadata):
1691
1742
Table('sometable', metadata,
1692
Column('col1',Integer, primary_key=True, test_needs_autoincrement=True),
1743
Column('col1', Integer, primary_key=True,
1744
test_needs_autoincrement=True),
1693
1745
Column('data', String(30)))
1694
1746
Table('someothertable', metadata,
1695
Column('col1', Integer, primary_key=True, test_needs_autoincrement=True),
1747
Column('col1', Integer, primary_key=True,
1748
test_needs_autoincrement=True),
1696
1749
Column('scol1', Integer,
1697
1750
ForeignKey('sometable.col1')),
1698
1751
Column('data', String(20)))
1700
1753
def test_basic(self):
1701
someothertable, sometable = self.tables.someothertable, self.tables.sometable
1754
someothertable, sometable = self.tables.someothertable, \
1755
self.tables.sometable
1703
1757
class MyList(list):
1710
1764
mapper(Foo, sometable, properties={
1711
'bars':relationship(Bar, collection_class=MyList)
1765
'bars': relationship(Bar, collection_class=MyList)
1713
1767
mapper(Bar, someothertable)
1717
1771
def test_lazyload(self):
1718
1772
"""test that a 'set' can be used as a collection and can lazyload."""
1720
someothertable, sometable = self.tables.someothertable, self.tables.sometable
1774
someothertable, sometable = self.tables.someothertable, \
1775
self.tables.sometable
1722
1777
class Foo(object):
1724
1779
class Bar(object):
1726
1781
mapper(Foo, sometable, properties={
1727
'bars':relationship(Bar, collection_class=set)
1782
'bars': relationship(Bar, collection_class=set)
1729
1784
mapper(Bar, someothertable)
1741
1796
def test_dict(self):
1742
1797
"""test that a 'dict' can be used as a collection and can lazyload."""
1744
someothertable, sometable = self.tables.someothertable, self.tables.sometable
1799
someothertable, sometable = self.tables.someothertable, \
1800
self.tables.sometable
1747
1803
class Foo(object):
1758
1814
del self[id(item)]
1760
1816
mapper(Foo, sometable, properties={
1761
'bars':relationship(Bar, collection_class=AppenderDict)
1817
'bars': relationship(Bar, collection_class=AppenderDict)
1763
1819
mapper(Bar, someothertable)
1775
1831
def test_dict_wrapper(self):
1776
"""test that the supplied 'dict' wrapper can be used as a collection and can lazyload."""
1832
"""test that the supplied 'dict' wrapper can be used as a
1833
collection and can lazyload."""
1778
someothertable, sometable = self.tables.someothertable, self.tables.sometable
1835
someothertable, sometable = self.tables.someothertable, \
1836
self.tables.sometable
1781
1839
class Foo(object):
1850
1908
self._test_list(ListLike)
1852
1910
def _test_list(self, listcls):
1853
someothertable, sometable = self.tables.someothertable, self.tables.sometable
1911
someothertable, sometable = self.tables.someothertable, \
1912
self.tables.sometable
1855
1914
class Parent(object):
1860
1919
mapper(Parent, sometable, properties={
1861
'children':relationship(Child, collection_class=listcls)
1920
'children': relationship(Child, collection_class=listcls)
1863
1922
mapper(Child, someothertable)
1973
2032
assert control == list(p.children)
1975
2034
def test_custom(self):
1976
someothertable, sometable = self.tables.someothertable, self.tables.sometable
2035
someothertable, sometable = self.tables.someothertable, \
2036
self.tables.sometable
1978
2038
class Parent(object):
1994
2054
return iter(self.data)
1996
2056
mapper(Parent, sometable, properties={
1997
'children':relationship(Child, collection_class=MyCollection)
2057
'children': relationship(Child, collection_class=MyCollection)
1999
2059
mapper(Child, someothertable)
2039
2099
assert not hasattr(Touchy, 'no_touch')
2040
2100
assert 'no_touch' in dir(Touchy)
2042
instrumented = collections._instrument_class(Touchy)
2102
collections._instrument_class(Touchy)
2104
def test_name_setup(self):
2107
@collection.iterator
2108
def base_iterate(self, x):
2109
return "base_iterate"
2111
@collection.appender
2112
def base_append(self, x):
2113
return "base_append"
2115
@collection.converter
2116
def base_convert(self, x):
2117
return "base_convert"
2120
def base_remove(self, x):
2121
return "base_remove"
2124
from sqlalchemy.orm.collections import _instrument_class
2125
_instrument_class(Base)
2127
eq_(Base._sa_remover(Base(), 5), "base_remove")
2128
eq_(Base._sa_appender(Base(), 5), "base_append")
2129
eq_(Base._sa_iterator(Base(), 5), "base_iterate")
2130
eq_(Base._sa_converter(Base(), 5), "base_convert")
2133
@collection.converter
2134
def base_convert(self, x):
2135
return "sub_convert"
2138
def sub_remove(self, x):
2140
_instrument_class(Sub)
2142
eq_(Sub._sa_appender(Sub(), 5), "base_append")
2143
eq_(Sub._sa_remover(Sub(), 5), "sub_remove")
2144
eq_(Sub._sa_iterator(Sub(), 5), "base_iterate")
2145
eq_(Sub._sa_converter(Sub(), 5), "sub_convert")
2147
def test_link_event(self):
2149
class Collection(list):
2151
def _on_link(self, obj):
2157
instrumentation.register_class(Foo)
2158
attributes.register_attribute(Foo, 'attr', uselist=True,
2159
typecallable=Collection, useobject=True)
2164
eq_(canary, [f1.attr._sa_adapter])
2165
adapter_1 = f1.attr._sa_adapter
2169
eq_(canary, [adapter_1, f1.attr._sa_adapter, None])