8
from nose.tools import (assert_dict_equal,
10
from nose.tools import (
22
from happybase import Connection, ConnectionPool, NoConnectionsAvailable
20
24
HAPPYBASE_HOST = os.environ.get('HAPPYBASE_HOST')
21
25
HAPPYBASE_PORT = os.environ.get('HAPPYBASE_PORT')
22
26
HAPPYBASE_COMPAT = os.environ.get('HAPPYBASE_COMPAT', '0.92')
27
HAPPYBASE_TRANSPORT = os.environ.get('HAPPYBASE_TRANSPORT', 'buffered')
23
28
KEEP_TABLE = ('HAPPYBASE_NO_CLEANUP' in os.environ)
25
30
TABLE_PREFIX = 'happybase_tests_tmp'
26
31
TEST_TABLE_NAME = 'test1'
33
connection_kwargs = dict(
36
table_prefix=TABLE_PREFIX,
37
compat=HAPPYBASE_COMPAT,
38
transport=HAPPYBASE_TRANSPORT,
28
43
connection = table = None
46
def maybe_delete_table():
50
if TEST_TABLE_NAME in connection.tables():
51
print "Test table already exists; removing it..."
52
connection.delete_table(TEST_TABLE_NAME, disable=True)
31
55
def setup_module():
32
56
global connection, table
33
connection = happybase.Connection(host=HAPPYBASE_HOST,
35
table_prefix=TABLE_PREFIX,
36
compat=HAPPYBASE_COMPAT)
57
connection = Connection(**connection_kwargs)
37
59
assert_is_not_none(connection)
42
65
'cf3': {'max_versions': 1},
44
67
connection.create_table(TEST_TABLE_NAME, families=cfs)
46
69
table = connection.table(TEST_TABLE_NAME)
50
73
def teardown_module():
52
connection.disable_table(TEST_TABLE_NAME)
53
connection.delete_table(TEST_TABLE_NAME)
75
connection.delete_table(TEST_TABLE_NAME, disable=True)
57
79
def test_connection_compat():
58
80
with assert_raises(ValueError):
59
happybase.Connection(compat='0.1.invalid.version')
81
Connection(compat='0.1.invalid.version')
84
def test_timeout_arg():
62
90
def test_enabling():
79
107
assert_equal(connection.table('foobar').name, TABLE_PREFIX + '_foobar')
80
108
assert_equal(connection.table('foobar', use_prefix=False).name, 'foobar')
82
c = happybase.Connection(autoconnect=False)
110
c = Connection(autoconnect=False)
83
111
assert_equal('foo', c._table_name('foo'))
85
113
with assert_raises(TypeError):
86
happybase.Connection(autoconnect=False, table_prefix=123)
114
Connection(autoconnect=False, table_prefix=123)
88
116
with assert_raises(TypeError):
89
happybase.Connection(autoconnect=False, table_prefix_separator=2.1)
117
Connection(autoconnect=False, table_prefix_separator=2.1)
92
120
def test_stringify():
129
157
table.put('r1', {'cf1:c1': 'v1', 'cf1:c2': 'v2', 'cf2:c3': 'v3'})
130
158
table.put('r1', {'cf1:c4': 'v2'}, timestamp=2345678)
159
table.put('r1', {'cf1:c4': 'v2'}, timestamp=1369168852994L)
133
162
def test_atomic_counters():
391
420
scanner = table.scan(row_prefix='row', timestamp=123)
392
421
assert_equal(0, calc_len(scanner))
423
scanner = table.scan(batch_size=20)
427
with assert_raises(StopIteration):
395
431
def test_delete():
396
432
row_key = 'row-test-delete'
418
454
table.delete(row_key)
419
455
assert_dict_equal({}, table.row(row_key))
458
def test_connection_pool_construction():
459
with assert_raises(TypeError):
460
ConnectionPool(size='abc')
462
with assert_raises(ValueError):
463
ConnectionPool(size=0)
466
def test_connection_pool():
468
from thrift.transport.TTransport import TTransportException
471
name = threading.current_thread().name
472
print "Thread %s starting" % name
474
def inner_function():
475
# Nested connection requests must return the same connection
476
with pool.connection() as another_connection:
477
assert connection is another_connection
479
# Fake an exception once in a while
480
if random.random() < .25:
481
print "Introducing random failure"
482
connection.transport.close()
483
raise TTransportException("Fake transport exception")
486
with pool.connection() as connection:
491
except TTransportException:
492
# This error should have been picked up by the
493
# connection pool, and the connection should have
494
# been replaced by a fresh one
499
print "Thread %s done" % name
503
pool = ConnectionPool(size=3, **connection_kwargs)
504
threads = [threading.Thread(target=run) for i in xrange(N_THREADS)]
513
# filter out finished threads
514
threads = [t for t in threads if t.is_alive()]
515
print "%d threads still alive" % len(threads)
518
def test_pool_exhaustion():
519
pool = ConnectionPool(size=1, **connection_kwargs)
522
with assert_raises(NoConnectionsAvailable):
523
with pool.connection(timeout=.1) as connection:
526
with pool.connection():
527
# At this point the only connection is assigned to this thread,
528
# so another thread cannot obtain a connection at this point.
530
t = threading.Thread(target=run)
535
if __name__ == '__main__':
539
# Dump stacktraces using 'kill -USR1', useful for debugging hanging
540
# programs and multi threading issues.
547
faulthandler.register(signal.SIGUSR1)
549
logging.basicConfig(level=logging.DEBUG)
551
method_name = 'test_%s' % sys.argv[1]
552
method = globals()[method_name]