~landscape/zope3/ztk-1.1.3

« back to all changes in this revision

Viewing changes to src/twisted/test/test_unix.py

  • Committer: Andreas Hasenack
  • Date: 2009-07-20 17:49:16 UTC
  • Revision ID: andreas@canonical.com-20090720174916-g2tn6qmietz2hn0u
Revert twisted removal, it breaks several dozen tests [trivial]

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- test-case-name: twisted.test.test_unix -*-
 
2
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
 
3
# See LICENSE for details.
 
4
 
 
5
 
 
6
import stat, os, sys
 
7
import socket
 
8
 
 
9
from twisted.internet import interfaces, reactor, protocol, error, address, defer, utils
 
10
from twisted.python import lockfile, failure
 
11
from twisted.protocols import loopback
 
12
from twisted.trial import unittest
 
13
 
 
14
 
 
15
class MyProtocol(protocol.Protocol):
 
16
    made = closed = failed = 0
 
17
    data = ""
 
18
    def connectionMade(self):
 
19
        self.deferred.callback(None)
 
20
 
 
21
    def dataReceived(self, data):
 
22
        self.data += data
 
23
 
 
24
    def connectionLost(self, reason):
 
25
        self.closed = 1
 
26
 
 
27
class TestClientFactory(protocol.ClientFactory):
 
28
    protocol = None
 
29
 
 
30
    def __init__(self, testcase, name):
 
31
        self.testcase = testcase
 
32
        self.name = name
 
33
        self.deferred = defer.Deferred()
 
34
 
 
35
    def buildProtocol(self, addr):
 
36
        self.testcase.assertEquals(address.UNIXAddress(self.name), addr)
 
37
        self.protocol = MyProtocol()
 
38
        self.protocol.deferred = self.deferred
 
39
        return self.protocol
 
40
 
 
41
class Factory(protocol.Factory):
 
42
    protocol = stopped = None
 
43
 
 
44
    def __init__(self, testcase, name, peername=''):
 
45
        self.testcase = testcase
 
46
        self.name = name
 
47
        self.peername = peername
 
48
        self.deferred = defer.Deferred()
 
49
 
 
50
    def stopFactory(self):
 
51
        self.stopped = True
 
52
 
 
53
    def buildProtocol(self, addr):
 
54
        # os.path.samefile fails on ('', '')
 
55
        if self.peername or addr.name:
 
56
            self.testcase.assertEquals(address.UNIXAddress(self.peername), addr,
 
57
                                       '%r != %r' % (self.peername, addr.name))
 
58
        else:
 
59
            self.testcase.assertEquals(self.peername, addr.name)
 
60
        self.protocol = p = MyProtocol()
 
61
        self.protocol.deferred = self.deferred
 
62
        return p
 
63
 
 
64
 
 
65
class FailedConnectionClientFactory(protocol.ClientFactory):
 
66
    def __init__(self, onFail):
 
67
        self.onFail = onFail
 
68
 
 
69
    def clientConnectionFailed(self, connector, reason):
 
70
        self.onFail.errback(reason)
 
71
 
 
72
 
 
73
class PortCleanerUpper(unittest.TestCase):
 
74
    callToLoseCnx = 'loseConnection'
 
75
    def setUp(self):
 
76
        self.ports = []
 
77
 
 
78
    def tearDown(self):
 
79
        return self.cleanPorts(*self.ports)
 
80
 
 
81
    def _addPorts(self, *ports):
 
82
        for p in ports:
 
83
            self.ports.append(p)
 
84
 
 
85
    def cleanPorts(self, *ports):
 
86
        ds = [ defer.maybeDeferred(p.loseConnection)
 
87
               for p in ports if p.connected ]
 
88
        return defer.gatherResults(ds)
 
89
 
 
90
 
 
91
class UnixSocketTestCase(PortCleanerUpper):
 
92
    """Test unix sockets."""
 
93
 
 
94
    def testPeerBind(self):
 
95
        """assert the remote endpoint (getPeer) on the receiving end matches
 
96
           the local endpoint (bind) on the connecting end, for unix sockets"""
 
97
        filename = self.mktemp()
 
98
        peername = self.mktemp()
 
99
        f = Factory(self, filename, peername=peername)
 
100
        l = reactor.listenUNIX(filename, f)
 
101
        self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
 
102
        self._sock.bind(peername)
 
103
        self._sock.connect(filename)            
 
104
        d = f.deferred
 
105
        def done(x):
 
106
            self._addPorts(l)
 
107
            self._sock.close()
 
108
            del self._sock
 
109
            return x
 
110
        d.addBoth(done)
 
111
        return d
 
