~landscape/zope3/newer-from-ztk

« back to all changes in this revision

Viewing changes to src/twisted/test/test_reflector.py

  • Committer: Thomas Hervé
  • Date: 2009-07-08 13:52:04 UTC
  • Revision ID: thomas@canonical.com-20090708135204-df5eesrthifpylf8
Remove twisted copy

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
2
 
# See LICENSE for details.
3
 
 
4
 
 
5
 
"""Tests for twisted.enterprise reflectors."""
6
 
 
7
 
from twisted.trial import unittest
8
 
 
9
 
import os, random
10
 
 
11
 
from twisted.internet import reactor, interfaces, defer
12
 
from twisted.enterprise.row import RowObject
13
 
from twisted.enterprise.reflector import *
14
 
from twisted.enterprise.sqlreflector import SQLReflector
15
 
from twisted.enterprise import util
16
 
from twisted.test.test_adbapi import makeSQLTests
17
 
 
18
 
tableName = "testTable"
19
 
childTableName = "childTable"
20
 
 
21
 
class TestRow(RowObject):
22
 
    rowColumns = [("key_string",      "varchar"),
23
 
                  ("col2",            "int"),
24
 
                  ("another_column",  "varchar"),
25
 
                  ("Column4",         "varchar"),
26
 
                  ("column_5_",       "int")]
27
 
    rowKeyColumns = [("key_string", "varchar")]
28
 
    rowTableName  = tableName
29
 
 
30
 
class ChildRow(RowObject):
31
 
    rowColumns    = [("childId",  "int"),
32
 
                     ("foo",      "varchar"),
33
 
                     ("test_key", "varchar"),
34
 
                     ("stuff",    "varchar"),
35
 
                     ("gogogo",   "int"),
36
 
                     ("data",     "varchar")]
37
 
    rowKeyColumns = [("childId", "int")]
38
 
    rowTableName  = childTableName
39
 
    rowForeignKeys = [(tableName,
40
 
                       [("test_key","varchar")],
41
 
                       [("key_string","varchar")],
42
 
                       None, 1)]
43
 
 
44
 
main_table_schema = """
45
 
CREATE TABLE testTable (
46
 
  key_string     varchar(64),
47
 
  col2           integer,
48
 
  another_column varchar(64),
49
 
  Column4        varchar(64),
50
 
  column_5_      integer
51
 
)
52
 
"""
53
 
 
54
 
child_table_schema = """
55
 
CREATE TABLE childTable (
56
 
  childId        integer,
57
 
  foo            varchar(64),
58
 
  test_key       varchar(64),
59
 
  stuff          varchar(64),
60
 
  gogogo         integer,
61
 
  data           varchar(64)
62
 
)
63
 
"""
64
 
 
65
 
def randomizeRow(row, nulls_ok=True, trailing_spaces_ok=True):
66
 
    values = {}
67
 
    for name, type in row.rowColumns:
68
 
        if util.getKeyColumn(row, name):
69
 
            values[name] = getattr(row, name)
70
 
            continue
71
 
        elif nulls_ok and random.randint(0, 9) == 0:
72
 
            value = None # null
73
 
        elif type == 'int':
74
 
            value = random.randint(-10000, 10000)
75
 
        else:
76
 
            if random.randint(0, 9) == 0:
77
 
                value = ''
78
 
            else:
79
 
                value = ''.join(map(lambda i:chr(random.randrange(32,127)),
80
 
                                    xrange(random.randint(1, 64))))
81
 
            if not trailing_spaces_ok:
82
 
                value = value.rstrip()
83
 
        setattr(row, name, value)
84
 
        values[name] = value
85
 
    return values
86
 
 
87
 
def rowMatches(row, values):
88
 
    for name, type in row.rowColumns:
89
 
        if getattr(row, name) != values[name]:
90
 
            print ("Mismatch on column %s: |%s| (row) |%s| (values)" %
91
 
                   (name, getattr(row, name), values[name]))
92
 
            return False
93
 
    return True
94
 
 
95
 
class ReflectorTestBase:
96
 
    """Base class for testing reflectors."""
97
 
 
98
 
    if interfaces.IReactorThreads(reactor, None) is None:
99
 
        skip = "No thread support, no reflector tests"
100
 
 
101
 
    count = 100 # a parameter used for running iterative tests
102
 
 
103
 
    def randomizeRow(self, row):
104
 
        return randomizeRow(row, self.nulls_ok, self.trailing_spaces_ok)
105
 
 
106
 
    def setUp(self):
107
 
        d = self.createReflector()
108
 
        d.addCallback(self._cbSetUp)
109
 
        return d
110
 
 
111
 
    def _cbSetUp(self, reflector):
112
 
        self.reflector = reflector
113
 
 
114
 
    def tearDown(self):
