~justin-fathomdb/nova/justinsb-openstack-api-volumes

« back to all changes in this revision

Viewing changes to vendor/redis-py/redis/client.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import datetime
 
2
import errno
 
3
import socket
 
4
import threading
 
5
import time
 
6
import warnings
 
7
from itertools import chain
 
8
from redis.exceptions import ConnectionError, ResponseError, InvalidResponse
 
9
from redis.exceptions import RedisError, AuthenticationError
 
10
 
 
11
 
 
12
class ConnectionPool(threading.local):
 
13
    "Manages a list of connections on the local thread"
 
14
    def __init__(self):
 
15
        self.connections = {}
 
16
 
 
17
    def make_connection_key(self, host, port, db):
 
18
        "Create a unique key for the specified host, port and db"
 
19
        return '%s:%s:%s' % (host, port, db)
 
20
 
 
21
    def get_connection(self, host, port, db, password, socket_timeout):
 
22
        "Return a specific connection for the specified host, port and db"
 
23
        key = self.make_connection_key(host, port, db)
 
24
        if key not in self.connections:
 
25
            self.connections[key] = Connection(
 
26
                host, port, db, password, socket_timeout)
 
27
        return self.connections[key]
 
28
 
 
29
    def get_all_connections(self):
 
30
        "Return a list of all connection objects the manager knows about"
 
31
        return self.connections.values()
 
32
 
 
33
 
 
34
class Connection(object):
 
35
    "Manages TCP communication to and from a Redis server"
 
36
    def __init__(self, host='localhost', port=6379, db=0, password=None,
 
37
                 socket_timeout=None):
 
38
        self.host = host
 
39
        self.port = port
 
40
        self.db = db
 
41
        self.password = password
 
42
        self.socket_timeout = socket_timeout
 
43
        self._sock = None
 
44
        self._fp = None
 
45
 
 
46
    def connect(self, redis_instance):
 
47
        "Connects to the Redis server if not already connected"
 
48
        if self._sock:
 
49
            return
 
50
        try:
 
51
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
52
            sock.connect((self.host, self.port))
 
53
        except socket.error, e:
 
54
            # args for socket.error can either be (errno, "message")
 
55
            # or just "message"
 
56
            if len(e.args) == 1:
 
57
                error_message = "Error connecting to %s:%s. %s." % \
 
58
                    (self.host, self.port, e.args[0])
 
59
            else:
 
60
                error_message = "Error %s connecting %s:%s. %s." % \
 
61
                    (e.args[0], self.host, self.port, e.args[1])
 
62
            raise ConnectionError(error_message)
 
63
        sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
 
64
        sock.settimeout(self.socket_timeout)
 
65
        self._sock = sock
 
66
        self._fp = sock.makefile('r')
 
67
        redis_instance._setup_connection()
 
68
 
 
69
    def disconnect(self):
 
70
        "Disconnects from the Redis server"
 
71
        if self._sock is None:
 
72
            return
 
73
        try:
 
74
            self._sock.close()
 
75
        except socket.error:
 
76
            pass
 
77
        self._sock = None
 
78
        self._fp = None
 
79
 
 
80
    def send(self, command, redis_instance):
 
81
        "Send ``command`` to the Redis server. Return the result."
 
82
        self.connect(redis_instance)
 
83
        try:
 
84
            self._sock.sendall(command)
 
85
        except socket.error, e:
 
86
            if e.args[0] == errno.EPIPE:
 
87
                self.disconnect()
 
88
            raise ConnectionError("Error %s while writing to socket. %s." % \
 
89
                e.args)
 
90
 
 
91
    def read(self, length=None):
 
92
        """
 
93
        Read a line from the socket is length is None,
 
94
        otherwise read ``length`` bytes
 
95
        """
 
96
        try:
 
97
            if length is not None:
 
98
                return self._fp.read(length)
 
99
            return self._fp.readline()
 
100
        except socket.error, e:
 
101
            self.disconnect()
 
102
            if e.args and e.args[0] == errno.EAGAIN:
 
103
                raise ConnectionError("Error while reading from socket: %s" % \
 
104
                    e.args[1])
 
105
        return ''
 
106
 
 
107
def list_or_args(command, keys, args):
 
108
    # returns a single list combining keys and args
 
109
    # if keys is not a list or args has items, issue a
 
110
    # deprecation warning
 
111
    oldapi = bool(args)
 
112
    try:
 
113
        i = iter(keys)
 
114
        # a string can be iterated, but indicates
 
115
        # keys wasn't passed as a list
 
116
        if isinstance(keys, basestring):
 
117
            oldapi = True
 
118
    except TypeError:
 
119
        oldapi = True
 
120
        keys = [keys]
 
121
    if oldapi:
 
122
        warnings.warn(DeprecationWarning(
 
123
            "Passing *args to Redis.%s has been deprecated. "
 
124
            "Pass an iterable to ``keys`` instead" % command
 
125
        ))
 
126
        keys.extend(args)
 
127
    return keys
 
128
 
 
129
def timestamp_to_datetime(response):
 
130
    "Converts a unix timestamp to a Python datetime object"
 
131
    if not response:
 
132
        return None
 
133
    try:
 
134
        response = int(response)
 
135
    except ValueError:
 
136
        return None
 
137
    return datetime.datetime.fromtimestamp(response)
 
138
 
 
139
def string_keys_to_dict(key_string, callback):
 
140
    return dict([(key, callback) for key in key_string.split()])
 
141
 
 
142
def dict_merge(*dicts):
 
143
    merged = {}
 
144
    [merged.update(d) for d in dicts]
 
145
    return merged
 
146
 
 
147
def parse_info(response):
 
148
    "Parse the result of Redis's INFO command into a Python dict"
 
149
    info = {}
 
150
    def get_value(value):
 
151
        if ',' not in value:
 
152
            return value
 
153
        sub_dict = {}
 
154
        for item in value.split(','):
 
155
            k, v = item.split('=')
 
156
            try:
 
157
                sub_dict[k] = int(v)
 
158
            except ValueError:
 
159
                sub_dict[k] = v
 
160
        return sub_dict
 
161
    for line in response.splitlines():
 
162
        key, value = line.split(':')
 
163
        try:
 
164
            info[key] = int(value)
 
165
        except ValueError:
 
166
            info[key] = get_value(value)
 
167
    return info
 
168
 
 
169
def pairs_to_dict(response):
 
170
    "Create a dict given a list of key/value pairs"
 
171
    return dict(zip(response[::2], response[1::2]))
 
172
 
 
173
def zset_score_pairs(response, **options):
 
