~lifeless/storm/bug-620615

« back to all changes in this revision

Viewing changes to tests/store/base.py

Merging fixes from James.

Show diffs side-by-side

added added

removed removed

Lines of Context:
26
26
from storm.database import Result
27
27
from storm.properties import Int, Float, RawStr, Unicode, Property, Pickle
28
28
from storm.properties import PropertyPublisherMeta, Decimal
29
 
from storm.expr import Asc, Desc, Select, Func, LeftJoin, SQL
 
29
from storm.expr import Asc, Desc, Select, Func, LeftJoin, SQL, Or, And, Eq
30
30
from storm.variables import Variable, UnicodeVariable, IntVariable
31
31
from storm.info import get_obj_info, ClassAlias
32
32
from storm.exceptions import *
123
123
        self.store = None
124
124
        self.stores = []
125
125
        self.create_database()
 
126
        self.connection = self.database.connect()
126
127
        self.drop_tables()
127
128
        self.create_tables()
128
129
        self.create_sample_data()
133
134
        self.drop_sample_data()
134
135
        self.drop_tables()
135
136
        self.drop_database()
 
137
        self.connection.close()
136
138
 
137
139
    def create_database(self):
138
140
        raise NotImplementedError
141
143
        raise NotImplementedError
142
144
 
143
145
    def create_sample_data(self):
144
 
        connection = self.database.connect()
 
146
        connection = self.connection
145
147
        connection.execute("INSERT INTO foo (id, title)"
146
148
                           " VALUES (10, 'Title 30')")
147
149
        connection.execute("INSERT INTO foo (id, title)"
193
195
        pass
194
196
 
195
197
    def drop_tables(self):
196
 
        connection = self.database.connect()
197
198
        for table in ["foo", "bar", "bin", "link", "money", "selfref"]:
198
199
            try:
199
 
                connection.execute("DROP TABLE %s" % table)
200
 
                connection.commit()
 
200
                self.connection.execute("DROP TABLE %s" % table)
 
201
                self.connection.commit()
201
202
            except:
202
 
                connection.rollback()
 
203
                self.connection.rollback()
203
204
 
204
205
    def drop_database(self):
205
206
        pass
4518
4519
 
4519
4520
        self.assertEquals(result3.count(), 2)
4520
4521
 
 
4522
    def test_is_in_empty_result_set(self):
 
4523
        result1 = self.store.find(Foo, Foo.id < 10)
 
4524
        result2 = self.store.find(Foo, Or(Foo.id > 20, Foo.id.is_in(result1)))
 
4525
        self.assertEquals(result2.count(), 1)
 
4526
 
 
4527
    def test_is_in_empty_list(self):
 
4528
        result2 = self.store.find(Foo, Eq(False, And(True, Foo.id.is_in([]))))
 
4529
        self.assertEquals(result2.count(), 3)
 
4530
 
4521
4531
    def test_result_intersection(self):
4522
4532
        if self.__class__.__name__.startswith("MySQL"):
4523
4533
            return
4765
4775
 
4766
4776
    def setUp(self):
4767
4777
        self.create_database()
 
4778
        self.connection = self.database.connect()
4768
4779
        self.drop_tables()
4769
4780
        self.create_tables()
4770
4781
        self.create_store()
4775
4786
        self.drop_store()
4776
4787
        self.drop_tables()
4777
4788
        self.drop_database()
 
4789
        self.connection.close()
4778
4790
 
4779
4791
    def create_database(self):
4780
4792
        raise NotImplementedError
4790
4802
 
4791
4803
    def drop_tables(self):
4792
4804
        for table in ["foo", "bar", "bin", "link"]:
4793
 
            connection = self.database.connect()
4794
4805
            try:
4795
 
                connection.execute("DROP TABLE %s" % table)
4796
 
                connection.commit()
 
4806
                self.connection.execute("DROP TABLE %s" % table)
 
4807
                self.connection.commit()
4797
4808
            except:
4798
 
                connection.rollback()
 
4809
                self.connection.rollback()
4799
4810
 
4800
4811
    def drop_store(self):
4801
4812
        self.store.rollback()