~dustin-spy/twisted/dustin

« back to all changes in this revision

Viewing changes to twisted/test/test_application.py

  • Committer: moshez
  • Date: 2003-09-19 08:27:11 UTC
  • Revision ID: vcs-imports@canonical.com-20030919082711-7c4f41d51a909f9d
First stage of application integration

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Twisted, the Framework of Your Internet
 
2
# Copyright (C) 2001-2003 Matthew W. Lefkowitz
 
3
#
 
4
# This library is free software; you can redistribute it and/or
 
5
# modify it under the terms of version 2.1 of the GNU Lesser General Public
 
6
# License as published by the Free Software Foundation.
 
7
#
 
8
# This library is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 
11
# Lesser General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU Lesser General Public
 
14
# License along with this library; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
#
 
17
from twisted.trial import unittest
 
18
from twisted.application import service, compat, internet, app
 
19
from twisted.persisted import sob
 
20
from twisted.python import components
 
21
from twisted.internet import utils
 
22
from twisted.protocols import wire, basic
 
23
from twisted.internet import protocol, reactor
 
24
import copy, os, pickle, sys
 
25
 
 
26
class Dummy:
 
27
    processName=None
 
28
 
 
29
class TestService(unittest.TestCase):
 
30
 
 
31
    def testName(self):
 
32
        s = service.Service()
 
33
        s.setName("hello")
 
34
        self.failUnlessEqual(s.name, "hello")
 
35
 
 
36
    def testParent(self):
 
37
        s = service.Service()
 
38
        p = service.MultiService()
 
39
        s.setServiceParent(p)
 
40
        self.failUnlessEqual(list(p), [s])
 
41
        self.failUnlessEqual(s.parent, p)
 
42
 
 
43
    def testNamedChild(self):
 
44
        s = service.Service()
 
45
        p = service.MultiService()
 
46
        s.setName("hello")
 
47
        s.setServiceParent(p)
 
48
        self.failUnlessEqual(list(p), [s])
 
49
        self.failUnlessEqual(s.parent, p)
 
50
        self.failUnlessEqual(p.getServiceNamed("hello"), s)
 
51
 
 
52
    def testDoublyNamedChild(self):
 
53
        s = service.Service()
 
54
        p = service.MultiService()
 
55
        s.setName("hello")
 
56
        s.setServiceParent(p)
 
57
        self.failUnlessRaises(RuntimeError, s.setName, "lala")
 
58
 
 
59
    def testDuplicateNamedChild(self):
 
60
        s = service.Service()
 
61
        p = service.MultiService()
 
62
        s.setName("hello")
 
63
        s.setServiceParent(p)
 
64
        s = service.Service()
 
65
        s.setName("hello")
 
66
        self.failUnlessRaises(RuntimeError, s.setServiceParent, p)
 
67
 
 
68
    def testDisowning(self):
 
69
        s = service.Service()
 
70
        p = service.MultiService()
 
71
        s.setServiceParent(p)
 
72
        self.failUnlessEqual(list(p), [s])
 
73
        self.failUnlessEqual(s.parent, p)
 
74
        s.disownServiceParent()
 
75
        self.failUnlessEqual(list(p), [])
 
76
        self.failUnlessEqual(s.parent, None)
 
77
 
 
78
    def testRunning(self):
 
79
        s = service.Service()
 
80
        self.assert_(not s.running)
 
81
        s.startService()
 
82
        self.assert_(s.running)
 
83
        s.stopService()
 
84
        self.assert_(not s.running)
 
85
 
 
86
    def testRunningChildren(self):
 
87
        s = service.Service()
 
88
        p = service.MultiService()
 
89
        s.setServiceParent(p)
 
90
        self.assert_(not s.running)
 
91
        self.assert_(not p.running)
 
92
        p.startService()
 
93
        self.assert_(s.running)
 
94
        self.assert_(p.running)
 
95
        p.stopService()
 
96
        self.assert_(not s.running)
 
97
        self.assert_(not p.running)
 
98
 
 
99
    def testAddingIntoRunning(self):
 
100
        p = service.MultiService()
 