174
    """
 
175
    If ``withscores`` is specified in the options, return the response as
 
176
    a list of (value, score) pairs
 
177
    """
 
178
    if not response or not options['withscores']:
 
179
        return response
 
180
    return zip(response[::2], map(float, response[1::2]))
 
181
 
 
182
def int_or_none(response):
 
183
    if response is None:
 
184
        return None
 
185
    return int(response)
 
186
 
 
187
def float_or_none(response):
 
188
    if response is None:
 
189
        return None
 
190
    return float(response)
 
191
 
 
192
 
 
193
class Redis(threading.local):
 
194
    """
 
195
    Implementation of the Redis protocol.
 
196
 
 
197
    This abstract class provides a Python interface to all Redis commands
 
198
    and an implementation of the Redis protocol.
 
199
 
 
200
    Connection and Pipeline derive from this, implementing how
 
201
    the commands are sent and received to the Redis server
 
202
    """
 
203
    RESPONSE_CALLBACKS = dict_merge(
 
204
        string_keys_to_dict(
 
205
            'AUTH DEL EXISTS EXPIRE EXPIREAT HDEL HEXISTS HMSET MOVE MSETNX '
 
206
            'RENAMENX SADD SISMEMBER SMOVE SETEX SETNX SREM ZADD ZREM',
 
207
            bool
 
208
            ),
 
209
        string_keys_to_dict(
 
210
            'DECRBY HLEN INCRBY LLEN SCARD SDIFFSTORE SINTERSTORE '
 
211
            'SUNIONSTORE ZCARD ZREMRANGEBYSCORE ZREVRANK',
 
212
            int
 
213
            ),
 
214
        string_keys_to_dict(
 
215
            # these return OK, or int if redis-server is >=1.3.4
 
216
            'LPUSH RPUSH',
 
217
            lambda r: isinstance(r, int) and r or r == 'OK'
 
218
            ),
 
219
        string_keys_to_dict('ZSCORE ZINCRBY', float_or_none),
 
220
        string_keys_to_dict(
 
221
            'FLUSHALL FLUSHDB LSET LTRIM MSET RENAME '
 
222
            'SAVE SELECT SET SHUTDOWN',
 
223
            lambda r: r == 'OK'
 
224
            ),
 
225
        string_keys_to_dict('SDIFF SINTER SMEMBERS SUNION',
 
226
            lambda r: set(r)
 
227
            ),
 
228
        string_keys_to_dict('ZRANGE ZRANGEBYSCORE ZREVRANGE', zset_score_pairs),
 
229
        {
 
230
            'BGREWRITEAOF': lambda r: \
 
231
                r == 'Background rewriting of AOF file started',
 
232
            'BGSAVE': lambda r: r == 'Background saving started',
 
233
            'HGETALL': lambda r: r and pairs_to_dict(r) or {},
 
234
            'INFO': parse_info,
 
235
            'LASTSAVE': timestamp_to_datetime,
 
236
            'PING': lambda r: r == 'PONG',
 
237
            'RANDOMKEY': lambda r: r and r or None,
 
238
            'TTL': lambda r: r != -1 and r or None,
 
239
            'ZRANK': int_or_none,
 
240
        }
 
241
        )
 
242
 
 
243
    # commands that should NOT pull data off the network buffer when executed
 
244
    SUBSCRIPTION_COMMANDS = set(['SUBSCRIBE', 'UNSUBSCRIBE'])
 
245
 
 
246
    def __init__(self, host='localhost', port=6379,
 
247
                 db=0, password=None, socket_timeout=None,
 
248
                 connection_pool=None,
 
249
                 charset='utf-8', errors='strict'):
 
250
        self.encoding = charset
 
251
        self.errors = errors
 
252
        self.connection = None
 
253
        self.subscribed = False
 
254
        self.connection_pool = connection_pool and connection_pool or ConnectionPool()
 
255
        self.select(db, host, port, password, socket_timeout)
 
256
 
 
257
    #### Legacty accessors of connection information ####
 
258
    def _get_host(self):
 
259
        return self.connection.host
 
260
    host = property(_get_host)
 
261
 
 
262
    def _get_port(self):
 
263
        return self.connection.port
 
264
    port = property(_get_port)
 
265
 
 
266
    def _get_db(self):
 
267
        return self.connection.db
 
268
    db = property(_get_db)
 
269
 
 
270
    def pipeline(self, transaction=True):
 
271
        """
 
272
        Return a new pipeline object that can queue multiple commands for
 
273
        later execution. ``transaction`` indicates whether all commands
 
274
        should be executed atomically. Apart from multiple atomic operations,
 
275
        pipelines are useful for batch loading of data as they reduce the
 
276
        number of back and forth network operations between client and server.
 
277
        """
 
278
        return Pipeline(
 
279
            self.connection,
 
280
            transaction,
 
281
            self.encoding,
 
282
            self.errors
 
283
            )
 
284
 
 
285
 
 
286
    #### COMMAND EXECUTION AND PROTOCOL PARSING ####
 
287
    def _execute_command(self, command_name, command, **options):
 
288
        subscription_command = command_name in self.SUBSCRIPTION_COMMANDS
 
289
        if self.subscribed and not subscription_command:
 
290
            raise RedisError("Cannot issue commands other than SUBSCRIBE and "
 
291
                "UNSUBSCRIBE while channels are open")
 
292
        try:
 
293
            self.connection.send(command, self)
 
294
            if subscription_command:
 
295
                return None
 
296
            return self.parse_response(command_name, **options)
 
297
        except ConnectionError:
 
298
            self.connection.disconnect()
 
299
            self.connection.send(command, self)
 
300
            if subscription_command:
 
301
                return None
 
302
            return self.parse_response(command_name, **options)
 
303
 
 
304
    def execute_command(self, *args, **options):
 
305
        "Sends the command to the redis server and returns it's response"
 
306
        cmd_count = len(args)
 
307
        cmds = []
 
308
        for i in args:
 
309
            enc_value = self.encode(i)
 
310
            cmds.append('$%s\r\n%s\r\n' % (len(enc_value), enc_value))
 
311
        return self._execute_command(
 
312
            args[0],
 
313
            '*%s\r\n%s' % (cmd_count, ''.join(cmds)),
 
314
            **options
 
315
            )
 
316
 
 
317
    def _parse_response(self, command_name, catch_errors):
 
318
        conn = self.connection
 
319
        response = conn.read()[:-2] # strip last two characters (\r\n)
 
320
        if not response:
 
321
            self.connection.disconnect()
 
322
            raise ConnectionError("Socket closed on remote end")
 
323
 
 
324
        # server returned a null value
 
325
        if response in ('$-1', '*-1'):
 
