~launchpad-committers/storm/lp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
# -*- encoding: utf-8 -*-
#
# Copyright (c) 2006, 2007 Canonical
#
# Written by Gustavo Niemeyer <gustavo@niemeyer.net>
#
# This file is part of Storm Object Relational Mapper.
#
# Storm is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation; either version 2.1 of
# the License, or (at your option) any later version.
#
# Storm is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
from __future__ import print_function

from datetime import datetime, date, time, timedelta
import shutil
import sys
import os

import six
from six.moves import cPickle as pickle

from storm.uri import URI
from storm.expr import Select, Column, SQLToken, SQLRaw, Count, Alias
from storm.variables import (Variable, PickleVariable, BytesVariable,
                             DecimalVariable, DateTimeVariable, DateVariable,
                             TimeVariable, TimeDeltaVariable)
from storm.database import *
from storm.xid import Xid
from storm.event import EventSystem
from storm.exceptions import (
    DatabaseError, DatabaseModuleError, ConnectionBlockedError,
    DisconnectionError, Error, OperationalError, ProgrammingError)
from storm.tests.databases.proxy import ProxyTCPServer
from storm.tests.helper import MakePath


class Marker(object):
    pass

marker = Marker()


class DatabaseTest(object):

    supports_microseconds = True

    def setUp(self):
        super(DatabaseTest, self).setUp()
        self.create_database()
        self.create_connection()
        self.drop_tables()
        self.create_tables()
        self.create_sample_data()

    def tearDown(self):
        self.drop_sample_data()
        self.drop_tables()
        self.drop_connection()
        self.drop_database()
        super(DatabaseTest, self).tearDown()

    def create_database(self):
        raise NotImplementedError

    def create_connection(self):
        self.connection = self.database.connect()

    def create_tables(self):
        raise NotImplementedError

    def create_sample_data(self):
        self.connection.execute("INSERT INTO number VALUES (1, 2, 3)")
        self.connection.execute("INSERT INTO test VALUES (10, 'Title 10')")
        self.connection.execute("INSERT INTO test VALUES (20, 'Title 20')")
        self.connection.commit()

    def drop_sample_data(self):
        pass

    def drop_tables(self):
        for table in ["number", "test", "datetime_test", "bin_test"]:
            try:
                self.connection.execute("DROP TABLE " + table)
                self.connection.commit()
            except:
                self.connection.rollback()

    def drop_connection(self):
        self.connection.close()

    def drop_database(self):
        pass

    def test_create(self):
        self.assertTrue(isinstance(self.database, Database))

    def test_get_uri(self):
        """
        The get_uri() method returns the URI the database with created with.
        """
        uri = self.database.get_uri()
        self.assertIsNotNone(uri.scheme)

    def test_connection(self):
        self.assertTrue(isinstance(self.connection, Connection))

    def test_rollback(self):
        self.connection.execute("INSERT INTO test VALUES (30, 'Title 30')")
        self.connection.rollback()
        result = self.connection.execute("SELECT id FROM test WHERE id=30")
        self.assertFalse(result.get_one())

    def test_rollback_twice(self):
        self.connection.execute("INSERT INTO test VALUES (30, 'Title 30')")
        self.connection.rollback()
        self.connection.rollback()
        result = self.connection.execute("SELECT id FROM test WHERE id=30")
        self.assertFalse(result.get_one())

    def test_commit(self):
        self.connection.execute("INSERT INTO test VALUES (30, 'Title 30')")
        self.connection.commit()
        self.connection.rollback()
        result = self.connection.execute("SELECT id FROM test WHERE id=30")
        self.assertTrue(result.get_one())

    def test_commit_twice(self):
        self.connection.execute("INSERT INTO test VALUES (30, 'Title 30')")
        self.connection.commit()
        self.connection.commit()
        result = self.connection.execute("SELECT id FROM test WHERE id=30")
        self.assertTrue(result.get_one())

    def test_execute_result(self):
        result = self.connection.execute("SELECT 1")
        self.assertTrue(isinstance(result, Result))
        self.assertTrue(result.get_one())

    def test_execute_unicode_result(self):
        result = self.connection.execute(u"SELECT title FROM test")
        self.assertTrue(isinstance(result, Result))
        row = result.get_one()
        self.assertEqual(row, ("Title 10",))
        self.assertTrue(isinstance(row[0], six.text_type))

    def test_execute_params(self):
        result = self.connection.execute("SELECT one FROM number "
                                         "WHERE 1=?", (1,))
        self.assertTrue(result.get_one())
        result = self.connection.execute("SELECT one FROM number "
                                         "WHERE 1=?", (2,))
        self.assertFalse(result.get_one())

    def test_execute_empty_params(self):
        result = self.connection.execute("SELECT one FROM number", ())
        self.assertTrue(result.get_one())

    def test_execute_expression(self):
        result = self.connection.execute(Select(1))
        self.assertTrue(result.get_one(), (1,))

    def test_execute_expression_empty_params(self):
        result = self.connection.execute(Select(SQLRaw("1")))
        self.assertTrue(result.get_one(), (1,))

    def test_get_one(self):
        result = self.connection.execute("SELECT * FROM test ORDER BY id")
        self.assertEqual(result.get_one(), (10, "Title 10"))

    def test_get_all(self):
        result = self.connection.execute("SELECT * FROM test ORDER BY id")
        self.assertEqual(result.get_all(),
                         [(10, "Title 10"), (20, "Title 20")])

    def test_iter(self):
        result = self.connection.execute("SELECT * FROM test ORDER BY id")
        self.assertEqual([item for item in result],
                         [(10, "Title 10"), (20, "Title 20")])

    def test_simultaneous_iter(self):
        result1 = self.connection.execute("SELECT * FROM test "
                                          "ORDER BY id ASC")
        result2 = self.connection.execute("SELECT * FROM test "
                                          "ORDER BY id DESC")
        iter1 = iter(result1)
        iter2 = iter(result2)
        self.assertEqual(next(iter1), (10, "Title 10"))
        self.assertEqual(next(iter2), (20, "Title 20"))
        self.assertEqual(next(iter1), (20, "Title 20"))
        self.assertEqual(next(iter2), (10, "Title 10"))
        self.assertRaises(StopIteration, next, iter1)
        self.assertRaises(StopIteration, next, iter2)

    def test_get_insert_identity(self):
        result = self.connection.execute("INSERT INTO test (title) "
                                         "VALUES ('Title 30')")
        primary_key = (Column("id", SQLToken("test")),)
        primary_variables = (Variable(),)
        expr = result.get_insert_identity(primary_key, primary_variables)
        select = Select(Column("title", SQLToken("test")), expr)
        result = self.connection.execute(select)
        self.assertEqual(result.get_one(), ("Title 30",))

    def test_get_insert_identity_composed(self):
        result = self.connection.execute("INSERT INTO test (title) "
                                         "VALUES ('Title 30')")
        primary_key = (Column("id", SQLToken("test")),
                       Column("title", SQLToken("test")))
        primary_variables = (Variable(), Variable(u"Title 30"))
        expr = result.get_insert_identity(primary_key, primary_variables)
        select = Select(Column("title", SQLToken("test")), expr)
        result = self.connection.execute(select)
        self.assertEqual(result.get_one(), ("Title 30",))

    def test_datetime(self):
        value = datetime(1977, 4, 5, 12, 34, 56, 78)
        self.connection.execute("INSERT INTO datetime_test (dt) VALUES (?)",
                                (value,))
        result = self.connection.execute("SELECT dt FROM datetime_test")
        variable = DateTimeVariable()
        result.set_variable(variable, result.get_one()[0])
        if not self.supports_microseconds:
            value = value.replace(microsecond=0)
        self.assertEqual(variable.get(), value)

    def test_date(self):
        value = date(1977, 4, 5)
        self.connection.execute("INSERT INTO datetime_test (d) VALUES (?)",
                                (value,))
        result = self.connection.execute("SELECT d FROM datetime_test")
        variable = DateVariable()
        result.set_variable(variable, result.get_one()[0])
        self.assertEqual(variable.get(), value)

    def test_time(self):
        value = time(12, 34, 56, 78)
        self.connection.execute("INSERT INTO datetime_test (t) VALUES (?)",
                                (value,))
        result = self.connection.execute("SELECT t FROM datetime_test")
        variable = TimeVariable()
        result.set_variable(variable, result.get_one()[0])
        if not self.supports_microseconds:
            value = value.replace(microsecond=0)
        self.assertEqual(variable.get(), value)

    def test_timedelta(self):
        value = timedelta(12, 34, 56)
        self.connection.execute("INSERT INTO datetime_test (td) VALUES (?)",
                                (value,))
        result = self.connection.execute("SELECT td FROM datetime_test")
        variable = TimeDeltaVariable()
        result.set_variable(variable, result.get_one()[0])
        self.assertEqual(variable.get(), value)

    def test_pickle(self):
        value = {"a": 1, "b": 2}
        value_dump = pickle.dumps(value, -1)
        self.connection.execute("INSERT INTO bin_test (b) VALUES (?)",
                                (value_dump,))
        result = self.connection.execute("SELECT b FROM bin_test")
        variable = PickleVariable()
        result.set_variable(variable, result.get_one()[0])
        self.assertEqual(variable.get(), value)

    def test_binary(self):
        """Ensure database works with high bits and embedded zeros."""
        value = b"\xff\x00\xff\x00"
        self.connection.execute("INSERT INTO bin_test (b) VALUES (?)",
                                (value,))
        result = self.connection.execute("SELECT b FROM bin_test")
        variable = BytesVariable()
        result.set_variable(variable, result.get_one()[0])
        self.assertEqual(variable.get(), value)

    def test_binary_ascii(self):
        """Some databases like pysqlite2 may return unicode for strings."""
        self.connection.execute("INSERT INTO bin_test VALUES (10, 'Value')")
        result = self.connection.execute("SELECT b FROM bin_test")
        variable = BytesVariable()
        # If the following doesn't raise a TypeError we're good.
        result.set_variable(variable, result.get_one()[0])
        self.assertEqual(variable.get(), b"Value")

    def test_order_by_group_by(self):
        self.connection.execute("INSERT INTO test VALUES (100, 'Title 10')")
        self.connection.execute("INSERT INTO test VALUES (101, 'Title 10')")
        id = Column("id", "test")
        title = Column("title", "test")
        expr = Select(Count(id), group_by=title, order_by=Count(id))
        result = self.connection.execute(expr)
        self.assertEqual(result.get_all(), [(1,), (3,)])

    def test_set_decimal_variable_from_str_column(self):
        self.connection.execute("INSERT INTO test VALUES (40, '40.5')")
        variable = DecimalVariable()
        result = self.connection.execute("SELECT title FROM test WHERE id=40")
        result.set_variable(variable, result.get_one()[0])

    def test_get_decimal_variable_to_str_column(self):
        variable = DecimalVariable()
        variable.set("40.5", from_db=True)
        self.connection.execute("INSERT INTO test VALUES (40, ?)", (variable,))
        result = self.connection.execute("SELECT title FROM test WHERE id=40")
        self.assertEqual(result.get_one()[0], "40.5")

    def test_quoting(self):
        # FIXME "with'quote" should be in the list below, but it doesn't
        #       work because it breaks the parameter mark translation.
        for reserved_name in ["with space", 'with`"escape', "SELECT"]:
            reserved_name = SQLToken(reserved_name)
            expr = Select(reserved_name,
                          tables=Alias(Select(Alias(1, reserved_name))))
            result = self.connection.execute(expr)
            self.assertEqual(result.get_one(), (1,))

    def test_concurrent_behavior(self):
        """The default behavior should be to handle transactions in isolation.

        Data committed in one transaction shouldn't be visible to another
        running transaction before the later is committed or aborted.  If
        this isn't the case, the caching made by Storm (or by anything
        that works with data in memory, in fact) becomes a dangerous thing.

        For PostgreSQL, isolation level must be SERIALIZABLE.
        For MySQL, isolation level must be REPEATABLE READ (the default),
        and the InnoDB engine must be in use.
        For SQLite, the isolation level already is SERIALIZABLE when not
        in autocommit mode.  OTOH, PySQLite is nuts regarding transactional
        behavior, and will easily offer READ COMMITTED behavior inside a
        "transaction" (it didn't tell SQLite to open a transaction, in fact).
        """
        connection1 = self.connection
        connection2 = self.database.connect()
        try:
            result = connection1.execute("SELECT title FROM test WHERE id=10")
            self.assertEqual(result.get_one(), ("Title 10",))
            try:
                connection2.execute("UPDATE test SET title='Title 100' "
                                    "WHERE id=10")
                connection2.commit()
            except OperationalError as e:
                self.assertEqual(str(e), "database is locked") # SQLite blocks
            result = connection1.execute("SELECT title FROM test WHERE id=10")
            self.assertEqual(result.get_one(), ("Title 10",))
        finally:
            connection1.rollback()

    def test_wb_connect_sets_event_system(self):
        connection = self.database.connect(marker)
        self.assertEqual(connection._event, marker)

    def test_execute_sends_event(self):
        event = EventSystem(marker)
        calls = []
        def register_transaction(owner):
            calls.append(owner)
        event.hook("register-transaction", register_transaction)

        connection = self.database.connect(event)
        connection.execute("SELECT 1")
        self.assertEqual(len(calls), 1)
        self.assertEqual(calls[0], marker)

    def from_database(self, row):
        return [int(item)+1 for item in row]

    def test_wb_result_get_one_goes_through_from_database(self):
        result = self.connection.execute("SELECT one, two FROM number")
        result.from_database = self.from_database
        self.assertEqual(result.get_one(), (2, 3))

    def test_wb_result_get_all_goes_through_from_database(self):
        result = self.connection.execute("SELECT one, two FROM number")
        result.from_database = self.from_database
        self.assertEqual(result.get_all(), [(2, 3)])

    def test_wb_result_iter_goes_through_from_database(self):
        result = self.connection.execute("SELECT one, two FROM number")
        result.from_database = self.from_database
        self.assertEqual(next(iter(result)), (2, 3))

    def test_rowcount_insert(self):
        # All supported backends support rowcount, so far.
        result = self.connection.execute(
            "INSERT INTO test VALUES (999, '999')")
        self.assertEqual(result.rowcount, 1)

    def test_rowcount_delete(self):
        # All supported backends support rowcount, so far.
        result = self.connection.execute("DELETE FROM test")
        self.assertEqual(result.rowcount, 2)

    def test_rowcount_update(self):
        # All supported backends support rowcount, so far.
        result = self.connection.execute(
            "UPDATE test SET title='whatever'")
        self.assertEqual(result.rowcount, 2)

    def test_expr_startswith(self):
        self.connection.execute("INSERT INTO test VALUES (30, '!!_%blah')")
        self.connection.execute("INSERT INTO test VALUES (40, '!!blah')")
        id = Column("id", SQLToken("test"))
        title = Column("title", SQLToken("test"))
        expr = Select(id, title.startswith(u"!!_%"))
        result = list(self.connection.execute(expr))
        self.assertEqual(result, [(30,)])

    def test_expr_endswith(self):
        self.connection.execute("INSERT INTO test VALUES (30, 'blah_%!!')")
        self.connection.execute("INSERT INTO test VALUES (40, 'blah!!')")
        id = Column("id", SQLToken("test"))
        title = Column("title", SQLToken("test"))
        expr = Select(id, title.endswith(u"_%!!"))
        result = list(self.connection.execute(expr))
        self.assertEqual(result, [(30,)])

    def test_expr_contains_string(self):
        self.connection.execute("INSERT INTO test VALUES (30, 'blah_%!!x')")
        self.connection.execute("INSERT INTO test VALUES (40, 'blah!!x')")
        id = Column("id", SQLToken("test"))
        title = Column("title", SQLToken("test"))
        expr = Select(id, title.contains_string(u"_%!!"))
        result = list(self.connection.execute(expr))
        self.assertEqual(result, [(30,)])

    def test_block_access(self):
        """Access to the connection is blocked by block_access()."""
        self.connection.execute("SELECT 1")
        self.connection.block_access()
        self.assertRaises(ConnectionBlockedError,
                          self.connection.execute, "SELECT 1")
        self.assertRaises(ConnectionBlockedError, self.connection.commit)
        # Allow rolling back a blocked connection.
        self.connection.rollback()
        # Unblock the connection, allowing access again.
        self.connection.unblock_access()
        self.connection.execute("SELECT 1")

    def test_wrap_exception_subclasses(self):
        """Subclasses of the generic DB-API exception types are wrapped."""
        db_api_operational_error = getattr(
            self.database._exception_module, 'OperationalError')
        operational_error_types = [
            type(name, (db_api_operational_error,), {})
            for name in ('A', 'B')]
        for error_type in operational_error_types:
            error = error_type('error message')
            wrapped = self.database._wrap_exception(OperationalError, error)
            self.assertTrue(isinstance(wrapped, error_type))
            self.assertTrue(isinstance(wrapped, OperationalError))
            self.assertEqual(error_type.__name__, wrapped.__class__.__name__)
            self.assertEqual(('error message',), wrapped.args)