115
 
        return self.destroyReflector()
116
 
 
117
 
    def destroyReflector(self):
118
 
        pass
119
 
 
120
 
    def testReflector(self):
121
 
        # create one row to work with
122
 
        row = TestRow()
123
 
        row.assignKeyAttr("key_string", "first")
124
 
        values = self.randomizeRow(row)
125
 
 
126
 
        # save it
127
 
        d = self.reflector.insertRow(row)
128
 
 
129
 
        def _loadBack(_):
130
 
            # now load it back in
131
 
            whereClause = [("key_string", EQUAL, "first")]
132
 
            d = self.reflector.loadObjectsFrom(tableName,
133
 
                                               whereClause=whereClause)
134
 
            return d.addCallback(self.gotData)
135
 
 
136
 
        def _getParent(_):
137
 
            # make sure it came back as what we saved
138
 
            self.failUnless(len(self.data) == 1, "no row")
139
 
            parent = self.data[0]
140
 
            self.failUnless(rowMatches(parent, values), "no match")
141
 
            return parent
142
 
 
143
 
        d.addCallback(_loadBack)
144
 
        d.addCallback(_getParent)
145
 
        d.addCallback(self._cbTestReflector)
146
 
        return d
147
 
 
148
 
    def _cbTestReflector(self, parent):
149
 
        # create some child rows
150
 
        test_values = {}
151
 
        inserts = []
152
 
        child_values = {}
153
 
        for i in range(0, self.num_iterations):
154
 
            row = ChildRow()
155
 
            row.assignKeyAttr("childId", i)
156
 
            values = self.randomizeRow(row)
157
 
            values['test_key'] = row.test_key = "first"
158
 
            child_values[i] = values
159
 
            inserts.append(self.reflector.insertRow(row))
160
 
            row = None
161
 
        #del inserts
162
 
        d = defer.gatherResults(inserts)
163
 
        values = [None]
164
 
 
165
 
        def _loadObjects(_):
166
 
            d = self.reflector.loadObjectsFrom(childTableName, parentRow=parent)
167
 
            return d.addCallback(self.gotData)
168
 
 
169
 
        def _checkLoadObjects(_):
170
 
            self.failUnless(len(self.data) == self.num_iterations,
171
 
                            "no rows on query")
172
 
            self.failUnless(len(parent.childRows) == self.num_iterations,
173
 
                            "did not load child rows: %d" % len(parent.childRows))
174
 
            for child in parent.childRows:
175
 
                self.failUnless(rowMatches(child, child_values[child.childId]),
176
 
                                "child %d does not match" % child.childId)
177
 
 
178
 
        def _checkLoadObjects2(_):
179
 
            self.failUnless(len(self.data) == self.num_iterations,
180
 
                            "no rows on query")
181
 
            self.failUnless(len(parent.childRows) == self.num_iterations,
182
 
                            "child rows added twice!: %d" % len(parent.childRows))
183
 
 
184
 
        def _changeParent(_):
185
 
            # now change the parent
186
 
            values[0] = self.randomizeRow(parent)
187
 
            return self.reflector.updateRow(parent)
188
 
 
189
 
        def _loadBack(_):
190
 
            # now load it back in
191
 
            whereClause = [("key_string", EQUAL, "first")]
192
 
            d = self.reflector.loadObjectsFrom(tableName, whereClause=whereClause)
193
 
            return d.addCallback(self.gotData)
194
 
 
195
 
        def _checkLoadBack(_):
196
 
            # make sure it came back as what we saved
197
 
            self.failUnless(len(self.data) == 1, "no row")
198
 
            parent = self.data[0]
199
 
            self.failUnless(rowMatches(parent, values[0]), "no match")
200
 
            # save parent
201
 
            test_values[parent.key_string] = values[0]
202
 
            parent = None
203
 
 
204
 
        def _saveMoreTestRows(_):
205
 
            # save some more test rows
206
 
            ds = []
207
 
            for i in range(0, self.num_iterations):
208
 
                row = TestRow()
209
 
                row.assignKeyAttr("key_string", "bulk%d"%i)
210
 
                test_values[row.key_string] = self.randomizeRow(row)
211
 
                ds.append(self.reflector.insertRow(row))
212
 
            return defer.gatherResults(ds)
213
 
 
214
 
        def _loadRowsBack(_):
215
 
            # now load them all back in
216
 
            d = self.reflector.loadObjectsFrom("testTable")
217
 
            return d.addCallback(self.gotData)
218
 
 
219
 
        def _checkRowsBack(_):
220
 
            # make sure they are the same
221
 
            self.failUnless(len(self.data) == self.num_iterations + 1,
222
 
                            "query did not get rows")
223
 
            for row in self.data:
224
 
                self.failUnless(rowMatches(row, test_values[row.key_string]),
225
 
                                "child %s does not match" % row.key_string)
226
 
 
227
 
        def _changeRows(_):
228
 
            # now change them all
229
 
            ds = []
230
 
            for row in self.data:
231
 
                test_values[row.key_string] = self.randomizeRow(row)
232
 
                ds.append(self.reflector.updateRow(row))
233
 
            d = defer.gatherResults(ds)
234
 
            return d.addCallback(_cbChangeRows)
235
 
 
236
 
        def _cbChangeRows(_):
237
 
            self.data = None
238
 
 
239
 
        def _deleteRows(_):
240
 
            # now delete them
241
 
            ds = []
242
 
            for row in self.data:
243
 
                ds.append(self.reflector.deleteRow(row))
244
 
            d = defer.gatherResults(ds)
245
 
            return d.addCallback(_cbChangeRows)
246
 
 
247
 
        def _checkRowsDeleted(_):
248
 
            self.failUnless(len(self.data) == 0, "rows were not deleted")
249
 
 
250
 
        d.addCallback(_loadObjects)
251
 
        d.addCallback(_checkLoadObjects)
252
 
        d.addCallback(_loadObjects)
253
 
        d.addCallback(_checkLoadObjects2)
254
 
        d.addCallback(_changeParent)
255
 
        d.addCallback(_loadBack)
256
 
        d.addCallback(_checkLoadBack)
257
 
        d.addCallback(_saveMoreTestRows)
258
 
        d.addCallback(_loadRowsBack)
259
 
        d.addCallback(_checkRowsBack)
260
 
        d.addCallback(_changeRows)
261
 
        d.addCallback(_loadRowsBack)
262
 
        d.addCallback(_checkRowsBack)
263
 
        d.addCallback(_deleteRows)
264
 
        d.addCallback(_loadRowsBack)
265
 
        d.addCallback(_checkRowsDeleted)
266
 
        return d
267
 
 
268
 
 
269
 
    def testSaveAndDelete(self):
270
 
        # create one row to work with
271
 
        row = TestRow()
272
 
        row.assignKeyAttr("key_string", "first")
273
 
        values = self.randomizeRow(row)
274
 
        # save it
275
 
        d = self.reflector.insertRow(row)
276
 
        def _deleteRow(_):
277
 
            # delete it
278
 
            return self.reflector.deleteRow(row)
279
 
        d.addCallback(_deleteRow)
280
 
        return d
281
 
 
282
 
 
283
 
    def gotData(self, data):
284
 
        self.data = data
285
 
 
286
 
ReflectorTestBase.timeout = 30.0
287
 
 
288
 
class SQLReflectorTestBase(ReflectorTestBase):
289
 
    """Base class for the SQL reflector."""
290
 
 
291
 
    def createReflector(self):
292
 
        self.startDB()
293
 
        self.dbpool = self.makePool()
294
 
        self.dbpool.start()
295
 
 
296
 
        if self.can_clear:
297
 
            d = self.dbpool.runOperation('DROP TABLE testTable')
298
 
            d.addCallback(lambda _:
299
 
                          self.dbpool.runOperation('DROP TABLE childTable'))
300
 
            d.addErrback(lambda _: None)
301
 
        else:
302
 
            d = defer.succeed(None)
303
 
 
304
 
        d.addCallback(lambda _: self.dbpool.runOperation(main_table_schema))
305
 
        d.addCallback(lambda _: self.dbpool.runOperation(child_table_schema))
306
 
        reflectorClass = self.escape_slashes and SQLReflector \
307
 
                         or NoSlashSQLReflector
308
 
        d.addCallback(lambda _:
309
 
                      reflectorClass(self.dbpool, [TestRow, ChildRow]))
310
 
        return d
311
 
 
312
 
    def destroyReflector(self):
313
 
        d = self.dbpool.runOperation('DROP TABLE testTable')
314
 
        d.addCallback(lambda _:
315
 
                      self.dbpool.runOperation('DROP TABLE childTable'))
316
 
        def close(_):
317
 
            self.dbpool.close()
318
 
            self.stopDB()
319
 
        d.addCallback(close)
320
 
        return d
321
 
 
322
 
# GadflyReflectorTestCase SQLiteReflectorTestCase PyPgSQLReflectorTestCase
323
 
# PsycopgReflectorTestCase MySQLReflectorTestCase FirebirdReflectorTestCase
324
 
makeSQLTests(SQLReflectorTestBase, 'ReflectorTestCase', globals())
325
 
 
326
 
class NoSlashSQLReflector(SQLReflector):
327
 
    """An sql reflector that only escapes single quotes."""
328
 
 
329
 
    def escape_string(self, text):
330
 
        return text.replace("'", "''")