326
            return None
 
327
        byte, response = response[0], response[1:]
 
328
 
 
329
        # server returned an error
 
330
        if byte == '-':
 
331
            if response.startswith('ERR '):
 
332
                response = response[4:]
 
333
            raise ResponseError(response)
 
334
        # single value
 
335
        elif byte == '+':
 
336
            return response
 
337
        # int value
 
338
        elif byte == ':':
 
339
            return int(response)
 
340
        # bulk response
 
341
        elif byte == '$':
 
342
            length = int(response)
 
343
            if length == -1:
 
344
                return None
 
345
            response = length and conn.read(length) or ''
 
346
            conn.read(2) # read the \r\n delimiter
 
347
            return response
 
348
        # multi-bulk response
 
349
        elif byte == '*':
 
350
            length = int(response)
 
351
            if length == -1:
 
352
                return None
 
353
            if not catch_errors:
 
354
                return [self._parse_response(command_name, catch_errors)
 
355
                    for i in range(length)]
 
356
            else:
 
357
                # for pipelines, we need to read everything,
 
358
                # including response errors. otherwise we'd
 
359
                # completely mess up the receive buffer
 
360
                data = []
 
361
                for i in range(length):
 
362
                    try:
 
363
                        data.append(
 
364
                            self._parse_response(command_name, catch_errors)
 
365
                            )
 
366
                    except Exception, e:
 
367
                        data.append(e)
 
368
                return data
 
369
 
 
370
        raise InvalidResponse("Unknown response type for: %s" % command_name)
 
371
 
 
372
    def parse_response(self, command_name, catch_errors=False, **options):
 
373
        "Parses a response from the Redis server"
 
374
        response = self._parse_response(command_name, catch_errors)
 
375
        if command_name in self.RESPONSE_CALLBACKS:
 
376
            return self.RESPONSE_CALLBACKS[command_name](response, **options)
 
377
        return response
 
378
 
 
379
    def encode(self, value):
 
380
        "Encode ``value`` using the instance's charset"
 
381
        if isinstance(value, str):
 
382
            return value
 
383
        if isinstance(value, unicode):
 
384
            return value.encode(self.encoding, self.errors)
 
385
        # not a string or unicode, attempt to convert to a string
 
386
        return str(value)
 
387
 
 
388
    #### CONNECTION HANDLING ####
 
389
    def get_connection(self, host, port, db, password, socket_timeout):
 
390
        "Returns a connection object"
 
391
        conn = self.connection_pool.get_connection(
 
392
            host, port, db, password, socket_timeout)
 
393
        # if for whatever reason the connection gets a bad password, make
 
394
        # sure a subsequent attempt with the right password makes its way
 
395
        # to the connection
 
396
        conn.password = password
 
397
        return conn
 
398
 
 
399
    def _setup_connection(self):
 
400
        """
 
401
        After successfully opening a socket to the Redis server, the
 
402
        connection object calls this method to authenticate and select
 
403
        the appropriate database.
 
404
        """
 
405
        if self.connection.password:
 
406
            if not self.execute_command('AUTH', self.connection.password):
 
407
                raise AuthenticationError("Invalid Password")
 
408
        self.execute_command('SELECT', self.connection.db)
 
409
 
 
410
    def select(self, db, host=None, port=None, password=None,
 
411
            socket_timeout=None):
 
412
        """
 
413
        Switch to a different Redis connection.
 
414
 
 
415
        If the host and port aren't provided and there's an existing
 
416
        connection, use the existing connection's host and port instead.
 
417
 
 
418
        Note this method actually replaces the underlying connection object
 
419
        prior to issuing the SELECT command.  This makes sure we protect
 
420
        the thread-safe connections
 
421
        """
 
422
        if host is None:
 
423
            if self.connection is None:
 
424
                raise RedisError("A valid hostname or IP address "
 
425
                    "must be specified")
 
426
            host = self.connection.host
 
427
        if port is None:
 
428
            if self.connection is None:
 
429
                raise RedisError("A valid port must be specified")
 
430
            port = self.connection.port
 
431
 
 
432
        self.connection = self.get_connection(
 
433
            host, port, db, password, socket_timeout)
 
434
 
 
435
 
 
436
    #### SERVER INFORMATION ####
 
437
    def bgrewriteaof(self):
 
438
        "Tell the Redis server to rewrite the AOF file from data in memory."
 
439
        return self.execute_command('BGREWRITEAOF')
 
440
 
 
441
    def bgsave(self):
 
442
        """
 
443
        Tell the Redis server to save its data to disk.  Unlike save(),
 
444
        this method is asynchronous and returns immediately.
 
445
        """
 
446
        return self.execute_command('BGSAVE')
 
447
 
 
448
    def dbsize(self):
 
449
        "Returns the number of keys in the current database"
 
450
        return self.execute_command('DBSIZE')
 
451
 
 
452
    def delete(self, *names):
 
453
        "Delete one or more keys specified by ``names``"
 
454
        return self.execute_command('DEL', *names)
 
455
    __delitem__ = delete
 
456
 
 
457
    def flush(self, all_dbs=False):
 
458
        warnings.warn(DeprecationWarning(
 
459
            "'flush' has been deprecated. "
 
460
            "Use Redis.flushdb() or Redis.flushall() instead"))
 
461
        if all_dbs:
 
462
            return self.flushall()
 
463
        return self.flushdb()
 
464
 
 
465
    def flushall(self):
 
466
        "Delete all keys in all databases on the current host"
 
467
        return self.execute_command('FLUSHALL')
 
468
 
 
469
    def flushdb(self):
 
470
        "Delete all keys in the current database"
 
471
        return self.execute_command('FLUSHDB')
 
472
 
 
473
    def info(self):
 
474
        "Returns a dictionary containing information about the Redis server"
 
475
        return self.execute_command('INFO')
 
476
 
 
477
    def lastsave(self):
 
478
        """
 
479
        Return a Python datetime object representing the last time the
 
480
        Redis database was saved to disk
 
481
        """
 
482
        return self.execute_command('LASTSAVE')
 
483
 
 
484
    def ping(self):
 
485
        "Ping the Redis server"
 
486
        return self.execute_command('PING')
 
487
 
 
488
    def save(self):
 
489
        """
 
490
        Tell the Redis server to save its data to disk,
 
491
        blocking until the save is complete
 
492
        """
 
493
        return self.execute_command('SAVE')
 
494
 
 
495
    #### BASIC KEY COMMANDS ####
 
496
    def append(self, key, value):
 