class TwoPhaseCommitTest(object):

    def setUp(self):
        super(TwoPhaseCommitTest, self).setUp()
        self.create_database()
        self.create_connection()
        self.drop_tables()
        self.create_tables()

    def tearDown(self):
        self.drop_tables()
        self.drop_connection()
        super(TwoPhaseCommitTest, self).tearDown()

    def create_database(self):
        raise NotImplementedError

    def create_connection(self):
        self.connection = self.database.connect()

    def create_tables(self):
        raise NotImplementedError

    def drop_tables(self):
        try:
            self.connection.execute("DROP TABLE test")
            self.connection.commit()
        except:
            self.connection.rollback()

    def drop_connection(self):
        self.connection.close()

    def test_begin(self):
        """
        begin() starts a transaction that can be ended with a two-phase commit.
        """
        xid = Xid(0, "foo", "bar")
        self.connection.begin(xid)
        self.connection.execute("INSERT INTO test VALUES (30, 'Title 30')")
        self.connection.prepare()
        self.connection.commit()
        self.connection.rollback()
        result = self.connection.execute("SELECT id FROM test WHERE id=30")
        self.assertTrue(result.get_one())

    def test_begin_inside_a_two_phase_transaction(self):
        """
        begin() can't be used if a two-phase transaction has already started.
        """
        xid1 = Xid(0, "foo", "bar")
        self.connection.begin(xid1)
        xid2 = Xid(1, "egg", "baz")
        self.assertRaises(ProgrammingError, self.connection.begin, xid2)

    def test_begin_after_commit(self):
        """
        After a two phase commit, it's possible to start a new transaction.
        """
        xid = Xid(0, "foo", "bar")
        self.connection.begin(xid)
        self.connection.execute("INSERT INTO test VALUES (30, 'Title 30')")
        self.connection.commit()
        self.connection.begin(xid)
        result = self.connection.execute("SELECT id FROM test WHERE id=30")
        self.assertTrue(result.get_one())

    def test_begin_after_rollback(self):
        """
        After a tpc rollback, it's possible to start a new transaction.
        """
        xid = Xid(0, "foo", "bar")
        self.connection.begin(xid)
        self.connection.execute("INSERT INTO test VALUES (30, 'Title 30')")
        self.connection.rollback()
        self.connection.begin(xid)
        result = self.connection.execute("SELECT id FROM test WHERE id=30")
        self.assertFalse(result.get_one())

    def test_prepare_outside_a_two_phase_transaction(self):
        """
        prepare() can't be used if a two-phase transaction has not begun yet.
        """
        self.assertRaises(ProgrammingError, self.connection.prepare)

    def test_rollback_after_prepare(self):
        """
        Calling rollback() after prepare() actually rolls back the changes.
        """
        xid = Xid(0, "foo", "bar")
        self.connection.begin(xid)
        self.connection.execute("INSERT INTO test VALUES (30, 'Title 30')")
        self.connection.prepare()
        self.connection.rollback()
        result = self.connection.execute("SELECT id FROM test WHERE id=30")
        self.assertFalse(result.get_one())

    def test_mixing_standard_and_two_phase_commits(self):
        """
        It's possible to mix standard and two phase commits across different
        transactions.
        """
        self.connection.execute("INSERT INTO test VALUES (30, 'Title 30')")
        self.connection.commit()
        xid = Xid(0, "foo", "bar")
        self.connection.begin(xid)
        self.connection.execute("INSERT INTO test VALUES (40, 'Title 40')")
        self.connection.prepare()
        self.connection.commit()
        result = self.connection.execute("SELECT id FROM test "
                                         "WHERE id IN (30, 40)")
        self.assertEqual([(30,), (40,)], result.get_all())

    def test_recover_and_commit(self):
        """
        It's possible to recover and commit pending transactions that were
        prepared but not committed or rolled back.
        """
        # Prepare a transaction but leave it uncommitted
        self.connection.begin(Xid(0, "foo", "bar"))
        self.connection.execute("INSERT INTO test VALUES (30, 'Title 30')")
        self.connection.prepare()

        # Setup a new connection and recover the prepared transaction
        # committing it
        connection2 = self.database.connect()
        self.addCleanup(connection2.close)
        result = connection2.execute("SELECT id FROM test WHERE id=30")
        connection2.rollback()
        self.assertFalse(result.get_one())
        [xid] = connection2.recover()
        self.assertEqual(0, xid.format_id)
        self.assertEqual("foo", xid.global_transaction_id)
        self.assertEqual("bar", xid.branch_qualifier)
        connection2.commit(xid)
        self.assertEqual([], connection2.recover())

        # Reconnect, changes are be visible
        self.connection.close()
        self.connection = self.database.connect()
        result = self.connection.execute("SELECT id FROM test WHERE id=30")
        self.assertTrue(result.get_one())

    def test_recover_and_rollback(self):
        """
        It's possible to recover and rollback pending transactions that were
        prepared but not committed or rolled back.
        """
        # Prepare a transaction but leave it uncommitted
        self.connection.begin(Xid(0, "foo", "bar"))
        self.connection.execute("INSERT INTO test VALUES (30, 'Title 30')")
        self.connection.prepare()

        # Setup a new connection and recover the prepared transaction
        # rolling it back
        connection2 = self.database.connect()
        self.addCleanup(connection2.close)
        [xid] = connection2.recover()
        self.assertEqual(0, xid.format_id)
        self.assertEqual("foo", xid.global_transaction_id)
        self.assertEqual("bar", xid.branch_qualifier)
        connection2.rollback(xid)
        self.assertEqual([], connection2.recover())

        # Reconnect, changes were rolled back
        self.connection.close()
        self.connection = self.database.connect()
        result = self.connection.execute("SELECT id FROM test WHERE id=30")
        self.assertFalse(result.get_one())


