141
141
conn.execute('SELECT * FROM incoming_sync')
143
143
def test_delete_db(self):
144
def init_stub(conn, put_timestamp):
145
conn.execute('CREATE TABLE test (one TEXT)')
146
conn.execute('CREATE TABLE test_stat (id TEXT)')
147
conn.execute('INSERT INTO test_stat (id) VALUES (?)',
149
conn.execute('INSERT INTO test (one) VALUES ("1")')
144
151
stub_called = [False]
145
def stub(*args, **kwargs):
152
def delete_stub(*a, **kw):
146
153
stub_called[0] = True
147
154
broker = DatabaseBroker(':memory:')
148
broker._initialize = stub
155
broker.db_type = 'test'
156
broker._initialize = init_stub
157
# Initializes a good broker for us
149
158
broker.initialize(normalize_timestamp('1'))
150
159
self.assert_(broker.conn is not None)
151
broker._delete_db = stub
160
broker._delete_db = delete_stub
152
161
stub_called[0] = False
153
162
broker.delete_db('2')
154
163
self.assert_(stub_called[0])
155
164
broker = DatabaseBroker(os.path.join(self.testdir, '1.db'))
156
broker._initialize = stub
165
broker.db_type = 'test'
166
broker._initialize = init_stub
157
167
broker.initialize(normalize_timestamp('1'))
158
broker._delete_db = stub
168
broker._delete_db = delete_stub
159
169
stub_called[0] = False
160
170
broker.delete_db('2')
161
171
self.assert_(stub_called[0])
172
# ensure that metadata was cleared
174
self.assert_(not any(v[0] for v in m2.itervalues()))
175
self.assert_(all(v[1] == normalize_timestamp('2')
176
for v in m2.itervalues()))
163
178
def test_get(self):
164
179
broker = DatabaseBroker(':memory:')