~justin-fathomdb/nova/justinsb-openstack-api-volumes

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/test/test_application.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2001-2009 Twisted Matrix Laboratories.
 
2
# See LICENSE for details.
 
3
 
 
4
"""
 
5
Tests for L{twisted.application} and its interaction with
 
6
L{twisted.persisted.sob}.
 
7
"""
 
8
 
 
9
import sys, copy, os, pickle
 
10
from StringIO import StringIO
 
11
 
 
12
from twisted.trial import unittest, util
 
13
from twisted.application import service, internet, app
 
14
from twisted.persisted import sob
 
15
from twisted.python import usage
 
16
from twisted.python.runtime import platform
 
17
from twisted.internet import interfaces, defer
 
18
from twisted.protocols import wire, basic
 
19
from twisted.internet import protocol, reactor
 
20
from twisted.internet.utils import getProcessOutputAndValue
 
21
from twisted.application import reactors
 
22
from twisted.test.proto_helpers import MemoryReactor
 
23
 
 
24
 
 
25
oldAppSuppressions = [util.suppress(message='twisted.internet.app is deprecated',
 
26
                                    category=DeprecationWarning)]
 
27
 
 
28
skipWindowsNopywin32 = None
 
29
if platform.isWindows():
 
30
    try:
 
31
        import win32process
 
32
    except ImportError:
 
33
        skipWindowsNopywin32 = ("On windows, spawnProcess is not available "
 
34
                                "in the absence of win32process.")
 
35
 
 
36
class Dummy:
 
37
    processName=None
 
38
 
 
39
class TestService(unittest.TestCase):
 
40
 
 
41
    def testName(self):
 
42
        s = service.Service()
 
43
        s.setName("hello")
 
44
        self.failUnlessEqual(s.name, "hello")
 
45
 
 
46
    def testParent(self):
 
47
        s = service.Service()
 
48
        p = service.MultiService()
 
49
        s.setServiceParent(p)
 
50
        self.failUnlessEqual(list(p), [s])
 
51
        self.failUnlessEqual(s.parent, p)
 
52
 
 
53
    def testApplicationAsParent(self):
 
54
        s = service.Service()
 
55
        p = service.Application("")
 
56
        s.setServiceParent(p)
 
57
        self.failUnlessEqual(list(service.IServiceCollection(p)), [s])
 
58
        self.failUnlessEqual(s.parent, service.IServiceCollection(p))
 
59
 
 
60
    def testNamedChild(self):
 
61
        s = service.Service()
 
62
        p = service.MultiService()
 
63
        s.setName("hello")
 
64
        s.setServiceParent(p)
 
65
        self.failUnlessEqual(list(p), [s])
 
66
        self.failUnlessEqual(s.parent, p)
 
67
        self.failUnlessEqual(p.getServiceNamed("hello"), s)
 
68
 
 
69
    def testDoublyNamedChild(self):
 
70
        s = service.Service()
 
71
        p = service.MultiService()
 
72
        s.setName("hello")
 
73
        s.setServiceParent(p)
 
74
        self.failUnlessRaises(RuntimeError, s.setName, "lala")
 
75
 
 
76
    def testDuplicateNamedChild(self):
 
77
        s = service.Service()
 
78
        p = service.MultiService()
 
79
        s.setName("hello")
 
80
        s.setServiceParent(p)
 
81
        s = service.Service()
 
82
        s.setName("hello")
 
83
        self.failUnlessRaises(RuntimeError, s.setServiceParent, p)
 
84
 
 
85
    def testDisowning(self):
 
86
        s = service.Service()
 
87
        p = service.MultiService()
 
88
        s.setServiceParent(p)
 
89
        self.failUnlessEqual(list(p), [s])
 
90
        self.failUnlessEqual(s.parent, p)
 
91
        s.disownServiceParent()
 
92
        self.failUnlessEqual(list(p), [])
 
93
        self.failUnlessEqual(s.parent, None)
 
94
 
 
95
    def testRunning(self):
 
96
        s = service.Service()
 
97
        self.assert_(not s.running)
 
98
        s.startService()
 
99
        self.assert_(s.running)
 
100
        s.stopService()
 
101
        self.assert_(not s.running)
 
102
 
 
103
    def testRunningChildren1(self):
 
104
        s = service.Service()
 
105
        p = service.MultiService()
 
106
        s.setServiceParent(p)
 
107
        self.assert_(not s.running)
 
108
        self.assert_(not p.running)
 
109
        p.startService()
 
110
        self.assert_(s.running)
 
111
        self.assert_(p.running)
 
112
        p.stopService()
 
113
        self.assert_(not s.running)
 