497
        """
 
498
        Appends the string ``value`` to the value at ``key``. If ``key``
 
499
        doesn't already exist, create it with a value of ``value``.
 
500
        Returns the new length of the value at ``key``.
 
501
        """
 
502
        return self.execute_command('APPEND', key, value)
 
503
 
 
504
    def decr(self, name, amount=1):
 
505
        """
 
506
        Decrements the value of ``key`` by ``amount``.  If no key exists,
 
507
        the value will be initialized as 0 - ``amount``
 
508
        """
 
509
        return self.execute_command('DECRBY', name, amount)
 
510
 
 
511
    def exists(self, name):
 
512
        "Returns a boolean indicating whether key ``name`` exists"
 
513
        return self.execute_command('EXISTS', name)
 
514
    __contains__ = exists
 
515
 
 
516
    def expire(self, name, time):
 
517
        "Set an expire flag on key ``name`` for ``time`` seconds"
 
518
        return self.execute_command('EXPIRE', name, time)
 
519
 
 
520
    def expireat(self, name, when):
 
521
        """
 
522
        Set an expire flag on key ``name``. ``when`` can be represented
 
523
        as an integer indicating unix time or a Python datetime object.
 
524
        """
 
525
        if isinstance(when, datetime.datetime):
 
526
            when = int(time.mktime(when.timetuple()))
 
527
        return self.execute_command('EXPIREAT', name, when)
 
528
 
 
529
    def get(self, name):
 
530
        """
 
531
        Return the value at key ``name``, or None of the key doesn't exist
 
532
        """
 
533
        return self.execute_command('GET', name)
 
534
    __getitem__ = get
 
535
 
 
536
    def getset(self, name, value):
 
537
        """
 
538
        Set the value at key ``name`` to ``value`` if key doesn't exist
 
539
        Return the value at key ``name`` atomically
 
540
        """
 
541
        return self.execute_command('GETSET', name, value)
 
542
 
 
543
    def incr(self, name, amount=1):
 
544
        """
 
545
        Increments the value of ``key`` by ``amount``.  If no key exists,
 
546
        the value will be initialized as ``amount``
 
547
        """
 
548
        return self.execute_command('INCRBY', name, amount)
 
549
 
 
550
    def keys(self, pattern='*'):
 
551
        "Returns a list of keys matching ``pattern``"
 
552
        return self.execute_command('KEYS', pattern)
 
553
 
 
554
    def mget(self, keys, *args):
 
555
        """
 
556
        Returns a list of values ordered identically to ``keys``
 
557
 
 
558
        * Passing *args to this method has been deprecated *
 
559
        """
 
560
        keys = list_or_args('mget', keys, args)
 
561
        return self.execute_command('MGET', *keys)
 
562
 
 
563
    def mset(self, mapping):
 
564
        "Sets each key in the ``mapping`` dict to its corresponding value"
 
565
        items = []
 
566
        [items.extend(pair) for pair in mapping.iteritems()]
 
567
        return self.execute_command('MSET', *items)
 
568
 
 
569
    def msetnx(self, mapping):
 
570
        """
 
571
        Sets each key in the ``mapping`` dict to its corresponding value if
 
572
        none of the keys are already set
 
573
        """
 
574
        items = []
 
575
        [items.extend(pair) for pair in mapping.iteritems()]
 
576
        return self.execute_command('MSETNX', *items)
 
577
 
 
578
    def move(self, name, db):
 
579
        "Moves the key ``name`` to a different Redis database ``db``"
 
580
        return self.execute_command('MOVE', name, db)
 
581
 
 
582
    def randomkey(self):
 
583
        "Returns the name of a random key"
 
584
        return self.execute_command('RANDOMKEY')
 
585
 
 
586
    def rename(self, src, dst, **kwargs):
 
587
        """
 
588
        Rename key ``src`` to ``dst``
 
589
 
 
590
        * The following flags have been deprecated *
 
591
        If ``preserve`` is True, rename the key only if the destination name
 
592
            doesn't already exist
 
593
        """
 
594
        if kwargs:
 
595
            if 'preserve' in kwargs:
 
596
                warnings.warn(DeprecationWarning(
 
597
                    "preserve option to 'rename' is deprecated, "
 
598
                    "use Redis.renamenx instead"))
 
599
                if kwargs['preserve']:
 
600
                    return self.renamenx(src, dst)
 
601
        return self.execute_command('RENAME', src, dst)
 
602
 
 
603
    def renamenx(self, src, dst):
 
604
        "Rename key ``src`` to ``dst`` if ``dst`` doesn't already exist"
 
605
        return self.execute_command('RENAMENX', src, dst)
 
606
 
 
607
 
 
608
    def set(self, name, value, **kwargs):
 
609
        """
 
610
        Set the value at key ``name`` to ``value``
 
611
 
 
612
        * The following flags have been deprecated *
 
613
        If ``preserve`` is True, set the value only if key doesn't already
 
614
        exist
 
615
        If ``getset`` is True, set the value only if key doesn't already exist
 
616
        and return the resulting value of key
 
617
        """
 
618
        if kwargs:
 
619
            if 'getset' in kwargs:
 
620
                warnings.warn(DeprecationWarning(
 
621
                    "getset option to 'set' is deprecated, "
 
622
                    "use Redis.getset() instead"))
 
623
                if kwargs['getset']:
 
624
                    return self.getset(name, value)
 
625
            if 'preserve' in kwargs:
 
626
                warnings.warn(DeprecationWarning(
 
627
                    "preserve option to 'set' is deprecated, "
 
628
                    "use Redis.setnx() instead"))
 
629
                if kwargs['preserve']:
 
630
                    return self.setnx(name, value)
 
631
        return self.execute_command('SET', name, value)
 
632
    __setitem__ = set
 
633
 
 
634
    def setex(self, name, value, time):
 
635
        """
 
636
        Set the value of key ``name`` to ``value``
 
637
        that expires in ``time`` seconds
 
638
        """
 
639
        return self.execute_command('SETEX', name, time, value)
 
640
 
 
641
    def setnx(self, name, value):
 
642
        "Set the value of key ``name`` to ``value`` if key doesn't exist"
 
643
        return self.execute_command('SETNX', name, value)
 
644
 
 
645
    def substr(self, name, start, end=-1):
 
646
        """
 
647
        Return a substring of the string at key ``name``. ``start`` and ``end``
 
648
        are 0-based integers specifying the portion of the string to return.
 
649
        """
 
650
        return self.execute_command('SUBSTR', name, start, end)
 
651
 
 
652
    def ttl(self, name):
 
653
        "Returns the number of seconds until the key ``name`` will expire"
 
654
        return self.execute_command('TTL', name)
 
655
 
 
656
    def type(self, name):
 