112
 
 
113
    def testDumber(self):
 
114
        filename = self.mktemp()
 
115
        f = Factory(self, filename)
 
116
        l = reactor.listenUNIX(filename, f)
 
117
        tcf = TestClientFactory(self, filename)
 
118
        c = reactor.connectUNIX(filename, tcf)
 
119
        d = defer.gatherResults([f.deferred, tcf.deferred])
 
120
        d.addCallback(lambda x : self._addPorts(l, c.transport,
 
121
                                                tcf.protocol.transport,
 
122
                                                f.protocol.transport))
 
123
        return d
 
124
 
 
125
    def testMode(self):
 
126
        filename = self.mktemp()
 
127
        f = Factory(self, filename)
 
128
        l = reactor.listenUNIX(filename, f, mode = 0600)
 
129
        self.assertEquals(stat.S_IMODE(os.stat(filename)[0]), 0600)
 
130
        tcf = TestClientFactory(self, filename)
 
131
        c = reactor.connectUNIX(filename, tcf)
 
132
        self._addPorts(l, c.transport)
 
133
 
 
134
 
 
135
    def testPIDFile(self):
 
136
        filename = self.mktemp()
 
137
        f = Factory(self, filename)
 
138
        l = reactor.listenUNIX(filename, f, mode = 0600, wantPID=1)
 
139
        self.failUnless(lockfile.isLocked(filename + ".lock"))
 
140
        tcf = TestClientFactory(self, filename)
 
141
        c = reactor.connectUNIX(filename, tcf, checkPID=1)
 
142
        d = defer.gatherResults([f.deferred, tcf.deferred])
 
143
        def _portStuff(ignored):
 
144
            self._addPorts(l, c.transport, tcf.protocol.transport,
 
145
                           f.protocol.transport)
 
146
            return self.cleanPorts(*self.ports)
 
147
        def _check(ignored):
 
148
            self.failIf(lockfile.isLocked(filename + ".lock"), 'locked')
 
149
        d.addCallback(_portStuff)
 
150
        d.addCallback(_check)
 
151
        return d
 
152
    
 
153
 
 
154
    def testSocketLocking(self):
 
155
        filename = self.mktemp()
 
156
        f = Factory(self, filename)
 
157
        l = reactor.listenUNIX(filename, f, wantPID=True)
 
158
 
 
159
        self.assertRaises(
 
160
            error.CannotListenError,
 
161
            reactor.listenUNIX, filename, f, wantPID=True)
 
162
 
 
163
        def stoppedListening(ign):
 
164
            l = reactor.listenUNIX(filename, f, wantPID=True)
 
165
            return l.stopListening()
 
166
 
 
167
        return l.stopListening().addCallback(stoppedListening)
 
168
 
 
169
 
 
170
    def _uncleanSocketTest(self, callback):
 
171
        self.filename = self.mktemp()
 
172
        source = ("from twisted.internet import protocol, reactor\n"
 
173
                  "reactor.listenUNIX(%r, protocol.ServerFactory(), wantPID=True)\n") % (self.filename,)
 
174
        env = {'PYTHONPATH': os.pathsep.join(sys.path)}
 
175
 
 
176
        d = utils.getProcessOutput(sys.executable, ("-u", "-c", source), env=env)
 
177
        d.addCallback(callback)
 
178
        return d
 
179
 
 
180
 
 
181
    def testUncleanServerSocketLocking(self):
 
182
        def ranStupidChild(ign):
 
183
            # If this next call succeeds, our lock handling is correct.
 
184
            p = reactor.listenUNIX(self.filename, Factory(self, self.filename), wantPID=True)
 
185
            return p.stopListening()
 
186
        return self._uncleanSocketTest(ranStupidChild)
 
187
 
 
188
 
 
189
    def testUncleanSocketLockingFromThePerspectiveOfAClientConnectingToTheDeadServerSocket(self):
 
190
        def ranStupidChild(ign):
 
191
            d = defer.Deferred()
 
192
            f = FailedConnectionClientFactory(d)
 
193
            c = reactor.connectUNIX(self.filename, f, checkPID=True)
 
194
            return self.assertFailure(d, error.BadFileError)
 
195
        return self._uncleanSocketTest(ranStupidChild)
 
196
 
 
197
 
 
198
    def testRepr(self):
 
199
        filename = self.mktemp()
 
200
        f = Factory(self, filename)
 
201
        p = reactor.listenUNIX(filename, f)
 
202
        self.failIf(str(p).find(filename) == -1)
 
203
 
 
204
        def stoppedListening(ign):
 
205
            self.failIf(str(p).find(filename) != -1)
 