class UnsupportedDatabaseTest(object):

    helpers = [MakePath]

    dbapi_module_names = []
    db_module_name = None

    def test_exception_when_unsupported(self):

        # Install a directory in front of the search path.
        module_dir = self.make_path()
        os.mkdir(module_dir)
        sys.path.insert(0, module_dir)

        # Copy the real module over to a new place, since the old one is
        # already using the real module, if it's available.
        db_module = __import__("storm.databases."+self.db_module_name,
                               None, None, [""])
        db_module_filename = db_module.__file__
        if db_module_filename.endswith(".pyc"):
            db_module_filename = db_module_filename[:-1]
        shutil.copyfile(db_module_filename,
                        os.path.join(module_dir, "_fake_.py"))

        dbapi_modules = {}
        for dbapi_module_name in self.dbapi_module_names:

            # If the real module is available, remove it from sys.modules.
            dbapi_module = sys.modules.pop(dbapi_module_name, None)
            if dbapi_module is not None:
                dbapi_modules[dbapi_module_name] = dbapi_module

            # Create a module which raises ImportError when imported, to fake
            # a missing module.
            dirname = self.make_path(path=os.path.join(module_dir,
                                                       dbapi_module_name))
            os.mkdir(dirname)
            self.make_path("raise ImportError",
                           os.path.join(module_dir, dbapi_module_name,
                                        "__init__.py"))

        # Finally, test it.
        import _fake_
        uri = URI("_fake_://db")

        try:
            self.assertRaises(DatabaseModuleError,
                              _fake_.create_from_uri, uri)
        finally:
            # Unhack the environment.
            del sys.path[0]
            del sys.modules["_fake_"]

            sys.modules.update(dbapi_modules)