657
        "Returns the type of key ``name``"
 
658
        return self.execute_command('TYPE', name)
 
659
 
 
660
 
 
661
    #### LIST COMMANDS ####
 
662
    def blpop(self, keys, timeout=0):
 
663
        """
 
664
        LPOP a value off of the first non-empty list
 
665
        named in the ``keys`` list.
 
666
 
 
667
        If none of the lists in ``keys`` has a value to LPOP, then block
 
668
        for ``timeout`` seconds, or until a value gets pushed on to one
 
669
        of the lists.
 
670
 
 
671
        If timeout is 0, then block indefinitely.
 
672
        """
 
673
        keys = list(keys)
 
674
        keys.append(timeout)
 
675
        return self.execute_command('BLPOP', *keys)
 
676
 
 
677
    def brpop(self, keys, timeout=0):
 
678
        """
 
679
        RPOP a value off of the first non-empty list
 
680
        named in the ``keys`` list.
 
681
 
 
682
        If none of the lists in ``keys`` has a value to LPOP, then block
 
683
        for ``timeout`` seconds, or until a value gets pushed on to one
 
684
        of the lists.
 
685
 
 
686
        If timeout is 0, then block indefinitely.
 
687
        """
 
688
        keys = list(keys)
 
689
        keys.append(timeout)
 
690
        return self.execute_command('BRPOP', *keys)
 
691
 
 
692
    def lindex(self, name, index):
 
693
        """
 
694
        Return the item from list ``name`` at position ``index``
 
695
 
 
696
        Negative indexes are supported and will return an item at the
 
697
        end of the list
 
698
        """
 
699
        return self.execute_command('LINDEX', name, index)
 
700
 
 
701
    def llen(self, name):
 
702
        "Return the length of the list ``name``"
 
703
        return self.execute_command('LLEN', name)
 
704
 
 
705
    def lpop(self, name):
 
706
        "Remove and return the first item of the list ``name``"
 
707
        return self.execute_command('LPOP', name)
 
708
 
 
709
    def lpush(self, name, value):
 
710
        "Push ``value`` onto the head of the list ``name``"
 
711
        return self.execute_command('LPUSH', name, value)
 
712
 
 
713
    def lrange(self, name, start, end):
 
714
        """
 
715
        Return a slice of the list ``name`` between
 
716
        position ``start`` and ``end``
 
717
 
 
718
        ``start`` and ``end`` can be negative numbers just like
 
719
        Python slicing notation
 
720
        """
 
721
        return self.execute_command('LRANGE', name, start, end)
 
722
 
 
723
    def lrem(self, name, value, num=0):
 
724
        """
 
725
        Remove the first ``num`` occurrences of ``value`` from list ``name``
 
726
 
 
727
        If ``num`` is 0, then all occurrences will be removed
 
728
        """
 
729
        return self.execute_command('LREM', name, num, value)
 
730
 
 
731
    def lset(self, name, index, value):
 
732
        "Set ``position`` of list ``name`` to ``value``"
 
733
        return self.execute_command('LSET', name, index, value)
 
734
 
 
735
    def ltrim(self, name, start, end):
 
736
        """
 
737
        Trim the list ``name``, removing all values not within the slice
 
738
        between ``start`` and ``end``
 
739
 
 
740
        ``start`` and ``end`` can be negative numbers just like
 
741
        Python slicing notation
 
742
        """
 
743
        return self.execute_command('LTRIM', name, start, end)
 
744
 
 
745
    def pop(self, name, tail=False):
 
746
        """
 
747
        Pop and return the first or last element of list ``name``
 
748
 
 
749
        * This method has been deprecated,
 
750
          use Redis.lpop or Redis.rpop instead *
 
751
        """
 
752
        warnings.warn(DeprecationWarning(
 
753
            "Redis.pop has been deprecated, "
 
754
            "use Redis.lpop or Redis.rpop instead"))
 
755
        if tail:
 
756
            return self.rpop(name)
 
757
        return self.lpop(name)
 
758
 
 
759
    def push(self, name, value, head=False):
 
760
        """
 
761
        Push ``value`` onto list ``name``.
 
762
 
 
763
        * This method has been deprecated,
 
764
          use Redis.lpush or Redis.rpush instead *
 
765
        """
 
766
        warnings.warn(DeprecationWarning(
 
767
            "Redis.push has been deprecated, "
 
768
            "use Redis.lpush or Redis.rpush instead"))
 
769
        if head:
 
770
            return self.lpush(name, value)
 
771
        return self.rpush(name, value)
 
772
 
 
773
    def rpop(self, name):
 
774
        "Remove and return the last item of the list ``name``"
 
775
        return self.execute_command('RPOP', name)
 
776
 
 
777
    def rpoplpush(self, src, dst):
 
778
        """
 
779
        RPOP a value off of the ``src`` list and atomically LPUSH it
 
780
        on to the ``dst`` list.  Returns the value.
 
781
        """
 
782
        return self.execute_command('RPOPLPUSH', src, dst)
 
783
 
 
784
    def rpush(self, name, value):
 
785
        "Push ``value`` onto the tail of the list ``name``"
 
786
        return self.execute_command('RPUSH', name, value)
 
787
 
 
788
    def sort(self, name, start=None, num=None, by=None, get=None,
 
789
            desc=False, alpha=False, store=None):
 
790
        """
 
791
        Sort and return the list, set or sorted set at ``name``.
 
792
 
 
793
        ``start`` and ``num`` allow for paging through the sorted data
 
794
 
 
795
        ``by`` allows using an external key to weight and sort the items.
 
796
            Use an "*" to indicate where in the key the item value is located
 
797
 
 
798
        ``get`` allows for returning items from external keys rather than the
 
799
            sorted data itself.  Use an "*" to indicate where int he key
 
800
            the item value is located
 
801
 
 
802
        ``desc`` allows for reversing the sort
 
803
 
 
804
        ``alpha`` allows for sorting lexicographically rather than numerically
 
805
 
 
806
        ``store`` allows for storing the result of the sort into
 
807
            the key ``store``
 
808
        """
 
809
        if (start is not None and num is None) or \
 
810
                (num is not None and start is None):
 
811
            raise RedisError("``start`` and ``num`` must both be specified")
 
812
 
 
813
        pieces = [name]
 
814
        if by is not None:
 
815
            pieces.append('BY')
 
816
            pieces.append(by)
 
817
        if start is not None and num is not None:
 
818
            pieces.append('LIMIT')
 
819
            pieces.append(start)
 
820
            pieces.append(num)
 
821
        if get is not None:
 
822
            pieces.append('GET')
 