114
        self.assert_(not p.running)
 
115
 
 
116
    def testRunningChildren2(self):
 
117
        s = service.Service()
 
118
        def checkRunning():
 
119
            self.assert_(s.running)
 
120
        t = service.Service()
 
121
        t.stopService = checkRunning
 
122
        t.startService = checkRunning
 
123
        p = service.MultiService()
 
124
        s.setServiceParent(p)
 
125
        t.setServiceParent(p)
 
126
        p.startService()
 
127
        p.stopService()
 
128
 
 
129
    def testAddingIntoRunning(self):
 
130
        p = service.MultiService()
 
131
        p.startService()
 
132
        s = service.Service()
 
133
        self.assert_(not s.running)
 
134
        s.setServiceParent(p)
 
135
        self.assert_(s.running)
 
136
        s.disownServiceParent()
 
137
        self.assert_(not s.running)
 
138
 
 
139
    def testPrivileged(self):
 
140
        s = service.Service()
 
141
        def pss():
 
142
            s.privilegedStarted = 1
 
143
        s.privilegedStartService = pss
 
144
        s1 = service.Service()
 
145
        p = service.MultiService()
 
146
        s.setServiceParent(p)
 
147
        s1.setServiceParent(p)
 
148
        p.privilegedStartService()
 
149
        self.assert_(s.privilegedStarted)
 
150
 
 
151
    def testCopying(self):
 
152
        s = service.Service()
 
153
        s.startService()
 
154
        s1 = copy.copy(s)
 
155
        self.assert_(not s1.running)
 
156
        self.assert_(s.running)
 
157
 
 
158
 
 
159
if hasattr(os, "getuid"):
 
160
    curuid = os.getuid()
 
161
    curgid = os.getgid()
 
162
else:
 
163
    curuid = curgid = 0
 
164
 
 
165
 
 
166
class TestProcess(unittest.TestCase):
 
167
 
 
168
    def testID(self):
 
169
        p = service.Process(5, 6)
 
170
        self.assertEqual(p.uid, 5)
 
171
        self.assertEqual(p.gid, 6)
 
172
 
 
173
    def testDefaults(self):
 
174
        p = service.Process(5)
 
175
        self.assertEqual(p.uid, 5)
 
176
        self.assertEqual(p.gid, None)
 
177
        p = service.Process(gid=5)
 
178
        self.assertEqual(p.uid, None)
 
179
        self.assertEqual(p.gid, 5)
 
180
        p = service.Process()
 
181
        self.assertEqual(p.uid, None)
 
182
        self.assertEqual(p.gid, None)
 
183
 
 
184
    def testProcessName(self):
 
185
        p = service.Process()
 
186
        self.assertEqual(p.processName, None)
 
187
        p.processName = 'hello'
 
188
        self.assertEqual(p.processName, 'hello')
 
189
 
 
190
 
 
191
class TestInterfaces(unittest.TestCase):
 
192
 
 
193
    def testService(self):
 
194
        self.assert_(service.IService.providedBy(service.Service()))
 
195
 
 
196
    def testMultiService(self):
 
197
        self.assert_(service.IService.providedBy(service.MultiService()))
 
198
        self.assert_(service.IServiceCollection.providedBy(service.MultiService()))
 
199
 
 
200
    def testProcess(self):
 
201
        self.assert_(service.IProcess.providedBy(service.Process()))
 
202
 
 
203
 
 
204
class TestApplication(unittest.TestCase):
 
205
 
 
206
    def testConstructor(self):
 
207
        service.Application("hello")
 
208
        service.Application("hello", 5)
 
209
        service.Application("hello", 5, 6)
 
210
 
 
211
    def testProcessComponent(self):
 
212
        a = service.Application("hello")
 
213
        self.assertEqual(service.IProcess(a).uid, None)
 
214
        self.assertEqual(service.IProcess(a).gid, None)
 
215
        a = service.Application("hello", 5)
 
216
        self.assertEqual(service.IProcess(a).uid, 5)
 
217
        self.assertEqual(service.IProcess(a).gid, None)
 
218
        a = service.Application("hello", 5, 6)
 
219
        self.assertEqual(service.IProcess(a).uid, 5)
 
220
        self.assertEqual(service.IProcess(a).gid, 6)
 
221
 
 
222
    def testServiceComponent(self):
 
223
        a = service.Application("hello")
 
224
        self.assert_(service.IService(a) is service.IServiceCollection(a))
 
225
        self.assertEqual(service.IService(a).name, "hello")
 
226
        self.assertEqual(service.IService(a).parent, None)
 
227
 
 
228
    def testPersistableComponent(self):
 
