~certify-web-dev/twisted/certify-staging

« back to all changes in this revision

Viewing changes to twisted/test/test_policies.py

  • Committer: Bazaar Package Importer
  • Author(s): Matthias Klose
  • Date: 2010-01-02 19:38:17 UTC
  • mfrom: (2.2.4 sid)
  • Revision ID: james.westby@ubuntu.com-20100102193817-jphp464ppwh7dulg
Tags: 9.0.0-1
* python-twisted: Depend on the python-twisted-* 9.0 packages.
* python-twisted: Depend on python-zope.interface only. Closes: #557781.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (c) 2001-2007 Twisted Matrix Laboratories.
 
1
# Copyright (c) 2001-2009 Twisted Matrix Laboratories.
2
2
# See LICENSE for details.
3
3
 
4
4
"""
5
5
Test code for policies.
6
6
"""
7
7
 
 
8
from zope.interface import Interface, implements, implementedBy
 
9
 
8
10
from StringIO import StringIO
9
11
 
10
12
from twisted.trial import unittest
 
13
from twisted.test.proto_helpers import StringTransport
11
14
from twisted.test.proto_helpers import StringTransportWithDisconnection
12
15
 
13
16
from twisted.internet import protocol, reactor, address, defer, task
15
18
 
16
19
 
17
20
 
18
 
class StringIOWithoutClosing(StringIO):
19
 
    def close(self): pass
20
 
 
21
 
 
22
 
 
23
21
class SimpleProtocol(protocol.Protocol):
24
22
 
25
23
    connected = disconnected = 0
122
120
 
123
121
 
124
122
 
125
 
class PausableStringTransport(StringTransportWithDisconnection):
126
 
    """
127
 
    A string transport saving the current production state.
128
 
 
129
 
    @ivar paused: whether the production is paused or not.
130
 
    @type paused: C{bool}
131
 
    """
132
 
    paused = False
133
 
 
134
 
    def pauseProducing(self):
135
 
        """
136
 
        Notification of production pause: set C{self.paused}.
137
 
        """
138
 
        self.paused = True
139
 
 
140
 
 
141
 
    def resumeProducing(self):
142
 
        """
143
 
        Notification of production restart: unset C{self.paused}.
144
 
        """
145
 
        self.paused = False
146
 
 
147
 
 
148
 
 
149
123
class WrapperTestCase(unittest.TestCase):
150
 
    def testProtocolFactoryAttribute(self):
 
124
    """
 
125
    Tests for L{WrappingFactory} and L{ProtocolWrapper}.
 
126
    """
 
127
    def test_protocolFactoryAttribute(self):
151
128
        """
152
129
        Make sure protocol.factory is the wrapped factory, not the wrapping
153
130
        factory.
158
135
        self.assertIdentical(p.wrappedProtocol.factory, f)
159
136
 
160
137
 
 
138
    def test_transportInterfaces(self):
 
139
        """
 
140
        The transport wrapper passed to the wrapped protocol's
 
141
        C{makeConnection} provides the same interfaces as are provided by the
 
142
        original transport.
 