101
        p.startService()
 
102
        s = service.Service()
 
103
        self.assert_(not s.running)
 
104
        s.setServiceParent(p)
 
105
        self.assert_(s.running)
 
106
        s.disownServiceParent()
 
107
        self.assert_(not s.running)
 
108
 
 
109
    def testPrivileged(self):
 
110
        s = service.Service()
 
111
        def pss():
 
112
            s.privilegedStarted = 1
 
113
        s.privilegedStartService = pss
 
114
        s1 = service.Service()
 
115
        p = service.MultiService()
 
116
        s.setServiceParent(p)
 
117
        s1.setServiceParent(p)
 
118
        p.privilegedStartService()
 
119
        self.assert_(s.privilegedStarted)
 
120
 
 
121
    def testCopying(self):
 
122
        s = service.Service()
 
123
        s.startService()
 
124
        s1 = copy.copy(s)
 
125
        self.assert_(not s1.running)
 
126
        self.assert_(s.running)
 
127
 
 
128
        
 
129
class TestProcess(unittest.TestCase):
 
130
 
 
131
    def testID(self):
 
132
        p = service.Process(5, 6)
 
133
        self.assertEqual(p.uid, 5)
 
134
        self.assertEqual(p.gid, 6)
 
135
 
 
136
    def testDefaults(self):
 
137
        p = service.Process(5)
 
138
        self.assertEqual(p.uid, 5)
 
139
        self.assertEqual(p.gid, 0)
 
140
        p = service.Process(gid=5)
 
141
        self.assertEqual(p.uid, 0)
 
142
        self.assertEqual(p.gid, 5)
 
143
        p = service.Process()
 
144
        self.assertEqual(p.uid, 0)
 
145
        self.assertEqual(p.gid, 0)
 
146
 
 
147
    def testProcessName(self):
 
148
        p = service.Process()
 
149
        self.assertEqual(p.processName, None)
 
150
        p.processName = 'hello'
 
151
        self.assertEqual(p.processName, 'hello')
 
152
 
 
153
 
 
154
class TestInterfaces(unittest.TestCase):
 
155
 
 
156
    def testService(self):
 
157
        self.assert_(components.implements(service.Service(),
 
158
                                           service.IService))
 
159
 
 
160
    def testMultiService(self):
 
161
        self.assert_(components.implements(service.MultiService(),
 
162
                                           service.IService))
 
163
        self.assert_(components.implements(service.MultiService(),
 
164
                                           service.IServiceCollection))
 
165
 
 
166
    def testProcess(self):
 
167
        self.assert_(components.implements(service.Process(),
 
168
                                           service.IProcess))
 
169
 
 
170
 
 
171
class TestApplication(unittest.TestCase):
 
172
 
 
173
    def testConstructor(self):
 
174
        service.Application("hello")
 
175
        service.Application("hello", 5)
 
176
        service.Application("hello", 5, 6)
 
177
 
 
178
    def testProcessComponent(self):
 
179
        a = service.Application("hello")
 
180
        self.assertEqual(service.IProcess(a).uid, 0)
 
181
        self.assertEqual(service.IProcess(a).gid, 0)
 
182
        a = service.Application("hello", 5)
 
183
        self.assertEqual(service.IProcess(a).uid, 5)
 
184
        self.assertEqual(service.IProcess(a).gid, 0)
 
185
        a = service.Application("hello", 5, 6)
 
186
        self.assertEqual(service.IProcess(a).uid, 5)
 
187
        self.assertEqual(service.IProcess(a).gid, 6)
 
188
 
 
189
    def testServiceComponent(self):
 
190
        a = service.Application("hello")
 
191
        self.assert_(service.IService(a) is service.IServiceCollection(a))
 
192
        self.assertEqual(service.IService(a).name, "hello")
 
193
        self.assertEqual(service.IService(a).parent, None)
 
194
 
 
195
    def testPersistableComponent(self):
 
196
        a = service.Application("hello")
 
197
        p = sob.IPersistable(a)
 
198
        self.assertEqual(p.style, 'pickle')
 
