5
import psycopg2.extensions
9
class ConnectionStub(object):
10
"""A `connection` wrapper allowing analysis of the `poll()` calls."""
11
def __init__(self, conn):
16
return self.conn.fileno()
23
class GreenTests(unittest.TestCase):
25
return psycopg2.connect(tests.dsn)
28
self._cb = psycopg2.extensions.get_wait_callback()
29
psycopg2.extensions.set_wait_callback(psycopg2.extras.wait_select)
32
psycopg2.extensions.set_wait_callback(self._cb)
34
def set_stub_wait_callback(self, conn):
35
stub = ConnectionStub(conn)
36
psycopg2.extensions.set_wait_callback(
37
lambda conn: psycopg2.extras.wait_select(stub))
40
def test_flush_on_write(self):
41
# a very large query requires a flush loop to be sent to the backend
43
stub = self.set_stub_wait_callback(conn)
45
for mb in 1, 5, 10, 20, 50:
46
size = mb * 1024 * 1024
48
curs.execute("select %s;", ('x' * size,))
49
self.assertEqual(size, len(curs.fetchone()[0]))
50
if stub.polls.count(psycopg2.extensions.POLL_WRITE) > 1:
53
self.fail("sending a large query didn't trigger block on write.")
55
def test_error_in_callback(self):
58
curs.execute("select 1") # have a BEGIN
61
# now try to do something that will fail in the callback
62
psycopg2.extensions.set_wait_callback(lambda conn: 1/0)
63
self.assertRaises(ZeroDivisionError, curs.execute, "select 2")
65
# check that the connection is left in an usable state
66
psycopg2.extensions.set_wait_callback(psycopg2.extras.wait_select)
68
curs.execute("select 2")
69
self.assertEqual(2, curs.fetchone()[0])
73
return unittest.TestLoader().loadTestsFromName(__name__)
75
if __name__ == "__main__":