1
# Copyright (c) 2007-2009 Twisted Matrix Laboratories.
2
# See LICENSE for details.
5
Test the memcache client protocol.
8
from twisted.internet.error import ConnectionDone
10
from twisted.protocols.memcache import MemCacheProtocol, NoSuchCommand
11
from twisted.protocols.memcache import ClientError, ServerError
13
from twisted.trial.unittest import TestCase
14
from twisted.test.proto_helpers import StringTransportWithDisconnection
15
from twisted.internet.task import Clock
16
from twisted.internet.defer import Deferred, gatherResults, TimeoutError
17
from twisted.internet.defer import DeferredList
23
Setup and tests for basic invocation of L{MemCacheProtocol} commands.
26
def _test(self, d, send, recv, result):
28
Helper test method to test the resulting C{Deferred} of a
29
L{MemCacheProtocol} command.
31
raise NotImplementedError()
36
L{MemCacheProtocol.get} returns a L{Deferred} which is called back with
37
the value and the flag associated with the given key if the server
38
returns a successful result.
40
return self._test(self.proto.get("foo"), "get foo\r\n",
41
"VALUE foo 0 3\r\nbar\r\nEND\r\n", (0, "bar"))
44
def test_emptyGet(self):
46
Test getting a non-available key: it succeeds but return C{None} as
47
value and C{0} as flag.
49
return self._test(self.proto.get("foo"), "get foo\r\n",
53
def test_getMultiple(self):
55
L{MemCacheProtocol.getMultiple} returns a L{Deferred} which is called
56
back with a dictionary of flag, value for each given key.
58
return self._test(self.proto.getMultiple(['foo', 'cow']),
60
"VALUE foo 0 3\r\nbar\r\nVALUE cow 0 7\r\nchicken\r\nEND\r\n",
61
{'cow': (0, 'chicken'), 'foo': (0, 'bar')})
64
def test_getMultipleWithEmpty(self):
66
When L{MemCacheProtocol.getMultiple} is called with non-available keys,
67
the corresponding tuples are (0, None).
69
return self._test(self.proto.getMultiple(['foo', 'cow']),
71
"VALUE cow 1 3\r\nbar\r\nEND\r\n",
72
{'cow': (1, 'bar'), 'foo': (0, None)})
77
L{MemCacheProtocol.set} returns a L{Deferred} which is called back with
78
C{True} when the operation succeeds.
80
return self._test(self.proto.set("foo", "bar"),
81
"set foo 0 0 3\r\nbar\r\n", "STORED\r\n", True)
86
L{MemCacheProtocol.add} returns a L{Deferred} which is called back with
87
C{True} when the operation succeeds.
89
return self._test(self.proto.add("foo", "bar"),
90
"add foo 0 0 3\r\nbar\r\n", "STORED\r\n", True)
93
def test_replace(self):
95
L{MemCacheProtocol.replace} returns a L{Deferred} which is called back
96
with C{True} when the operation succeeds.
98
return self._test(self.proto.replace("foo", "bar"),
99
"replace foo 0 0 3\r\nbar\r\n", "STORED\r\n", True)
102
def test_errorAdd(self):
104
Test an erroneous add: if a L{MemCacheProtocol.add} is called but the
105
key already exists on the server, it returns a B{NOT STORED} answer,
106
which calls back the resulting L{Deferred} with C{False}.
108
return self._test(self.proto.add("foo", "bar"),
109
"add foo 0 0 3\r\nbar\r\n", "NOT STORED\r\n", False)
112
def test_errorReplace(self):
114
Test an erroneous replace: if a L{MemCacheProtocol.replace} is called
115
but the key doesn't exist on the server, it returns a B{NOT STORED}
116
answer, which calls back the resulting L{Deferred} with C{False}.
118
return self._test(self.proto.replace("foo", "bar"),
119
"replace foo 0 0 3\r\nbar\r\n", "NOT STORED\r\n", False)
122
def test_delete(self):
124
L{MemCacheProtocol.delete} returns a L{Deferred} which is called back
125
with C{True} when the server notifies a success.
127
return self._test(self.proto.delete("bar"), "delete bar\r\n",
131
def test_errorDelete(self):
133
Test a error during a delete: if key doesn't exist on the server, it
134
returns a B{NOT FOUND} answer which calls back the resulting L{Deferred}
137
return self._test(self.proto.delete("bar"), "delete bar\r\n",
138
"NOT FOUND\r\n", False)
141
def test_increment(self):
143
Test incrementing a variable: L{MemCacheProtocol.increment} returns a
144
L{Deferred} which is called back with the incremented value of the
147
return self._test(self.proto.increment("foo"), "incr foo 1\r\n",
151
def test_decrement(self):
153
Test decrementing a variable: L{MemCacheProtocol.decrement} returns a
154
L{Deferred} which is called back with the decremented value of the
158
self.proto.decrement("foo"), "decr foo 1\r\n", "5\r\n", 5)
161
def test_incrementVal(self):
163
L{MemCacheProtocol.increment} takes an optional argument C{value} which
164
replaces the default value of 1 when specified.
166
return self._test(self.proto.increment("foo", 8), "incr foo 8\r\n",
170
def test_decrementVal(self):
172
L{MemCacheProtocol.decrement} takes an optional argument C{value} which
173
replaces the default value of 1 when specified.
175
return self._test(self.proto.decrement("foo", 3), "decr foo 3\r\n",
179
def test_stats(self):
181
Test retrieving server statistics via the L{MemCacheProtocol.stats}
182
command: it parses the data sent by the server and calls back the
183
resulting L{Deferred} with a dictionary of the received statistics.
185
return self._test(self.proto.stats(), "stats\r\n",
186
"STAT foo bar\r\nSTAT egg spam\r\nEND\r\n",
187
{"foo": "bar", "egg": "spam"})
190
def test_statsWithArgument(self):
192
L{MemCacheProtocol.stats} takes an optional C{str} argument which,
193
if specified, is sent along with the I{STAT} command. The I{STAT}
194
responses from the server are parsed as key/value pairs and returned
195
as a C{dict} (as in the case where the argument is not specified).
197
return self._test(self.proto.stats("blah"), "stats blah\r\n",
198
"STAT foo bar\r\nSTAT egg spam\r\nEND\r\n",
199
{"foo": "bar", "egg": "spam"})
202
def test_version(self):
204
Test version retrieval via the L{MemCacheProtocol.version} command: it
205
returns a L{Deferred} which is called back with the version sent by the
208
return self._test(self.proto.version(), "version\r\n",
209
"VERSION 1.1\r\n", "1.1")
212
def test_flushAll(self):
214
L{MemCacheProtocol.flushAll} returns a L{Deferred} which is called back
215
with C{True} if the server acknowledges success.
217
return self._test(self.proto.flushAll(), "flush_all\r\n",
222
class MemCacheTestCase(CommandMixin, TestCase):
224
Test client protocol class L{MemCacheProtocol}.
229
Create a memcache client, connect it to a string protocol, and make it
230
use a deterministic clock.
232
self.proto = MemCacheProtocol()
234
self.proto.callLater = self.clock.callLater
235
self.transport = StringTransportWithDisconnection()
236
self.transport.protocol = self.proto
237
self.proto.makeConnection(self.transport)
240
def _test(self, d, send, recv, result):
242
Implementation of C{_test} which checks that the command sends C{send}
243
data, and that upon reception of C{recv} the result is C{result}.
245
@param d: the resulting deferred from the memcache command.
248
@param send: the expected data to be sent.
251
@param recv: the data to simulate as reception.
254
@param result: the expected result.
258
self.assertEquals(res, result)
259
self.assertEquals(self.transport.value(), send)
261
self.proto.dataReceived(recv)
265
def test_invalidGetResponse(self):
267
If the value returned doesn't match the expected key of the current
268
C{get} command, an error is raised in L{MemCacheProtocol.dataReceived}.
270
self.proto.get("foo")
272
self.assertRaises(RuntimeError,
273
self.proto.dataReceived,
274
"VALUE bar 0 %s\r\n%s\r\nEND\r\n" % (len(s), s))
277
def test_invalidMultipleGetResponse(self):
279
If the value returned doesn't match one the expected keys of the
280
current multiple C{get} command, an error is raised error in
281
L{MemCacheProtocol.dataReceived}.
283
self.proto.getMultiple(["foo", "bar"])
285
self.assertRaises(RuntimeError,
286
self.proto.dataReceived,
287
"VALUE egg 0 %s\r\n%s\r\nEND\r\n" % (len(s), s))
290
def test_timeOut(self):
292
Test the timeout on outgoing requests: when timeout is detected, all
293
current commands fail with a L{TimeoutError}, and the connection is
296
d1 = self.proto.get("foo")
297
d2 = self.proto.get("bar")
299
self.proto.connectionLost = d3.callback
301
self.clock.advance(self.proto.persistentTimeOut)
302
self.assertFailure(d1, TimeoutError)
303
self.assertFailure(d2, TimeoutError)
304
def checkMessage(error):
305
self.assertEquals(str(error), "Connection timeout")
306
d1.addCallback(checkMessage)
307
return gatherResults([d1, d2, d3])
310
def test_timeoutRemoved(self):
312
When a request gets a response, no pending timeout call remains around.
314
d = self.proto.get("foo")
316
self.clock.advance(self.proto.persistentTimeOut - 1)
317
self.proto.dataReceived("VALUE foo 0 3\r\nbar\r\nEND\r\n")
320
self.assertEquals(result, (0, "bar"))
321
self.assertEquals(len(self.clock.calls), 0)
326
def test_timeOutRaw(self):
328
Test the timeout when raw mode was started: the timeout is not reset
329
until all the data has been received, so we can have a L{TimeoutError}
330
when waiting for raw data.
332
d1 = self.proto.get("foo")
334
self.proto.connectionLost = d2.callback
336
self.proto.dataReceived("VALUE foo 0 10\r\n12345")
337
self.clock.advance(self.proto.persistentTimeOut)
338
self.assertFailure(d1, TimeoutError)
339
return gatherResults([d1, d2])
342
def test_timeOutStat(self):
344
Test the timeout when stat command has started: the timeout is not
345
reset until the final B{END} is received.
347
d1 = self.proto.stats()
349
self.proto.connectionLost = d2.callback
351
self.proto.dataReceived("STAT foo bar\r\n")
352
self.clock.advance(self.proto.persistentTimeOut)
353
self.assertFailure(d1, TimeoutError)
354
return gatherResults([d1, d2])
357
def test_timeoutPipelining(self):
359
When two requests are sent, a timeout call remains around for the
360
second request, and its timeout time is correct.
362
d1 = self.proto.get("foo")
363
d2 = self.proto.get("bar")
365
self.proto.connectionLost = d3.callback
367
self.clock.advance(self.proto.persistentTimeOut - 1)
368
self.proto.dataReceived("VALUE foo 0 3\r\nbar\r\nEND\r\n")
371
self.assertEquals(result, (0, "bar"))
372
self.assertEquals(len(self.clock.calls), 1)
373
for i in range(self.proto.persistentTimeOut):
374
self.clock.advance(1)
375
return self.assertFailure(d2, TimeoutError).addCallback(checkTime)
376
def checkTime(ignored):
377
# Check that the timeout happened C{self.proto.persistentTimeOut}
378
# after the last response
380
self.clock.seconds(), 2 * self.proto.persistentTimeOut - 1)
381
d1.addCallback(check)
385
def test_timeoutNotReset(self):
387
Check that timeout is not resetted for every command, but keep the
388
timeout from the first command without response.
390
d1 = self.proto.get("foo")
392
self.proto.connectionLost = d3.callback
394
self.clock.advance(self.proto.persistentTimeOut - 1)
395
d2 = self.proto.get("bar")
396
self.clock.advance(1)
397
self.assertFailure(d1, TimeoutError)
398
self.assertFailure(d2, TimeoutError)
399
return gatherResults([d1, d2, d3])
402
def test_timeoutCleanDeferreds(self):
404
C{timeoutConnection} cleans the list of commands that it fires with
405
C{TimeoutError}: C{connectionLost} doesn't try to fire them again, but
406
sets the disconnected state so that future commands fail with a
409
d1 = self.proto.get("foo")
410
self.clock.advance(self.proto.persistentTimeOut)
411
self.assertFailure(d1, TimeoutError)
412
d2 = self.proto.get("bar")
413
self.assertFailure(d2, RuntimeError)
414
return gatherResults([d1, d2])
417
def test_connectionLost(self):
419
When disconnection occurs while commands are still outstanding, the
422
d1 = self.proto.get("foo")
423
d2 = self.proto.get("bar")
424
self.transport.loseConnection()
425
done = DeferredList([d1, d2], consumeErrors=True)
426
def checkFailures(results):
427
for success, result in results:
428
self.assertFalse(success)
429
result.trap(ConnectionDone)
430
return done.addCallback(checkFailures)
433
def test_tooLongKey(self):
435
An error is raised when trying to use a too long key: the called
436
command returns a L{Deferred} which fails with a L{ClientError}.
438
d1 = self.assertFailure(self.proto.set("a" * 500, "bar"), ClientError)
439
d2 = self.assertFailure(self.proto.increment("a" * 500), ClientError)
440
d3 = self.assertFailure(self.proto.get("a" * 500), ClientError)
441
d4 = self.assertFailure(
442
self.proto.append("a" * 500, "bar"), ClientError)
443
d5 = self.assertFailure(
444
self.proto.prepend("a" * 500, "bar"), ClientError)
445
d6 = self.assertFailure(
446
self.proto.getMultiple(["foo", "a" * 500]), ClientError)
447
return gatherResults([d1, d2, d3, d4, d5, d6])
450
def test_invalidCommand(self):
452
When an unknown command is sent directly (not through public API), the
453
server answers with an B{ERROR} token, and the command fails with
456
d = self.proto._set("egg", "foo", "bar", 0, 0, "")
457
self.assertEquals(self.transport.value(), "egg foo 0 0 3\r\nbar\r\n")
458
self.assertFailure(d, NoSuchCommand)
459
self.proto.dataReceived("ERROR\r\n")
463
def test_clientError(self):
465
Test the L{ClientError} error: when the server sends a B{CLIENT_ERROR}
466
token, the originating command fails with L{ClientError}, and the error
467
contains the text sent by the server.
470
d = self.proto.set("foo", a)
471
self.assertEquals(self.transport.value(),
472
"set foo 0 0 8\r\neggspamm\r\n")
473
self.assertFailure(d, ClientError)
475
self.assertEquals(str(err), "We don't like egg and spam")
477
self.proto.dataReceived("CLIENT_ERROR We don't like egg and spam\r\n")
481
def test_serverError(self):
483
Test the L{ServerError} error: when the server sends a B{SERVER_ERROR}
484
token, the originating command fails with L{ServerError}, and the error
485
contains the text sent by the server.
488
d = self.proto.set("foo", a)
489
self.assertEquals(self.transport.value(),
490
"set foo 0 0 8\r\neggspamm\r\n")
491
self.assertFailure(d, ServerError)
493
self.assertEquals(str(err), "zomg")
495
self.proto.dataReceived("SERVER_ERROR zomg\r\n")
499
def test_unicodeKey(self):
501
Using a non-string key as argument to commands raises an error.
503
d1 = self.assertFailure(self.proto.set(u"foo", "bar"), ClientError)
504
d2 = self.assertFailure(self.proto.increment(u"egg"), ClientError)
505
d3 = self.assertFailure(self.proto.get(1), ClientError)
506
d4 = self.assertFailure(self.proto.delete(u"bar"), ClientError)
507
d5 = self.assertFailure(self.proto.append(u"foo", "bar"), ClientError)
508
d6 = self.assertFailure(self.proto.prepend(u"foo", "bar"), ClientError)
509
d7 = self.assertFailure(
510
self.proto.getMultiple(["egg", 1]), ClientError)
511
return gatherResults([d1, d2, d3, d4, d5, d6, d7])
514
def test_unicodeValue(self):
516
Using a non-string value raises an error.
518
return self.assertFailure(self.proto.set("foo", u"bar"), ClientError)
521
def test_pipelining(self):
523
Multiple requests can be sent subsequently to the server, and the
524
protocol orders the responses correctly and dispatch to the
525
corresponding client command.
527
d1 = self.proto.get("foo")
528
d1.addCallback(self.assertEquals, (0, "bar"))
529
d2 = self.proto.set("bar", "spamspamspam")
530
d2.addCallback(self.assertEquals, True)
531
d3 = self.proto.get("egg")
532
d3.addCallback(self.assertEquals, (0, "spam"))
533
self.assertEquals(self.transport.value(),
534
"get foo\r\nset bar 0 0 12\r\nspamspamspam\r\nget egg\r\n")
535
self.proto.dataReceived("VALUE foo 0 3\r\nbar\r\nEND\r\n"
537
"VALUE egg 0 4\r\nspam\r\nEND\r\n")
538
return gatherResults([d1, d2, d3])
541
def test_getInChunks(self):
543
If the value retrieved by a C{get} arrive in chunks, the protocol
544
is able to reconstruct it and to produce the good value.
546
d = self.proto.get("foo")
547
d.addCallback(self.assertEquals, (0, "0123456789"))
548
self.assertEquals(self.transport.value(), "get foo\r\n")
549
self.proto.dataReceived("VALUE foo 0 10\r\n0123456")
550
self.proto.dataReceived("789")
551
self.proto.dataReceived("\r\nEND")
552
self.proto.dataReceived("\r\n")
556
def test_append(self):
558
L{MemCacheProtocol.append} behaves like a L{MemCacheProtocol.set}
559
method: it returns a L{Deferred} which is called back with C{True} when
560
the operation succeeds.
562
return self._test(self.proto.append("foo", "bar"),
563
"append foo 0 0 3\r\nbar\r\n", "STORED\r\n", True)
566
def test_prepend(self):
568
L{MemCacheProtocol.prepend} behaves like a L{MemCacheProtocol.set}
569
method: it returns a L{Deferred} which is called back with C{True} when
570
the operation succeeds.
572
return self._test(self.proto.prepend("foo", "bar"),
573
"prepend foo 0 0 3\r\nbar\r\n", "STORED\r\n", True)
578
L{MemCacheProtocol.get} handles an additional cas result when
579
C{withIdentifier} is C{True} and forward it in the resulting
582
return self._test(self.proto.get("foo", True), "gets foo\r\n",
583
"VALUE foo 0 3 1234\r\nbar\r\nEND\r\n", (0, "1234", "bar"))
586
def test_emptyGets(self):
588
Test getting a non-available key with gets: it succeeds but return
589
C{None} as value, C{0} as flag and an empty cas value.
591
return self._test(self.proto.get("foo", True), "gets foo\r\n",
592
"END\r\n", (0, "", None))
595
def test_getsMultiple(self):
597
L{MemCacheProtocol.getMultiple} handles an additional cas field in the
598
returned tuples if C{withIdentifier} is C{True}.
600
return self._test(self.proto.getMultiple(["foo", "bar"], True),
602
"VALUE foo 0 3 1234\r\negg\r\nVALUE bar 0 4 2345\r\nspam\r\nEND\r\n",
603
{'bar': (0, '2345', 'spam'), 'foo': (0, '1234', 'egg')})
606
def test_getsMultipleWithEmpty(self):
608
When getting a non-available key with L{MemCacheProtocol.getMultiple}
609
when C{withIdentifier} is C{True}, the other keys are retrieved
610
correctly, and the non-available key gets a tuple of C{0} as flag,
611
C{None} as value, and an empty cas value.
613
return self._test(self.proto.getMultiple(["foo", "bar"], True),
615
"VALUE foo 0 3 1234\r\negg\r\nEND\r\n",
616
{'bar': (0, '', None), 'foo': (0, '1234', 'egg')})
619
def test_checkAndSet(self):
621
L{MemCacheProtocol.checkAndSet} passes an additional cas identifier
622
that the server handles to check if the data has to be updated.
624
return self._test(self.proto.checkAndSet("foo", "bar", cas="1234"),
625
"cas foo 0 0 3 1234\r\nbar\r\n", "STORED\r\n", True)
628
def test_casUnknowKey(self):
630
When L{MemCacheProtocol.checkAndSet} response is C{EXISTS}, the
631
resulting L{Deferred} fires with C{False}.
633
return self._test(self.proto.checkAndSet("foo", "bar", cas="1234"),
634
"cas foo 0 0 3 1234\r\nbar\r\n", "EXISTS\r\n", False)
638
class CommandFailureTests(CommandMixin, TestCase):
640
Tests for correct failure of commands on a disconnected
646
Create a disconnected memcache client, using a deterministic clock.
648
self.proto = MemCacheProtocol()
650
self.proto.callLater = self.clock.callLater
651
self.transport = StringTransportWithDisconnection()
652
self.transport.protocol = self.proto
653
self.proto.makeConnection(self.transport)
654
self.transport.loseConnection()
657
def _test(self, d, send, recv, result):
659
Implementation of C{_test} which checks that the command fails with
660
C{RuntimeError} because the transport is disconnected. All the
661
parameters except C{d} are ignored.
663
return self.assertFailure(d, RuntimeError)