1
# Copyright (c) 2001-2008 Twisted Matrix Laboratories.
2
# See LICENSE for details.
5
Tests for twisted.enterprise reflectors.
10
from twisted.internet import reactor, interfaces, defer
11
from twisted.enterprise.row import RowObject
12
from twisted.enterprise.reflector import EQUAL
13
from twisted.enterprise.sqlreflector import SQLReflector
14
from twisted.enterprise import util
15
from twisted.test.test_adbapi import makeSQLTests
16
from twisted.trial.util import suppress as suppressWarning
17
from twisted.trial import unittest
20
tableName = "testTable"
21
childTableName = "childTable"
24
class TestRow(RowObject):
25
rowColumns = [("key_string", "varchar"),
27
("another_column", "varchar"),
28
("Column4", "varchar"),
30
rowKeyColumns = [("key_string", "varchar")]
31
rowTableName = tableName
34
class ChildRow(RowObject):
35
rowColumns = [("childId", "int"),
37
("test_key", "varchar"),
41
rowKeyColumns = [("childId", "int")]
42
rowTableName = childTableName
43
rowForeignKeys = [(tableName,
44
[("test_key","varchar")],
45
[("key_string","varchar")],
49
main_table_schema = """
50
CREATE TABLE testTable (
51
key_string varchar(64),
53
another_column varchar(64),
59
child_table_schema = """
60
CREATE TABLE childTable (
71
def randomizeRow(row, nulls_ok=True, trailing_spaces_ok=True):
73
for name, type in row.rowColumns:
74
if util.getKeyColumn(row, name):
75
values[name] = getattr(row, name)
77
elif nulls_ok and random.randint(0, 9) == 0:
80
value = random.randint(-10000, 10000)
82
if random.randint(0, 9) == 0:
85
value = ''.join(map(lambda i:chr(random.randrange(32,127)),
86
xrange(random.randint(1, 64))))
87
if not trailing_spaces_ok:
88
value = value.rstrip()
89
setattr(row, name, value)
94
def rowMatches(row, values):
95
for name, type in row.rowColumns:
96
if getattr(row, name) != values[name]:
97
print ("Mismatch on column %s: |%s| (row) |%s| (values)" %
98
(name, getattr(row, name), values[name]))
103
rowObjectSuppression = suppressWarning(
104
message="twisted.enterprise.row is deprecated since Twisted 8.0",
105
category=DeprecationWarning)
108
reflectorSuppression = suppressWarning(
109
message="twisted.enterprise.reflector is deprecated since Twisted 8.0",
110
category=DeprecationWarning)
113
class ReflectorTestBase:
115
Base class for testing reflectors.
117
@ivar reflector: The reflector created during setup.
120
if interfaces.IReactorThreads(reactor, None) is None:
121
skip = "No thread support, no reflector tests"
123
count = 100 # a parameter used for running iterative tests
125
def randomizeRow(self, row):
126
return randomizeRow(row, self.nulls_ok, self.trailing_spaces_ok)
128
def extraSetUp(self):
130
Create and store a reference to a SQL reflector for use by the tests.
132
d = self.createReflector()
133
d.addCallback(self._cbSetUp)
136
def _cbSetUp(self, reflector):
137
self.reflector = reflector
140
return self.destroyReflector()
142
def destroyReflector(self):
145
def test_reflector(self):
147
Full featured tests of reflector.
149
# create one row to work with
151
row.assignKeyAttr("key_string", "first")
152
values = self.randomizeRow(row)
155
d = self.reflector.insertRow(row)
158
# now load it back in
159
whereClause = [("key_string", EQUAL, "first")]
160
d = self.reflector.loadObjectsFrom(tableName,
161
whereClause=whereClause)
162
return d.addCallback(self.gotData)
165
# make sure it came back as what we saved
166
self.failUnless(len(self.data) == 1, "no row")
167
parent = self.data[0]
168
self.failUnless(rowMatches(parent, values), "no match")
171
d.addCallback(_loadBack)
172
d.addCallback(_getParent)
173
d.addCallback(self._cbTestReflector)
175
test_reflector.suppress = [rowObjectSuppression, reflectorSuppression]
177
def _cbTestReflector(self, parent):
178
# create some child rows
182
for i in range(0, self.num_iterations):
184
row.assignKeyAttr("childId", i)
185
values = self.randomizeRow(row)
186
values['test_key'] = row.test_key = "first"
187
child_values[i] = values
188
inserts.append(self.reflector.insertRow(row))
191
d = defer.gatherResults(inserts)
195
d = self.reflector.loadObjectsFrom(childTableName, parentRow=parent)
196
return d.addCallback(self.gotData)
198
def _checkLoadObjects(_):
199
self.failUnless(len(self.data) == self.num_iterations,
201
self.failUnless(len(parent.childRows) == self.num_iterations,
202
"did not load child rows: %d" % len(parent.childRows))
203
for child in parent.childRows:
204
self.failUnless(rowMatches(child, child_values[child.childId]),
205
"child %d does not match" % child.childId)
207
def _checkLoadObjects2(_):
208
self.failUnless(len(self.data) == self.num_iterations,
210
self.failUnless(len(parent.childRows) == self.num_iterations,
211
"child rows added twice!: %d" % len(parent.childRows))
213
def _changeParent(_):
214
# now change the parent
215
values[0] = self.randomizeRow(parent)
216
return self.reflector.updateRow(parent)
219
# now load it back in
220
whereClause = [("key_string", EQUAL, "first")]
221
d = self.reflector.loadObjectsFrom(tableName, whereClause=whereClause)
222
return d.addCallback(self.gotData)
224
def _checkLoadBack(_):
225
# make sure it came back as what we saved
226
self.failUnless(len(self.data) == 1, "no row")
227
parent = self.data[0]
228
self.failUnless(rowMatches(parent, values[0]), "no match")
230
test_values[parent.key_string] = values[0]
233
def _saveMoreTestRows(_):
234
# save some more test rows
236
for i in range(0, self.num_iterations):
238
row.assignKeyAttr("key_string", "bulk%d"%i)
239
test_values[row.key_string] = self.randomizeRow(row)
240
ds.append(self.reflector.insertRow(row))
241
return defer.gatherResults(ds)
243
def _loadRowsBack(_):
244
# now load them all back in
245
d = self.reflector.loadObjectsFrom("testTable")
246
return d.addCallback(self.gotData)
248
def _checkRowsBack(_):
249
# make sure they are the same
250
self.failUnless(len(self.data) == self.num_iterations + 1,
251
"query did not get rows")
252
for row in self.data:
253
self.failUnless(rowMatches(row, test_values[row.key_string]),
254
"child %s does not match" % row.key_string)
257
# now change them all
259
for row in self.data:
260
test_values[row.key_string] = self.randomizeRow(row)
261
ds.append(self.reflector.updateRow(row))
262
d = defer.gatherResults(ds)
263
return d.addCallback(_cbChangeRows)
265
def _cbChangeRows(_):
271
for row in self.data:
272
ds.append(self.reflector.deleteRow(row))
273
d = defer.gatherResults(ds)
274
return d.addCallback(_cbChangeRows)
276
def _checkRowsDeleted(_):
277
self.failUnless(len(self.data) == 0, "rows were not deleted")
279
d.addCallback(_loadObjects)
280
d.addCallback(_checkLoadObjects)
281
d.addCallback(_loadObjects)
282
d.addCallback(_checkLoadObjects2)
283
d.addCallback(_changeParent)
284
d.addCallback(_loadBack)
285
d.addCallback(_checkLoadBack)
286
d.addCallback(_saveMoreTestRows)
287
d.addCallback(_loadRowsBack)
288
d.addCallback(_checkRowsBack)
289
d.addCallback(_changeRows)
290
d.addCallback(_loadRowsBack)
291
d.addCallback(_checkRowsBack)
292
d.addCallback(_deleteRows)
293
d.addCallback(_loadRowsBack)
294
d.addCallback(_checkRowsDeleted)
298
def test_saveAndDelete(self):
300
Create a row and then try to delete it.
302
# create one row to work with
304
row.assignKeyAttr("key_string", "first")
305
values = self.randomizeRow(row)
307
d = self.reflector.insertRow(row)
310
return self.reflector.deleteRow(row)
311
d.addCallback(_deleteRow)
313
test_saveAndDelete.suppress = [rowObjectSuppression, reflectorSuppression]
316
def gotData(self, data):
320
ReflectorTestBase.timeout = 30.0
323
class SQLReflectorTestBase(ReflectorTestBase):
325
Base class for the SQL reflector.
328
def createReflector(self):
330
self.dbpool = self.makePool()
334
d = self.dbpool.runOperation('DROP TABLE testTable')
335
d.addCallback(lambda _:
336
self.dbpool.runOperation('DROP TABLE childTable'))
337
d.addErrback(lambda _: None)
339
d = defer.succeed(None)
341
d.addCallback(lambda _: self.dbpool.runOperation(main_table_schema))
342
d.addCallback(lambda _: self.dbpool.runOperation(child_table_schema))
343
reflectorClass = self.escape_slashes and SQLReflector \
344
or NoSlashSQLReflector
345
d.addCallback(lambda _:
346
reflectorClass(self.dbpool, [TestRow, ChildRow]))
349
def destroyReflector(self):
350
d = self.dbpool.runOperation('DROP TABLE testTable')
351
d.addCallback(lambda _:
352
self.dbpool.runOperation('DROP TABLE childTable'))
360
class DeprecationTestCase(unittest.TestCase):
362
Test various deprecations of twisted.enterprise.
365
def test_rowDeprecation(self):
367
Test deprecation of L{RowObject}.
371
self.assertWarns(DeprecationWarning,
372
"twisted.enterprise.row is deprecated since Twisted 8.0",
376
def test_reflectorDeprecation(self):
378
Test deprecation of L{SQLReflector}.
381
return SQLReflector(None, ())
382
from twisted.enterprise import sqlreflector
383
self.assertWarns(DeprecationWarning,
384
"twisted.enterprise.reflector is deprecated since Twisted 8.0",
385
sqlreflector.__file__,
389
# GadflyReflectorTestCase SQLiteReflectorTestCase PyPgSQLReflectorTestCase
390
# PsycopgReflectorTestCase MySQLReflectorTestCase FirebirdReflectorTestCase
391
makeSQLTests(SQLReflectorTestBase, 'ReflectorTestCase', globals())
394
class NoSlashSQLReflector(SQLReflector):
396
An sql reflector that only escapes single quotes.
399
def escape_string(self, text):
400
return text.replace("'", "''")