1
# -*- test-case-name: twisted.test.test_unix -*-
3
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
4
# See LICENSE for details.
7
"""Various asynchronous TCP/IP classes.
9
End users shouldn't use this module directly - use the reactor APIs instead.
11
Maintainer: Itamar Shtull-Trauring
15
import os, stat, socket
16
from errno import EINTR, EMSGSIZE, EAGAIN, EWOULDBLOCK, ECONNREFUSED
18
from zope.interface import implements, implementsOnly, implementedBy
20
if not hasattr(socket, 'AF_UNIX'):
21
raise ImportError("UNIX sockets not supported on this platform")
24
from twisted.internet import base, tcp, udp, error, interfaces, protocol, address
25
from twisted.internet.error import CannotListenError
26
from twisted.python import lockfile, log, reflect, failure
29
class Server(tcp.Server):
30
def __init__(self, sock, protocol, client, server, sessionno, reactor):
31
tcp.Server.__init__(self, sock, protocol, (client, None), server, sessionno, reactor)
34
return address.UNIXAddress(self.socket.getsockname())
37
return address.UNIXAddress(self.hostname)
41
addressFamily = socket.AF_UNIX
42
socketType = socket.SOCK_STREAM
47
def __init__(self, fileName, factory, backlog=50, mode=0666, reactor=None, wantPID = 0):
48
tcp.Port.__init__(self, fileName, factory, backlog, reactor=reactor)
50
self.wantPID = wantPID
53
factoryName = reflect.qual(self.factory.__class__)
54
if hasattr(self, 'socket'):
55
return '<%s on %r>' % (factoryName, self.port)
57
return '<%s (not listening)>' % (factoryName,)
59
def _buildAddr(self, name):
60
return address.UNIXAddress(name)
62
def startListening(self):
63
"""Create and bind my socket, and begin listening on it.
65
This is called on unserialization, and must be called after creating a
66
server to begin listening on the specified port.
68
log.msg("%s starting on %r" % (self.factory.__class__, repr(self.port)))
70
self.lockFile = lockfile.FilesystemLock(self.port + ".lock")
71
if not self.lockFile.lock():
72
raise CannotListenError, (None, self.port, "Cannot acquire lock")
74
if not self.lockFile.clean:
76
# This is a best-attempt at cleaning up
77
# left-over unix sockets on the filesystem.
78
# If it fails, there's not much else we can
79
# do. The bind() below will fail with an
80
# exception that actually propegates.
81
if stat.S_ISSOCK(os.stat(self.port).st_mode):
86
self.factory.doStart()
88
skt = self.createInternetSocket()
90
except socket.error, le:
91
raise CannotListenError, (None, self.port, le)
93
# Make the socket readable and writable to the world.
95
os.chmod(self.port, self.mode)
96
except OSError: # probably not a visible filesystem name
98
skt.listen(self.backlog)
101
self.fileno = self.socket.fileno
102
self.numberAccepts = 100
105
def connectionLost(self, reason):
107
if self.lockFile is not None:
108
self.lockFile.unlock()
109
tcp.Port.connectionLost(self, reason)
112
"""Returns a UNIXAddress.
114
This indicates the server's address.
116
return address.UNIXAddress(self.socket.getsockname())
119
class Client(tcp.BaseClient):
120
"""A client for Unix sockets."""
121
addressFamily = socket.AF_UNIX
122
socketType = socket.SOCK_STREAM
124
def __init__(self, filename, connector, reactor=None, checkPID = 0):
125
self.connector = connector
126
self.realAddress = self.addr = filename
127
if checkPID and not lockfile.isLocked(filename + ".lock"):
128
self._finishInit(None, None, error.BadFileError(filename), reactor)
129
self._finishInit(self.doConnect, self.createInternetSocket(),
133
return address.UNIXAddress(self.addr)
136
return address.UNIXAddress(None)
139
class Connector(base.BaseConnector):
140
def __init__(self, address, factory, timeout, reactor, checkPID):
141
base.BaseConnector.__init__(self, factory, timeout, reactor)
142
self.address = address
143
self.checkPID = checkPID
145
def _makeTransport(self):
146
return Client(self.address, self, self.reactor, self.checkPID)
148
def getDestination(self):
149
return address.UNIXAddress(self.address)
152
class DatagramPort(udp.Port):
153
"""Datagram UNIX port, listening for packets."""
155
implements(interfaces.IUNIXDatagramTransport)
157
addressFamily = socket.AF_UNIX
159
def __init__(self, addr, proto, maxPacketSize=8192, mode=0666, reactor=None):
160
"""Initialize with address to listen on.
162
udp.Port.__init__(self, addr, proto, maxPacketSize=maxPacketSize, reactor=reactor)
167
protocolName = reflect.qual(self.protocol.__class__,)
168
if hasattr(self, 'socket'):
169
return '<%s on %r>' % (protocolName, self.port)
171
return '<%s (not listening)>' % (protocolName,)
174
def _bindSocket(self):
175
log.msg("%s starting on %s"%(self.protocol.__class__, repr(self.port)))
177
skt = self.createInternetSocket() # XXX: haha misnamed method
180
except socket.error, le:
181
raise error.CannotListenError, (None, self.port, le)
184
os.chmod(self.port, self.mode)
185
except: # probably not a visible filesystem name
189
self.fileno = self.socket.fileno
191
def write(self, datagram, address):
192
"""Write a datagram."""
194
return self.socket.sendto(datagram, address)
195
except socket.error, se:
198
return self.write(datagram, address)
200
raise error.MessageLengthError, "message too long"
202
# oh, well, drop the data. The only difference from UDP
203
# is that UDP won't ever notice.
204
# TODO: add TCP-like buffering
209
def connectionLost(self, reason=None):
210
"""Cleans up my socket.
212
log.msg('(Port %s Closed)' % repr(self.port))
213
base.BasePort.connectionLost(self, reason)
214
if hasattr(self, "protocol"):
215
# we won't have attribute in ConnectedPort, in cases
216
# where there was an error in connection process
217
self.protocol.doStop()
222
if hasattr(self, "d"):
223
self.d.callback(None)
227
self.logstr = reflect.qual(self.protocol.__class__) + " (UDP)"
230
return address.UNIXAddress(self.socket.getsockname())
233
class ConnectedDatagramPort(DatagramPort):
234
"""A connected datagram UNIX socket."""
236
implementsOnly(interfaces.IUNIXDatagramConnectedTransport,
237
*(implementedBy(base.BasePort)))
239
def __init__(self, addr, proto, maxPacketSize=8192, mode=0666, bindAddress=None, reactor=None):
240
assert isinstance(proto, protocol.ConnectedDatagramProtocol)
241
DatagramPort.__init__(self, bindAddress, proto, maxPacketSize, mode, reactor)
242
self.remoteaddr = addr
244
def startListening(self):
247
self.socket.connect(self.remoteaddr)
248
self._connectToProtocol()
250
self.connectionFailed(failure.Failure())
252
def connectionFailed(self, reason):
253
self.loseConnection()
254
self.protocol.connectionFailed(reason)
258
"""Called when my socket is ready for reading."""
260
while read < self.maxThroughput:
262
data, addr = self.socket.recvfrom(self.maxPacketSize)
264
self.protocol.datagramReceived(data)
265
except socket.error, se:
267
if no in (EAGAIN, EINTR, EWOULDBLOCK):
269
if no == ECONNREFUSED:
270
self.protocol.connectionRefused()
276
def write(self, data):
277
"""Write a datagram."""
279
return self.socket.send(data)
280
except socket.error, se:
283
return self.write(data)
285
raise error.MessageLengthError, "message too long"
286
elif no == ECONNREFUSED:
287
self.protocol.connectionRefused()
289
# oh, well, drop the data. The only difference from UDP
290
# is that UDP won't ever notice.
291
# TODO: add TCP-like buffering
297
return address.UNIXAddress(self.remoteaddr)