199
        self.assertEqual(p.name, 'hello')
 
200
        self.assert_(p.original is a)
 
201
 
 
202
class TestLoading(unittest.TestCase):
 
203
 
 
204
    def test_simpleStoreAndLoad(self):
 
205
        a = service.Application("hello")
 
206
        p = sob.IPersistable(a)
 
207
        for style in 'xml source pickle'.split():
 
208
            p.setStyle(style)
 
209
            p.save()
 
210
            a1 = service.loadApplication("hello.ta"+style[0], style)
 
211
            self.assertEqual(service.IService(a1).name, "hello")
 
212
        open("hello.tac", 'w').writelines([
 
213
        "from twisted.application import service\n",
 
214
        "application = service.Application('hello')\n",
 
215
        ])
 
216
        a1 = service.loadApplication("hello.tac", 'python')
 
217
        self.assertEqual(service.IService(a1).name, "hello")
 
218
 
 
219
    def test_implicitConversion(self):
 
220
        a = Dummy()
 
221
        a.__dict__ = {'udpConnectors': [], 'unixConnectors': [],
 
222
                      '_listenerDict': {}, 'name': 'dummy',
 
223
                      'sslConnectors': [], 'unixPorts': [],
 
224
                      '_extraListeners': {}, 'sslPorts': [], 'tcpPorts': [],
 
225
                      'services': {}, 'gid': 0, 'tcpConnectors': [],
 
226
                      'extraConnectors': [], 'udpPorts': [], 'extraPorts': [],
 
227
                      'uid': 0}
 
228
        pickle.dump(a, open("file.tap", 'w'))
 
229
        a1 = service.loadApplication("file.tap", "pickle", None)
 
230
        self.assertEqual(service.IService(a1).name, "dummy")
 
231
        self.assertEqual(list(service.IServiceCollection(a1)), [])
 
232
 
 
233
 
 
234
class TestAppSupport(unittest.TestCase):
 
235
 
 
236
    def testPassphrase(self):
 
237
        self.assertEqual(app.getPassphrase(0), None)
 
238
 
 
239
    def testLoadApplication(self):
 
240
        a = service.Application("hello")
 
241
        #baseconfig = {'file': None, 'xml': None, 'source': None, 'python':None}
 
242
        #for style in 'source xml pickle'.split():
 
243
        #    config = baseconfig.copy()
 
244
        #    config[{'pickle': 'file'}.get(style, style)] = 'helloapplication'
 
245
        #    sob.IPersistable(a).setStyle(style)
 
246
        #    sob.IPersistable(a).save(filename='helloapplication')
 
247
        #    a1 = app.getApplication(config, None)
 
248
        #    self.assertEqual(service.IService(a1).name, "hello")
 
249
        #config = baseconfig.copy()
 
250
        #config['python'] = 'helloapplication'
 
251
        #open("helloapplication", 'w').writelines([
 
252
        #"from twisted.application import service\n",
 
253
        #"application = service.Application('hello')\n",
 
254
        #])
 
255
        #a1 = app.getApplication(config, None)
 
256
        #self.assertEqual(service.IService(a1).name, "hello")
 
257
 
 
258
    def test_convertStyle(self):
 
259
        appl = service.Application("lala")
 
260
        for instyle in 'xml source pickle'.split():
 
261
            for outstyle in 'xml source pickle'.split():
 
262
                sob.IPersistable(appl).setStyle(instyle)
 
263
                sob.IPersistable(appl).save(filename="converttest")
 
264
                app.convertStyle("converttest", instyle, None,
 
265
                                 "converttest.out", outstyle, 0)
 
266
                appl2 = service.loadApplication("converttest.out", outstyle)
 
267
                self.assertEqual(service.IService(appl2).name, "lala")
 
268
 
 
269
    def test_getLogFile(self):
 
270
        os.mkdir("logfiledir")
 
271
        l = app.getLogFile(os.path.join("logfiledir", "lala"))
 
272
        self.assertEqual(l.path,
 
273
                         os.path.abspath(os.path.join("logfiledir", "lala")))
 
