7
from itertools import chain
8
from redis.exceptions import ConnectionError, ResponseError, InvalidResponse
9
from redis.exceptions import RedisError, AuthenticationError
12
class ConnectionPool(threading.local):
13
"Manages a list of connections on the local thread"
17
def make_connection_key(self, host, port, db):
18
"Create a unique key for the specified host, port and db"
19
return '%s:%s:%s' % (host, port, db)
21
def get_connection(self, host, port, db, password, socket_timeout):
22
"Return a specific connection for the specified host, port and db"
23
key = self.make_connection_key(host, port, db)
24
if key not in self.connections:
25
self.connections[key] = Connection(
26
host, port, db, password, socket_timeout)
27
return self.connections[key]
29
def get_all_connections(self):
30
"Return a list of all connection objects the manager knows about"
31
return self.connections.values()
34
class Connection(object):
35
"Manages TCP communication to and from a Redis server"
36
def __init__(self, host='localhost', port=6379, db=0, password=None,
41
self.password = password
42
self.socket_timeout = socket_timeout
46
def connect(self, redis_instance):
47
"Connects to the Redis server if not already connected"
51
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
52
sock.connect((self.host, self.port))
53
except socket.error, e:
54
# args for socket.error can either be (errno, "message")
57
error_message = "Error connecting to %s:%s. %s." % \
58
(self.host, self.port, e.args[0])
60
error_message = "Error %s connecting %s:%s. %s." % \
61
(e.args[0], self.host, self.port, e.args[1])
62
raise ConnectionError(error_message)
63
sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
64
sock.settimeout(self.socket_timeout)
66
self._fp = sock.makefile('r')
67
redis_instance._setup_connection()
70
"Disconnects from the Redis server"
71
if self._sock is None:
80
def send(self, command, redis_instance):
81
"Send ``command`` to the Redis server. Return the result."
82
self.connect(redis_instance)
84
self._sock.sendall(command)
85
except socket.error, e:
86
if e.args[0] == errno.EPIPE:
88
raise ConnectionError("Error %s while writing to socket. %s." % \
91
def read(self, length=None):
93
Read a line from the socket is length is None,
94
otherwise read ``length`` bytes
97
if length is not None:
98
return self._fp.read(length)
99
return self._fp.readline()
100
except socket.error, e:
102
if e.args and e.args[0] == errno.EAGAIN:
103
raise ConnectionError("Error while reading from socket: %s" % \
107
def list_or_args(command, keys, args):
108
# returns a single list combining keys and args
109
# if keys is not a list or args has items, issue a
110
# deprecation warning
114
# a string can be iterated, but indicates
115
# keys wasn't passed as a list
116
if isinstance(keys, basestring):
122
warnings.warn(DeprecationWarning(
123
"Passing *args to Redis.%s has been deprecated. "
124
"Pass an iterable to ``keys`` instead" % command
129
def timestamp_to_datetime(response):
130
"Converts a unix timestamp to a Python datetime object"
134
response = int(response)
137
return datetime.datetime.fromtimestamp(response)
139
def string_keys_to_dict(key_string, callback):
140
return dict([(key, callback) for key in key_string.split()])
142
def dict_merge(*dicts):
144
[merged.update(d) for d in dicts]
147
def parse_info(response):
148
"Parse the result of Redis's INFO command into a Python dict"
150
def get_value(value):
154
for item in value.split(','):
155
k, v = item.split('=')
161
for line in response.splitlines():
162
key, value = line.split(':')
164
info[key] = int(value)
166
info[key] = get_value(value)
169
def pairs_to_dict(response):
170
"Create a dict given a list of key/value pairs"
171
return dict(zip(response[::2], response[1::2]))
173
def zset_score_pairs(response, **options):
175
If ``withscores`` is specified in the options, return the response as
176
a list of (value, score) pairs
178
if not response or not options['withscores']:
180
return zip(response[::2], map(float, response[1::2]))
182
def int_or_none(response):
187
def float_or_none(response):
190
return float(response)
193
class Redis(threading.local):
195
Implementation of the Redis protocol.
197
This abstract class provides a Python interface to all Redis commands
198
and an implementation of the Redis protocol.
200
Connection and Pipeline derive from this, implementing how
201
the commands are sent and received to the Redis server
203
RESPONSE_CALLBACKS = dict_merge(
205
'AUTH DEL EXISTS EXPIRE EXPIREAT HDEL HEXISTS HMSET MOVE MSETNX '
206
'RENAMENX SADD SISMEMBER SMOVE SETEX SETNX SREM ZADD ZREM',
210
'DECRBY HLEN INCRBY LLEN SCARD SDIFFSTORE SINTERSTORE '
211
'SUNIONSTORE ZCARD ZREMRANGEBYSCORE ZREVRANK',
215
# these return OK, or int if redis-server is >=1.3.4
217
lambda r: isinstance(r, int) and r or r == 'OK'
219
string_keys_to_dict('ZSCORE ZINCRBY', float_or_none),
221
'FLUSHALL FLUSHDB LSET LTRIM MSET RENAME '
222
'SAVE SELECT SET SHUTDOWN',
225
string_keys_to_dict('SDIFF SINTER SMEMBERS SUNION',
228
string_keys_to_dict('ZRANGE ZRANGEBYSCORE ZREVRANGE', zset_score_pairs),
230
'BGREWRITEAOF': lambda r: \
231
r == 'Background rewriting of AOF file started',
232
'BGSAVE': lambda r: r == 'Background saving started',
233
'HGETALL': lambda r: r and pairs_to_dict(r) or {},
235
'LASTSAVE': timestamp_to_datetime,
236
'PING': lambda r: r == 'PONG',
237
'RANDOMKEY': lambda r: r and r or None,
238
'TTL': lambda r: r != -1 and r or None,
239
'ZRANK': int_or_none,
243
# commands that should NOT pull data off the network buffer when executed
244
SUBSCRIPTION_COMMANDS = set(['SUBSCRIBE', 'UNSUBSCRIBE'])
246
def __init__(self, host='localhost', port=6379,
247
db=0, password=None, socket_timeout=None,
248
connection_pool=None,
249
charset='utf-8', errors='strict'):
250
self.encoding = charset
252
self.connection = None
253
self.subscribed = False
254
self.connection_pool = connection_pool and connection_pool or ConnectionPool()
255
self.select(db, host, port, password, socket_timeout)
257
#### Legacty accessors of connection information ####
259
return self.connection.host
260
host = property(_get_host)
263
return self.connection.port
264
port = property(_get_port)
267
return self.connection.db
268
db = property(_get_db)
270
def pipeline(self, transaction=True):
272
Return a new pipeline object that can queue multiple commands for
273
later execution. ``transaction`` indicates whether all commands
274
should be executed atomically. Apart from multiple atomic operations,
275
pipelines are useful for batch loading of data as they reduce the
276
number of back and forth network operations between client and server.
286
#### COMMAND EXECUTION AND PROTOCOL PARSING ####
287
def _execute_command(self, command_name, command, **options):
288
subscription_command = command_name in self.SUBSCRIPTION_COMMANDS
289
if self.subscribed and not subscription_command:
290
raise RedisError("Cannot issue commands other than SUBSCRIBE and "
291
"UNSUBSCRIBE while channels are open")
293
self.connection.send(command, self)
294
if subscription_command:
296
return self.parse_response(command_name, **options)
297
except ConnectionError:
298
self.connection.disconnect()
299
self.connection.send(command, self)
300
if subscription_command:
302
return self.parse_response(command_name, **options)
304
def execute_command(self, *args, **options):
305
"Sends the command to the redis server and returns it's response"
306
cmd_count = len(args)
309
enc_value = self.encode(i)
310
cmds.append('$%s\r\n%s\r\n' % (len(enc_value), enc_value))
311
return self._execute_command(
313
'*%s\r\n%s' % (cmd_count, ''.join(cmds)),
317
def _parse_response(self, command_name, catch_errors):
318
conn = self.connection
319
response = conn.read()[:-2] # strip last two characters (\r\n)
321
self.connection.disconnect()
322
raise ConnectionError("Socket closed on remote end")
324
# server returned a null value
325
if response in ('$-1', '*-1'):
327
byte, response = response[0], response[1:]
329
# server returned an error
331
if response.startswith('ERR '):
332
response = response[4:]
333
raise ResponseError(response)
342
length = int(response)
345
response = length and conn.read(length) or ''
346
conn.read(2) # read the \r\n delimiter
348
# multi-bulk response
350
length = int(response)
354
return [self._parse_response(command_name, catch_errors)
355
for i in range(length)]
357
# for pipelines, we need to read everything,
358
# including response errors. otherwise we'd
359
# completely mess up the receive buffer
361
for i in range(length):
364
self._parse_response(command_name, catch_errors)
370
raise InvalidResponse("Unknown response type for: %s" % command_name)
372
def parse_response(self, command_name, catch_errors=False, **options):
373
"Parses a response from the Redis server"
374
response = self._parse_response(command_name, catch_errors)
375
if command_name in self.RESPONSE_CALLBACKS:
376
return self.RESPONSE_CALLBACKS[command_name](response, **options)
379
def encode(self, value):
380
"Encode ``value`` using the instance's charset"
381
if isinstance(value, str):
383
if isinstance(value, unicode):
384
return value.encode(self.encoding, self.errors)
385
# not a string or unicode, attempt to convert to a string
388
#### CONNECTION HANDLING ####
389
def get_connection(self, host, port, db, password, socket_timeout):
390
"Returns a connection object"
391
conn = self.connection_pool.get_connection(
392
host, port, db, password, socket_timeout)
393
# if for whatever reason the connection gets a bad password, make
394
# sure a subsequent attempt with the right password makes its way
396
conn.password = password
399
def _setup_connection(self):
401
After successfully opening a socket to the Redis server, the
402
connection object calls this method to authenticate and select
403
the appropriate database.
405
if self.connection.password:
406
if not self.execute_command('AUTH', self.connection.password):
407
raise AuthenticationError("Invalid Password")
408
self.execute_command('SELECT', self.connection.db)
410
def select(self, db, host=None, port=None, password=None,
411
socket_timeout=None):
413
Switch to a different Redis connection.
415
If the host and port aren't provided and there's an existing
416
connection, use the existing connection's host and port instead.
418
Note this method actually replaces the underlying connection object
419
prior to issuing the SELECT command. This makes sure we protect
420
the thread-safe connections
423
if self.connection is None:
424
raise RedisError("A valid hostname or IP address "
426
host = self.connection.host
428
if self.connection is None:
429
raise RedisError("A valid port must be specified")
430
port = self.connection.port
432
self.connection = self.get_connection(
433
host, port, db, password, socket_timeout)
436
#### SERVER INFORMATION ####
437
def bgrewriteaof(self):
438
"Tell the Redis server to rewrite the AOF file from data in memory."
439
return self.execute_command('BGREWRITEAOF')
443
Tell the Redis server to save its data to disk. Unlike save(),
444
this method is asynchronous and returns immediately.
446
return self.execute_command('BGSAVE')
449
"Returns the number of keys in the current database"
450
return self.execute_command('DBSIZE')
452
def delete(self, *names):
453
"Delete one or more keys specified by ``names``"
454
return self.execute_command('DEL', *names)
457
def flush(self, all_dbs=False):
458
warnings.warn(DeprecationWarning(
459
"'flush' has been deprecated. "
460
"Use Redis.flushdb() or Redis.flushall() instead"))
462
return self.flushall()
463
return self.flushdb()
466
"Delete all keys in all databases on the current host"
467
return self.execute_command('FLUSHALL')
470
"Delete all keys in the current database"
471
return self.execute_command('FLUSHDB')
474
"Returns a dictionary containing information about the Redis server"
475
return self.execute_command('INFO')
479
Return a Python datetime object representing the last time the
480
Redis database was saved to disk
482
return self.execute_command('LASTSAVE')
485
"Ping the Redis server"
486
return self.execute_command('PING')
490
Tell the Redis server to save its data to disk,
491
blocking until the save is complete
493
return self.execute_command('SAVE')
495
#### BASIC KEY COMMANDS ####
496
def append(self, key, value):
498
Appends the string ``value`` to the value at ``key``. If ``key``
499
doesn't already exist, create it with a value of ``value``.
500
Returns the new length of the value at ``key``.
502
return self.execute_command('APPEND', key, value)
504
def decr(self, name, amount=1):
506
Decrements the value of ``key`` by ``amount``. If no key exists,
507
the value will be initialized as 0 - ``amount``
509
return self.execute_command('DECRBY', name, amount)
511
def exists(self, name):
512
"Returns a boolean indicating whether key ``name`` exists"
513
return self.execute_command('EXISTS', name)
514
__contains__ = exists
516
def expire(self, name, time):
517
"Set an expire flag on key ``name`` for ``time`` seconds"
518
return self.execute_command('EXPIRE', name, time)
520
def expireat(self, name, when):
522
Set an expire flag on key ``name``. ``when`` can be represented
523
as an integer indicating unix time or a Python datetime object.
525
if isinstance(when, datetime.datetime):
526
when = int(time.mktime(when.timetuple()))
527
return self.execute_command('EXPIREAT', name, when)
531
Return the value at key ``name``, or None of the key doesn't exist
533
return self.execute_command('GET', name)
536
def getset(self, name, value):
538
Set the value at key ``name`` to ``value`` if key doesn't exist
539
Return the value at key ``name`` atomically
541
return self.execute_command('GETSET', name, value)
543
def incr(self, name, amount=1):
545
Increments the value of ``key`` by ``amount``. If no key exists,
546
the value will be initialized as ``amount``
548
return self.execute_command('INCRBY', name, amount)
550
def keys(self, pattern='*'):
551
"Returns a list of keys matching ``pattern``"
552
return self.execute_command('KEYS', pattern)
554
def mget(self, keys, *args):
556
Returns a list of values ordered identically to ``keys``
558
* Passing *args to this method has been deprecated *
560
keys = list_or_args('mget', keys, args)
561
return self.execute_command('MGET', *keys)
563
def mset(self, mapping):
564
"Sets each key in the ``mapping`` dict to its corresponding value"
566
[items.extend(pair) for pair in mapping.iteritems()]
567
return self.execute_command('MSET', *items)
569
def msetnx(self, mapping):
571
Sets each key in the ``mapping`` dict to its corresponding value if
572
none of the keys are already set
575
[items.extend(pair) for pair in mapping.iteritems()]
576
return self.execute_command('MSETNX', *items)
578
def move(self, name, db):
579
"Moves the key ``name`` to a different Redis database ``db``"
580
return self.execute_command('MOVE', name, db)
583
"Returns the name of a random key"
584
return self.execute_command('RANDOMKEY')
586
def rename(self, src, dst, **kwargs):
588
Rename key ``src`` to ``dst``
590
* The following flags have been deprecated *
591
If ``preserve`` is True, rename the key only if the destination name
592
doesn't already exist
595
if 'preserve' in kwargs:
596
warnings.warn(DeprecationWarning(
597
"preserve option to 'rename' is deprecated, "
598
"use Redis.renamenx instead"))
599
if kwargs['preserve']:
600
return self.renamenx(src, dst)
601
return self.execute_command('RENAME', src, dst)
603
def renamenx(self, src, dst):
604
"Rename key ``src`` to ``dst`` if ``dst`` doesn't already exist"
605
return self.execute_command('RENAMENX', src, dst)
608
def set(self, name, value, **kwargs):
610
Set the value at key ``name`` to ``value``
612
* The following flags have been deprecated *
613
If ``preserve`` is True, set the value only if key doesn't already
615
If ``getset`` is True, set the value only if key doesn't already exist
616
and return the resulting value of key
619
if 'getset' in kwargs:
620
warnings.warn(DeprecationWarning(
621
"getset option to 'set' is deprecated, "
622
"use Redis.getset() instead"))
624
return self.getset(name, value)
625
if 'preserve' in kwargs:
626
warnings.warn(DeprecationWarning(
627
"preserve option to 'set' is deprecated, "
628
"use Redis.setnx() instead"))
629
if kwargs['preserve']:
630
return self.setnx(name, value)
631
return self.execute_command('SET', name, value)
634
def setex(self, name, value, time):
636
Set the value of key ``name`` to ``value``
637
that expires in ``time`` seconds
639
return self.execute_command('SETEX', name, time, value)
641
def setnx(self, name, value):
642
"Set the value of key ``name`` to ``value`` if key doesn't exist"
643
return self.execute_command('SETNX', name, value)
645
def substr(self, name, start, end=-1):
647
Return a substring of the string at key ``name``. ``start`` and ``end``
648
are 0-based integers specifying the portion of the string to return.
650
return self.execute_command('SUBSTR', name, start, end)
653
"Returns the number of seconds until the key ``name`` will expire"
654
return self.execute_command('TTL', name)
656
def type(self, name):
657
"Returns the type of key ``name``"
658
return self.execute_command('TYPE', name)
661
#### LIST COMMANDS ####
662
def blpop(self, keys, timeout=0):
664
LPOP a value off of the first non-empty list
665
named in the ``keys`` list.
667
If none of the lists in ``keys`` has a value to LPOP, then block
668
for ``timeout`` seconds, or until a value gets pushed on to one
671
If timeout is 0, then block indefinitely.
675
return self.execute_command('BLPOP', *keys)
677
def brpop(self, keys, timeout=0):
679
RPOP a value off of the first non-empty list
680
named in the ``keys`` list.
682
If none of the lists in ``keys`` has a value to LPOP, then block
683
for ``timeout`` seconds, or until a value gets pushed on to one
686
If timeout is 0, then block indefinitely.
690
return self.execute_command('BRPOP', *keys)
692
def lindex(self, name, index):
694
Return the item from list ``name`` at position ``index``
696
Negative indexes are supported and will return an item at the
699
return self.execute_command('LINDEX', name, index)
701
def llen(self, name):
702
"Return the length of the list ``name``"
703
return self.execute_command('LLEN', name)
705
def lpop(self, name):
706
"Remove and return the first item of the list ``name``"
707
return self.execute_command('LPOP', name)
709
def lpush(self, name, value):
710
"Push ``value`` onto the head of the list ``name``"
711
return self.execute_command('LPUSH', name, value)
713
def lrange(self, name, start, end):
715
Return a slice of the list ``name`` between
716
position ``start`` and ``end``
718
``start`` and ``end`` can be negative numbers just like
719
Python slicing notation
721
return self.execute_command('LRANGE', name, start, end)
723
def lrem(self, name, value, num=0):
725
Remove the first ``num`` occurrences of ``value`` from list ``name``
727
If ``num`` is 0, then all occurrences will be removed
729
return self.execute_command('LREM', name, num, value)
731
def lset(self, name, index, value):
732
"Set ``position`` of list ``name`` to ``value``"
733
return self.execute_command('LSET', name, index, value)
735
def ltrim(self, name, start, end):
737
Trim the list ``name``, removing all values not within the slice
738
between ``start`` and ``end``
740
``start`` and ``end`` can be negative numbers just like
741
Python slicing notation
743
return self.execute_command('LTRIM', name, start, end)
745
def pop(self, name, tail=False):
747
Pop and return the first or last element of list ``name``
749
* This method has been deprecated,
750
use Redis.lpop or Redis.rpop instead *
752
warnings.warn(DeprecationWarning(
753
"Redis.pop has been deprecated, "
754
"use Redis.lpop or Redis.rpop instead"))
756
return self.rpop(name)
757
return self.lpop(name)
759
def push(self, name, value, head=False):
761
Push ``value`` onto list ``name``.
763
* This method has been deprecated,
764
use Redis.lpush or Redis.rpush instead *
766
warnings.warn(DeprecationWarning(
767
"Redis.push has been deprecated, "
768
"use Redis.lpush or Redis.rpush instead"))
770
return self.lpush(name, value)
771
return self.rpush(name, value)
773
def rpop(self, name):
774
"Remove and return the last item of the list ``name``"
775
return self.execute_command('RPOP', name)
777
def rpoplpush(self, src, dst):
779
RPOP a value off of the ``src`` list and atomically LPUSH it
780
on to the ``dst`` list. Returns the value.
782
return self.execute_command('RPOPLPUSH', src, dst)
784
def rpush(self, name, value):
785
"Push ``value`` onto the tail of the list ``name``"
786
return self.execute_command('RPUSH', name, value)
788
def sort(self, name, start=None, num=None, by=None, get=None,
789
desc=False, alpha=False, store=None):
791
Sort and return the list, set or sorted set at ``name``.
793
``start`` and ``num`` allow for paging through the sorted data
795
``by`` allows using an external key to weight and sort the items.
796
Use an "*" to indicate where in the key the item value is located
798
``get`` allows for returning items from external keys rather than the
799
sorted data itself. Use an "*" to indicate where int he key
800
the item value is located
802
``desc`` allows for reversing the sort
804
``alpha`` allows for sorting lexicographically rather than numerically
806
``store`` allows for storing the result of the sort into
809
if (start is not None and num is None) or \
810
(num is not None and start is None):
811
raise RedisError("``start`` and ``num`` must both be specified")
817
if start is not None and num is not None:
818
pieces.append('LIMIT')
825
pieces.append('DESC')
827
pieces.append('ALPHA')
828
if store is not None:
829
pieces.append('STORE')
831
return self.execute_command('SORT', *pieces)
834
#### SET COMMANDS ####
835
def sadd(self, name, value):
836
"Add ``value`` to set ``name``"
837
return self.execute_command('SADD', name, value)
839
def scard(self, name):
840
"Return the number of elements in set ``name``"
841
return self.execute_command('SCARD', name)
843
def sdiff(self, keys, *args):
844
"Return the difference of sets specified by ``keys``"
845
keys = list_or_args('sdiff', keys, args)
846
return self.execute_command('SDIFF', *keys)
848
def sdiffstore(self, dest, keys, *args):
850
Store the difference of sets specified by ``keys`` into a new
851
set named ``dest``. Returns the number of keys in the new set.
853
keys = list_or_args('sdiffstore', keys, args)
854
return self.execute_command('SDIFFSTORE', dest, *keys)
856
def sinter(self, keys, *args):
857
"Return the intersection of sets specified by ``keys``"
858
keys = list_or_args('sinter', keys, args)
859
return self.execute_command('SINTER', *keys)
861
def sinterstore(self, dest, keys, *args):
863
Store the intersection of sets specified by ``keys`` into a new
864
set named ``dest``. Returns the number of keys in the new set.
866
keys = list_or_args('sinterstore', keys, args)
867
return self.execute_command('SINTERSTORE', dest, *keys)
869
def sismember(self, name, value):
870
"Return a boolean indicating if ``value`` is a member of set ``name``"
871
return self.execute_command('SISMEMBER', name, value)
873
def smembers(self, name):
874
"Return all members of the set ``name``"
875
return self.execute_command('SMEMBERS', name)
877
def smove(self, src, dst, value):
878
"Move ``value`` from set ``src`` to set ``dst`` atomically"
879
return self.execute_command('SMOVE', src, dst, value)
881
def spop(self, name):
882
"Remove and return a random member of set ``name``"
883
return self.execute_command('SPOP', name)
885
def srandmember(self, name):
886
"Return a random member of set ``name``"
887
return self.execute_command('SRANDMEMBER', name)
889
def srem(self, name, value):
890
"Remove ``value`` from set ``name``"
891
return self.execute_command('SREM', name, value)
893
def sunion(self, keys, *args):
894
"Return the union of sets specifiued by ``keys``"
895
keys = list_or_args('sunion', keys, args)
896
return self.execute_command('SUNION', *keys)
898
def sunionstore(self, dest, keys, *args):
900
Store the union of sets specified by ``keys`` into a new
901
set named ``dest``. Returns the number of keys in the new set.
903
keys = list_or_args('sunionstore', keys, args)
904
return self.execute_command('SUNIONSTORE', dest, *keys)
907
#### SORTED SET COMMANDS ####
908
def zadd(self, name, value, score):
909
"Add member ``value`` with score ``score`` to sorted set ``name``"
910
return self.execute_command('ZADD', name, score, value)
912
def zcard(self, name):
913
"Return the number of elements in the sorted set ``name``"
914
return self.execute_command('ZCARD', name)
916
def zincr(self, key, member, value=1):
917
"This has been deprecated, use zincrby instead"
918
warnings.warn(DeprecationWarning(
919
"Redis.zincr has been deprecated, use Redis.zincrby instead"
921
return self.zincrby(key, member, value)
923
def zincrby(self, name, value, amount=1):
924
"Increment the score of ``value`` in sorted set ``name`` by ``amount``"
925
return self.execute_command('ZINCRBY', name, amount, value)
927
def zinter(self, dest, keys, aggregate=None):
929
Intersect multiple sorted sets specified by ``keys`` into
930
a new sorted set, ``dest``. Scores in the destination will be
931
aggregated based on the ``aggregate``, or SUM if none is provided.
933
return self._zaggregate('ZINTER', dest, keys, aggregate)
935
def zrange(self, name, start, end, desc=False, withscores=False):
937
Return a range of values from sorted set ``name`` between
938
``start`` and ``end`` sorted in ascending order.
940
``start`` and ``end`` can be negative, indicating the end of the range.
942
``desc`` indicates to sort in descending order.
944
``withscores`` indicates to return the scores along with the values.
945
The return type is a list of (value, score) pairs
948
return self.zrevrange(name, start, end, withscores)
949
pieces = ['ZRANGE', name, start, end]
951
pieces.append('withscores')
952
return self.execute_command(*pieces, **{'withscores': withscores})
954
def zrangebyscore(self, name, min, max,
955
start=None, num=None, withscores=False):
957
Return a range of values from the sorted set ``name`` with scores
958
between ``min`` and ``max``.
960
If ``start`` and ``num`` are specified, then return a slice of the range.
962
``withscores`` indicates to return the scores along with the values.
963
The return type is a list of (value, score) pairs
965
if (start is not None and num is None) or \
966
(num is not None and start is None):
967
raise RedisError("``start`` and ``num`` must both be specified")
968
pieces = ['ZRANGEBYSCORE', name, min, max]
969
if start is not None and num is not None:
970
pieces.extend(['LIMIT', start, num])
972
pieces.append('withscores')
973
return self.execute_command(*pieces, **{'withscores': withscores})
975
def zrank(self, name, value):
977
Returns a 0-based value indicating the rank of ``value`` in sorted set
980
return self.execute_command('ZRANK', name, value)
982
def zrem(self, name, value):
983
"Remove member ``value`` from sorted set ``name``"
984
return self.execute_command('ZREM', name, value)
986
def zremrangebyscore(self, name, min, max):
988
Remove all elements in the sorted set ``name`` with scores
989
between ``min`` and ``max``
991
return self.execute_command('ZREMRANGEBYSCORE', name, min, max)
993
def zrevrange(self, name, start, num, withscores=False):
995
Return a range of values from sorted set ``name`` between
996
``start`` and ``num`` sorted in descending order.
998
``start`` and ``num`` can be negative, indicating the end of the range.
1000
``withscores`` indicates to return the scores along with the values
1001
as a dictionary of value => score
1003
pieces = ['ZREVRANGE', name, start, num]
1005
pieces.append('withscores')
1006
return self.execute_command(*pieces, **{'withscores': withscores})
1008
def zrevrank(self, name, value):
1010
Returns a 0-based value indicating the descending rank of
1011
``value`` in sorted set ``name``
1013
return self.execute_command('ZREVRANK', name, value)
1015
def zscore(self, name, value):
1016
"Return the score of element ``value`` in sorted set ``name``"
1017
return self.execute_command('ZSCORE', name, value)
1019
def zunion(self, dest, keys, aggregate=None):
1021
Union multiple sorted sets specified by ``keys`` into
1022
a new sorted set, ``dest``. Scores in the destination will be
1023
aggregated based on the ``aggregate``, or SUM if none is provided.
1025
return self._zaggregate('ZUNION', dest, keys, aggregate)
1027
def _zaggregate(self, command, dest, keys, aggregate=None):
1028
pieces = [command, dest, len(keys)]
1029
if isinstance(keys, dict):
1030
items = keys.items()
1031
keys = [i[0] for i in items]
1032
weights = [i[1] for i in items]
1037
pieces.append('WEIGHTS')
1038
pieces.extend(weights)
1040
pieces.append('AGGREGATE')
1041
pieces.append(aggregate)
1042
return self.execute_command(*pieces)
1044
#### HASH COMMANDS ####
1045
def hdel(self, name, key):
1046
"Delete ``key`` from hash ``name``"
1047
return self.execute_command('HDEL', name, key)
1049
def hexists(self, name, key):
1050
"Returns a boolean indicating if ``key`` exists within hash ``name``"
1051
return self.execute_command('HEXISTS', name, key)
1053
def hget(self, name, key):
1054
"Return the value of ``key`` within the hash ``name``"
1055
return self.execute_command('HGET', name, key)
1057
def hgetall(self, name):
1058
"Return a Python dict of the hash's name/value pairs"
1059
return self.execute_command('HGETALL', name)
1061
def hincrby(self, name, key, amount=1):
1062
"Increment the value of ``key`` in hash ``name`` by ``amount``"
1063
return self.execute_command('HINCRBY', name, key, amount)
1065
def hkeys(self, name):
1066
"Return the list of keys within hash ``name``"
1067
return self.execute_command('HKEYS', name)
1069
def hlen(self, name):
1070
"Return the number of elements in hash ``name``"
1071
return self.execute_command('HLEN', name)
1073
def hset(self, name, key, value):
1075
Set ``key`` to ``value`` within hash ``name``
1076
Returns 1 if HSET created a new field, otherwise 0
1078
return self.execute_command('HSET', name, key, value)
1080
def hmset(self, name, mapping):
1082
Sets each key in the ``mapping`` dict to its corresponding value
1083
in the hash ``name``
1086
[items.extend(pair) for pair in mapping.iteritems()]
1087
return self.execute_command('HMSET', name, *items)
1089
def hmget(self, name, keys):
1090
"Returns a list of values ordered identically to ``keys``"
1091
return self.execute_command('HMGET', name, *keys)
1093
def hvals(self, name):
1094
"Return the list of values within hash ``name``"
1095
return self.execute_command('HVALS', name)
1099
def psubscribe(self, patterns):
1100
"Subscribe to all channels matching any pattern in ``patterns``"
1101
if isinstance(patterns, basestring):
1102
patterns = [patterns]
1103
response = self.execute_command('PSUBSCRIBE', *patterns)
1104
# this is *after* the SUBSCRIBE in order to allow for lazy and broken
1105
# connections that need to issue AUTH and SELECT commands
1106
self.subscribed = True
1109
def punsubscribe(self, patterns=[]):
1111
Unsubscribe from any channel matching any pattern in ``patterns``.
1112
If empty, unsubscribe from all channels.
1114
if isinstance(patterns, basestring):
1115
patterns = [patterns]
1116
return self.execute_command('PUNSUBSCRIBE', *patterns)
1118
def subscribe(self, channels):
1119
"Subscribe to ``channels``, waiting for messages to be published"
1120
if isinstance(channels, basestring):
1121
channels = [channels]
1122
response = self.execute_command('SUBSCRIBE', *channels)
1123
# this is *after* the SUBSCRIBE in order to allow for lazy and broken
1124
# connections that need to issue AUTH and SELECT commands
1125
self.subscribed = True
1128
def unsubscribe(self, channels=[]):
1130
Unsubscribe from ``channels``. If empty, unsubscribe
1133
if isinstance(channels, basestring):
1134
channels = [channels]
1135
return self.execute_command('UNSUBSCRIBE', *channels)
1137
def publish(self, channel, message):
1139
Publish ``message`` on ``channel``.
1140
Returns the number of subscribers the message was delivered to.
1142
return self.execute_command('PUBLISH', channel, message)
1145
"Listen for messages on channels this client has been subscribed to"
1146
while self.subscribed:
1147
r = self.parse_response('LISTEN')
1148
message_type, channel, message = r[0], r[1], r[2]
1149
yield (message_type, channel, message)
1150
if message_type == 'unsubscribe' and message == 0:
1151
self.subscribed = False
1154
class Pipeline(Redis):
1156
Pipelines provide a way to transmit multiple commands to the Redis server
1157
in one transmission. This is convenient for batch processing, such as
1158
saving all the values in a list to Redis.
1160
All commands executed within a pipeline are wrapped with MULTI and EXEC
1161
calls. This guarantees all commands executed in the pipeline will be
1162
executed atomically.
1164
Any command raising an exception does *not* halt the execution of
1165
subsequent commands in the pipeline. Instead, the exception is caught
1166
and its instance is placed into the response list returned by execute().
1167
Code iterating over the response list should be able to deal with an
1168
instance of an exception as a potential value. In general, these will be
1169
ResponseError exceptions, such as those raised when issuing a command
1170
on a key of a different datatype.
1172
def __init__(self, connection, transaction, charset, errors):
1173
self.connection = connection
1174
self.transaction = transaction
1175
self.encoding = charset
1176
self.errors = errors
1177
self.subscribed = False # NOTE not in use, but necessary
1181
self.command_stack = []
1183
def _execute_command(self, command_name, command, **options):
1185
Stage a command to be executed when execute() is next called
1187
Returns the current Pipeline object back so commands can be
1188
chained together, such as:
1190
pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')
1192
At some other point, you can then run: pipe.execute(),
1193
which will execute all commands queued in the pipe.
1195
# if the command_name is 'AUTH' or 'SELECT', then this command
1196
# must have originated after a socket connection and a call to
1197
# _setup_connection(). run these commands immediately without
1199
if command_name in ('AUTH', 'SELECT'):
1200
return super(Pipeline, self)._execute_command(
1201
command_name, command, **options)
1203
self.command_stack.append((command_name, command, options))
1206
def _execute_transaction(self, commands):
1207
# wrap the commands in MULTI ... EXEC statements to indicate an
1209
all_cmds = ''.join([c for _1, c, _2 in chain(
1210
(('', 'MULTI\r\n', ''),),
1212
(('', 'EXEC\r\n', ''),)
1214
self.connection.send(all_cmds, self)
1215
# parse off the response for MULTI and all commands prior to EXEC
1216
for i in range(len(commands)+1):
1217
_ = self.parse_response('_')
1218
# parse the EXEC. we want errors returned as items in the response
1219
response = self.parse_response('_', catch_errors=True)
1220
if len(response) != len(commands):
1221
raise ResponseError("Wrong number of response items from "
1222
"pipeline execution")
1223
# Run any callbacks for the commands run in the pipeline
1225
for r, cmd in zip(response, commands):
1226
if not isinstance(r, Exception):
1227
if cmd[0] in self.RESPONSE_CALLBACKS:
1228
r = self.RESPONSE_CALLBACKS[cmd[0]](r, **cmd[2])
1232
def _execute_pipeline(self, commands):
1233
# build up all commands into a single request to increase network perf
1234
all_cmds = ''.join([c for _1, c, _2 in commands])
1235
self.connection.send(all_cmds, self)
1237
for command_name, _, options in commands:
1239
self.parse_response(command_name, catch_errors=True, **options)
1244
"Execute all the commands in the current pipeline"
1245
stack = self.command_stack
1247
if self.transaction:
1248
execute = self._execute_transaction
1250
execute = self._execute_pipeline
1252
return execute(stack)
1253
except ConnectionError:
1254
self.connection.disconnect()
1255
return execute(stack)
1257
def select(self, *args, **kwargs):
1258
raise RedisError("Cannot select a different database from a pipeline")