1
# -*- test-case-name: twisted.test.test_memcache -*-
2
# Copyright (c) 2007-2009 Twisted Matrix Laboratories.
3
# See LICENSE for details.
6
Memcache client protocol. Memcached is a caching server, storing data in the
7
form of pairs key/value, and memcache is the protocol to talk with it.
9
To connect to a server, create a factory for L{MemCacheProtocol}::
11
from twisted.internet import reactor, protocol
12
from twisted.protocols.memcache import MemCacheProtocol, DEFAULT_PORT
13
d = protocol.ClientCreator(reactor, MemCacheProtocol
14
).connectTCP("localhost", DEFAULT_PORT)
15
def doSomething(proto):
16
# Here you call the memcache operations
17
return proto.set("mykey", "a lot of data")
18
d.addCallback(doSomething)
21
All the operations of the memcache protocol are present, but
22
L{MemCacheProtocol.set} and L{MemCacheProtocol.get} are the more important.
24
See U{http://code.sixapart.com/svn/memcached/trunk/server/doc/protocol.txt} for
25
more information about the protocol.
29
from collections import deque
36
from twisted.protocols.basic import LineReceiver
37
from twisted.protocols.policies import TimeoutMixin
38
from twisted.internet.defer import Deferred, fail, TimeoutError
39
from twisted.python import log
47
class NoSuchCommand(Exception):
49
Exception raised when a non existent command is called.
54
class ClientError(Exception):
56
Error caused by an invalid client call.
61
class ServerError(Exception):
63
Problem happening on the server.
68
class Command(object):
70
Wrap a client action into an object, that holds the values used in the
73
@ivar _deferred: the L{Deferred} object that will be fired when the result
75
@type _deferred: L{Deferred}
77
@ivar command: name of the command sent to the server.
81
def __init__(self, command, **kwargs):
85
@param command: the name of the command.
88
@param kwargs: this values will be stored as attributes of the object
91
self.command = command
92
self._deferred = Deferred()
93
for k, v in kwargs.items():
97
def success(self, value):
99
Shortcut method to fire the underlying deferred.
101
self._deferred.callback(value)
104
def fail(self, error):
106
Make the underlying deferred fails.
108
self._deferred.errback(error)
112
class MemCacheProtocol(LineReceiver, TimeoutMixin):
114
MemCache protocol: connect to a memcached server to store/retrieve values.
116
@ivar persistentTimeOut: the timeout period used to wait for a response.
117
@type persistentTimeOut: C{int}
119
@ivar _current: current list of requests waiting for an answer from the
121
@type _current: C{deque} of L{Command}
123
@ivar _lenExpected: amount of data expected in raw mode, when reading for
125
@type _lenExpected: C{int}
127
@ivar _getBuffer: current buffer of data, used to store temporary data
128
when reading in raw mode.
129
@type _getBuffer: C{list}
131
@ivar _bufferLength: the total amount of bytes in C{_getBuffer}.
132
@type _bufferLength: C{int}
134
@ivar _disconnected: indicate if the connectionLost has been called or not.
135
@type _disconnected: C{bool}
138
_disconnected = False
140
def __init__(self, timeOut=60):
144
@param timeOut: the timeout to wait before detecting that the
145
connection is dead and close it. It's expressed in seconds.
146
@type timeOut: C{int}
148
self._current = deque()
149
self._lenExpected = None
150
self._getBuffer = None
151
self._bufferLength = None
152
self.persistentTimeOut = self.timeOut = timeOut
155
def _cancelCommands(self, reason):
157
Cancel all the outstanding commands, making them fail with C{reason}.
160
cmd = self._current.popleft()
164
def timeoutConnection(self):
166
Close the connection in case of timeout.
168
self._cancelCommands(TimeoutError("Connection timeout"))
169
self.transport.loseConnection()
172
def connectionLost(self, reason):
174
Cause any outstanding commands to fail.
176
self._disconnected = True
177
self._cancelCommands(reason)
178
LineReceiver.connectionLost(self, reason)
181
def sendLine(self, line):
183
Override sendLine to add a timeout to response.
185
if not self._current:
186
self.setTimeout(self.persistentTimeOut)
187
LineReceiver.sendLine(self, line)
190
def rawDataReceived(self, data):
192
Collect data for a get.
195
self._getBuffer.append(data)
196
self._bufferLength += len(data)
197
if self._bufferLength >= self._lenExpected + 2:
198
data = "".join(self._getBuffer)
199
buf = data[:self._lenExpected]
200
rem = data[self._lenExpected + 2:]
202
self._lenExpected = None
203
self._getBuffer = None
204
self._bufferLength = None
205
cmd = self._current[0]
207
flags, cas = cmd.values[cmd.currentKey]
208
cmd.values[cmd.currentKey] = (flags, cas, val)
211
self.setLineMode(rem)
214
def cmd_STORED(self):
216
Manage a success response to a set operation.
218
self._current.popleft().success(True)
221
def cmd_NOT_STORED(self):
223
Manage a specific 'not stored' response to a set operation: this is not
224
an error, but some condition wasn't met.
226
self._current.popleft().success(False)
231
This the end token to a get or a stat operation.
233
cmd = self._current.popleft()
234
if cmd.command == "get":
236
values = dict([(key, val[::2]) for key, val in
237
cmd.values.iteritems()])
240
cmd.success((cmd.flags, cmd.value))
241
elif cmd.command == "gets":
243
cmd.success(cmd.values)
245
cmd.success((cmd.flags, cmd.cas, cmd.value))
246
elif cmd.command == "stats":
247
cmd.success(cmd.values)
250
def cmd_NOT_FOUND(self):
252
Manage error response for incr/decr/delete.
254
self._current.popleft().success(False)
257
def cmd_VALUE(self, line):
259
Prepare the reading a value after a get.
261
cmd = self._current[0]
262
if cmd.command == "get":
263
key, flags, length = line.split()
266
key, flags, length, cas = line.split()
267
self._lenExpected = int(length)
269
self._bufferLength = 0
271
if key not in cmd.keys:
272
raise RuntimeError("Unexpected commands answer.")
274
cmd.values[key] = [int(flags), cas]
277
raise RuntimeError("Unexpected commands answer.")
278
cmd.flags = int(flags)
283
def cmd_STAT(self, line):
285
Reception of one stat line.
287
cmd = self._current[0]
288
key, val = line.split(" ", 1)
289
cmd.values[key] = val
292
def cmd_VERSION(self, versionData):
296
self._current.popleft().success(versionData)
301
An non-existent command has been sent.
303
log.err("Non-existent command sent.")
304
cmd = self._current.popleft()
305
cmd.fail(NoSuchCommand())
308
def cmd_CLIENT_ERROR(self, errText):
310
An invalid input as been sent.
312
log.err("Invalid input: %s" % (errText,))
313
cmd = self._current.popleft()
314
cmd.fail(ClientError(errText))
317
def cmd_SERVER_ERROR(self, errText):
319
An error has happened server-side.
321
log.err("Server error: %s" % (errText,))
322
cmd = self._current.popleft()
323
cmd.fail(ServerError(errText))
326
def cmd_DELETED(self):
328
A delete command has completed successfully.
330
self._current.popleft().success(True)
335
The last command has been completed.
337
self._current.popleft().success(True)
340
def cmd_EXISTS(self):
342
A C{checkAndSet} update has failed.
344
self._current.popleft().success(False)
347
def lineReceived(self, line):
349
Receive line commands from the server.
352
token = line.split(" ", 1)[0]
353
# First manage standard commands without space
354
cmd = getattr(self, "cmd_%s" % (token,), None)
356
args = line.split(" ", 1)[1:]
362
# Then manage commands with space in it
363
line = line.replace(" ", "_")
364
cmd = getattr(self, "cmd_%s" % (line,), None)
368
# Increment/Decrement response
369
cmd = self._current.popleft()
372
if not self._current:
373
# No pending request, remove timeout
374
self.setTimeout(None)
377
def increment(self, key, val=1):
379
Increment the value of C{key} by given value (default to 1).
380
C{key} must be consistent with an int. Return the new value.
382
@param key: the key to modify.
385
@param val: the value to increment.
388
@return: a deferred with will be called back with the new value
389
associated with the key (after the increment).
392
return self._incrdecr("incr", key, val)
395
def decrement(self, key, val=1):
397
Decrement the value of C{key} by given value (default to 1).
398
C{key} must be consistent with an int. Return the new value, coerced to
401
@param key: the key to modify.
404
@param val: the value to decrement.
407
@return: a deferred with will be called back with the new value
408
associated with the key (after the decrement).
411
return self._incrdecr("decr", key, val)
414
def _incrdecr(self, cmd, key, val):
416
Internal wrapper for incr/decr.
418
if self._disconnected:
419
return fail(RuntimeError("not connected"))
420
if not isinstance(key, str):
421
return fail(ClientError(
422
"Invalid type for key: %s, expecting a string" % (type(key),)))
423
if len(key) > self.MAX_KEY_LENGTH:
424
return fail(ClientError("Key too long"))
425
fullcmd = "%s %s %d" % (cmd, key, int(val))
426
self.sendLine(fullcmd)
427
cmdObj = Command(cmd, key=key)
428
self._current.append(cmdObj)
429
return cmdObj._deferred
432
def replace(self, key, val, flags=0, expireTime=0):
434
Replace the given C{key}. It must already exist in the server.
436
@param key: the key to replace.
439
@param val: the new value associated with the key.
442
@param flags: the flags to store with the key.
445
@param expireTime: if different from 0, the relative time in seconds
446
when the key will be deleted from the store.
447
@type expireTime: C{int}
449
@return: a deferred that will fire with C{True} if the operation has
450
succeeded, and C{False} with the key didn't previously exist.
453
return self._set("replace", key, val, flags, expireTime, "")
456
def add(self, key, val, flags=0, expireTime=0):
458
Add the given C{key}. It must not exist in the server.
460
@param key: the key to add.
463
@param val: the value associated with the key.
466
@param flags: the flags to store with the key.
469
@param expireTime: if different from 0, the relative time in seconds
470
when the key will be deleted from the store.
471
@type expireTime: C{int}
473
@return: a deferred that will fire with C{True} if the operation has
474
succeeded, and C{False} with the key already exists.
477
return self._set("add", key, val, flags, expireTime, "")
480
def set(self, key, val, flags=0, expireTime=0):
482
Set the given C{key}.
484
@param key: the key to set.
487
@param val: the value associated with the key.
490
@param flags: the flags to store with the key.
493
@param expireTime: if different from 0, the relative time in seconds
494
when the key will be deleted from the store.
495
@type expireTime: C{int}
497
@return: a deferred that will fire with C{True} if the operation has
501
return self._set("set", key, val, flags, expireTime, "")
504
def checkAndSet(self, key, val, cas, flags=0, expireTime=0):
506
Change the content of C{key} only if the C{cas} value matches the
507
current one associated with the key. Use this to store a value which
508
hasn't been modified since last time you fetched it.
510
@param key: The key to set.
513
@param val: The value associated with the key.
516
@param cas: Unique 64-bit value returned by previous call of C{get}.
519
@param flags: The flags to store with the key.
522
@param expireTime: If different from 0, the relative time in seconds
523
when the key will be deleted from the store.
524
@type expireTime: C{int}
526
@return: A deferred that will fire with C{True} if the operation has
527
succeeded, C{False} otherwise.
530
return self._set("cas", key, val, flags, expireTime, cas)
533
def _set(self, cmd, key, val, flags, expireTime, cas):
535
Internal wrapper for setting values.
537
if self._disconnected:
538
return fail(RuntimeError("not connected"))
539
if not isinstance(key, str):
540
return fail(ClientError(
541
"Invalid type for key: %s, expecting a string" % (type(key),)))
542
if len(key) > self.MAX_KEY_LENGTH:
543
return fail(ClientError("Key too long"))
544
if not isinstance(val, str):
545
return fail(ClientError(
546
"Invalid type for value: %s, expecting a string" %
551
fullcmd = "%s %s %d %d %d%s" % (
552
cmd, key, flags, expireTime, length, cas)
553
self.sendLine(fullcmd)
555
cmdObj = Command(cmd, key=key, flags=flags, length=length)
556
self._current.append(cmdObj)
557
return cmdObj._deferred
560
def append(self, key, val):
562
Append given data to the value of an existing key.
564
@param key: The key to modify.
567
@param val: The value to append to the current value associated with
571
@return: A deferred that will fire with C{True} if the operation has
572
succeeded, C{False} otherwise.
575
# Even if flags and expTime values are ignored, we have to pass them
576
return self._set("append", key, val, 0, 0, "")
579
def prepend(self, key, val):
581
Prepend given data to the value of an existing key.
583
@param key: The key to modify.
586
@param val: The value to prepend to the current value associated with
590
@return: A deferred that will fire with C{True} if the operation has
591
succeeded, C{False} otherwise.
594
# Even if flags and expTime values are ignored, we have to pass them
595
return self._set("prepend", key, val, 0, 0, "")
598
def get(self, key, withIdentifier=False):
600
Get the given C{key}. It doesn't support multiple keys. If
601
C{withIdentifier} is set to C{True}, the command issued is a C{gets},
602
that will return the current identifier associated with the value. This
603
identifier has to be used when issuing C{checkAndSet} update later,
604
using the corresponding method.
606
@param key: The key to retrieve.
609
@param withIdentifier: If set to C{True}, retrieve the current
610
identifier along with the value and the flags.
611
@type withIdentifier: C{bool}
613
@return: A deferred that will fire with the tuple (flags, value) if
614
C{withIdentifier} is C{False}, or (flags, cas identifier, value)
615
if C{True}. If the server indicates there is no value
616
associated with C{key}, the returned value will be C{None} and
617
the returned flags will be C{0}.
620
return self._get([key], withIdentifier, False)
623
def getMultiple(self, keys, withIdentifier=False):
625
Get the given list of C{keys}. If C{withIdentifier} is set to C{True},
626
the command issued is a C{gets}, that will return the identifiers
627
associated with each values. This identifier has to be used when
628
issuing C{checkAndSet} update later, using the corresponding method.
630
@param keys: The keys to retrieve.
631
@type keys: C{list} of C{str}
633
@param withIdentifier: If set to C{True}, retrieve the identifiers
634
along with the values and the flags.
635
@type withIdentifier: C{bool}
637
@return: A deferred that will fire with a dictionary with the elements
638
of C{keys} as keys and the tuples (flags, value) as values if
639
C{withIdentifier} is C{False}, or (flags, cas identifier, value) if
640
C{True}. If the server indicates there is no value associated with
641
C{key}, the returned values will be C{None} and the returned flags
647
return self._get(keys, withIdentifier, True)
649
def _get(self, keys, withIdentifier, multiple):
651
Helper method for C{get} and C{getMultiple}.
653
if self._disconnected:
654
return fail(RuntimeError("not connected"))
656
if not isinstance(key, str):
657
return fail(ClientError(
658
"Invalid type for key: %s, expecting a string" % (type(key),)))
659
if len(key) > self.MAX_KEY_LENGTH:
660
return fail(ClientError("Key too long"))
665
fullcmd = "%s %s" % (cmd, " ".join(keys))
666
self.sendLine(fullcmd)
668
values = dict([(key, (0, "", None)) for key in keys])
669
cmdObj = Command(cmd, keys=keys, values=values, multiple=True)
671
cmdObj = Command(cmd, key=keys[0], value=None, flags=0, cas="",
673
self._current.append(cmdObj)
674
return cmdObj._deferred
676
def stats(self, arg=None):
678
Get some stats from the server. It will be available as a dict.
680
@param arg: An optional additional string which will be sent along
681
with the I{stats} command. The interpretation of this value by
682
the server is left undefined by the memcache protocol
684
@type arg: L{NoneType} or L{str}
686
@return: a deferred that will fire with a C{dict} of the available
694
if self._disconnected:
695
return fail(RuntimeError("not connected"))
697
cmdObj = Command("stats", values={})
698
self._current.append(cmdObj)
699
return cmdObj._deferred
704
Get the version of the server.
706
@return: a deferred that will fire with the string value of the
710
if self._disconnected:
711
return fail(RuntimeError("not connected"))
712
self.sendLine("version")
713
cmdObj = Command("version")
714
self._current.append(cmdObj)
715
return cmdObj._deferred
718
def delete(self, key):
720
Delete an existing C{key}.
722
@param key: the key to delete.
725
@return: a deferred that will be called back with C{True} if the key
726
was successfully deleted, or C{False} if not.
729
if self._disconnected:
730
return fail(RuntimeError("not connected"))
731
if not isinstance(key, str):
732
return fail(ClientError(
733
"Invalid type for key: %s, expecting a string" % (type(key),)))
734
self.sendLine("delete %s" % key)
735
cmdObj = Command("delete", key=key)
736
self._current.append(cmdObj)
737
return cmdObj._deferred
742
Flush all cached values.
744
@return: a deferred that will be called back with C{True} when the
745
operation has succeeded.
748
if self._disconnected:
749
return fail(RuntimeError("not connected"))
750
self.sendLine("flush_all")
751
cmdObj = Command("flush_all")
752
self._current.append(cmdObj)
753
return cmdObj._deferred
757
__all__ = ["MemCacheProtocol", "DEFAULT_PORT", "NoSuchCommand", "ClientError",