229
        a = service.Application("hello")
 
230
        p = sob.IPersistable(a)
 
231
        self.assertEqual(p.style, 'pickle')
 
232
        self.assertEqual(p.name, 'hello')
 
233
        self.assert_(p.original is a)
 
234
 
 
235
class TestLoading(unittest.TestCase):
 
236
 
 
237
    def test_simpleStoreAndLoad(self):
 
238
        a = service.Application("hello")
 
239
        p = sob.IPersistable(a)
 
240
        for style in 'source pickle'.split():
 
241
            p.setStyle(style)
 
242
            p.save()
 
243
            a1 = service.loadApplication("hello.ta"+style[0], style)
 
244
            self.assertEqual(service.IService(a1).name, "hello")
 
245
        f = open("hello.tac", 'w')
 
246
        f.writelines([
 
247
        "from twisted.application import service\n",
 
248
        "application = service.Application('hello')\n",
 
249
        ])
 
250
        f.close()
 
251
        a1 = service.loadApplication("hello.tac", 'python')
 
252
        self.assertEqual(service.IService(a1).name, "hello")
 
253
 
 
254
 
 
255
 
 
256
class TestAppSupport(unittest.TestCase):
 
257
 
 
258
    def testPassphrase(self):
 
259
        self.assertEqual(app.getPassphrase(0), None)
 
260
 
 
261
    def testLoadApplication(self):
 
262
        """
 
263
        Test loading an application file in different dump format.
 
264
        """
 
265
        a = service.Application("hello")
 
266
        baseconfig = {'file': None, 'source': None, 'python':None}
 
267
        for style in 'source pickle'.split():
 
268
            config = baseconfig.copy()
 
269
            config[{'pickle': 'file'}.get(style, style)] = 'helloapplication'
 
270
            sob.IPersistable(a).setStyle(style)
 
271
            sob.IPersistable(a).save(filename='helloapplication')
 
272
            a1 = app.getApplication(config, None)
 
273
            self.assertEqual(service.IService(a1).name, "hello")
 
274
        config = baseconfig.copy()
 
275
        config['python'] = 'helloapplication'
 
276
        f = open("helloapplication", 'w')
 
277
        f.writelines([
 
278
        "from twisted.application import service\n",
 
279
        "application = service.Application('hello')\n",
 
280
        ])
 
281
        f.close()
 
282
        a1 = app.getApplication(config, None)
 
283
        self.assertEqual(service.IService(a1).name, "hello")
 
284
 
 
285
    def test_convertStyle(self):
 
286
        appl = service.Application("lala")
 
287
        for instyle in 'source pickle'.split():
 
288
            for outstyle in 'source pickle'.split():
 
289
                sob.IPersistable(appl).setStyle(instyle)
 
290
                sob.IPersistable(appl).save(filename="converttest")
 
291
                app.convertStyle("converttest", instyle, None,
 
292
                                 "converttest.out", outstyle, 0)
 
293
                appl2 = service.loadApplication("converttest.out", outstyle)
 
294
                self.assertEqual(service.IService(appl2).name, "lala")
 
295
 
 
296
    def test_getLogFile(self):
 
297
        """
 
298
        Test L{app.getLogFile}, veryfying the LogFile instance it returns.
 
299
        """
 
300
        os.mkdir("logfiledir")
 
301
        l = app.getLogFile(os.path.join("logfiledir", "lala"))
 
302
        self.assertEqual(l.path,
 
303
                         os.path.abspath(os.path.join("logfiledir", "lala")))
 
304
        self.assertEqual(l.name, "lala")
 
305
        self.assertEqual(l.directory, os.path.abspath("logfiledir"))
 
306
 
 
307
    test_getLogFile.suppress = [
 
308
        util.suppress(message="app.getLogFile is deprecated. Use "
 
309
                      "twisted.python.logfile.LogFile.fromFullPath instead",
 
310
                      category=DeprecationWarning)]
 
311
 
 
312
    def test_startApplication(self):
 
313
        appl = service.Application("lala")
 
314
        app.startApplication(appl, 0)
 
315
        self.assert_(service.IService(appl).running)
 
316
 
 
317
 
 
318
class Foo(basic.LineReceiver):
 
319
    def connectionMade(self):
 
320
        self.transport.write('lalala\r\n')
 
321
    def lineReceived(self, line):
 
322
        self.factory.line = line
 
323
        self.transport.loseConnection()
 
324
    def connectionLost(self, reason):
 
325
        self.factory.d.callback(self.factory.line)
 
326
 
 
327
 
 
328
class DummyApp:
 
329
    processName = None
 
330
    def addService(self, service):
 