274
        self.assertEqual(l.name, "lala")
 
275
        self.assertEqual(l.directory, os.path.abspath("logfiledir"))
 
276
 
 
277
    def test_startApplication(self):
 
278
        appl = service.Application("lala")
 
279
        app.startApplication(appl, 0)
 
280
        self.assert_(service.IService(appl).running)
 
281
 
 
282
class TestInternet(unittest.TestCase):
 
283
 
 
284
    def testUNIX(self):
 
285
        s = service.MultiService()
 
286
        c = compat.IOldApplication(s)
 
287
        factory = protocol.ServerFactory()
 
288
        factory.protocol = wire.Echo
 
289
        if os.path.exists('.hello.skt'):
 
290
            os.remove('hello.skt')
 
291
        c.listenUNIX('./hello.skt', factory)
 
292
        class Foo(basic.LineReceiver):
 
293
            def connectionMade(self):
 
294
                self.transport.write('lalala\r\n')
 
295
            def lineReceived(self, line):
 
296
                self.factory.line = line
 
297
        factory = protocol.ClientFactory()
 
298
        factory.protocol = Foo
 
299
        factory.line = None
 
300
        c.connectUNIX('./hello.skt', factory)
 
301
        s.privilegedStartService()
 
302
        s.startService()
 
303
        while factory.line is None:
 
304
            reactor.iterate(0.1)
 
305
        s.stopService()
 
306
        self.assertEqual(factory.line, 'lalala')
 
307
 
 
308
    def testTCP(self):
 
309
        s = service.MultiService()
 
310
        c = compat.IOldApplication(s)
 
311
        factory = protocol.ServerFactory()
 
312
        factory.protocol = wire.Echo
 
313
        c.listenTCP(0, factory)
 
314
        s.privilegedStartService()
 
315
        s.startService()
 
316
        num = list(s)[0]._port.getHost()[2]
 
317
        class Foo(basic.LineReceiver):
 
318
            def connectionMade(self):
 
319
                self.transport.write('lalala\r\n')
 
320
            def lineReceived(self, line):
 
321
                self.factory.line = line
 
322
        factory = protocol.ClientFactory()
 
323
        factory.protocol = Foo
 
324
        factory.line = None
 
325
        c.connectTCP('localhost', num, factory)
 
326
        while factory.line is None:
 
327
            reactor.iterate(0.1)
 
328
        s.stopService()
 
329
        self.assertEqual(factory.line, 'lalala')
 
330
 
 
331
    def testCalling(self):
 
332
        s = service.MultiService()
 
333
        c = compat.IOldApplication(s)
 
334
        c.listenWith(None)
 
335
        self.assertEqual(list(s)[0].args[0], None)
 
336
        c.listenTCP(None, None)
 
337
        self.assertEqual(list(s)[1].args[:2], (None,)*2)
 
338
        c.listenSSL(None, None, None)
 
339
        self.assertEqual(list(s)[2].args[:3], (None,)*3)
 
340
        c.listenUDP(None, None)
 
341
        self.assertEqual(list(s)[3].args[:2], (None,)*2)
 
342
        c.listenUNIX(None, None)
 
343
        self.assertEqual(list(s)[4].args[:2], (None,)*2)
 
344
        for ch in s:
 
345
            self.assert_(ch.privileged)
 
346
        c.connectWith(None)
 
347
        self.assertEqual(list(s)[5].args[0], None)
 
348
        c.connectTCP(None, None, None)
 
349
        self.assertEqual(list(s)[6].args[:3], (None,)*3)
 
350
        c.connectSSL(None, None, None, None)
 
351
        self.assertEqual(list(s)[7].args[:4], (None,)*4)
 
352
        c.connectUDP(None, None, None)
 
353
        self.assertEqual(list(s)[8].args[:3], (None,)*3)
 
354
        c.connectUNIX(None, None)
 
355
        self.assertEqual(list(s)[9].args[:2], (None,)*2)
 
356
        self.assertEqual(len(list(s)), 10)
 
357
        for ch in s:
 
358
            self.failIf(ch.kwargs)
 