823
            pieces.append(get)
 
824
        if desc:
 
825
            pieces.append('DESC')
 
826
        if alpha:
 
827
            pieces.append('ALPHA')
 
828
        if store is not None:
 
829
            pieces.append('STORE')
 
830
            pieces.append(store)
 
831
        return self.execute_command('SORT', *pieces)
 
832
 
 
833
 
 
834
    #### SET COMMANDS ####
 
835
    def sadd(self, name, value):
 
836
        "Add ``value`` to set ``name``"
 
837
        return self.execute_command('SADD', name, value)
 
838
 
 
839
    def scard(self, name):
 
840
        "Return the number of elements in set ``name``"
 
841
        return self.execute_command('SCARD', name)
 
842
 
 
843
    def sdiff(self, keys, *args):
 
844
        "Return the difference of sets specified by ``keys``"
 
845
        keys = list_or_args('sdiff', keys, args)
 
846
        return self.execute_command('SDIFF', *keys)
 
847
 
 
848
    def sdiffstore(self, dest, keys, *args):
 
849
        """
 
850
        Store the difference of sets specified by ``keys`` into a new
 
851
        set named ``dest``.  Returns the number of keys in the new set.
 
852
        """
 
853
        keys = list_or_args('sdiffstore', keys, args)
 
854
        return self.execute_command('SDIFFSTORE', dest, *keys)
 
855
 
 
856
    def sinter(self, keys, *args):
 
857
        "Return the intersection of sets specified by ``keys``"
 
858
        keys = list_or_args('sinter', keys, args)
 
859
        return self.execute_command('SINTER', *keys)
 
860
 
 
861
    def sinterstore(self, dest, keys, *args):
 
862
        """
 
863
        Store the intersection of sets specified by ``keys`` into a new
 
864
        set named ``dest``.  Returns the number of keys in the new set.
 
865
        """
 
866
        keys = list_or_args('sinterstore', keys, args)
 
867
        return self.execute_command('SINTERSTORE', dest, *keys)
 
868
 
 
869
    def sismember(self, name, value):
 
870
        "Return a boolean indicating if ``value`` is a member of set ``name``"
 
871
        return self.execute_command('SISMEMBER', name, value)
 
872
 
 
873
    def smembers(self, name):
 
874
        "Return all members of the set ``name``"
 
875
        return self.execute_command('SMEMBERS', name)
 
876
 
 
877
    def smove(self, src, dst, value):
 
878
        "Move ``value`` from set ``src`` to set ``dst`` atomically"
 
879
        return self.execute_command('SMOVE', src, dst, value)
 
880
 
 
881
    def spop(self, name):
 
882
        "Remove and return a random member of set ``name``"
 
883
        return self.execute_command('SPOP', name)
 
884
 
 
885
    def srandmember(self, name):
 
886
        "Return a random member of set ``name``"
 
887
        return self.execute_command('SRANDMEMBER', name)
 
888
 
 
889
    def srem(self, name, value):
 
890
        "Remove ``value`` from set ``name``"
 
891
        return self.execute_command('SREM', name, value)
 
892
 
 
893
    def sunion(self, keys, *args):
 
894
        "Return the union of sets specifiued by ``keys``"
 
895
        keys = list_or_args('sunion', keys, args)
 
896
        return self.execute_command('SUNION', *keys)
 
897
 
 
898
    def sunionstore(self, dest, keys, *args):
 
899
        """
 
900
        Store the union of sets specified by ``keys`` into a new
 
901
        set named ``dest``.  Returns the number of keys in the new set.
 
902
        """
 
903
        keys = list_or_args('sunionstore', keys, args)
 
904
        return self.execute_command('SUNIONSTORE', dest, *keys)
 
905
 
 
906
 
 
907
    #### SORTED SET COMMANDS ####
 
908
    def zadd(self, name, value, score):
 
909
        "Add member ``value`` with score ``score`` to sorted set ``name``"
 
910
        return self.execute_command('ZADD', name, score, value)
 
911
 
 
912
    def zcard(self, name):
 
913
        "Return the number of elements in the sorted set ``name``"
 
914
        return self.execute_command('ZCARD', name)
 
915
 
 
916
    def zincr(self, key, member, value=1):
 
917
        "This has been deprecated, use zincrby instead"
 
918
        warnings.warn(DeprecationWarning(
 
919
            "Redis.zincr has been deprecated, use Redis.zincrby instead"
 
920
            ))
 
921
        return self.zincrby(key, member, value)
 
922
 
 
923
    def zincrby(self, name, value, amount=1):
 
924
        "Increment the score of ``value`` in sorted set ``name`` by ``amount``"
 
925
        return self.execute_command('ZINCRBY', name, amount, value)
 
926
 
 
927
    def zinter(self, dest, keys, aggregate=None):
 
928
        """
 
929
        Intersect multiple sorted sets specified by ``keys`` into
 
930
        a new sorted set, ``dest``. Scores in the destination will be
 
931
        aggregated based on the ``aggregate``, or SUM if none is provided.
 
932
        """
 
933
        return self._zaggregate('ZINTER', dest, keys, aggregate)
 
934
 
 
935
    def zrange(self, name, start, end, desc=False, withscores=False):
 
936
        """
 
937
        Return a range of values from sorted set ``name`` between
 
938
        ``start`` and ``end`` sorted in ascending order.
 
939
 
 
940
        ``start`` and ``end`` can be negative, indicating the end of the range.
 
941
 
 
942
        ``desc`` indicates to sort in descending order.
 
943
 
 
944
        ``withscores`` indicates to return the scores along with the values.
 
945
            The return type is a list of (value, score) pairs
 
946
        """
 
947
        if desc:
 
948
            return self.zrevrange(name, start, end, withscores)
 
949
        pieces = ['ZRANGE', name, start, end]
 
950
        if withscores:
 
951
            pieces.append('withscores')
 
952
        return self.execute_command(*pieces, **{'withscores': withscores})
 
953
 
 
954
    def zrangebyscore(self, name, min, max,
 
955
            start=None, num=None, withscores=False):
 
956
        """
 
957
        Return a range of values from the sorted set ``name`` with scores
 
958
        between ``min`` and ``max``.
 
959
 
 
960
        If ``start`` and ``num`` are specified, then return a slice of the range.
 
961
 
 
962
        ``withscores`` indicates to return the scores along with the values.
 
963
            The return type is a list of (value, score) pairs
 
964
        """
 
965
        if (start is not None and num is None) or \
 
966
                (num is not None and start is None):
 
967
            raise RedisError("``start`` and ``num`` must both be specified")
 