331
        self.services[service.name] = service
 
332
    def removeService(self, service):
 
333
        del self.services[service.name]
 
334
 
 
335
 
 
336
class TimerTarget:
 
337
    def __init__(self):
 
338
        self.l = []
 
339
    def append(self, what):
 
340
        self.l.append(what)
 
341
 
 
342
class TestEcho(wire.Echo):
 
343
    def connectionLost(self, reason):
 
344
        self.d.callback(True)
 
345
 
 
346
class TestInternet2(unittest.TestCase):
 
347
 
 
348
    def testTCP(self):
 
349
        s = service.MultiService()
 
350
        s.startService()
 
351
        factory = protocol.ServerFactory()
 
352
        factory.protocol = TestEcho
 
353
        TestEcho.d = defer.Deferred()
 
354
        t = internet.TCPServer(0, factory)
 
355
        t.setServiceParent(s)
 
356
        num = t._port.getHost().port
 
357
        factory = protocol.ClientFactory()
 
358
        factory.d = defer.Deferred()
 
359
        factory.protocol = Foo
 
360
        factory.line = None
 
361
        internet.TCPClient('127.0.0.1', num, factory).setServiceParent(s)
 
362
        factory.d.addCallback(self.assertEqual, 'lalala')
 
363
        factory.d.addCallback(lambda x : s.stopService())
 
364
        factory.d.addCallback(lambda x : TestEcho.d)
 
365
        return factory.d
 
366
 
 
367
 
 
368
    def test_UDP(self):
 
369
        """
 
370
        Test L{internet.UDPServer} with a random port: starting the service
 
371
        should give it valid port, and stopService should free it so that we
 
372
        can start a server on the same port again.
 
373
        """
 
374
        if not interfaces.IReactorUDP(reactor, None):
 
375
            raise unittest.SkipTest("This reactor does not support UDP sockets")
 
376
        p = protocol.DatagramProtocol()
 
377
        t = internet.UDPServer(0, p)
 
378
        t.startService()
 
379
        num = t._port.getHost().port
 
380
        self.assertNotEquals(num, 0)
 
381
        def onStop(ignored):
 
382
            t = internet.UDPServer(num, p)
 
383
            t.startService()
 
384
            return t.stopService()
 
385
        return defer.maybeDeferred(t.stopService).addCallback(onStop)
 
386
 
 
387
 
 
388
    def testPrivileged(self):
 
389
        factory = protocol.ServerFactory()
 
390
        factory.protocol = TestEcho
 
391
        TestEcho.d = defer.Deferred()
 
392
        t = internet.TCPServer(0, factory)
 
393
        t.privileged = 1
 
394
        t.privilegedStartService()
 
395
        num = t._port.getHost().port
 
396
        factory = protocol.ClientFactory()
 
397
        factory.d = defer.Deferred()
 
398
        factory.protocol = Foo
 
399
        factory.line = None
 
400
        c = internet.TCPClient('127.0.0.1', num, factory)
 
401
        c.startService()
 
402
        factory.d.addCallback(self.assertEqual, 'lalala')
 
403
        factory.d.addCallback(lambda x : c.stopService())
 
404
        factory.d.addCallback(lambda x : t.stopService())
 
405
        factory.d.addCallback(lambda x : TestEcho.d)
 
406
        return factory.d
 
407
 
 
408
    def testConnectionGettingRefused(self):
 
409
        factory = protocol.ServerFactory()
 
410
        factory.protocol = wire.Echo
 
411
        t = internet.TCPServer(0, factory)
 
412
        t.startService()
 
413
        num = t._port.getHost().port
 
414
        t.stopService()
 
415
        d = defer.Deferred()
 
416
        factory = protocol.ClientFactory()
 
417
        factory.clientConnectionFailed = lambda *args: d.callback(None)
 
418
        c = internet.TCPClient('127.0.0.1', num, factory)
 
419
        c.startService()
 
420
        return d
 
421
 
 
422
    def testUNIX(self):
 
423
        # FIXME: This test is far too dense.  It needs comments.
 
424
        #  -- spiv, 2004-11-07
 
425
        if not interfaces.IReactorUNIX(reactor, None):
 
426
            raise unittest.SkipTest, "This reactor does not support UNIX domain sockets"
 
427
        s = service.MultiService()
 
428
        s.startService()
 
429
        factory = protocol.ServerFactory()
 
430
        factory.protocol = TestEcho
 
431
        TestEcho.d = defer.Deferred()
 
432
        t = internet.UNIXServer('echo.skt', factory)
 
433
        t.setServiceParent(s)
 
434
        factory = protocol.ClientFactory()
 
