1
# Copyright (c) 2001-2009 Twisted Matrix Laboratories.
2
# See LICENSE for details.
5
Tests for L{twisted.application} and its interaction with
6
L{twisted.persisted.sob}.
9
import sys, copy, os, pickle
10
from StringIO import StringIO
12
from twisted.trial import unittest, util
13
from twisted.application import service, internet, app
14
from twisted.persisted import sob
15
from twisted.python import usage
16
from twisted.python.runtime import platform
17
from twisted.internet import interfaces, defer
18
from twisted.protocols import wire, basic
19
from twisted.internet import protocol, reactor
20
from twisted.internet.utils import getProcessOutputAndValue
21
from twisted.application import reactors
22
from twisted.test.proto_helpers import MemoryReactor
25
oldAppSuppressions = [util.suppress(message='twisted.internet.app is deprecated',
26
category=DeprecationWarning)]
28
skipWindowsNopywin32 = None
29
if platform.isWindows():
33
skipWindowsNopywin32 = ("On windows, spawnProcess is not available "
34
"in the absence of win32process.")
39
class TestService(unittest.TestCase):
44
self.failUnlessEqual(s.name, "hello")
48
p = service.MultiService()
50
self.failUnlessEqual(list(p), [s])
51
self.failUnlessEqual(s.parent, p)
53
def testApplicationAsParent(self):
55
p = service.Application("")
57
self.failUnlessEqual(list(service.IServiceCollection(p)), [s])
58
self.failUnlessEqual(s.parent, service.IServiceCollection(p))
60
def testNamedChild(self):
62
p = service.MultiService()
65
self.failUnlessEqual(list(p), [s])
66
self.failUnlessEqual(s.parent, p)
67
self.failUnlessEqual(p.getServiceNamed("hello"), s)
69
def testDoublyNamedChild(self):
71
p = service.MultiService()
74
self.failUnlessRaises(RuntimeError, s.setName, "lala")
76
def testDuplicateNamedChild(self):
78
p = service.MultiService()
83
self.failUnlessRaises(RuntimeError, s.setServiceParent, p)
85
def testDisowning(self):
87
p = service.MultiService()
89
self.failUnlessEqual(list(p), [s])
90
self.failUnlessEqual(s.parent, p)
91
s.disownServiceParent()
92
self.failUnlessEqual(list(p), [])
93
self.failUnlessEqual(s.parent, None)
95
def testRunning(self):
97
self.assert_(not s.running)
99
self.assert_(s.running)
101
self.assert_(not s.running)
103
def testRunningChildren1(self):
104
s = service.Service()
105
p = service.MultiService()
106
s.setServiceParent(p)
107
self.assert_(not s.running)
108
self.assert_(not p.running)
110
self.assert_(s.running)
111
self.assert_(p.running)
113
self.assert_(not s.running)
114
self.assert_(not p.running)
116
def testRunningChildren2(self):
117
s = service.Service()
119
self.assert_(s.running)
120
t = service.Service()
121
t.stopService = checkRunning
122
t.startService = checkRunning
123
p = service.MultiService()
124
s.setServiceParent(p)
125
t.setServiceParent(p)
129
def testAddingIntoRunning(self):
130
p = service.MultiService()
132
s = service.Service()
133
self.assert_(not s.running)
134
s.setServiceParent(p)
135
self.assert_(s.running)
136
s.disownServiceParent()
137
self.assert_(not s.running)
139
def testPrivileged(self):
140
s = service.Service()
142
s.privilegedStarted = 1
143
s.privilegedStartService = pss
144
s1 = service.Service()
145
p = service.MultiService()
146
s.setServiceParent(p)
147
s1.setServiceParent(p)
148
p.privilegedStartService()
149
self.assert_(s.privilegedStarted)
151
def testCopying(self):
152
s = service.Service()
155
self.assert_(not s1.running)
156
self.assert_(s.running)
159
if hasattr(os, "getuid"):
166
class TestProcess(unittest.TestCase):
169
p = service.Process(5, 6)
170
self.assertEqual(p.uid, 5)
171
self.assertEqual(p.gid, 6)
173
def testDefaults(self):
174
p = service.Process(5)
175
self.assertEqual(p.uid, 5)
176
self.assertEqual(p.gid, None)
177
p = service.Process(gid=5)
178
self.assertEqual(p.uid, None)
179
self.assertEqual(p.gid, 5)
180
p = service.Process()
181
self.assertEqual(p.uid, None)
182
self.assertEqual(p.gid, None)
184
def testProcessName(self):
185
p = service.Process()
186
self.assertEqual(p.processName, None)
187
p.processName = 'hello'
188
self.assertEqual(p.processName, 'hello')
191
class TestInterfaces(unittest.TestCase):
193
def testService(self):
194
self.assert_(service.IService.providedBy(service.Service()))
196
def testMultiService(self):
197
self.assert_(service.IService.providedBy(service.MultiService()))
198
self.assert_(service.IServiceCollection.providedBy(service.MultiService()))
200
def testProcess(self):
201
self.assert_(service.IProcess.providedBy(service.Process()))
204
class TestApplication(unittest.TestCase):
206
def testConstructor(self):
207
service.Application("hello")
208
service.Application("hello", 5)
209
service.Application("hello", 5, 6)
211
def testProcessComponent(self):
212
a = service.Application("hello")
213
self.assertEqual(service.IProcess(a).uid, None)
214
self.assertEqual(service.IProcess(a).gid, None)
215
a = service.Application("hello", 5)
216
self.assertEqual(service.IProcess(a).uid, 5)
217
self.assertEqual(service.IProcess(a).gid, None)
218
a = service.Application("hello", 5, 6)
219
self.assertEqual(service.IProcess(a).uid, 5)
220
self.assertEqual(service.IProcess(a).gid, 6)
222
def testServiceComponent(self):
223
a = service.Application("hello")
224
self.assert_(service.IService(a) is service.IServiceCollection(a))
225
self.assertEqual(service.IService(a).name, "hello")
226
self.assertEqual(service.IService(a).parent, None)
228
def testPersistableComponent(self):
229
a = service.Application("hello")
230
p = sob.IPersistable(a)
231
self.assertEqual(p.style, 'pickle')
232
self.assertEqual(p.name, 'hello')
233
self.assert_(p.original is a)
235
class TestLoading(unittest.TestCase):
237
def test_simpleStoreAndLoad(self):
238
a = service.Application("hello")
239
p = sob.IPersistable(a)
240
for style in 'source pickle'.split():
243
a1 = service.loadApplication("hello.ta"+style[0], style)
244
self.assertEqual(service.IService(a1).name, "hello")
245
f = open("hello.tac", 'w')
247
"from twisted.application import service\n",
248
"application = service.Application('hello')\n",
251
a1 = service.loadApplication("hello.tac", 'python')
252
self.assertEqual(service.IService(a1).name, "hello")
256
class TestAppSupport(unittest.TestCase):
258
def testPassphrase(self):
259
self.assertEqual(app.getPassphrase(0), None)
261
def testLoadApplication(self):
263
Test loading an application file in different dump format.
265
a = service.Application("hello")
266
baseconfig = {'file': None, 'source': None, 'python':None}
267
for style in 'source pickle'.split():
268
config = baseconfig.copy()
269
config[{'pickle': 'file'}.get(style, style)] = 'helloapplication'
270
sob.IPersistable(a).setStyle(style)
271
sob.IPersistable(a).save(filename='helloapplication')
272
a1 = app.getApplication(config, None)
273
self.assertEqual(service.IService(a1).name, "hello")
274
config = baseconfig.copy()
275
config['python'] = 'helloapplication'
276
f = open("helloapplication", 'w')
278
"from twisted.application import service\n",
279
"application = service.Application('hello')\n",
282
a1 = app.getApplication(config, None)
283
self.assertEqual(service.IService(a1).name, "hello")
285
def test_convertStyle(self):
286
appl = service.Application("lala")
287
for instyle in 'source pickle'.split():
288
for outstyle in 'source pickle'.split():
289
sob.IPersistable(appl).setStyle(instyle)
290
sob.IPersistable(appl).save(filename="converttest")
291
app.convertStyle("converttest", instyle, None,
292
"converttest.out", outstyle, 0)
293
appl2 = service.loadApplication("converttest.out", outstyle)
294
self.assertEqual(service.IService(appl2).name, "lala")
296
def test_getLogFile(self):
298
Test L{app.getLogFile}, veryfying the LogFile instance it returns.
300
os.mkdir("logfiledir")
301
l = app.getLogFile(os.path.join("logfiledir", "lala"))
302
self.assertEqual(l.path,
303
os.path.abspath(os.path.join("logfiledir", "lala")))
304
self.assertEqual(l.name, "lala")
305
self.assertEqual(l.directory, os.path.abspath("logfiledir"))
307
test_getLogFile.suppress = [
308
util.suppress(message="app.getLogFile is deprecated. Use "
309
"twisted.python.logfile.LogFile.fromFullPath instead",
310
category=DeprecationWarning)]
312
def test_startApplication(self):
313
appl = service.Application("lala")
314
app.startApplication(appl, 0)
315
self.assert_(service.IService(appl).running)
318
class Foo(basic.LineReceiver):
319
def connectionMade(self):
320
self.transport.write('lalala\r\n')
321
def lineReceived(self, line):
322
self.factory.line = line
323
self.transport.loseConnection()
324
def connectionLost(self, reason):
325
self.factory.d.callback(self.factory.line)
330
def addService(self, service):
331
self.services[service.name] = service
332
def removeService(self, service):
333
del self.services[service.name]
339
def append(self, what):
342
class TestEcho(wire.Echo):
343
def connectionLost(self, reason):
344
self.d.callback(True)
346
class TestInternet2(unittest.TestCase):
349
s = service.MultiService()
351
factory = protocol.ServerFactory()
352
factory.protocol = TestEcho
353
TestEcho.d = defer.Deferred()
354
t = internet.TCPServer(0, factory)
355
t.setServiceParent(s)
356
num = t._port.getHost().port
357
factory = protocol.ClientFactory()
358
factory.d = defer.Deferred()
359
factory.protocol = Foo
361
internet.TCPClient('127.0.0.1', num, factory).setServiceParent(s)
362
factory.d.addCallback(self.assertEqual, 'lalala')
363
factory.d.addCallback(lambda x : s.stopService())
364
factory.d.addCallback(lambda x : TestEcho.d)
370
Test L{internet.UDPServer} with a random port: starting the service
371
should give it valid port, and stopService should free it so that we
372
can start a server on the same port again.
374
if not interfaces.IReactorUDP(reactor, None):
375
raise unittest.SkipTest("This reactor does not support UDP sockets")
376
p = protocol.DatagramProtocol()
377
t = internet.UDPServer(0, p)
379
num = t._port.getHost().port
380
self.assertNotEquals(num, 0)
382
t = internet.UDPServer(num, p)
384
return t.stopService()
385
return defer.maybeDeferred(t.stopService).addCallback(onStop)
388
def testPrivileged(self):
389
factory = protocol.ServerFactory()
390
factory.protocol = TestEcho
391
TestEcho.d = defer.Deferred()
392
t = internet.TCPServer(0, factory)
394
t.privilegedStartService()
395
num = t._port.getHost().port
396
factory = protocol.ClientFactory()
397
factory.d = defer.Deferred()
398
factory.protocol = Foo
400
c = internet.TCPClient('127.0.0.1', num, factory)
402
factory.d.addCallback(self.assertEqual, 'lalala')
403
factory.d.addCallback(lambda x : c.stopService())
404
factory.d.addCallback(lambda x : t.stopService())
405
factory.d.addCallback(lambda x : TestEcho.d)
408
def testConnectionGettingRefused(self):
409
factory = protocol.ServerFactory()
410
factory.protocol = wire.Echo
411
t = internet.TCPServer(0, factory)
413
num = t._port.getHost().port
416
factory = protocol.ClientFactory()
417
factory.clientConnectionFailed = lambda *args: d.callback(None)
418
c = internet.TCPClient('127.0.0.1', num, factory)
423
# FIXME: This test is far too dense. It needs comments.
424
# -- spiv, 2004-11-07
425
if not interfaces.IReactorUNIX(reactor, None):
426
raise unittest.SkipTest, "This reactor does not support UNIX domain sockets"
427
s = service.MultiService()
429
factory = protocol.ServerFactory()
430
factory.protocol = TestEcho
431
TestEcho.d = defer.Deferred()
432
t = internet.UNIXServer('echo.skt', factory)
433
t.setServiceParent(s)
434
factory = protocol.ClientFactory()
435
factory.protocol = Foo
436
factory.d = defer.Deferred()
438
internet.UNIXClient('echo.skt', factory).setServiceParent(s)
439
factory.d.addCallback(self.assertEqual, 'lalala')
440
factory.d.addCallback(lambda x : s.stopService())
441
factory.d.addCallback(lambda x : TestEcho.d)
442
factory.d.addCallback(self._cbTestUnix, factory, s)
445
def _cbTestUnix(self, ignored, factory, s):
446
TestEcho.d = defer.Deferred()
448
factory.d = defer.Deferred()
450
factory.d.addCallback(self.assertEqual, 'lalala')
451
factory.d.addCallback(lambda x : s.stopService())
452
factory.d.addCallback(lambda x : TestEcho.d)
455
def testVolatile(self):
456
if not interfaces.IReactorUNIX(reactor, None):
457
raise unittest.SkipTest, "This reactor does not support UNIX domain sockets"
458
factory = protocol.ServerFactory()
459
factory.protocol = wire.Echo
460
t = internet.UNIXServer('echo.skt', factory)
462
self.failIfIdentical(t._port, None)
464
self.assertIdentical(t1._port, None)
466
self.assertIdentical(t._port, None)
467
self.failIf(t.running)
469
factory = protocol.ClientFactory()
470
factory.protocol = wire.Echo
471
t = internet.UNIXClient('echo.skt', factory)
473
self.failIfIdentical(t._connection, None)
475
self.assertIdentical(t1._connection, None)
477
self.assertIdentical(t._connection, None)
478
self.failIf(t.running)
480
def testStoppingServer(self):
481
if not interfaces.IReactorUNIX(reactor, None):
482
raise unittest.SkipTest, "This reactor does not support UNIX domain sockets"
483
factory = protocol.ServerFactory()
484
factory.protocol = wire.Echo
485
t = internet.UNIXServer('echo.skt', factory)
488
self.failIf(t.running)
489
factory = protocol.ClientFactory()
491
factory.clientConnectionFailed = lambda *args: d.callback(None)
492
reactor.connectUNIX('echo.skt', factory)
495
def testPickledTimer(self):
496
target = TimerTarget()
497
t0 = internet.TimerService(1, target.append, "hello")
503
self.failIf(t.running)
505
def testBrokenTimer(self):
507
t = internet.TimerService(1, lambda: 1 / 0)
508
oldFailed = t._failed
514
d.addCallback(lambda x : t.stopService)
515
d.addCallback(lambda x : self.assertEqual(
517
[o.value.__class__ for o in self.flushLoggedErrors(ZeroDivisionError)]))
520
def testEverythingThere(self):
521
trans = 'TCP UNIX SSL UDP UNIXDatagram Multicast'.split()
522
for tran in trans[:]:
523
if not getattr(interfaces, "IReactor"+tran)(reactor, None):
525
if interfaces.IReactorArbitrary(reactor, None) is not None:
526
trans.insert(0, "Generic")
528
for side in 'Server Client'.split():
529
if tran == "Multicast" and side == "Client":
531
self.assert_(hasattr(internet, tran+side))
532
method = getattr(internet, tran+side).method
533
prefix = {'Server': 'listen', 'Client': 'connect'}[side]
534
self.assert_(hasattr(reactor, prefix+method) or
535
(prefix == "connect" and method == "UDP"))
536
o = getattr(internet, tran+side)()
537
self.assertEqual(service.IService(o), o)
540
def test_reactorParametrizationInServer(self):
542
L{internet._AbstractServer} supports a C{reactor} keyword argument
543
that can be used to parametrize the reactor used to listen for
546
reactor = MemoryReactor()
549
t = internet.TCPServer(1234, factory, reactor=reactor)
551
self.assertEquals(reactor.tcpServers.pop()[:2], (1234, factory))
554
def test_reactorParametrizationInClient(self):
556
L{internet._AbstractClient} supports a C{reactor} keyword arguments
557
that can be used to parametrize the reactor used to create new client
560
reactor = MemoryReactor()
563
t = internet.TCPClient('127.0.0.1', 1234, factory, reactor=reactor)
566
reactor.tcpClients.pop()[:3], ('127.0.0.1', 1234, factory))
569
def test_reactorParametrizationInServerMultipleStart(self):
571
Like L{test_reactorParametrizationInServer}, but stop and restart the
572
service and check that the given reactor is still used.
574
reactor = MemoryReactor()
577
t = internet.TCPServer(1234, factory, reactor=reactor)
579
self.assertEquals(reactor.tcpServers.pop()[:2], (1234, factory))
582
self.assertEquals(reactor.tcpServers.pop()[:2], (1234, factory))
585
def test_reactorParametrizationInClientMultipleStart(self):
587
Like L{test_reactorParametrizationInClient}, but stop and restart the
588
service and check that the given reactor is still used.
590
reactor = MemoryReactor()
593
t = internet.TCPClient('127.0.0.1', 1234, factory, reactor=reactor)
596
reactor.tcpClients.pop()[:3], ('127.0.0.1', 1234, factory))
600
reactor.tcpClients.pop()[:3], ('127.0.0.1', 1234, factory))
604
class TestTimerBasic(unittest.TestCase):
606
def testTimerRuns(self):
608
self.t = internet.TimerService(1, d.callback, 'hello')
609
self.t.startService()
610
d.addCallback(self.assertEqual, 'hello')
611
d.addCallback(lambda x : self.t.stopService())
612
d.addCallback(lambda x : self.failIf(self.t.running))
616
return self.t.stopService()
618
def testTimerRestart(self):
619
# restart the same TimerService
620
d1 = defer.Deferred()
621
d2 = defer.Deferred()
622
work = [(d2, "bar"), (d1, "foo")]
626
self.t = internet.TimerService(1, trigger)
627
self.t.startService()
628
def onFirstResult(result):
629
self.assertEqual(result, 'foo')
630
return self.t.stopService()
631
def onFirstStop(ignored):
632
self.failIf(self.t.running)
633
self.t.startService()
635
def onSecondResult(result):
636
self.assertEqual(result, 'bar')
638
d1.addCallback(onFirstResult)
639
d1.addCallback(onFirstStop)
640
d1.addCallback(onSecondResult)
643
def testTimerLoops(self):
645
def trigger(data, number, d):
650
self.t = internet.TimerService(0.01, trigger, "hello", 10, d)
651
self.t.startService()
652
d.addCallback(self.assertEqual, ['hello'] * 10)
653
d.addCallback(lambda x : self.t.stopService())
657
class FakeReactor(reactors.Reactor):
659
A fake reactor with a hooked install method.
662
def __init__(self, install, *args, **kwargs):
664
@param install: any callable that will be used as install method.
665
@type install: C{callable}
667
reactors.Reactor.__init__(self, *args, **kwargs)
668
self.install = install
672
class PluggableReactorTestCase(unittest.TestCase):
674
Tests for the reactor discovery/inspection APIs.
679
Override the L{reactors.getPlugins} function, normally bound to
680
L{twisted.plugin.getPlugins}, in order to control which
681
L{IReactorInstaller} plugins are seen as available.
683
C{self.pluginResults} can be customized and will be used as the
684
result of calls to C{reactors.getPlugins}.
686
self.pluginCalls = []
687
self.pluginResults = []
688
self.originalFunction = reactors.getPlugins
689
reactors.getPlugins = self._getPlugins
694
Restore the original L{reactors.getPlugins}.
696
reactors.getPlugins = self.originalFunction
699
def _getPlugins(self, interface, package=None):
701
Stand-in for the real getPlugins method which records its arguments
702
and returns a fixed result.
704
self.pluginCalls.append((interface, package))
705
return list(self.pluginResults)
708
def test_getPluginReactorTypes(self):
710
Test that reactor plugins are returned from L{getReactorTypes}
712
name = 'fakereactortest'
713
package = __name__ + '.fakereactor'
714
description = 'description'
715
self.pluginResults = [reactors.Reactor(name, package, description)]
716
reactorTypes = reactors.getReactorTypes()
720
[(reactors.IReactorInstaller, None)])
722
for r in reactorTypes:
723
if r.shortName == name:
724
self.assertEqual(r.description, description)
727
self.fail("Reactor plugin not present in getReactorTypes() result")
730
def test_reactorInstallation(self):
732
Test that L{reactors.Reactor.install} loads the correct module and
733
calls its install attribute.
737
installed.append(True)
738
installer = FakeReactor(install,
739
'fakereactortest', __name__, 'described')
741
self.assertEqual(installed, [True])
744
def test_installReactor(self):
746
Test that the L{reactors.installReactor} function correctly installs
747
the specified reactor.
751
installed.append(True)
752
name = 'fakereactortest'
754
description = 'description'
755
self.pluginResults = [FakeReactor(install, name, package, description)]
756
reactors.installReactor(name)
757
self.assertEqual(installed, [True])
760
def test_installNonExistentReactor(self):
762
Test that L{reactors.installReactor} raises L{reactors.NoSuchReactor}
763
when asked to install a reactor which it cannot find.
765
self.pluginResults = []
767
reactors.NoSuchReactor,
768
reactors.installReactor, 'somereactor')
771
def test_installNotAvailableReactor(self):
773
Test that L{reactors.installReactor} raises an exception when asked to
774
install a reactor which doesn't work in this environment.
777
raise ImportError("Missing foo bar")
778
name = 'fakereactortest'
780
description = 'description'
781
self.pluginResults = [FakeReactor(install, name, package, description)]
782
self.assertRaises(ImportError, reactors.installReactor, name)
785
def test_reactorSelectionMixin(self):
787
Test that the reactor selected is installed as soon as possible, ie
788
when the option is parsed.
791
INSTALL_EVENT = 'reactor installed'
792
SUBCOMMAND_EVENT = 'subcommands loaded'
794
class ReactorSelectionOptions(usage.Options, app.ReactorSelectionMixin):
795
def subCommands(self):
796
executed.append(SUBCOMMAND_EVENT)
797
return [('subcommand', None, lambda: self, 'test subcommand')]
798
subCommands = property(subCommands)
801
executed.append(INSTALL_EVENT)
802
self.pluginResults = [
803
FakeReactor(install, 'fakereactortest', __name__, 'described')
806
options = ReactorSelectionOptions()
807
options.parseOptions(['--reactor', 'fakereactortest', 'subcommand'])
808
self.assertEqual(executed[0], INSTALL_EVENT)
809
self.assertEqual(executed.count(INSTALL_EVENT), 1)
812
def test_reactorSelectionMixinNonExistent(self):
814
Test that the usage mixin exits when trying to use a non existent
815
reactor (the name not matching to any reactor), giving an error
818
class ReactorSelectionOptions(usage.Options, app.ReactorSelectionMixin):
820
self.pluginResults = []
822
options = ReactorSelectionOptions()
823
options.messageOutput = StringIO()
824
e = self.assertRaises(usage.UsageError, options.parseOptions,
825
['--reactor', 'fakereactortest', 'subcommand'])
826
self.assertIn("fakereactortest", e.args[0])
827
self.assertIn("help-reactors", e.args[0])
830
def test_reactorSelectionMixinNotAvailable(self):
832
Test that the usage mixin exits when trying to use a reactor not
833
available (the reactor raises an error at installation), giving an
836
class ReactorSelectionOptions(usage.Options, app.ReactorSelectionMixin):
838
message = "Missing foo bar"
840
raise ImportError(message)
842
name = 'fakereactortest'
844
description = 'description'
845
self.pluginResults = [FakeReactor(install, name, package, description)]
847
options = ReactorSelectionOptions()
848
options.messageOutput = StringIO()
849
e = self.assertRaises(usage.UsageError, options.parseOptions,
850
['--reactor', 'fakereactortest', 'subcommand'])
851
self.assertIn(message, e.args[0])
852
self.assertIn("help-reactors", e.args[0])
856
class ReportProfileTestCase(unittest.TestCase):
858
Tests for L{app.reportProfile}.
861
def test_deprecation(self):
863
Check that L{app.reportProfile} prints a warning and does nothing else.
865
self.assertWarns(DeprecationWarning,
866
"reportProfile is deprecated and a no-op since Twisted 8.0.",
867
app.__file__, app.reportProfile, None, None)