1
# Twisted, the Framework of Your Internet
2
# Copyright (C) 2001-2003 Matthew W. Lefkowitz
4
# This library is free software; you can redistribute it and/or
5
# modify it under the terms of version 2.1 of the GNU Lesser General Public
6
# License as published by the Free Software Foundation.
8
# This library is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11
# Lesser General Public License for more details.
13
# You should have received a copy of the GNU Lesser General Public
14
# License along with this library; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
from twisted.trial import unittest
18
from twisted.application import service, compat, internet, app
19
from twisted.persisted import sob
20
from twisted.python import components
21
from twisted.internet import utils
22
from twisted.protocols import wire, basic
23
from twisted.internet import protocol, reactor
24
import copy, os, pickle, sys
29
class TestService(unittest.TestCase):
34
self.failUnlessEqual(s.name, "hello")
38
p = service.MultiService()
40
self.failUnlessEqual(list(p), [s])
41
self.failUnlessEqual(s.parent, p)
43
def testNamedChild(self):
45
p = service.MultiService()
48
self.failUnlessEqual(list(p), [s])
49
self.failUnlessEqual(s.parent, p)
50
self.failUnlessEqual(p.getServiceNamed("hello"), s)
52
def testDoublyNamedChild(self):
54
p = service.MultiService()
57
self.failUnlessRaises(RuntimeError, s.setName, "lala")
59
def testDuplicateNamedChild(self):
61
p = service.MultiService()
66
self.failUnlessRaises(RuntimeError, s.setServiceParent, p)
68
def testDisowning(self):
70
p = service.MultiService()
72
self.failUnlessEqual(list(p), [s])
73
self.failUnlessEqual(s.parent, p)
74
s.disownServiceParent()
75
self.failUnlessEqual(list(p), [])
76
self.failUnlessEqual(s.parent, None)
78
def testRunning(self):
80
self.assert_(not s.running)
82
self.assert_(s.running)
84
self.assert_(not s.running)
86
def testRunningChildren(self):
88
p = service.MultiService()
90
self.assert_(not s.running)
91
self.assert_(not p.running)
93
self.assert_(s.running)
94
self.assert_(p.running)
96
self.assert_(not s.running)
97
self.assert_(not p.running)
99
def testAddingIntoRunning(self):
100
p = service.MultiService()
102
s = service.Service()
103
self.assert_(not s.running)
104
s.setServiceParent(p)
105
self.assert_(s.running)
106
s.disownServiceParent()
107
self.assert_(not s.running)
109
def testPrivileged(self):
110
s = service.Service()
112
s.privilegedStarted = 1
113
s.privilegedStartService = pss
114
s1 = service.Service()
115
p = service.MultiService()
116
s.setServiceParent(p)
117
s1.setServiceParent(p)
118
p.privilegedStartService()
119
self.assert_(s.privilegedStarted)
121
def testCopying(self):
122
s = service.Service()
125
self.assert_(not s1.running)
126
self.assert_(s.running)
129
class TestProcess(unittest.TestCase):
132
p = service.Process(5, 6)
133
self.assertEqual(p.uid, 5)
134
self.assertEqual(p.gid, 6)
136
def testDefaults(self):
137
p = service.Process(5)
138
self.assertEqual(p.uid, 5)
139
self.assertEqual(p.gid, 0)
140
p = service.Process(gid=5)
141
self.assertEqual(p.uid, 0)
142
self.assertEqual(p.gid, 5)
143
p = service.Process()
144
self.assertEqual(p.uid, 0)
145
self.assertEqual(p.gid, 0)
147
def testProcessName(self):
148
p = service.Process()
149
self.assertEqual(p.processName, None)
150
p.processName = 'hello'
151
self.assertEqual(p.processName, 'hello')
154
class TestInterfaces(unittest.TestCase):
156
def testService(self):
157
self.assert_(components.implements(service.Service(),
160
def testMultiService(self):
161
self.assert_(components.implements(service.MultiService(),
163
self.assert_(components.implements(service.MultiService(),
164
service.IServiceCollection))
166
def testProcess(self):
167
self.assert_(components.implements(service.Process(),
171
class TestApplication(unittest.TestCase):
173
def testConstructor(self):
174
service.Application("hello")
175
service.Application("hello", 5)
176
service.Application("hello", 5, 6)
178
def testProcessComponent(self):
179
a = service.Application("hello")
180
self.assertEqual(service.IProcess(a).uid, 0)
181
self.assertEqual(service.IProcess(a).gid, 0)
182
a = service.Application("hello", 5)
183
self.assertEqual(service.IProcess(a).uid, 5)
184
self.assertEqual(service.IProcess(a).gid, 0)
185
a = service.Application("hello", 5, 6)
186
self.assertEqual(service.IProcess(a).uid, 5)
187
self.assertEqual(service.IProcess(a).gid, 6)
189
def testServiceComponent(self):
190
a = service.Application("hello")
191
self.assert_(service.IService(a) is service.IServiceCollection(a))
192
self.assertEqual(service.IService(a).name, "hello")
193
self.assertEqual(service.IService(a).parent, None)
195
def testPersistableComponent(self):
196
a = service.Application("hello")
197
p = sob.IPersistable(a)
198
self.assertEqual(p.style, 'pickle')
199
self.assertEqual(p.name, 'hello')
200
self.assert_(p.original is a)
202
class TestLoading(unittest.TestCase):
204
def test_simpleStoreAndLoad(self):
205
a = service.Application("hello")
206
p = sob.IPersistable(a)
207
for style in 'xml source pickle'.split():
210
a1 = service.loadApplication("hello.ta"+style[0], style)
211
self.assertEqual(service.IService(a1).name, "hello")
212
open("hello.tac", 'w').writelines([
213
"from twisted.application import service\n",
214
"application = service.Application('hello')\n",
216
a1 = service.loadApplication("hello.tac", 'python')
217
self.assertEqual(service.IService(a1).name, "hello")
219
def test_implicitConversion(self):
221
a.__dict__ = {'udpConnectors': [], 'unixConnectors': [],
222
'_listenerDict': {}, 'name': 'dummy',
223
'sslConnectors': [], 'unixPorts': [],
224
'_extraListeners': {}, 'sslPorts': [], 'tcpPorts': [],
225
'services': {}, 'gid': 0, 'tcpConnectors': [],
226
'extraConnectors': [], 'udpPorts': [], 'extraPorts': [],
228
pickle.dump(a, open("file.tap", 'w'))
229
a1 = service.loadApplication("file.tap", "pickle", None)
230
self.assertEqual(service.IService(a1).name, "dummy")
231
self.assertEqual(list(service.IServiceCollection(a1)), [])
234
class TestAppSupport(unittest.TestCase):
236
def testPassphrase(self):
237
self.assertEqual(app.getPassphrase(0), None)
239
def testLoadApplication(self):
240
a = service.Application("hello")
241
#baseconfig = {'file': None, 'xml': None, 'source': None, 'python':None}
242
#for style in 'source xml pickle'.split():
243
# config = baseconfig.copy()
244
# config[{'pickle': 'file'}.get(style, style)] = 'helloapplication'
245
# sob.IPersistable(a).setStyle(style)
246
# sob.IPersistable(a).save(filename='helloapplication')
247
# a1 = app.getApplication(config, None)
248
# self.assertEqual(service.IService(a1).name, "hello")
249
#config = baseconfig.copy()
250
#config['python'] = 'helloapplication'
251
#open("helloapplication", 'w').writelines([
252
#"from twisted.application import service\n",
253
#"application = service.Application('hello')\n",
255
#a1 = app.getApplication(config, None)
256
#self.assertEqual(service.IService(a1).name, "hello")
258
def test_convertStyle(self):
259
appl = service.Application("lala")
260
for instyle in 'xml source pickle'.split():
261
for outstyle in 'xml source pickle'.split():
262
sob.IPersistable(appl).setStyle(instyle)
263
sob.IPersistable(appl).save(filename="converttest")
264
app.convertStyle("converttest", instyle, None,
265
"converttest.out", outstyle, 0)
266
appl2 = service.loadApplication("converttest.out", outstyle)
267
self.assertEqual(service.IService(appl2).name, "lala")
269
def test_getLogFile(self):
270
os.mkdir("logfiledir")
271
l = app.getLogFile(os.path.join("logfiledir", "lala"))
272
self.assertEqual(l.path,
273
os.path.abspath(os.path.join("logfiledir", "lala")))
274
self.assertEqual(l.name, "lala")
275
self.assertEqual(l.directory, os.path.abspath("logfiledir"))
277
def test_startApplication(self):
278
appl = service.Application("lala")
279
app.startApplication(appl, 0)
280
self.assert_(service.IService(appl).running)
282
class TestInternet(unittest.TestCase):
285
s = service.MultiService()
286
c = compat.IOldApplication(s)
287
factory = protocol.ServerFactory()
288
factory.protocol = wire.Echo
289
if os.path.exists('.hello.skt'):
290
os.remove('hello.skt')
291
c.listenUNIX('./hello.skt', factory)
292
class Foo(basic.LineReceiver):
293
def connectionMade(self):
294
self.transport.write('lalala\r\n')
295
def lineReceived(self, line):
296
self.factory.line = line
297
factory = protocol.ClientFactory()
298
factory.protocol = Foo
300
c.connectUNIX('./hello.skt', factory)
301
s.privilegedStartService()
303
while factory.line is None:
306
self.assertEqual(factory.line, 'lalala')
309
s = service.MultiService()
310
c = compat.IOldApplication(s)
311
factory = protocol.ServerFactory()
312
factory.protocol = wire.Echo
313
c.listenTCP(0, factory)
314
s.privilegedStartService()
316
num = list(s)[0]._port.getHost()[2]
317
class Foo(basic.LineReceiver):
318
def connectionMade(self):
319
self.transport.write('lalala\r\n')
320
def lineReceived(self, line):
321
self.factory.line = line
322
factory = protocol.ClientFactory()
323
factory.protocol = Foo
325
c.connectTCP('localhost', num, factory)
326
while factory.line is None:
329
self.assertEqual(factory.line, 'lalala')
331
def testCalling(self):
332
s = service.MultiService()
333
c = compat.IOldApplication(s)
335
self.assertEqual(list(s)[0].args[0], None)
336
c.listenTCP(None, None)
337
self.assertEqual(list(s)[1].args[:2], (None,)*2)
338
c.listenSSL(None, None, None)
339
self.assertEqual(list(s)[2].args[:3], (None,)*3)
340
c.listenUDP(None, None)
341
self.assertEqual(list(s)[3].args[:2], (None,)*2)
342
c.listenUNIX(None, None)
343
self.assertEqual(list(s)[4].args[:2], (None,)*2)
345
self.assert_(ch.privileged)
347
self.assertEqual(list(s)[5].args[0], None)
348
c.connectTCP(None, None, None)
349
self.assertEqual(list(s)[6].args[:3], (None,)*3)
350
c.connectSSL(None, None, None, None)
351
self.assertEqual(list(s)[7].args[:4], (None,)*4)
352
c.connectUDP(None, None, None)
353
self.assertEqual(list(s)[8].args[:3], (None,)*3)
354
c.connectUNIX(None, None)
355
self.assertEqual(list(s)[9].args[:2], (None,)*2)
356
self.assertEqual(len(list(s)), 10)
358
self.failIf(ch.kwargs)
359
self.assertEqual(ch.name, None)
361
def testUnlistenersCallable(self):
362
s = service.MultiService()
363
c = compat.IOldApplication(s)
364
self.assert_(callable(c.unlistenTCP))
365
self.assert_(callable(c.unlistenUNIX))
366
self.assert_(callable(c.unlistenUDP))
367
self.assert_(callable(c.unlistenSSL))
369
def testServices(self):
370
s = service.MultiService()
371
c = compat.IOldApplication(s)
372
ch = service.Service()
374
ch.setServiceParent(c)
375
self.assertEqual(c.getServiceNamed("lala"), ch)
376
ch.disownServiceParent()
377
self.assertEqual(list(s), [])
379
def testInterface(self):
380
s = service.MultiService()
381
c = compat.IOldApplication(s)
382
for key in compat.IOldApplication.__dict__.keys():
383
if callable(getattr(compat.IOldApplication, key)):
384
self.assert_(callable(getattr(c, key)))
389
def addService(self, service):
390
self.services[service.name] = service
391
def removeService(self, service):
392
del self.services[service.name]
395
class TestConvert(unittest.TestCase):
397
def testSimpleInternet(self):
398
s = "(dp0\nS'udpConnectors'\np1\n(lp2\nsS'unixConnectors'\np3\n(lp4\nsS'twisted.internet.app.Application.persistenceVersion'\np5\nI12\nsS'name'\np6\nS'web'\np7\nsS'sslConnectors'\np8\n(lp9\nsS'sslPorts'\np10\n(lp11\nsS'tcpPorts'\np12\n(lp13\n(I8080\n(itwisted.web.server\nSite\np14\n(dp16\nS'resource'\np17\n(itwisted.web.test\nTest\np18\n(dp19\nS'files'\np20\n(lp21\nsS'paths'\np22\n(dp23\nsS'tmpl'\np24\n(lp25\nS'\\n Congratulations, twisted.web appears to work!\\n <ul>\\n <li>Funky Form:\\n '\np26\naS'self.funkyForm()'\np27\naS'\\n <li>Exception Handling:\\n '\np28\naS'self.raiseHell()'\np29\naS'\\n </ul>\\n '\np30\nasS'widgets'\np31\n(dp32\nsS'variables'\np33\n(dp34\nsS'modules'\np35\n(lp36\nsS'children'\np37\n(dp38\nsbsS'logPath'\np39\nNsS'timeOut'\np40\nI43200\nsS'sessions'\np41\n(dp42\nsbI5\nS''\np43\ntp44\nasS'unixPorts'\np45\n(lp46\nsS'services'\np47\n(dp48\nsS'gid'\np49\nI1000\nsS'tcpConnectors'\np50\n(lp51\nsS'extraConnectors'\np52\n(lp53\nsS'udpPorts'\np54\n(lp55\nsS'extraPorts'\np56\n(lp57\nsS'persistStyle'\np58\nS'pickle'\np59\nsS'uid'\np60\nI1000\ns."
402
appl = compat.convert(a)
403
self.assertEqual(service.IProcess(appl).uid, 1000)
404
self.assertEqual(service.IProcess(appl).gid, 1000)
405
self.assertEqual(service.IService(appl).name, "web")
406
services = list(service.IServiceCollection(appl))
407
self.assertEqual(len(services), 1)
409
self.assertEqual(s.parent, service.IServiceCollection(appl))
410
self.assert_(s.privileged)
411
self.assert_(isinstance(s, internet.TCPServer))
413
self.assertEqual(args[0], 8080)
414
args[1].resource.template
415
self.assertEqual(args[3], '')
417
def testSimpleUNIX(self):
418
s = "(dp0\nS'udpConnectors'\np1\n(lp2\nsS'unixConnectors'\np3\n(lp4\nsS'twisted.internet.app.Application.persistenceVersion'\np5\nI12\nsS'name'\np6\nS'web'\np7\nsS'sslConnectors'\np8\n(lp9\nsS'sslPorts'\np10\n(lp11\nsS'tcpPorts'\np12\n(lp13\nsS'unixPorts'\np14\n(lp15\n(S'/home/moshez/.twistd-web-pb'\np16\n(itwisted.spread.pb\nBrokerFactory\np17\n(dp19\nS'objectToBroker'\np20\n(itwisted.web.distrib\nResourcePublisher\np21\n(dp22\nS'twisted.web.distrib.ResourcePublisher.persistenceVersion'\np23\nI2\nsS'site'\np24\n(itwisted.web.server\nSite\np25\n(dp26\nS'resource'\np27\n(itwisted.web.static\nFile\np28\n(dp29\nS'ignoredExts'\np30\n(lp31\nsS'defaultType'\np32\nS'text/html'\np33\nsS'registry'\np34\n(itwisted.web.static\nRegistry\np35\n(dp36\nS'twisted.web.static.Registry.persistenceVersion'\np37\nI1\nsS'twisted.python.components.Componentized.persistenceVersion'\np38\nI1\nsS'_pathCache'\np39\n(dp40\nsS'_adapterCache'\np41\n(dp42\nS'twisted.internet.interfaces.IServiceCollection'\np43\n(itwisted.internet.app\nApplication\np44\n(dp45\ng1\ng2\nsg3\ng4\nsg5\nI12\nsg6\ng7\nsg8\ng9\nsg10\ng11\nsg12\ng13\nsg14\ng15\nsS'extraPorts'\np46\n(lp47\nsS'gid'\np48\nI1053\nsS'tcpConnectors'\np49\n(lp50\nsS'extraConnectors'\np51\n(lp52\nsS'udpPorts'\np53\n(lp54\nsS'services'\np55\n(dp56\nsS'persistStyle'\np57\nS'pickle'\np58\nsS'delayeds'\np59\n(lp60\nsS'uid'\np61\nI1053\nsbssbsS'encoding'\np62\nNsS'twisted.web.static.File.persistenceVersion'\np63\nI6\nsS'path'\np64\nS'/home/moshez/public_html.twistd'\np65\nsS'type'\np66\ng33\nsS'children'\np67\n(dp68\nsS'processors'\np69\n(dp70\nS'.php3'\np71\nctwisted.web.twcgi\nPHP3Script\np72\nsS'.rpy'\np73\nctwisted.web.script\nResourceScript\np74\nsS'.php'\np75\nctwisted.web.twcgi\nPHPScript\np76\nsS'.cgi'\np77\nctwisted.web.twcgi\nCGIScript\np78\nsS'.epy'\np79\nctwisted.web.script\nPythonScript\np80\nsS'.trp'\np81\nctwisted.web.trp\nResourceUnpickler\np82\nssbsS'logPath'\np83\nNsS'sessions'\np84\n(dp85\nsbsbsS'twisted.spread.pb.BrokerFactory.persistenceVersion'\np86\nI3\nsbI5\nI438\ntp87\nasg55\ng56\nsg48\nI1053\nsg49\ng50\nsg51\ng52\nsg53\ng54\nsg46\ng47\nsg57\ng58\nsg61\nI1053\nsg59\ng60\ns."
422
appl = compat.convert(a)
423
self.assertEqual(service.IProcess(appl).uid, 1053)
424
self.assertEqual(service.IProcess(appl).gid, 1053)
425
self.assertEqual(service.IService(appl).name, "web")
426
services = list(service.IServiceCollection(appl))
427
self.assertEqual(len(services), 1)
429
self.assertEqual(s.parent, service.IServiceCollection(appl))
430
self.assert_(s.privileged)
431
self.assert_(isinstance(s, internet.UNIXServer))
433
self.assertEqual(args[0], '/home/moshez/.twistd-web-pb')
435
def testSimpleService(self):
437
a.__dict__ = {'udpConnectors': [], 'unixConnectors': [],
438
'_listenerDict': {}, 'name': 'dummy',
439
'sslConnectors': [], 'unixPorts': [],
440
'_extraListeners': {}, 'sslPorts': [], 'tcpPorts': [],
441
'services': {}, 'gid': 0, 'tcpConnectors': [],
442
'extraConnectors': [], 'udpPorts': [], 'extraPorts': [],
444
s = service.Service()
446
s.setServiceParent(a)
447
appl = compat.convert(a)
448
services = list(service.IServiceCollection(appl))
449
self.assertEqual(len(services), 1)
451
self.assertEqual(s, s1)
453
class TestInternet(unittest.TestCase):
456
s = service.MultiService()
458
factory = protocol.ServerFactory()
459
factory.protocol = wire.Echo
460
t = internet.TCPServer(0, factory)
461
t.setServiceParent(s)
462
num = t._port.getHost()[2]
463
class Foo(basic.LineReceiver):
464
def connectionMade(self):
465
self.transport.write('lalala\r\n')
466
def lineReceived(self, line):
467
self.factory.line = line
468
factory = protocol.ClientFactory()
469
factory.protocol = Foo
471
internet.TCPClient('localhost', num, factory).setServiceParent(s)
472
while factory.line is None:
474
self.assertEqual(factory.line, 'lalala')
476
def testPrivileged(self):
477
factory = protocol.ServerFactory()
478
factory.protocol = wire.Echo
479
t = internet.TCPServer(0, factory)
481
t.privilegedStartService()
482
num = t._port.getHost()[2]
483
class Foo(basic.LineReceiver):
484
def connectionMade(self):
485
self.transport.write('lalala\r\n')
486
def lineReceived(self, line):
487
self.factory.line = line
488
factory = protocol.ClientFactory()
489
factory.protocol = Foo
491
internet.TCPClient('localhost', num, factory).startService()
492
while factory.line is None:
494
self.assertEqual(factory.line, 'lalala')
496
def testConnectionGettingRefused(self):
497
factory = protocol.ServerFactory()
498
factory.protocol = wire.Echo
499
t = internet.TCPServer(0, factory)
501
num = t._port.getHost()[2]
504
factory = protocol.ClientFactory()
505
factory.clientConnectionFailed = lambda *args: l.append(None)
506
c = internet.TCPClient('localhost', num, factory)
510
self.assertEqual(l, [None])
513
s = service.MultiService()
515
factory = protocol.ServerFactory()
516
factory.protocol = wire.Echo
517
t = internet.UNIXServer('echo.skt', factory)
518
t.setServiceParent(s)
519
class Foo(basic.LineReceiver):
520
def connectionMade(self):
521
self.transport.write('lalala\r\n')
522
def lineReceived(self, line):
523
self.factory.line = line
524
factory = protocol.ClientFactory()
525
factory.protocol = Foo
527
internet.UNIXClient('echo.skt', factory).setServiceParent(s)
528
while factory.line is None:
530
self.assertEqual(factory.line, 'lalala')
533
def testVolatile(self):
534
factory = protocol.ServerFactory()
535
factory.protocol = wire.Echo
536
t = internet.UNIXServer('echo.skt', factory)
538
self.assert_(hasattr(t, '_port'))
540
self.assert_(not hasattr(t1, '_port'))
542
t = internet.TimerService(1, lambda:None)
544
self.assert_(hasattr(t, '_call'))
546
self.assert_(not hasattr(t1, '_call'))
548
factory = protocol.ClientFactory()
549
factory.protocol = wire.Echo
550
t = internet.UNIXClient('echo.skt', factory)
552
self.assert_(hasattr(t, '_connection'))
554
self.assert_(not hasattr(t1, '_connection'))
556
self.failIf(t.running)
558
def testStoppingServer(self):
559
factory = protocol.ServerFactory()
560
factory.protocol = wire.Echo
561
t = internet.UNIXServer('echo.skt', factory)
564
self.failIf(t.running)
565
factory = protocol.ClientFactory()
567
factory.clientConnectionFailed = lambda *args: l.append(None)
568
reactor.connectUNIX('echo.skt', factory)
571
self.assertEqual(l, [None])
575
t = internet.TimerService(1, l.append, "hello")
580
self.failIf(t.running)
581
self.assertEqual(l, ["hello"])
583
t = internet.TimerService(0.01, l.append, "hello")
588
self.assertEqual(l, ["hello"]*10)
590
def testEverythingThere(self):
591
for tran in 'Generic TCP UNIX SSL UDP UNIXDatagram Multicast'.split():
592
for side in 'Server Client'.split():
593
self.assert_(hasattr(internet, tran+side))
594
method = getattr(internet, tran+side).method
595
prefix = {'Server': 'listen', 'Client': 'connect'}[side]
596
self.assert_(hasattr(reactor, prefix+method))
597
o = getattr(internet, tran+side)
598
self.assertEqual(service.IService(o), o)