~justin-fathomdb/nova/justinsb-openstack-api-volumes

« back to all changes in this revision

Viewing changes to vendor/redis-py/tests/pipeline.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import redis
 
2
import unittest
 
3
 
 
4
class PipelineTestCase(unittest.TestCase):
 
5
    def setUp(self):
 
6
        self.client = redis.Redis(host='localhost', port=6379, db=9)
 
7
        self.client.flushdb()
 
8
 
 
9
    def tearDown(self):
 
10
        self.client.flushdb()
 
11
 
 
12
    def test_pipeline(self):
 
13
        pipe = self.client.pipeline()
 
14
        pipe.set('a', 'a1').get('a').zadd('z', 'z1', 1).zadd('z', 'z2', 4)
 
15
        pipe.zincrby('z', 'z1').zrange('z', 0, 5, withscores=True)
 
16
        self.assertEquals(pipe.execute(),
 
17
            [
 
18
                True,
 
19
                'a1',
 
20
                True,
 
21
                True,
 
22
                2.0,
 
23
                [('z1', 2.0), ('z2', 4)],
 
24
            ]
 
25
            )
 
26
 
 
27
    def test_invalid_command_in_pipeline(self):
 
28
        # all commands but the invalid one should be excuted correctly
 
29
        self.client['c'] = 'a'
 
30
        pipe = self.client.pipeline()
 
31
        pipe.set('a', 1).set('b', 2).lpush('c', 3).set('d', 4)
 
32
        result = pipe.execute()
 
33
 
 
34
        self.assertEquals(result[0], True)
 
35
        self.assertEquals(self.client['a'], '1')
 
36
        self.assertEquals(result[1], True)
 
37
        self.assertEquals(self.client['b'], '2')
 
38
        # we can't lpush to a key that's a string value, so this should
 
39
        # be a ResponseError exception
 
40
        self.assert_(isinstance(result[2], redis.ResponseError))
 
41
        self.assertEquals(self.client['c'], 'a')
 
42
        self.assertEquals(result[3], True)
 
43
        self.assertEquals(self.client['d'], '4')
 
44
 
 
45
        # make sure the pipe was restored to a working state
 
46
        self.assertEquals(pipe.set('z', 'zzz').execute(), [True])
 
47
        self.assertEquals(self.client['z'], 'zzz')
 
48
 
 
49
    def test_pipeline_cannot_select(self):
 
50
        pipe = self.client.pipeline()
 
51
        self.assertRaises(redis.RedisError,
 
52
            pipe.select, 'localhost', 6379, db=9)
 
53
 
 
54
    def test_pipeline_no_transaction(self):
 
55
        pipe = self.client.pipeline(transaction=False)
 
56
        pipe.set('a', 'a1').set('b', 'b1').set('c', 'c1')
 
57
        self.assertEquals(pipe.execute(), [True, True, True])
 
58
        self.assertEquals(self.client['a'], 'a1')
 
59
        self.assertEquals(self.client['b'], 'b1')
 
60
        self.assertEquals(self.client['c'], 'c1')
 
61