435
        factory.protocol = Foo
 
436
        factory.d = defer.Deferred()
 
437
        factory.line = None
 
438
        internet.UNIXClient('echo.skt', factory).setServiceParent(s)
 
439
        factory.d.addCallback(self.assertEqual, 'lalala')
 
440
        factory.d.addCallback(lambda x : s.stopService())
 
441
        factory.d.addCallback(lambda x : TestEcho.d)
 
442
        factory.d.addCallback(self._cbTestUnix, factory, s)
 
443
        return factory.d
 
444
 
 
445
    def _cbTestUnix(self, ignored, factory, s):
 
446
        TestEcho.d = defer.Deferred()
 
447
        factory.line = None
 
448
        factory.d = defer.Deferred()
 
449
        s.startService()
 
450
        factory.d.addCallback(self.assertEqual, 'lalala')
 
451
        factory.d.addCallback(lambda x : s.stopService())
 
452
        factory.d.addCallback(lambda x : TestEcho.d)
 
453
        return factory.d
 
454
 
 
455
    def testVolatile(self):
 
456
        if not interfaces.IReactorUNIX(reactor, None):
 
457
            raise unittest.SkipTest, "This reactor does not support UNIX domain sockets"
 
458
        factory = protocol.ServerFactory()
 
459
        factory.protocol = wire.Echo
 
460
        t = internet.UNIXServer('echo.skt', factory)
 
461
        t.startService()
 
462
        self.failIfIdentical(t._port, None)
 
463
        t1 = copy.copy(t)
 
464
        self.assertIdentical(t1._port, None)
 
465
        t.stopService()
 
466
        self.assertIdentical(t._port, None)
 
467
        self.failIf(t.running)
 
468
 
 
469
        factory = protocol.ClientFactory()
 
470
        factory.protocol = wire.Echo
 
471
        t = internet.UNIXClient('echo.skt', factory)
 
472
        t.startService()
 
473
        self.failIfIdentical(t._connection, None)
 
474
        t1 = copy.copy(t)
 
475
        self.assertIdentical(t1._connection, None)
 
476
        t.stopService()
 
477
        self.assertIdentical(t._connection, None)
 
478
        self.failIf(t.running)
 
479
 
 
480
    def testStoppingServer(self):
 
481
        if not interfaces.IReactorUNIX(reactor, None):
 
482
            raise unittest.SkipTest, "This reactor does not support UNIX domain sockets"
 
483
        factory = protocol.ServerFactory()
 
484
        factory.protocol = wire.Echo
 
485
        t = internet.UNIXServer('echo.skt', factory)
 
486
        t.startService()
 
487
        t.stopService()
 
488
        self.failIf(t.running)
 
489
        factory = protocol.ClientFactory()
 
490
        d = defer.Deferred()
 
491
        factory.clientConnectionFailed = lambda *args: d.callback(None)
 
492
        reactor.connectUNIX('echo.skt', factory)
 
493
        return d
 
494
 
 
495
    def testPickledTimer(self):
 
496
        target = TimerTarget()
 
497
        t0 = internet.TimerService(1, target.append, "hello")
 
498
        t0.startService()
 
499
        s = pickle.dumps(t0)
 
500
        t0.stopService()
 
501
 
 
502
        t = pickle.loads(s)
 
503
        self.failIf(t.running)
 
504
 
 
505
    def testBrokenTimer(self):
 
506
        d = defer.Deferred()
 
507
        t = internet.TimerService(1, lambda: 1 / 0)
 
508
        oldFailed = t._failed
 
509
        def _failed(why):
 
510
            oldFailed(why)
 
511
            d.callback(None)
 
512
        t._failed = _failed
 
513
        t.startService()
 
514
        d.addCallback(lambda x : t.stopService)
 
515
        d.addCallback(lambda x : self.assertEqual(
 
516
            [ZeroDivisionError],
 
517
            [o.value.__class__ for o in self.flushLoggedErrors(ZeroDivisionError)]))
 
518
        return d
 
519
 
 
520
    def testEverythingThere(self):
 
521
        trans = 'TCP UNIX SSL UDP UNIXDatagram Multicast'.split()
 
522
        for tran in trans[:]:
 
523
            if not getattr(interfaces, "IReactor"+tran)(reactor, None):
 
524
                trans.remove(tran)
 
525
        if interfaces.IReactorArbitrary(reactor, None) is not None:
 
526
            trans.insert(0, "Generic")
 
527
        for tran in trans:
 
528
            for side in 'Server Client'.split():
 
529
                if tran == "Multicast" and side == "Client":
 
530
                    continue
 
531
                self.assert_(hasattr(internet, tran+side))
 