359
            self.assertEqual(ch.name, None)
 
360
 
 
361
    def testUnlistenersCallable(self):
 
362
        s = service.MultiService()
 
363
        c = compat.IOldApplication(s)
 
364
        self.assert_(callable(c.unlistenTCP))
 
365
        self.assert_(callable(c.unlistenUNIX))
 
366
        self.assert_(callable(c.unlistenUDP))
 
367
        self.assert_(callable(c.unlistenSSL))
 
368
 
 
369
    def testServices(self):
 
370
        s = service.MultiService()
 
371
        c = compat.IOldApplication(s)
 
372
        ch = service.Service()
 
373
        ch.setName("lala")
 
374
        ch.setServiceParent(c)
 
375
        self.assertEqual(c.getServiceNamed("lala"), ch)
 
376
        ch.disownServiceParent()
 
377
        self.assertEqual(list(s), [])
 
378
 
 
379
    def testInterface(self):
 
380
        s = service.MultiService()
 
381
        c = compat.IOldApplication(s)
 
382
        for key in compat.IOldApplication.__dict__.keys():
 
383
            if callable(getattr(compat.IOldApplication, key)):
 
384
                self.assert_(callable(getattr(c, key)))
 
385
 
 
386
 
 
387
class DummyApp:
 
388
    processName = None
 
389
    def addService(self, service):
 
390
        self.services[service.name] = service
 
391
    def removeService(self, service):
 
392
        del self.services[service.name]
 
393
    
 
394
 
 
395
class TestConvert(unittest.TestCase):
 
396
 
 
397
    def testSimpleInternet(self):
 
398
        s = "(dp0\nS'udpConnectors'\np1\n(lp2\nsS'unixConnectors'\np3\n(lp4\nsS'twisted.internet.app.Application.persistenceVersion'\np5\nI12\nsS'name'\np6\nS'web'\np7\nsS'sslConnectors'\np8\n(lp9\nsS'sslPorts'\np10\n(lp11\nsS'tcpPorts'\np12\n(lp13\n(I8080\n(itwisted.web.server\nSite\np14\n(dp16\nS'resource'\np17\n(itwisted.web.test\nTest\np18\n(dp19\nS'files'\np20\n(lp21\nsS'paths'\np22\n(dp23\nsS'tmpl'\np24\n(lp25\nS'\\n    Congratulations, twisted.web appears to work!\\n    <ul>\\n    <li>Funky Form:\\n    '\np26\naS'self.funkyForm()'\np27\naS'\\n    <li>Exception Handling:\\n    '\np28\naS'self.raiseHell()'\np29\naS'\\n    </ul>\\n    '\np30\nasS'widgets'\np31\n(dp32\nsS'variables'\np33\n(dp34\nsS'modules'\np35\n(lp36\nsS'children'\np37\n(dp38\nsbsS'logPath'\np39\nNsS'timeOut'\np40\nI43200\nsS'sessions'\np41\n(dp42\nsbI5\nS''\np43\ntp44\nasS'unixPorts'\np45\n(lp46\nsS'services'\np47\n(dp48\nsS'gid'\np49\nI1000\nsS'tcpConnectors'\np50\n(lp51\nsS'extraConnectors'\np52\n(lp53\nsS'udpPorts'\np54\n(lp55\nsS'extraPorts'\np56\n(lp57\nsS'persistStyle'\np58\nS'pickle'\np59\nsS'uid'\np60\nI1000\ns."
 
399
        d = pickle.loads(s)
 
400
        a = Dummy()
 
401
        a.__dict__ = d
 
402
        appl = compat.convert(a)
 
403
        self.assertEqual(service.IProcess(appl).uid, 1000)
 
404
        self.assertEqual(service.IProcess(appl).gid, 1000)
 
405
        self.assertEqual(service.IService(appl).name, "web")
 
406
        services = list(service.IServiceCollection(appl))
 
407
        self.assertEqual(len(services), 1)
 
408
        s = services[0]
 
409
        self.assertEqual(s.parent, service.IServiceCollection(appl))
 
410
        self.assert_(s.privileged)
 
