~ntt-pf-lab/nova/monkey_patch_notification

« back to all changes in this revision

Viewing changes to vendor/redis-py/tests/connection_pool.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import redis
 
2
import threading
 
3
import time
 
4
import unittest
 
5
 
 
6
class ConnectionPoolTestCase(unittest.TestCase):
 
7
    def test_multiple_connections(self):
 
8
        # 2 clients to the same host/port/db/pool should use the same connection
 
9
        pool = redis.ConnectionPool()
 
10
        r1 = redis.Redis(host='localhost', port=6379, db=9, connection_pool=pool)
 
11
        r2 = redis.Redis(host='localhost', port=6379, db=9, connection_pool=pool)
 
12
        self.assertEquals(r1.connection, r2.connection)
 
13
 
 
14
        # if one of them switches, they should have
 
15
        # separate conncetion objects
 
16
        r2.select(db=10, host='localhost', port=6379)
 
17
        self.assertNotEqual(r1.connection, r2.connection)
 
18
 
 
19
        conns = [r1.connection, r2.connection]
 
20
        conns.sort()
 
21
 
 
22
        # but returning to the original state shares the object again
 
23
        r2.select(db=9, host='localhost', port=6379)
 
24
        self.assertEquals(r1.connection, r2.connection)
 
25
 
 
26
        # the connection manager should still have just 2 connections
 
27
        mgr_conns = pool.get_all_connections()
 
28
        mgr_conns.sort()
 
29
        self.assertEquals(conns, mgr_conns)
 
30
 
 
31
    def test_threaded_workers(self):
 
32
        r = redis.Redis(host='localhost', port=6379, db=9)
 
33
        r.set('a', 'foo')
 
34
        r.set('b', 'bar')
 
35
 
 
36
        def _info_worker():
 
37
            for i in range(50):
 
38
                _ = r.info()
 
39
                time.sleep(0.01)
 
40
 
 
41
        def _keys_worker():
 
42
            for i in range(50):
 
43
                _ = r.keys()
 
44
                time.sleep(0.01)
 
45
 
 
46
        t1 = threading.Thread(target=_info_worker)
 
47
        t2 = threading.Thread(target=_keys_worker)
 
48
        t1.start()
 
49
        t2.start()
 
50
 
 
51
        for i in [t1, t2]:
 
52
            i.join()
 
53