206
 
 
207
        return defer.maybeDeferred(p.stopListening).addCallback(stoppedListening)
 
208
 
 
209
class ClientProto(protocol.ConnectedDatagramProtocol):
 
210
    started = stopped = False
 
211
    gotback = None
 
212
 
 
213
    def __init__(self):
 
214
        self.deferredStarted = defer.Deferred()
 
215
        self.deferredGotBack = defer.Deferred()
 
216
 
 
217
    def stopProtocol(self):
 
218
        self.stopped = True
 
219
 
 
220
    def startProtocol(self):
 
221
        self.started = True
 
222
        self.deferredStarted.callback(None)
 
223
 
 
224
    def datagramReceived(self, data):
 
225
        self.gotback = data
 
226
        self.deferredGotBack.callback(None)
 
227
 
 
228
class ServerProto(protocol.DatagramProtocol):
 
229
    started = stopped = False
 
230
    gotwhat = gotfrom = None
 
231
 
 
232
    def __init__(self):
 
233
        self.deferredStarted = defer.Deferred()
 
234
        self.deferredGotWhat = defer.Deferred()
 
235
 
 
236
    def stopProtocol(self):
 
237
        self.stopped = True
 
238
 
 
239
    def startProtocol(self):
 
240
        self.started = True
 
241
        self.deferredStarted.callback(None)
 
242
 
 
243
    def datagramReceived(self, data, addr):
 
244
        self.gotfrom = addr
 
245
        self.transport.write("hi back", addr)
 
246
        self.gotwhat = data
 
247
        self.deferredGotWhat.callback(None)
 
248
 
 
249
class DatagramUnixSocketTestCase(PortCleanerUpper):
 
250
    """Test datagram UNIX sockets."""
 
251
    def testExchange(self):
 
252
        clientaddr = self.mktemp()
 
253
        serveraddr = self.mktemp()
 
254
        sp = ServerProto()
 
255
        cp = ClientProto()
 
256
        s = reactor.listenUNIXDatagram(serveraddr, sp)
 
257
        c = reactor.connectUNIXDatagram(serveraddr, cp, bindAddress = clientaddr)
 
258
 
 
259
        d = defer.gatherResults([sp.deferredStarted, cp.deferredStarted])
 
260
        def write(ignored):
 
261
            cp.transport.write("hi")
 
262
            return defer.gatherResults([sp.deferredGotWhat,
 
263
                                        cp.deferredGotBack])
 
264
 
 
265
        def cleanup(ignored):
 
266
            d1 = defer.maybeDeferred(s.stopListening)
 
267
            d1.addCallback(lambda x : os.unlink(clientaddr))
 
268
            d2 = defer.maybeDeferred(c.stopListening)
 
269
            d2.addCallback(lambda x : os.unlink(serveraddr))
 
270
            return defer.gatherResults([d1, d2])
 
271
 
 
272
        def _cbTestExchange(ignored):
 
273
            self.failUnlessEqual("hi", sp.gotwhat)
 
274
            self.failUnlessEqual(clientaddr, sp.gotfrom)
 
275
            self.failUnlessEqual("hi back", cp.gotback)
 
276
 
 
277
        d.addCallback(write)
 
278
        d.addCallback(cleanup)
 
279
        d.addCallback(_cbTestExchange)
 
280
        return d
 
281
 
 
282
    def testCannotListen(self):
 
283
        addr = self.mktemp()
 
284
        p = ServerProto()
 
285
        s = reactor.listenUNIXDatagram(addr, p)
 
286
        self.failUnlessRaises(error.CannotListenError, reactor.listenUNIXDatagram, addr, p)
 
287
        s.stopListening()
 
288
        os.unlink(addr)
 
289
    # test connecting to bound and connected (somewhere else) address
 
290
 
 
291
    def testRepr(self):
 
292
        filename = self.mktemp()
 
293
        f = ServerProto()
 
294
        p = reactor.listenUNIXDatagram(filename, f)
 
295
        self.failIf(str(p).find(filename) == -1)
 
296
 
 
297
        def stoppedListening(ign):
 
298
            self.failIf(str(p).find(filename) != -1)
 
299
 
 
300
        return defer.maybeDeferred(p.stopListening).addCallback(stoppedListening)
 
301
 
 
302
 
 
303
if not interfaces.IReactorUNIX(reactor, None):
 
304
    UnixSocketTestCase.skip = "This reactor does not support UNIX domain sockets"
 
305
if not interfaces.IReactorUNIXDatagram(reactor, None):
 
306
    DatagramUnixSocketTestCase.skip = "This reactor does not support UNIX datagram sockets"
 
307