968
        pieces = ['ZRANGEBYSCORE', name, min, max]
 
969
        if start is not None and num is not None:
 
970
            pieces.extend(['LIMIT', start, num])
 
971
        if withscores:
 
972
            pieces.append('withscores')
 
973
        return self.execute_command(*pieces, **{'withscores': withscores})
 
974
 
 
975
    def zrank(self, name, value):
 
976
        """
 
977
        Returns a 0-based value indicating the rank of ``value`` in sorted set
 
978
        ``name``
 
979
        """
 
980
        return self.execute_command('ZRANK', name, value)
 
981
 
 
982
    def zrem(self, name, value):
 
983
        "Remove member ``value`` from sorted set ``name``"
 
984
        return self.execute_command('ZREM', name, value)
 
985
 
 
986
    def zremrangebyscore(self, name, min, max):
 
987
        """
 
988
        Remove all elements in the sorted set ``name`` with scores
 
989
        between ``min`` and ``max``
 
990
        """
 
991
        return self.execute_command('ZREMRANGEBYSCORE', name, min, max)
 
992
 
 
993
    def zrevrange(self, name, start, num, withscores=False):
 
994
        """
 
995
        Return a range of values from sorted set ``name`` between
 
996
        ``start`` and ``num`` sorted in descending order.
 
997
 
 
998
        ``start`` and ``num`` can be negative, indicating the end of the range.
 
999
 
 
1000
        ``withscores`` indicates to return the scores along with the values
 
1001
            as a dictionary of value => score
 
1002
        """
 
1003
        pieces = ['ZREVRANGE', name, start, num]
 
1004
        if withscores:
 
1005
            pieces.append('withscores')
 
1006
        return self.execute_command(*pieces, **{'withscores': withscores})
 
1007
 
 
1008
    def zrevrank(self, name, value):
 
1009
        """
 
1010
        Returns a 0-based value indicating the descending rank of
 
1011
        ``value`` in sorted set ``name``
 
1012
        """
 
1013
        return self.execute_command('ZREVRANK', name, value)
 
1014
 
 
1015
    def zscore(self, name, value):
 
1016
        "Return the score of element ``value`` in sorted set ``name``"
 
1017
        return self.execute_command('ZSCORE', name, value)
 
1018
 
 
1019
    def zunion(self, dest, keys, aggregate=None):
 
1020
        """
 
1021
        Union multiple sorted sets specified by ``keys`` into
 
1022
        a new sorted set, ``dest``. Scores in the destination will be
 
1023
        aggregated based on the ``aggregate``, or SUM if none is provided.
 
1024
        """
 
1025
        return self._zaggregate('ZUNION', dest, keys, aggregate)
 
1026
 
 
1027
    def _zaggregate(self, command, dest, keys, aggregate=None):
 
1028
        pieces = [command, dest, len(keys)]
 
1029
        if isinstance(keys, dict):
 
1030
            items = keys.items()
 
1031
            keys = [i[0] for i in items]
 
1032
            weights = [i[1] for i in items]
 
1033
        else:
 
1034
            weights = None
 
1035
        pieces.extend(keys)
 
1036
        if weights:
 
1037
            pieces.append('WEIGHTS')
 
1038
            pieces.extend(weights)
 
1039
        if aggregate:
 
1040
            pieces.append('AGGREGATE')
 
1041
            pieces.append(aggregate)
 
1042
        return self.execute_command(*pieces)
 
1043
 
 
1044
    #### HASH COMMANDS ####
 
1045
    def hdel(self, name, key):
 
1046
        "Delete ``key`` from hash ``name``"
 
1047
        return self.execute_command('HDEL', name, key)
 
1048
 
 
1049
    def hexists(self, name, key):
 
1050
        "Returns a boolean indicating if ``key`` exists within hash ``name``"
 
1051
        return self.execute_command('HEXISTS', name, key)
 
1052
 
 
1053
    def hget(self, name, key):
 
1054
        "Return the value of ``key`` within the hash ``name``"
 
1055
        return self.execute_command('HGET', name, key)
 
1056
 
 
1057
    def hgetall(self, name):
 
1058
        "Return a Python dict of the hash's name/value pairs"
 
1059
        return self.execute_command('HGETALL', name)
 
1060
 
 
1061
    def hincrby(self, name, key, amount=1):
 
1062
        "Increment the value of ``key`` in hash ``name`` by ``amount``"
 
1063
        return self.execute_command('HINCRBY', name, key, amount)
 
1064
 
 
1065
    def hkeys(self, name):
 
1066
        "Return the list of keys within hash ``name``"
 
1067
        return self.execute_command('HKEYS', name)
 
1068
 
 
1069
    def hlen(self, name):
 
1070
        "Return the number of elements in hash ``name``"
 
1071
        return self.execute_command('HLEN', name)
 
1072
 
 
1073
    def hset(self, name, key, value):
 
1074
        """
 
1075
        Set ``key`` to ``value`` within hash ``name``
 
1076
        Returns 1 if HSET created a new field, otherwise 0
 
1077
        """
 
1078
        return self.execute_command('HSET', name, key, value)
 
1079
 
 
1080
    def hmset(self, name, mapping):
 
1081
        """
 
1082
        Sets each key in the ``mapping`` dict to its corresponding value
 
1083
        in the hash ``name``
 
1084
        """
 
1085
        items = []
 
1086
        [items.extend(pair) for pair in mapping.iteritems()]
 
1087
        return self.execute_command('HMSET', name, *items)
 
1088
 
 
1089
    def hmget(self, name, keys):
 
1090
        "Returns a list of values ordered identically to ``keys``"
 
1091
        return self.execute_command('HMGET', name, *keys)
 
1092
 
 
1093
    def hvals(self, name):
 
1094
        "Return the list of values within hash ``name``"
 
1095
        return self.execute_command('HVALS', name)
 
1096
 
 
1097
 
 
1098
    # channels
 
1099
    def psubscribe(self, patterns):
 
1100
        "Subscribe to all channels matching any pattern in ``patterns``"
 
1101
        if isinstance(patterns, basestring):
 
1102
            patterns = [patterns]
 
1103
        response = self.execute_command('PSUBSCRIBE', *patterns)
 
1104
        # this is *after* the SUBSCRIBE in order to allow for lazy and broken
 
1105
        # connections that need to issue AUTH and SELECT commands
 
1106
        self.subscribed = True
 
1107
        return response
 
1108
 
 
1109
    def punsubscribe(self, patterns=[]):
 
1110
        """
 
1111
        Unsubscribe from any channel matching any pattern in ``patterns``.
 
1112
        If empty, unsubscribe from all channels.
 
1113
        """
 