411
        self.assert_(isinstance(s, internet.TCPServer))
 
412
        args = s.args
 
413
        self.assertEqual(args[0], 8080)
 
414
        args[1].resource.template
 
415
        self.assertEqual(args[3], '')
 
416
 
 
417
    def testSimpleUNIX(self):
 
418
        s = "(dp0\nS'udpConnectors'\np1\n(lp2\nsS'unixConnectors'\np3\n(lp4\nsS'twisted.internet.app.Application.persistenceVersion'\np5\nI12\nsS'name'\np6\nS'web'\np7\nsS'sslConnectors'\np8\n(lp9\nsS'sslPorts'\np10\n(lp11\nsS'tcpPorts'\np12\n(lp13\nsS'unixPorts'\np14\n(lp15\n(S'/home/moshez/.twistd-web-pb'\np16\n(itwisted.spread.pb\nBrokerFactory\np17\n(dp19\nS'objectToBroker'\np20\n(itwisted.web.distrib\nResourcePublisher\np21\n(dp22\nS'twisted.web.distrib.ResourcePublisher.persistenceVersion'\np23\nI2\nsS'site'\np24\n(itwisted.web.server\nSite\np25\n(dp26\nS'resource'\np27\n(itwisted.web.static\nFile\np28\n(dp29\nS'ignoredExts'\np30\n(lp31\nsS'defaultType'\np32\nS'text/html'\np33\nsS'registry'\np34\n(itwisted.web.static\nRegistry\np35\n(dp36\nS'twisted.web.static.Registry.persistenceVersion'\np37\nI1\nsS'twisted.python.components.Componentized.persistenceVersion'\np38\nI1\nsS'_pathCache'\np39\n(dp40\nsS'_adapterCache'\np41\n(dp42\nS'twisted.internet.interfaces.IServiceCollection'\np43\n(itwisted.internet.app\nApplication\np44\n(dp45\ng1\ng2\nsg3\ng4\nsg5\nI12\nsg6\ng7\nsg8\ng9\nsg10\ng11\nsg12\ng13\nsg14\ng15\nsS'extraPorts'\np46\n(lp47\nsS'gid'\np48\nI1053\nsS'tcpConnectors'\np49\n(lp50\nsS'extraConnectors'\np51\n(lp52\nsS'udpPorts'\np53\n(lp54\nsS'services'\np55\n(dp56\nsS'persistStyle'\np57\nS'pickle'\np58\nsS'delayeds'\np59\n(lp60\nsS'uid'\np61\nI1053\nsbssbsS'encoding'\np62\nNsS'twisted.web.static.File.persistenceVersion'\np63\nI6\nsS'path'\np64\nS'/home/moshez/public_html.twistd'\np65\nsS'type'\np66\ng33\nsS'children'\np67\n(dp68\nsS'processors'\np69\n(dp70\nS'.php3'\np71\nctwisted.web.twcgi\nPHP3Script\np72\nsS'.rpy'\np73\nctwisted.web.script\nResourceScript\np74\nsS'.php'\np75\nctwisted.web.twcgi\nPHPScript\np76\nsS'.cgi'\np77\nctwisted.web.twcgi\nCGIScript\np78\nsS'.epy'\np79\nctwisted.web.script\nPythonScript\np80\nsS'.trp'\np81\nctwisted.web.trp\nResourceUnpickler\np82\nssbsS'logPath'\np83\nNsS'sessions'\np84\n(dp85\nsbsbsS'twisted.spread.pb.BrokerFactory.persistenceVersion'\np86\nI3\nsbI5\nI438\ntp87\nasg55\ng56\nsg48\nI1053\nsg49\ng50\nsg51\ng52\nsg53\ng54\nsg46\ng47\nsg57\ng58\nsg61\nI1053\nsg59\ng60\ns."
 
419
        d = pickle.loads(s)
 
420
        a = Dummy()
 
421
        a.__dict__ = d
 
422
        appl = compat.convert(a)
 
423
        self.assertEqual(service.IProcess(appl).uid, 1053)
 