class DatabaseDisconnectionMixin(object):

    environment_variable = ""
    host_environment_variable = ""
    default_port = None

    def setUp(self):
        super(DatabaseDisconnectionMixin, self).setUp()
        self.create_database_and_proxy()
        self.create_connection()

    def tearDown(self):
        self.drop_connection()
        self.drop_database()
        self.proxy.close()
        super(DatabaseDisconnectionMixin, self).tearDown()

    def is_supported(self):
        return bool(self.get_uri())

    def get_uri(self):
        """Return URI instance with a defined host (and port, for TCP)."""
        if not self.environment_variable:
            raise RuntimeError(
                "Define at least %s.environment_variable" %
                type(self).__name__)
        uri_str = os.environ.get(self.host_environment_variable)
        if uri_str:
            uri = URI(uri_str)
            if not uri.host:
                raise RuntimeError("The URI in %s must include a host." %
                                   self.host_environment_variable)
            if not uri.host.startswith("/") and not uri.port:
                if not self.default_port:
                    raise RuntimeError(
                        "Define at least %s.default_port" % type(self).__name)
                uri.port = self.default_port
            return uri
        else:
            uri_str = os.environ.get(self.environment_variable)
            if uri_str:
                uri = URI(uri_str)
                if uri.host:
                    if not uri.host.startswith("/") and not uri.port:
                        if not self.default_port:
                            raise RuntimeError(
                                "Define at least %s.default_port" %
                                type(self).__name)
                        uri.port = self.default_port
                    return uri
        return None

    def create_proxy(self, uri):
        """Create a TCP proxy forwarding requests to `uri`."""
        return ProxyTCPServer((uri.host, uri.port))

    def create_database_and_proxy(self):
        """Set up the TCP proxy and database object.

        The TCP proxy should forward requests on to the database.  The
        database object should point at the TCP proxy.
        """
        uri = self.get_uri()
        self.proxy = self.create_proxy(uri)
        uri.host, uri.port = self.proxy.server_address
        self.proxy_uri = uri
        self.database = create_database(uri)

    def create_connection(self):
        self.connection = self.database.connect()

    def drop_connection(self):
        self.connection.close()

    def drop_database(self):
        pass