1114
        if isinstance(patterns, basestring):
 
1115
            patterns = [patterns]
 
1116
        return self.execute_command('PUNSUBSCRIBE', *patterns)
 
1117
 
 
1118
    def subscribe(self, channels):
 
1119
        "Subscribe to ``channels``, waiting for messages to be published"
 
1120
        if isinstance(channels, basestring):
 
1121
            channels = [channels]
 
1122
        response = self.execute_command('SUBSCRIBE', *channels)
 
1123
        # this is *after* the SUBSCRIBE in order to allow for lazy and broken
 
1124
        # connections that need to issue AUTH and SELECT commands
 
1125
        self.subscribed = True
 
1126
        return response
 
1127
 
 
1128
    def unsubscribe(self, channels=[]):
 
1129
        """
 
1130
        Unsubscribe from ``channels``. If empty, unsubscribe
 
1131
        from all channels
 
1132
        """
 
1133
        if isinstance(channels, basestring):
 
1134
            channels = [channels]
 
1135
        return self.execute_command('UNSUBSCRIBE', *channels)
 
1136
 
 
1137
    def publish(self, channel, message):
 
1138
        """
 
1139
        Publish ``message`` on ``channel``.
 
1140
        Returns the number of subscribers the message was delivered to.
 
1141
        """
 
1142
        return self.execute_command('PUBLISH', channel, message)
 
1143
 
 
1144
    def listen(self):
 
1145
        "Listen for messages on channels this client has been subscribed to"
 
1146
        while self.subscribed:
 
1147
            r = self.parse_response('LISTEN')
 
1148
            message_type, channel, message = r[0], r[1], r[2]
 
1149
            yield (message_type, channel, message)
 
1150
            if message_type == 'unsubscribe' and message == 0:
 
1151
                self.subscribed = False
 
1152
 
 
1153
 
 
1154
class Pipeline(Redis):
 
1155
    """
 
1156
    Pipelines provide a way to transmit multiple commands to the Redis server
 
1157
    in one transmission.  This is convenient for batch processing, such as
 
1158
    saving all the values in a list to Redis.
 
1159
 
 
1160
    All commands executed within a pipeline are wrapped with MULTI and EXEC
 
1161
    calls. This guarantees all commands executed in the pipeline will be
 
1162
    executed atomically.
 
1163
 
 
1164
    Any command raising an exception does *not* halt the execution of
 
1165
    subsequent commands in the pipeline. Instead, the exception is caught
 
1166
    and its instance is placed into the response list returned by execute().
 
1167
    Code iterating over the response list should be able to deal with an
 
1168
    instance of an exception as a potential value. In general, these will be
 
1169
    ResponseError exceptions, such as those raised when issuing a command
 
1170
    on a key of a different datatype.
 
1171
    """
 
1172
    def __init__(self, connection, transaction, charset, errors):
 
1173
        self.connection = connection
 
1174
        self.transaction = transaction
 
1175
        self.encoding = charset
 
1176
        self.errors = errors
 
1177
        self.subscribed = False # NOTE not in use, but necessary
 
1178
        self.reset()
 
1179
 
 
1180
    def reset(self):
 
1181
        self.command_stack = []
 
1182
 
 
1183
    def _execute_command(self, command_name, command, **options):
 
1184
        """
 
1185
        Stage a command to be executed when execute() is next called
 
1186
 
 
1187
        Returns the current Pipeline object back so commands can be
 
1188
        chained together, such as:
 
1189
 
 
1190
        pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')
 
1191
 
 
1192
        At some other point, you can then run: pipe.execute(),
 
1193
        which will execute all commands queued in the pipe.
 
1194
        """
 
1195
        # if the command_name is 'AUTH' or 'SELECT', then this command
 
1196
        # must have originated after a socket connection and a call to
 
1197
        # _setup_connection(). run these commands immediately without
 
1198
        # buffering them.
 
1199
        if command_name in ('AUTH', 'SELECT'):
 
1200
            return super(Pipeline, self)._execute_command(
 
1201
                command_name, command, **options)
 
1202
        else:
 
1203
            self.command_stack.append((command_name, command, options))
 
1204
        return self
 
1205
 
 
1206
    def _execute_transaction(self, commands):
 
1207
        # wrap the commands in MULTI ... EXEC statements to indicate an
 
1208
        # atomic operation
 
1209
        all_cmds = ''.join([c for _1, c, _2 in chain(
 
1210
            (('', 'MULTI\r\n', ''),),
 
1211
            commands,
 
1212
            (('', 'EXEC\r\n', ''),)
 
1213
            )])
 
1214
        self.connection.send(all_cmds, self)
 
1215
        # parse off the response for MULTI and all commands prior to EXEC
 
1216
        for i in range(len(commands)+1):
 
1217
            _ = self.parse_response('_')
 
1218
        # parse the EXEC. we want errors returned as items in the response
 
1219
        response = self.parse_response('_', catch_errors=True)
 
1220
        if len(response) != len(commands):
 
1221
            raise ResponseError("Wrong number of response items from "
 
1222
                "pipeline execution")
 
1223
        # Run any callbacks for the commands run in the pipeline
 
1224
        data = []
 
1225
        for r, cmd in zip(response, commands):
 
1226
            if not isinstance(r, Exception):
 
1227
                if cmd[0] in self.RESPONSE_CALLBACKS:
 
1228
                    r = self.RESPONSE_CALLBACKS[cmd[0]](r, **cmd[2])
 
1229
            data.append(r)
 
1230
        return data
 
1231
 
 
1232
    def _execute_pipeline(self, commands):
 
1233
        # build up all commands into a single request to increase network perf
 
1234
        all_cmds = ''.join([c for _1, c, _2 in commands])
 
1235
        self.connection.send(all_cmds, self)
 
1236
        data = []
 
1237
        for command_name, _, options in commands:
 
1238
            data.append(
 
1239
                self.parse_response(command_name, catch_errors=True, **options)
 
1240
                )
 
1241
        return data
 
1242
 
 
1243
    def execute(self):
 
1244
        "Execute all the commands in the current pipeline"
 
1245
        stack = self.command_stack
 
1246
        self.reset()
 
1247
        if self.transaction:
 
1248
            execute = self._execute_transaction
 
1249
        else:
 
1250
            execute = self._execute_pipeline
 
1251
        try:
 
1252
            return execute(stack)
 
1253
        except ConnectionError:
 
1254
            self.connection.disconnect()
 
1255
            return execute(stack)
 
1256
 
 
1257
    def select(self, *args, **kwargs):
 
1258
        raise RedisError("Cannot select a different database from a pipeline")
 
1259