532
                method = getattr(internet, tran+side).method
 
533
                prefix = {'Server': 'listen', 'Client': 'connect'}[side]
 
534
                self.assert_(hasattr(reactor, prefix+method) or
 
535
                        (prefix == "connect" and method == "UDP"))
 
536
                o = getattr(internet, tran+side)()
 
537
                self.assertEqual(service.IService(o), o)
 
538
 
 
539
 
 
540
    def test_reactorParametrizationInServer(self):
 
541
        """
 
542
        L{internet._AbstractServer} supports a C{reactor} keyword argument
 
543
        that can be used to parametrize the reactor used to listen for
 
544
        connections.
 
545
        """
 
546
        reactor = MemoryReactor()
 
547
 
 
548
        factory = object()
 
549
        t = internet.TCPServer(1234, factory, reactor=reactor)
 
550
        t.startService()
 
551
        self.assertEquals(reactor.tcpServers.pop()[:2], (1234, factory))
 
552
 
 
553
 
 
554
    def test_reactorParametrizationInClient(self):
 
555
        """
 
556
        L{internet._AbstractClient} supports a C{reactor} keyword arguments
 
557
        that can be used to parametrize the reactor used to create new client
 
558
        connections.
 
559
        """
 
560
        reactor = MemoryReactor()
 
561
 
 
562
        factory = object()
 
563
        t = internet.TCPClient('127.0.0.1', 1234, factory, reactor=reactor)
 
564
        t.startService()
 
565
        self.assertEquals(
 
566
            reactor.tcpClients.pop()[:3], ('127.0.0.1', 1234, factory))
 
567
 
 
568
 
 
569
    def test_reactorParametrizationInServerMultipleStart(self):
 
570
        """
 
571
        Like L{test_reactorParametrizationInServer}, but stop and restart the
 
572
        service and check that the given reactor is still used.
 
573
        """
 
574
        reactor = MemoryReactor()
 
575
 
 
576
        factory = object()
 
577
        t = internet.TCPServer(1234, factory, reactor=reactor)
 
578
        t.startService()
 
579
        self.assertEquals(reactor.tcpServers.pop()[:2], (1234, factory))
 
580
        t.stopService()
 
581
        t.startService()
 
582
        self.assertEquals(reactor.tcpServers.pop()[:2], (1234, factory))
 
583
 
 
584
 
 
585
    def test_reactorParametrizationInClientMultipleStart(self):
 
586
        """
 
587
        Like L{test_reactorParametrizationInClient}, but stop and restart the
 
588
        service and check that the given reactor is still used.
 
589
        """
 
590
        reactor = MemoryReactor()
 
591
 
 
592
        factory = object()
 
593
        t = internet.TCPClient('127.0.0.1', 1234, factory, reactor=reactor)
 
594
        t.startService()
 
595
        self.assertEquals(
 
596
            reactor.tcpClients.pop()[:3], ('127.0.0.1', 1234, factory))
 
597
        t.stopService()
 
598
        t.startService()
 
599
        self.assertEquals(
 
600
            reactor.tcpClients.pop()[:3], ('127.0.0.1', 1234, factory))
 
601
 
 
602
 
 
603
 
 
604
class TestTimerBasic(unittest.TestCase):
 
605
 
 
606
    def testTimerRuns(self):
 
607
        d = defer.Deferred()
 
608
        self.t = internet.TimerService(1, d.callback, 'hello')
 
609
        self.t.startService()
 
610
        d.addCallback(self.assertEqual, 'hello')
 
611
        d.addCallback(lambda x : self.t.stopService())
 
612
        d.addCallback(lambda x : self.failIf(self.t.running))
 
613
        return d
 
614
 
 
615
    def tearDown(self):
 
616
        return self.t.stopService()
 
617
 
 
618
    def testTimerRestart(self):
 
619
        # restart the same TimerService
 
620
        d1 = defer.Deferred()
 
621
        d2 = defer.Deferred()
 
622
        work = [(d2, "bar"), (d1, "foo")]
 
623
        def trigger():
 
624
            d, arg = work.pop()
 
625
            d.callback(arg)
 
626
        self.t = internet.TimerService(1, trigger)
 
627
        self.t.startService()
 
628
        def onFirstResult(result):
 
629
            self.assertEqual(result, 'foo')
 
630
            return self.t.stopService()
 
631
        def onFirstStop(ignored):
 
632
            self.failIf(self.t.running)
 
633
            self.t.startService()
 
634
            return d2
 
635
        def onSecondResult(result):
 
636
            self.assertEqual(result, 'bar')
 
637
            self.t.stopService()
 
