1
# -*- test-case-name: twisted.test.test_unix -*-
2
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
3
# See LICENSE for details.
9
from twisted.internet import interfaces, reactor, protocol, error, address, defer, utils
10
from twisted.python import lockfile, failure
11
from twisted.protocols import loopback
12
from twisted.trial import unittest
15
class MyProtocol(protocol.Protocol):
16
made = closed = failed = 0
18
def connectionMade(self):
19
self.deferred.callback(None)
21
def dataReceived(self, data):
24
def connectionLost(self, reason):
27
class TestClientFactory(protocol.ClientFactory):
30
def __init__(self, testcase, name):
31
self.testcase = testcase
33
self.deferred = defer.Deferred()
35
def buildProtocol(self, addr):
36
self.testcase.assertEquals(address.UNIXAddress(self.name), addr)
37
self.protocol = MyProtocol()
38
self.protocol.deferred = self.deferred
41
class Factory(protocol.Factory):
42
protocol = stopped = None
44
def __init__(self, testcase, name, peername=''):
45
self.testcase = testcase
47
self.peername = peername
48
self.deferred = defer.Deferred()
50
def stopFactory(self):
53
def buildProtocol(self, addr):
54
# os.path.samefile fails on ('', '')
55
if self.peername or addr.name:
56
self.testcase.assertEquals(address.UNIXAddress(self.peername), addr,
57
'%r != %r' % (self.peername, addr.name))
59
self.testcase.assertEquals(self.peername, addr.name)
60
self.protocol = p = MyProtocol()
61
self.protocol.deferred = self.deferred
65
class FailedConnectionClientFactory(protocol.ClientFactory):
66
def __init__(self, onFail):
69
def clientConnectionFailed(self, connector, reason):
70
self.onFail.errback(reason)
73
class PortCleanerUpper(unittest.TestCase):
74
callToLoseCnx = 'loseConnection'
79
return self.cleanPorts(*self.ports)
81
def _addPorts(self, *ports):
85
def cleanPorts(self, *ports):
86
ds = [ defer.maybeDeferred(p.loseConnection)
87
for p in ports if p.connected ]
88
return defer.gatherResults(ds)
91
class UnixSocketTestCase(PortCleanerUpper):
92
"""Test unix sockets."""
94
def testPeerBind(self):
95
"""assert the remote endpoint (getPeer) on the receiving end matches
96
the local endpoint (bind) on the connecting end, for unix sockets"""
97
filename = self.mktemp()
98
peername = self.mktemp()
99
f = Factory(self, filename, peername=peername)
100
l = reactor.listenUNIX(filename, f)
101
self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
102
self._sock.bind(peername)
103
self._sock.connect(filename)
113
def testDumber(self):
114
filename = self.mktemp()
115
f = Factory(self, filename)
116
l = reactor.listenUNIX(filename, f)
117
tcf = TestClientFactory(self, filename)
118
c = reactor.connectUNIX(filename, tcf)
119
d = defer.gatherResults([f.deferred, tcf.deferred])
120
d.addCallback(lambda x : self._addPorts(l, c.transport,
121
tcf.protocol.transport,
122
f.protocol.transport))
126
filename = self.mktemp()
127
f = Factory(self, filename)
128
l = reactor.listenUNIX(filename, f, mode = 0600)
129
self.assertEquals(stat.S_IMODE(os.stat(filename)[0]), 0600)
130
tcf = TestClientFactory(self, filename)
131
c = reactor.connectUNIX(filename, tcf)
132
self._addPorts(l, c.transport)
135
def testPIDFile(self):
136
filename = self.mktemp()
137
f = Factory(self, filename)
138
l = reactor.listenUNIX(filename, f, mode = 0600, wantPID=1)
139
self.failUnless(lockfile.isLocked(filename + ".lock"))
140
tcf = TestClientFactory(self, filename)
141
c = reactor.connectUNIX(filename, tcf, checkPID=1)
142
d = defer.gatherResults([f.deferred, tcf.deferred])
143
def _portStuff(ignored):
144
self._addPorts(l, c.transport, tcf.protocol.transport,
145
f.protocol.transport)
146
return self.cleanPorts(*self.ports)
148
self.failIf(lockfile.isLocked(filename + ".lock"), 'locked')
149
d.addCallback(_portStuff)
150
d.addCallback(_check)
154
def testSocketLocking(self):
155
filename = self.mktemp()
156
f = Factory(self, filename)
157
l = reactor.listenUNIX(filename, f, wantPID=True)
160
error.CannotListenError,
161
reactor.listenUNIX, filename, f, wantPID=True)
163
def stoppedListening(ign):
164
l = reactor.listenUNIX(filename, f, wantPID=True)
165
return l.stopListening()
167
return l.stopListening().addCallback(stoppedListening)
170
def _uncleanSocketTest(self, callback):
171
self.filename = self.mktemp()
172
source = ("from twisted.internet import protocol, reactor\n"
173
"reactor.listenUNIX(%r, protocol.ServerFactory(), wantPID=True)\n") % (self.filename,)
174
env = {'PYTHONPATH': os.pathsep.join(sys.path)}
176
d = utils.getProcessOutput(sys.executable, ("-u", "-c", source), env=env)
177
d.addCallback(callback)
181
def testUncleanServerSocketLocking(self):
182
def ranStupidChild(ign):
183
# If this next call succeeds, our lock handling is correct.
184
p = reactor.listenUNIX(self.filename, Factory(self, self.filename), wantPID=True)
185
return p.stopListening()
186
return self._uncleanSocketTest(ranStupidChild)
189
def testUncleanSocketLockingFromThePerspectiveOfAClientConnectingToTheDeadServerSocket(self):
190
def ranStupidChild(ign):
192
f = FailedConnectionClientFactory(d)
193
c = reactor.connectUNIX(self.filename, f, checkPID=True)
194
return self.assertFailure(d, error.BadFileError)
195
return self._uncleanSocketTest(ranStupidChild)
199
filename = self.mktemp()
200
f = Factory(self, filename)
201
p = reactor.listenUNIX(filename, f)
202
self.failIf(str(p).find(filename) == -1)
204
def stoppedListening(ign):
205
self.failIf(str(p).find(filename) != -1)
207
return defer.maybeDeferred(p.stopListening).addCallback(stoppedListening)
209
class ClientProto(protocol.ConnectedDatagramProtocol):
210
started = stopped = False
214
self.deferredStarted = defer.Deferred()
215
self.deferredGotBack = defer.Deferred()
217
def stopProtocol(self):
220
def startProtocol(self):
222
self.deferredStarted.callback(None)
224
def datagramReceived(self, data):
226
self.deferredGotBack.callback(None)
228
class ServerProto(protocol.DatagramProtocol):
229
started = stopped = False
230
gotwhat = gotfrom = None
233
self.deferredStarted = defer.Deferred()
234
self.deferredGotWhat = defer.Deferred()
236
def stopProtocol(self):
239
def startProtocol(self):
241
self.deferredStarted.callback(None)
243
def datagramReceived(self, data, addr):
245
self.transport.write("hi back", addr)
247
self.deferredGotWhat.callback(None)
249
class DatagramUnixSocketTestCase(PortCleanerUpper):
250
"""Test datagram UNIX sockets."""
251
def testExchange(self):
252
clientaddr = self.mktemp()
253
serveraddr = self.mktemp()
256
s = reactor.listenUNIXDatagram(serveraddr, sp)
257
c = reactor.connectUNIXDatagram(serveraddr, cp, bindAddress = clientaddr)
259
d = defer.gatherResults([sp.deferredStarted, cp.deferredStarted])
261
cp.transport.write("hi")
262
return defer.gatherResults([sp.deferredGotWhat,
265
def cleanup(ignored):
266
d1 = defer.maybeDeferred(s.stopListening)
267
d1.addCallback(lambda x : os.unlink(clientaddr))
268
d2 = defer.maybeDeferred(c.stopListening)
269
d2.addCallback(lambda x : os.unlink(serveraddr))
270
return defer.gatherResults([d1, d2])
272
def _cbTestExchange(ignored):
273
self.failUnlessEqual("hi", sp.gotwhat)
274
self.failUnlessEqual(clientaddr, sp.gotfrom)
275
self.failUnlessEqual("hi back", cp.gotback)
278
d.addCallback(cleanup)
279
d.addCallback(_cbTestExchange)
282
def testCannotListen(self):
285
s = reactor.listenUNIXDatagram(addr, p)
286
self.failUnlessRaises(error.CannotListenError, reactor.listenUNIXDatagram, addr, p)
289
# test connecting to bound and connected (somewhere else) address
292
filename = self.mktemp()
294
p = reactor.listenUNIXDatagram(filename, f)
295
self.failIf(str(p).find(filename) == -1)
297
def stoppedListening(ign):
298
self.failIf(str(p).find(filename) != -1)
300
return defer.maybeDeferred(p.stopListening).addCallback(stoppedListening)
303
if not interfaces.IReactorUNIX(reactor, None):
304
UnixSocketTestCase.skip = "This reactor does not support UNIX domain sockets"
305
if not interfaces.IReactorUNIXDatagram(reactor, None):
306
DatagramUnixSocketTestCase.skip = "This reactor does not support UNIX datagram sockets"