class DatabaseDisconnectionTest(DatabaseDisconnectionMixin):

    def test_proxy_works(self):
        """Ensure that we can talk to the database through the proxy."""
        result = self.connection.execute("SELECT 1")
        self.assertEqual(result.get_one(), (1,))

    def test_catch_disconnect_on_execute(self):
        """Test that database disconnections get caught on execute()."""
        result = self.connection.execute("SELECT 1")
        self.assertTrue(result.get_one())
        self.proxy.restart()
        self.assertRaises(DisconnectionError,
                          self.connection.execute, "SELECT 1")

    def test_catch_disconnect_on_commit(self):
        """Test that database disconnections get caught on commit()."""
        result = self.connection.execute("SELECT 1")
        self.assertTrue(result.get_one())
        self.proxy.restart()
        self.assertRaises(DisconnectionError, self.connection.commit)

    def test_wb_catch_already_disconnected_on_rollback(self):
        """Connection.rollback() swallows disconnection errors.

        If the connection is being used outside of Storm's control,
        then it is possible that Storm won't see the disconnection.
        It should be able to recover from this situation though.
        """
        result = self.connection.execute("SELECT 1")
        self.assertTrue(result.get_one())
        self.proxy.restart()
        # Perform an action that should result in a disconnection.
        try:
            cursor = self.connection._raw_connection.cursor()
            cursor.execute("SELECT 1")
            cursor.fetchone()
        except Error as exc:
            self.assertTrue(self.connection.is_disconnection_error(exc))
        else:
            self.fail("Disconnection was not caught.")

        # Make sure our raw connection's rollback() raises a disconnection
        # error when called.
        try:
            self.connection._raw_connection.rollback()
        except Error as exc:
            self.assertTrue(self.connection.is_disconnection_error(exc))
        else:
            self.fail("Disconnection was not raised.")

        # Our rollback() will catch and swallow that disconnection error,
        # though.
        self.connection.rollback()

    def test_wb_catch_already_disconnected(self):
        """Storm detects connections that have already been disconnected.

        If the connection is being used outside of Storm's control,
        then it is possible that Storm won't see the disconnection.
        It should be able to recover from this situation though.
        """
        result = self.connection.execute("SELECT 1")
        self.assertTrue(result.get_one())
        self.proxy.restart()
        # Perform an action that should result in a disconnection.
        try:
            cursor = self.connection._raw_connection.cursor()
            cursor.execute("SELECT 1")
            cursor.fetchone()
        except DatabaseError as exc:
            self.assertTrue(self.connection.is_disconnection_error(exc))
        else:
            self.fail("Disconnection was not caught.")
        self.assertRaises(DisconnectionError,
                          self.connection.execute, "SELECT 1")

    def test_connection_stays_disconnected_in_transaction(self):
        """Test that connection does not immediately reconnect."""
        result = self.connection.execute("SELECT 1")
        self.assertTrue(result.get_one())
        self.proxy.restart()
        self.assertRaises(DisconnectionError,
                          self.connection.execute, "SELECT 1")
        self.assertRaises(DisconnectionError,
                          self.connection.execute, "SELECT 1")

    def test_reconnect_after_rollback(self):
        """Test that we reconnect after rolling back the connection."""
        result = self.connection.execute("SELECT 1")
        self.assertTrue(result.get_one())
        self.proxy.restart()
        self.assertRaises(DisconnectionError,
                          self.connection.execute, "SELECT 1")
        self.connection.rollback()
        result = self.connection.execute("SELECT 1")
        self.assertTrue(result.get_one())

    def test_catch_disconnect_on_reconnect(self):
        """Test that reconnection failures result in DisconnectionError."""
        result = self.connection.execute("SELECT 1")
        self.assertTrue(result.get_one())
        self.proxy.stop()
        self.assertRaises(DisconnectionError,
                          self.connection.execute, "SELECT 1")
        # Rollback the connection, but because the proxy is still
        # down, we get a DisconnectionError again.
        self.connection.rollback()
        self.assertRaises(DisconnectionError,
                          self.connection.execute, "SELECT 1")

    def test_close_connection_after_disconnect(self):
        result = self.connection.execute("SELECT 1")
        self.assertTrue(result.get_one())
        self.proxy.stop()
        self.assertRaises(DisconnectionError,
                          self.connection.execute, "SELECT 1")
        self.connection.close()


class TwoPhaseCommitDisconnectionTest(object):

    def test_begin_after_rollback_with_disconnection_error(self):
        """
        If a rollback fails because of a disconnection error, the two-phase
        transaction should be properly reset.
        """
        xid1 = Xid(0, "foo", "bar")
        self.connection.begin(xid1)
        self.connection.execute("SELECT 1")
        self.proxy.stop()
        self.connection.rollback()
        self.proxy.start()
        xid2 = Xid(0, "egg", "baz")
        self.connection.begin(xid2)
        result = self.connection.execute("SELECT 1")
        self.assertTrue(result.get_one())

    def test_begin_after_with_statement_disconnection_error_and_rollback(self):
        """
        The two-phase transaction state is properly reset if a disconnection
        happens before the rollback.
        """
        xid1 = Xid(0, "foo", "bar")
        self.connection.begin(xid1)
        self.proxy.close()
        self.assertRaises(DisconnectionError,
                          self.connection.execute, "SELECT 1")
        self.connection.rollback()
        self.proxy.start()
        xid2 = Xid(0, "egg", "baz")
        self.connection.begin(xid2)
        result = self.connection.execute("SELECT 1")
        self.assertTrue(result.get_one())