638
        d1.addCallback(onFirstResult)
 
639
        d1.addCallback(onFirstStop)
 
640
        d1.addCallback(onSecondResult)
 
641
        return d1
 
642
 
 
643
    def testTimerLoops(self):
 
644
        l = []
 
645
        def trigger(data, number, d):
 
646
            l.append(data)
 
647
            if len(l) == number:
 
648
                d.callback(l)
 
649
        d = defer.Deferred()
 
650
        self.t = internet.TimerService(0.01, trigger, "hello", 10, d)
 
651
        self.t.startService()
 
652
        d.addCallback(self.assertEqual, ['hello'] * 10)
 
653
        d.addCallback(lambda x : self.t.stopService())
 
654
        return d
 
655
 
 
656
 
 
657
class FakeReactor(reactors.Reactor):
 
658
    """
 
659
    A fake reactor with a hooked install method.
 
660
    """
 
661
 
 
662
    def __init__(self, install, *args, **kwargs):
 
663
        """
 
664
        @param install: any callable that will be used as install method.
 
665
        @type install: C{callable}
 
666
        """
 
667
        reactors.Reactor.__init__(self, *args, **kwargs)
 
668
        self.install = install
 
669
 
 
670
 
 
671
 
 
672
class PluggableReactorTestCase(unittest.TestCase):
 
673
    """
 
674
    Tests for the reactor discovery/inspection APIs.
 
675
    """
 
676
 
 
677
    def setUp(self):
 
678
        """
 
679
        Override the L{reactors.getPlugins} function, normally bound to
 
680
        L{twisted.plugin.getPlugins}, in order to control which
 
681
        L{IReactorInstaller} plugins are seen as available.
 
682
 
 
683
        C{self.pluginResults} can be customized and will be used as the
 
684
        result of calls to C{reactors.getPlugins}.
 
685
        """
 
686
        self.pluginCalls = []
 
687
        self.pluginResults = []
 
688
        self.originalFunction = reactors.getPlugins
 
689
        reactors.getPlugins = self._getPlugins
 
690
 
 
691
 
 
692
    def tearDown(self):
 
693
        """
 
694
        Restore the original L{reactors.getPlugins}.
 
695
        """
 
696
        reactors.getPlugins = self.originalFunction
 
697
 
 
698
 
 
699
    def _getPlugins(self, interface, package=None):
 
700
        """
 
701
        Stand-in for the real getPlugins method which records its arguments
 
702
        and returns a fixed result.
 
703
        """
 
704
        self.pluginCalls.append((interface, package))
 
705
        return list(self.pluginResults)
 
706
 
 
707
 
 
708
    def test_getPluginReactorTypes(self):
 
709
        """
 
710
        Test that reactor plugins are returned from L{getReactorTypes}
 
711
        """
 
712
        name = 'fakereactortest'
 
713
        package = __name__ + '.fakereactor'
 
714
        description = 'description'
 
715
        self.pluginResults = [reactors.Reactor(name, package, description)]
 
716
        reactorTypes = reactors.getReactorTypes()
 
717
 
 
718
        self.assertEqual(
 
719
            self.pluginCalls,
 
720
            [(reactors.IReactorInstaller, None)])
 
721
 
 
722
        for r in reactorTypes:
 
723
            if r.shortName == name:
 
724
                self.assertEqual(r.description, description)
 
725
                break
 
726
        else:
 
727
            self.fail("Reactor plugin not present in getReactorTypes() result")
 
728
 
 
729
 
 
730
    def test_reactorInstallation(self):
 
731
        """
 
732
        Test that L{reactors.Reactor.install} loads the correct module and
 
733
        calls its install attribute.
 
734
        """
 
735
        installed = []
 
736
        def install():
 
737
            installed.append(True)
 
738
        installer = FakeReactor(install,
 
739
                                'fakereactortest', __name__, 'described')
 
740
        installer.install()
 
741
        self.assertEqual(installed, [True])
 
742
 
 
743
 
 
744
    def test_installReactor(self):
 
745
        """
 
746
        Test that the L{reactors.installReactor} function correctly installs
 
747
        the specified reactor.
 
748
        """
 
749
        installed = []
 
750
        def install():
 
751
            installed.append(True)
 
752
        name = 'fakereactortest'
 
753
        package = __name__
 
754
        description = 'description'
 
755
        self.pluginResults = [FakeReactor(install, name, package, description)]
 
756
        reactors.installReactor(name)
 
757
        self.assertEqual(installed, [True])
 
758
 
 
759
 
 
760
    def test_installNonExistentReactor(self):
 
