1
# Copyright (c) 2001-2008 Twisted Matrix Laboratories.
2
# See LICENSE for details.
5
Tests for implementations of L{IReactorUNIX} and L{IReactorUNIXDatagram}.
8
import stat, os, sys, types
11
from twisted.internet import interfaces, reactor, protocol, error, address, defer, utils
12
from twisted.python import lockfile
13
from twisted.trial import unittest
15
from twisted.test.test_tcp import MyServerFactory, MyClientFactory
18
class FailedConnectionClientFactory(protocol.ClientFactory):
19
def __init__(self, onFail):
22
def clientConnectionFailed(self, connector, reason):
23
self.onFail.errback(reason)
27
class UnixSocketTestCase(unittest.TestCase):
31
def test_peerBind(self):
33
The address passed to the server factory's C{buildProtocol} method and
34
the address returned by the connected protocol's transport's C{getPeer}
35
method match the address the client socket is bound to.
37
filename = self.mktemp()
38
peername = self.mktemp()
39
serverFactory = MyServerFactory()
40
connMade = serverFactory.protocolConnectionMade = defer.Deferred()
41
unixPort = reactor.listenUNIX(filename, serverFactory)
42
self.addCleanup(unixPort.stopListening)
43
unixSocket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
44
self.addCleanup(unixSocket.close)
45
unixSocket.bind(peername)
46
unixSocket.connect(filename)
47
def cbConnMade(proto):
48
expected = address.UNIXAddress(peername)
49
self.assertEqual(serverFactory.peerAddresses, [expected])
50
self.assertEqual(proto.transport.getPeer(), expected)
51
connMade.addCallback(cbConnMade)
55
def test_dumber(self):
57
L{IReactorUNIX.connectUNIX} can be used to connect a client to a server
58
started with L{IReactorUNIX.listenUNIX}.
60
filename = self.mktemp()
61
serverFactory = MyServerFactory()
62
serverConnMade = defer.Deferred()
63
serverFactory.protocolConnectionMade = serverConnMade
64
unixPort = reactor.listenUNIX(filename, serverFactory)
65
self.addCleanup(unixPort.stopListening)
66
clientFactory = MyClientFactory()
67
clientConnMade = defer.Deferred()
68
clientFactory.protocolConnectionMade = clientConnMade
69
c = reactor.connectUNIX(filename, clientFactory)
70
d = defer.gatherResults([serverConnMade, clientConnMade])
71
def allConnected((serverProtocol, clientProtocol)):
73
# Incidental assertion which may or may not be redundant with some
74
# other test. This probably deserves its own test method.
75
self.assertEqual(clientFactory.peerAddresses,
76
[address.UNIXAddress(filename)])
78
clientProtocol.transport.loseConnection()
79
serverProtocol.transport.loseConnection()
80
d.addCallback(allConnected)
84
def test_pidFile(self):
86
A lockfile is created and locked when L{IReactorUNIX.listenUNIX} is
87
called and released when the Deferred returned by the L{IListeningPort}
88
provider's C{stopListening} method is called back.
90
filename = self.mktemp()
91
serverFactory = MyServerFactory()
92
serverConnMade = defer.Deferred()
93
serverFactory.protocolConnectionMade = serverConnMade
94
unixPort = reactor.listenUNIX(filename, serverFactory, wantPID=True)
95
self.assertTrue(lockfile.isLocked(filename + ".lock"))
97
# XXX This part would test something about the checkPID parameter, but
98
# it doesn't actually. It should be rewritten to test the several
99
# different possible behaviors. -exarkun
100
clientFactory = MyClientFactory()
101
clientConnMade = defer.Deferred()
102
clientFactory.protocolConnectionMade = clientConnMade
103
c = reactor.connectUNIX(filename, clientFactory, checkPID=1)
105
d = defer.gatherResults([serverConnMade, clientConnMade])
106
def _portStuff((serverProtocol, clientProto)):
108
# Incidental assertion which may or may not be redundant with some
109
# other test. This probably deserves its own test method.
110
self.assertEqual(clientFactory.peerAddresses,
111
[address.UNIXAddress(filename)])
113
clientProto.transport.loseConnection()
114
serverProtocol.transport.loseConnection()
115
return unixPort.stopListening()
116
d.addCallback(_portStuff)
119
self.failIf(lockfile.isLocked(filename + ".lock"), 'locked')
120
d.addCallback(_check)
124
def test_socketLocking(self):
126
L{IReactorUNIX.listenUNIX} raises L{error.CannotListenError} if passed
127
the name of a file on which a server is already listening.
129
filename = self.mktemp()
130
serverFactory = MyServerFactory()
131
unixPort = reactor.listenUNIX(filename, serverFactory, wantPID=True)
134
error.CannotListenError,
135
reactor.listenUNIX, filename, serverFactory, wantPID=True)
137
def stoppedListening(ign):
138
unixPort = reactor.listenUNIX(filename, serverFactory, wantPID=True)
139
return unixPort.stopListening()
141
return unixPort.stopListening().addCallback(stoppedListening)
144
def _uncleanSocketTest(self, callback):
145
self.filename = self.mktemp()
146
source = ("from twisted.internet import protocol, reactor\n"
147
"reactor.listenUNIX(%r, protocol.ServerFactory(), wantPID=True)\n") % (self.filename,)
148
env = {'PYTHONPATH': os.pathsep.join(sys.path)}
150
d = utils.getProcessValue(sys.executable, ("-u", "-c", source), env=env)
151
d.addCallback(callback)
155
def test_uncleanServerSocketLocking(self):
157
If passed C{True} for the C{wantPID} parameter, a server can be started
158
listening with L{IReactorUNIX.listenUNIX} when passed the name of a
159
file on which a previous server which has not exited cleanly has been
160
listening using the C{wantPID} option.
162
def ranStupidChild(ign):
163
# If this next call succeeds, our lock handling is correct.
164
p = reactor.listenUNIX(self.filename, MyServerFactory(), wantPID=True)
165
return p.stopListening()
166
return self._uncleanSocketTest(ranStupidChild)
169
def test_connectToUncleanServer(self):
171
If passed C{True} for the C{checkPID} parameter, a client connection
172
attempt made with L{IReactorUNIX.connectUNIX} fails with
173
L{error.BadFileError}.
175
def ranStupidChild(ign):
177
f = FailedConnectionClientFactory(d)
178
c = reactor.connectUNIX(self.filename, f, checkPID=True)
179
return self.assertFailure(d, error.BadFileError)
180
return self._uncleanSocketTest(ranStupidChild)
183
def _reprTest(self, serverFactory, factoryName):
185
Test the C{__str__} and C{__repr__} implementations of a UNIX port when
186
used with the given factory.
188
filename = self.mktemp()
189
unixPort = reactor.listenUNIX(filename, serverFactory)
191
connectedString = "<%s on %r>" % (factoryName, filename)
192
self.assertEqual(repr(unixPort), connectedString)
193
self.assertEqual(str(unixPort), connectedString)
195
d = defer.maybeDeferred(unixPort.stopListening)
196
def stoppedListening(ign):
197
unconnectedString = "<%s (not listening)>" % (factoryName,)
198
self.assertEqual(repr(unixPort), unconnectedString)
199
self.assertEqual(str(unixPort), unconnectedString)
200
d.addCallback(stoppedListening)
204
def test_reprWithClassicFactory(self):
206
The two string representations of the L{IListeningPort} returned by
207
L{IReactorUNIX.listenUNIX} contains the name of the classic factory
208
class being used and the filename on which the port is listening or
209
indicates that the port is not listening.
211
class ClassicFactory:
219
self.assertIsInstance(ClassicFactory, types.ClassType)
221
return self._reprTest(
222
ClassicFactory(), "twisted.test.test_unix.ClassicFactory")
225
def test_reprWithNewStyleFactory(self):
227
The two string representations of the L{IListeningPort} returned by
228
L{IReactorUNIX.listenUNIX} contains the name of the new-style factory
229
class being used and the filename on which the port is listening or
230
indicates that the port is not listening.
232
class NewStyleFactory(object):
240
self.assertIsInstance(NewStyleFactory, type)
242
return self._reprTest(
243
NewStyleFactory(), "twisted.test.test_unix.NewStyleFactory")
247
class ClientProto(protocol.ConnectedDatagramProtocol):
248
started = stopped = False
252
self.deferredStarted = defer.Deferred()
253
self.deferredGotBack = defer.Deferred()
255
def stopProtocol(self):
258
def startProtocol(self):
260
self.deferredStarted.callback(None)
262
def datagramReceived(self, data):
264
self.deferredGotBack.callback(None)
266
class ServerProto(protocol.DatagramProtocol):
267
started = stopped = False
268
gotwhat = gotfrom = None
271
self.deferredStarted = defer.Deferred()
272
self.deferredGotWhat = defer.Deferred()
274
def stopProtocol(self):
277
def startProtocol(self):
279
self.deferredStarted.callback(None)
281
def datagramReceived(self, data, addr):
283
self.transport.write("hi back", addr)
285
self.deferredGotWhat.callback(None)
289
class DatagramUnixSocketTestCase(unittest.TestCase):
291
Test datagram UNIX sockets.
293
def test_exchange(self):
295
Test that a datagram can be sent to and received by a server and vice
298
clientaddr = self.mktemp()
299
serveraddr = self.mktemp()
302
s = reactor.listenUNIXDatagram(serveraddr, sp)
303
self.addCleanup(s.stopListening)
304
c = reactor.connectUNIXDatagram(serveraddr, cp, bindAddress=clientaddr)
305
self.addCleanup(c.stopListening)
307
d = defer.gatherResults([sp.deferredStarted, cp.deferredStarted])
309
cp.transport.write("hi")
310
return defer.gatherResults([sp.deferredGotWhat,
313
def _cbTestExchange(ignored):
314
self.failUnlessEqual("hi", sp.gotwhat)
315
self.failUnlessEqual(clientaddr, sp.gotfrom)
316
self.failUnlessEqual("hi back", cp.gotback)
319
d.addCallback(_cbTestExchange)
323
def test_cannotListen(self):
325
L{IReactorUNIXDatagram.listenUNIXDatagram} raises
326
L{error.CannotListenError} if the unix socket specified is already in
331
s = reactor.listenUNIXDatagram(addr, p)
332
self.failUnlessRaises(error.CannotListenError, reactor.listenUNIXDatagram, addr, p)
336
# test connecting to bound and connected (somewhere else) address
338
def _reprTest(self, serverProto, protocolName):
340
Test the C{__str__} and C{__repr__} implementations of a UNIX datagram
341
port when used with the given protocol.
343
filename = self.mktemp()
344
unixPort = reactor.listenUNIXDatagram(filename, serverProto)
346
connectedString = "<%s on %r>" % (protocolName, filename)
347
self.assertEqual(repr(unixPort), connectedString)
348
self.assertEqual(str(unixPort), connectedString)
350
stopDeferred = defer.maybeDeferred(unixPort.stopListening)
351
def stoppedListening(ign):
352
unconnectedString = "<%s (not listening)>" % (protocolName,)
353
self.assertEqual(repr(unixPort), unconnectedString)
354
self.assertEqual(str(unixPort), unconnectedString)
355
stopDeferred.addCallback(stoppedListening)
359
def test_reprWithClassicProtocol(self):
361
The two string representations of the L{IListeningPort} returned by
362
L{IReactorUNIXDatagram.listenUNIXDatagram} contains the name of the
363
classic protocol class being used and the filename on which the port is
364
listening or indicates that the port is not listening.
366
class ClassicProtocol:
367
def makeConnection(self, transport):
374
self.assertIsInstance(ClassicProtocol, types.ClassType)
376
return self._reprTest(
377
ClassicProtocol(), "twisted.test.test_unix.ClassicProtocol")
380
def test_reprWithNewStyleProtocol(self):
382
The two string representations of the L{IListeningPort} returned by
383
L{IReactorUNIXDatagram.listenUNIXDatagram} contains the name of the
384
new-style protocol class being used and the filename on which the port
385
is listening or indicates that the port is not listening.
387
class NewStyleProtocol(object):
388
def makeConnection(self, transport):
395
self.assertIsInstance(NewStyleProtocol, type)
397
return self._reprTest(
398
NewStyleProtocol(), "twisted.test.test_unix.NewStyleProtocol")
402
if not interfaces.IReactorUNIX(reactor, None):
403
UnixSocketTestCase.skip = "This reactor does not support UNIX domain sockets"
404
if not interfaces.IReactorUNIXDatagram(reactor, None):
405
DatagramUnixSocketTestCase.skip = "This reactor does not support UNIX datagram sockets"