143
        """
 
144
        class IStubTransport(Interface):
 
145
            pass
 
146
 
 
147
        class StubTransport:
 
148
            implements(IStubTransport)
 
149
 
 
150
        # Looking up what ProtocolWrapper implements also mutates the class.
 
151
        # It adds __implemented__ and __providedBy__ attributes to it.  These
 
152
        # prevent __getattr__ from causing the IStubTransport.providedBy call
 
153
        # below from returning True.  If, by accident, nothing else causes
 
154
        # these attributes to be added to ProtocolWrapper, the test will pass,
 
155
        # but the interface will only be provided until something does trigger
 
156
        # their addition.  So we just trigger it right now to be sure.
 
157
        implementedBy(policies.ProtocolWrapper)
 
158
 
 
159
        proto = protocol.Protocol()
 
160
        wrapper = policies.ProtocolWrapper(policies.WrappingFactory(None), proto)
 
161
 
 
162
        wrapper.makeConnection(StubTransport())
 
163
        self.assertTrue(IStubTransport.providedBy(proto.transport))
 
164
 
 
165
 
161
166
 
162
167
class WrappingFactory(policies.WrappingFactory):
163
168
    protocol = lambda s, f, p: p
240
245
        server = Server()
241
246
        tServer = TestableThrottlingFactory(task.Clock(), server, writeLimit=10)
242
247
        port = tServer.buildProtocol(address.IPv4Address('TCP', '127.0.0.1', 0))
243
 
        tr = PausableStringTransport()
 
248
        tr = StringTransportWithDisconnection()
244
249
        tr.protocol = port
245
250
        port.makeConnection(tr)
246
251
        port.producer = port.wrappedProtocol
271
276
        server = Server()
272
277
        tServer = TestableThrottlingFactory(task.Clock(), server, readLimit=10)
273
278
        port = tServer.buildProtocol(address.IPv4Address('TCP', '127.0.0.1', 0))
274
 
        tr = PausableStringTransport()
 
279
        tr = StringTransportWithDisconnection()
275
280
        tr.protocol = port
276
281
        port.makeConnection(tr)
277
282
 
282
287
 
283
288
        tServer.clock.advance(1.05)
284
289
        self.assertEquals(tServer.readThisSecond, 0)
285
 
        self.assertTrue(tr.paused)
 
290
        self.assertEquals(tr.producerState, 'paused')
286
291
 
287
292
        tServer.clock.advance(1.05)
288
293
        self.assertEquals(tServer.readThisSecond, 0)
289
 
        self.assertFalse(tr.paused)
 
294
        self.assertEquals(tr.producerState, 'producing')
290
295
 
291
296
        tr.clear()
292
297
        port.dataReceived("0123456789")
296
301
 
297
302
        tServer.clock.advance(1.05)
298
303
        self.assertEquals(tServer.readThisSecond, 0)
299
 
        self.assertTrue(tr.paused)
 
304
        self.assertEquals(tr.producerState, 'paused')
300
305
 
301
306
        tServer.clock.advance(1.05)
302
307
        self.assertEquals(tServer.readThisSecond, 0)
303
 
        self.assertFalse(tr.paused)
 
308
        self.assertEquals(tr.producerState, 'producing')
304
309
 
305
310
 
306
311
 
472
477
        Check that the protocol does timeout at the time specified by its
473
478
        C{timeOut} attribute.
474
479
        """
475
 
        s = StringIOWithoutClosing()
476
 
        self.proto.makeConnection(protocol.FileWrapper(s))
 
480
        self.proto.makeConnection(StringTransport())
477
481
 
478
482
        # timeOut value is 3
479
483
        self.clock.pump([0, 0.5, 1.0, 1.0])
486
490
        """
487
491
        Check that receiving data is delaying the timeout of the connection.
488
492
        """
489
 
        s = StringIOWithoutClosing()
490
 
        self.proto.makeConnection(protocol.FileWrapper(s))
 
493
        self.proto.makeConnection(StringTransport())
491
494
 
492
495
        self.clock.pump([0, 0.5, 1.0, 1.0])
493
496
        self.failIf(self.proto.timedOut)
504
507
        and install a new timeout.
505
508
        """
506
509
        self.proto.timeOut = None
507
 
        s = StringIOWithoutClosing()
508
 
        self.proto.makeConnection(protocol.FileWrapper(s))
 
510
        self.proto.makeConnection(StringTransport())
509
511
 
510
512
        self.proto.setTimeout(1)
511
513
        self.assertEquals(self.proto.timeOut, 1)
521
523
        Setting the timeout to C{None} cancel any timeout operations.
522
524
        """
523
525
        self.proto.timeOut = 5
524
 
        s = StringIOWithoutClosing()
525
 
        self.proto.makeConnection(protocol.FileWrapper(s))
 
526
        self.proto.makeConnection(StringTransport())
526
527
 
527
528
        self.proto.setTimeout(None)
528
529
        self.assertEquals(self.proto.timeOut, None)