424
        self.assertEqual(service.IProcess(appl).gid, 1053)
 
425
        self.assertEqual(service.IService(appl).name, "web")
 
426
        services = list(service.IServiceCollection(appl))
 
427
        self.assertEqual(len(services), 1)
 
428
        s = services[0]
 
429
        self.assertEqual(s.parent, service.IServiceCollection(appl))
 
430
        self.assert_(s.privileged)
 
431
        self.assert_(isinstance(s, internet.UNIXServer))
 
432
        args = s.args
 
433
        self.assertEqual(args[0], '/home/moshez/.twistd-web-pb')
 
434
 
 
435
    def testSimpleService(self):
 
436
        a = DummyApp()
 
437
        a.__dict__ = {'udpConnectors': [], 'unixConnectors': [],
 
438
                      '_listenerDict': {}, 'name': 'dummy',
 
439
                      'sslConnectors': [], 'unixPorts': [],
 
440
                      '_extraListeners': {}, 'sslPorts': [], 'tcpPorts': [],
 
441
                      'services': {}, 'gid': 0, 'tcpConnectors': [],
 
442
                      'extraConnectors': [], 'udpPorts': [], 'extraPorts': [],
 
443
                      'uid': 0}
 
444
        s = service.Service()
 
445
        s.setName("lala")
 
446
        s.setServiceParent(a)
 
447
        appl = compat.convert(a)
 
448
        services = list(service.IServiceCollection(appl))
 
449
        self.assertEqual(len(services), 1)
 
450
        s1 = services[0]
 
451
        self.assertEqual(s, s1)
 
452
 
 
453
class TestInternet(unittest.TestCase):
 
454
 
 
455
    def testTCP(self):
 
456
        s = service.MultiService()
 
457
        s.startService()
 
458
        factory = protocol.ServerFactory()
 
459
        factory.protocol = wire.Echo
 
460
        t = internet.TCPServer(0, factory)
 
461
        t.setServiceParent(s)
 
462
        num = t._port.getHost()[2]
 
463
        class Foo(basic.LineReceiver):
 
464
            def connectionMade(self):
 
465
                self.transport.write('lalala\r\n')
 
466
            def lineReceived(self, line):
 
467
                self.factory.line = line
 
468
        factory = protocol.ClientFactory()
 
469
        factory.protocol = Foo
 
470
        factory.line = None
 
471
        internet.TCPClient('localhost', num, factory).setServiceParent(s)
 
472
        while factory.line is None:
 
473
            reactor.iterate(0.1)
 
474
        self.assertEqual(factory.line, 'lalala')
 
475
 
 
476
    def testPrivileged(self):
 
477
        factory = protocol.ServerFactory()
 
478
        factory.protocol = wire.Echo
 
479
        t = internet.TCPServer(0, factory)
 
480
        t.privileged = 1
 
481
        t.privilegedStartService()
 
482
        num = t._port.getHost()[2]
 
483
        class Foo(basic.LineReceiver):
 
484
            def connectionMade(self):
 
485
                self.transport.write('lalala\r\n')
 
486
            def lineReceived(self, line):
 
487
                self.factory.line = line
 
488
        factory = protocol.ClientFactory()
 
489
        factory.protocol = Foo
 
490
        factory.line = None
 
491
        internet.TCPClient('localhost', num, factory).startService()
 
492
        while factory.line is None:
 
493
            reactor.iterate(0.1)
 
494
        self.assertEqual(factory.line, 'lalala')
 
495
 
 
496
    def testConnectionGettingRefused(self):
 
497
        factory = protocol.ServerFactory()
 
498
        factory.protocol = wire.Echo
 
499
        t = internet.TCPServer(0, factory)
 
500
        t.startService()
 
501
        num = t._port.getHost()[2]
 
502
        t.stopService()
 
503
        l = []
 
504
        factory = protocol.ClientFactory()
 
505
        factory.clientConnectionFailed = lambda *args: l.append(None)
 
506
        c = internet.TCPClient('localhost', num, factory)
 
507
        c.startService()
 
508
        while not l:
 
509
            reactor.iterate(0.1)
 