761
        """
 
762
        Test that L{reactors.installReactor} raises L{reactors.NoSuchReactor}
 
763
        when asked to install a reactor which it cannot find.
 
764
        """
 
765
        self.pluginResults = []
 
766
        self.assertRaises(
 
767
            reactors.NoSuchReactor,
 
768
            reactors.installReactor, 'somereactor')
 
769
 
 
770
 
 
771
    def test_installNotAvailableReactor(self):
 
772
        """
 
773
        Test that L{reactors.installReactor} raises an exception when asked to
 
774
        install a reactor which doesn't work in this environment.
 
775
        """
 
776
        def install():
 
777
            raise ImportError("Missing foo bar")
 
778
        name = 'fakereactortest'
 
779
        package = __name__
 
780
        description = 'description'
 
781
        self.pluginResults = [FakeReactor(install, name, package, description)]
 
782
        self.assertRaises(ImportError, reactors.installReactor, name)
 
783
 
 
784
 
 
785
    def test_reactorSelectionMixin(self):
 
786
        """
 
787
        Test that the reactor selected is installed as soon as possible, ie
 
788
        when the option is parsed.
 
789
        """
 
790
        executed = []
 
791
        INSTALL_EVENT = 'reactor installed'
 
792
        SUBCOMMAND_EVENT = 'subcommands loaded'
 
793
 
 
794
        class ReactorSelectionOptions(usage.Options, app.ReactorSelectionMixin):
 
795
            def subCommands(self):
 
796
                executed.append(SUBCOMMAND_EVENT)
 
797
                return [('subcommand', None, lambda: self, 'test subcommand')]
 
798
            subCommands = property(subCommands)
 
799
 
 
800
        def install():
 
801
            executed.append(INSTALL_EVENT)
 
802
        self.pluginResults = [
 
803
            FakeReactor(install, 'fakereactortest', __name__, 'described')
 
804
        ]
 
805
 
 
806
        options = ReactorSelectionOptions()
 
807
        options.parseOptions(['--reactor', 'fakereactortest', 'subcommand'])
 
808
        self.assertEqual(executed[0], INSTALL_EVENT)
 
809
        self.assertEqual(executed.count(INSTALL_EVENT), 1)
 
810
 
 
811
 
 
812
    def test_reactorSelectionMixinNonExistent(self):
 
813
        """
 
814
        Test that the usage mixin exits when trying to use a non existent
 
815
        reactor (the name not matching to any reactor), giving an error
 
816
        message.
 
817
        """
 
818
        class ReactorSelectionOptions(usage.Options, app.ReactorSelectionMixin):
 
819
            pass
 
820
        self.pluginResults = []
 
821
 
 
822
        options = ReactorSelectionOptions()
 
823
        options.messageOutput = StringIO()
 
824
        e = self.assertRaises(usage.UsageError, options.parseOptions,
 
825
                              ['--reactor', 'fakereactortest', 'subcommand'])
 
826
        self.assertIn("fakereactortest", e.args[0])
 
827
        self.assertIn("help-reactors", e.args[0])
 
828
 
 
829
 
 
830
    def test_reactorSelectionMixinNotAvailable(self):
 
831
        """
 
832
        Test that the usage mixin exits when trying to use a reactor not
 
833
        available (the reactor raises an error at installation), giving an
 
834
        error message.
 
835
        """
 
836
        class ReactorSelectionOptions(usage.Options, app.ReactorSelectionMixin):
 
837
            pass
 
838
        message = "Missing foo bar"
 
839
        def install():
 
840
            raise ImportError(message)
 
841
 
 
842
        name = 'fakereactortest'
 
843
        package = __name__
 
844
        description = 'description'
 
845
        self.pluginResults = [FakeReactor(install, name, package, description)]
 
846
 
 
847
        options = ReactorSelectionOptions()
 
848
        options.messageOutput = StringIO()
 
849
        e =  self.assertRaises(usage.UsageError, options.parseOptions,
 
850
                               ['--reactor', 'fakereactortest', 'subcommand'])
 
851
        self.assertIn(message, e.args[0])
 
852
        self.assertIn("help-reactors", e.args[0])
 
853
 
 
854
 
 
855
 
 
856
class ReportProfileTestCase(unittest.TestCase):
 
857
    """
 
858
    Tests for L{app.reportProfile}.
 
859
    """
 
860
 
 
861
    def test_deprecation(self):
 
862
        """
 
863
        Check that L{app.reportProfile} prints a warning and does nothing else.
 
864
        """
 
865
        self.assertWarns(DeprecationWarning,
 
866
            "reportProfile is deprecated and a no-op since Twisted 8.0.",
 
867
            app.__file__, app.reportProfile, None, None)