510
        self.assertEqual(l, [None])
 
511
 
 
512
    def testUNIX(self):
 
513
        s = service.MultiService()
 
514
        s.startService()
 
515
        factory = protocol.ServerFactory()
 
516
        factory.protocol = wire.Echo
 
517
        t = internet.UNIXServer('echo.skt', factory)
 
518
        t.setServiceParent(s)
 
519
        class Foo(basic.LineReceiver):
 
520
            def connectionMade(self):
 
521
                self.transport.write('lalala\r\n')
 
522
            def lineReceived(self, line):
 
523
                self.factory.line = line
 
524
        factory = protocol.ClientFactory()
 
525
        factory.protocol = Foo
 
526
        factory.line = None
 
527
        internet.UNIXClient('echo.skt', factory).setServiceParent(s)
 
528
        while factory.line is None:
 
529
            reactor.iterate(0.1)
 
530
        self.assertEqual(factory.line, 'lalala')
 
531
        s.stopService()
 
532
 
 
533
    def testVolatile(self):
 
534
        factory = protocol.ServerFactory()
 
535
        factory.protocol = wire.Echo
 
536
        t = internet.UNIXServer('echo.skt', factory)
 
537
        t.startService()
 
538
        self.assert_(hasattr(t, '_port'))
 
539
        t1 = copy.copy(t)
 
540
        self.assert_(not hasattr(t1, '_port'))
 
541
        t.stopService()
 
542
        t = internet.TimerService(1, lambda:None)
 
543
        t.startService()
 
544
        self.assert_(hasattr(t, '_call'))
 
545
        t1 = copy.copy(t)
 
546
        self.assert_(not hasattr(t1, '_call'))
 
547
        t.stopService()
 
548
        factory = protocol.ClientFactory()
 
549
        factory.protocol = wire.Echo
 
550
        t = internet.UNIXClient('echo.skt', factory)
 
551
        t.startService()
 
552
        self.assert_(hasattr(t, '_connection'))
 
553
        t1 = copy.copy(t)
 
554
        self.assert_(not hasattr(t1, '_connection'))
 
555
        t.stopService()
 
556
        self.failIf(t.running)
 
557
 
 
558
    def testStoppingServer(self):
 
559
        factory = protocol.ServerFactory()
 
560
        factory.protocol = wire.Echo
 
561
        t = internet.UNIXServer('echo.skt', factory)
 
562
        t.startService()
 
563
        t.stopService()
 
564
        self.failIf(t.running)
 
565
        factory = protocol.ClientFactory()
 
566
        l = []
 
567
        factory.clientConnectionFailed = lambda *args: l.append(None)
 
568
        reactor.connectUNIX('echo.skt', factory)
 
569
        while not l:
 
570
            reactor.iterate(0.1)
 
571
        self.assertEqual(l, [None])
 
572
 
 
573
    def testTimer(self):
 
574
        l = []
 
575
        t = internet.TimerService(1, l.append, "hello")
 
576
        t.startService()
 
577
        while not l:
 
578
            reactor.iterate(0.1)
 
579
        t.stopService()
 
580
        self.failIf(t.running)
 
581
        self.assertEqual(l, ["hello"])
 
582
        l = []
 
583
        t = internet.TimerService(0.01, l.append, "hello")
 
584
        t.startService()
 
585
        while len(l)<10:
 
586
            reactor.iterate(0.1)
 
587
        t.stopService()
 
588
        self.assertEqual(l, ["hello"]*10)
 
589
 
 
590
    def testEverythingThere(self):
 
591
        for tran in 'Generic TCP UNIX SSL UDP UNIXDatagram Multicast'.split():
 
592
            for side in 'Server Client'.split():
 
593
                self.assert_(hasattr(internet, tran+side))
 
594
                method = getattr(internet, tran+side).method
 
595
                prefix = {'Server': 'listen', 'Client': 'connect'}[side]
 
596
                self.assert_(hasattr(reactor, prefix+method))
 
597
                o = getattr(internet, tran+side)
 
598
                self.assertEqual(service.IService(o), o)