~jk0/nova/xs-ipv6

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/internet/unix.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- test-case-name: twisted.test.test_unix -*-
 
2
 
 
3
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
 
4
# See LICENSE for details.
 
5
 
 
6
 
 
7
"""Various asynchronous TCP/IP classes.
 
8
 
 
9
End users shouldn't use this module directly - use the reactor APIs instead.
 
10
 
 
11
Maintainer: Itamar Shtull-Trauring
 
12
"""
 
13
 
 
14
# System imports
 
15
import os, stat, socket
 
16
from errno import EINTR, EMSGSIZE, EAGAIN, EWOULDBLOCK, ECONNREFUSED
 
17
 
 
18
from zope.interface import implements, implementsOnly, implementedBy
 
19
 
 
20
if not hasattr(socket, 'AF_UNIX'):
 
21
    raise ImportError("UNIX sockets not supported on this platform")
 
22
 
 
23
# Twisted imports
 
24
from twisted.internet import base, tcp, udp, error, interfaces, protocol, address
 
25
from twisted.internet.error import CannotListenError
 
26
from twisted.python import lockfile, log, reflect, failure
 
27
 
 
28
 
 
29
class Server(tcp.Server):
 
30
    def __init__(self, sock, protocol, client, server, sessionno, reactor):
 
31
        tcp.Server.__init__(self, sock, protocol, (client, None), server, sessionno, reactor)
 
32
 
 
33
    def getHost(self):
 
34
        return address.UNIXAddress(self.socket.getsockname())
 
35
 
 
36
    def getPeer(self):
 
37
        return address.UNIXAddress(self.hostname)
 
38
 
 
39
 
 
40
class Port(tcp.Port):
 
41
    addressFamily = socket.AF_UNIX
 
42
    socketType = socket.SOCK_STREAM
 
43
 
 
44
    transport = Server
 
45
    lockFile = None
 
46
 
 
47
    def __init__(self, fileName, factory, backlog=50, mode=0666, reactor=None, wantPID = 0):
 
48
        tcp.Port.__init__(self, fileName, factory, backlog, reactor=reactor)
 
49
        self.mode = mode
 
50
        self.wantPID = wantPID
 
51
 
 
52
    def __repr__(self):
 
53
        factoryName = reflect.qual(self.factory.__class__)
 
54
        if hasattr(self, 'socket'):
 
55
            return '<%s on %r>' % (factoryName, self.port)
 
56
        else:
 
57
            return '<%s (not listening)>' % (factoryName,)
 
58
 
 
59
    def _buildAddr(self, name):
 
60
        return address.UNIXAddress(name)
 
61
 
 
62
    def startListening(self):
 
63
        """Create and bind my socket, and begin listening on it.
 
64
 
 
65
        This is called on unserialization, and must be called after creating a
 
66
        server to begin listening on the specified port.
 
67
        """
 
68
        log.msg("%s starting on %r" % (self.factory.__class__, repr(self.port)))
 
69
        if self.wantPID:
 
70
            self.lockFile = lockfile.FilesystemLock(self.port + ".lock")
 
71
            if not self.lockFile.lock():
 
72
                raise CannotListenError, (None, self.port, "Cannot acquire lock")
 
73
            else:
 
74
                if not self.lockFile.clean:
 
75
                    try:
 
76
                        # This is a best-attempt at cleaning up
 
77
                        # left-over unix sockets on the filesystem.
 
78
                        # If it fails, there's not much else we can
 
79
                        # do.  The bind() below will fail with an
 
80
                        # exception that actually propegates.
 
81
                        if stat.S_ISSOCK(os.stat(self.port).st_mode):
 
82
                            os.remove(self.port)
 
83
                    except:
 
84
                        pass
 
85
 
 
86
        self.factory.doStart()
 
87
        try:
 
88
            skt = self.createInternetSocket()
 
89
            skt.bind(self.port)
 
90
        except socket.error, le:
 
91
            raise CannotListenError, (None, self.port, le)
 
92
        else:
 
93
            # Make the socket readable and writable to the world.
 
94
            try:
 
95
                os.chmod(self.port, self.mode)
 
96
            except OSError: # probably not a visible filesystem name
 
97
                pass
 
98
            skt.listen(self.backlog)
 
99
            self.connected = True
 
100
            self.socket = skt
 
101
            self.fileno = self.socket.fileno
 
102
            self.numberAccepts = 100
 
103
            self.startReading()
 
104
 
 
105
    def connectionLost(self, reason):
 
106
        os.unlink(self.port)
 
107
        if self.lockFile is not None:
 
108
            self.lockFile.unlock()
 
109
        tcp.Port.connectionLost(self, reason)
 
110
 
 
111
    def getHost(self):
 
112
        """Returns a UNIXAddress.
 
113
 
 
114
        This indicates the server's address.
 
115
        """
 
116
        return address.UNIXAddress(self.socket.getsockname())
 
117
 
 
118
 
 
119
class Client(tcp.BaseClient):
 
120
    """A client for Unix sockets."""
 
121
    addressFamily = socket.AF_UNIX
 
122
    socketType = socket.SOCK_STREAM
 
123
 
 
124
    def __init__(self, filename, connector, reactor=None, checkPID = 0):
 
125
        self.connector = connector
 
126
        self.realAddress = self.addr = filename
 
127
        if checkPID and not lockfile.isLocked(filename + ".lock"):
 
128
            self._finishInit(None, None, error.BadFileError(filename), reactor)
 
129
        self._finishInit(self.doConnect, self.createInternetSocket(),
 
130
                         None, reactor)
 
131
 
 
132
    def getPeer(self):
 
133
        return address.UNIXAddress(self.addr)
 
134
 
 
135
    def getHost(self):
 
136
        return address.UNIXAddress(None)
 
137
 
 
138
 
 
139
class Connector(base.BaseConnector):
 
140
    def __init__(self, address, factory, timeout, reactor, checkPID):
 
141
        base.BaseConnector.__init__(self, factory, timeout, reactor)
 
142
        self.address = address
 
143
        self.checkPID = checkPID
 
144
 
 
145
    def _makeTransport(self):
 
146
        return Client(self.address, self, self.reactor, self.checkPID)
 
147
 
 
148
    def getDestination(self):
 
149
        return address.UNIXAddress(self.address)
 
150
 
 
151
 
 
152
class DatagramPort(udp.Port):
 
153
    """Datagram UNIX port, listening for packets."""
 
154
 
 
155
    implements(interfaces.IUNIXDatagramTransport)
 
156
 
 
157
    addressFamily = socket.AF_UNIX
 
158
 
 
159
    def __init__(self, addr, proto, maxPacketSize=8192, mode=0666, reactor=None):
 
160
        """Initialize with address to listen on.
 
161
        """
 
162
        udp.Port.__init__(self, addr, proto, maxPacketSize=maxPacketSize, reactor=reactor)
 
163
        self.mode = mode
 
164
 
 
165
 
 
166
    def __repr__(self):
 
167
        protocolName = reflect.qual(self.protocol.__class__,)
 
168
        if hasattr(self, 'socket'):
 
169
            return '<%s on %r>' % (protocolName, self.port)
 
170
        else:
 
171
            return '<%s (not listening)>' % (protocolName,)
 
172
 
 
173
 
 
174
    def _bindSocket(self):
 
175
        log.msg("%s starting on %s"%(self.protocol.__class__, repr(self.port)))
 
176
        try:
 
177
            skt = self.createInternetSocket() # XXX: haha misnamed method
 
178
            if self.port:
 
179
                skt.bind(self.port)
 
180
        except socket.error, le:
 
181
            raise error.CannotListenError, (None, self.port, le)
 
182
        if self.port:
 
183
            try:
 
184
                os.chmod(self.port, self.mode)
 
185
            except: # probably not a visible filesystem name
 
186
                pass
 
187
        self.connected = 1
 
188
        self.socket = skt
 
189
        self.fileno = self.socket.fileno
 
190
 
 
191
    def write(self, datagram, address):
 
192
        """Write a datagram."""
 
193
        try:
 
194
            return self.socket.sendto(datagram, address)
 
195
        except socket.error, se:
 
196
            no = se.args[0]
 
197
            if no == EINTR:
 
198
                return self.write(datagram, address)
 
199
            elif no == EMSGSIZE:
 
200
                raise error.MessageLengthError, "message too long"
 
201
            elif no == EAGAIN:
 
202
                # oh, well, drop the data. The only difference from UDP
 
203
                # is that UDP won't ever notice.
 
204
                # TODO: add TCP-like buffering
 
205
                pass
 
206
            else:
 
207
                raise
 
208
 
 
209
    def connectionLost(self, reason=None):
 
210
        """Cleans up my socket.
 
211
        """
 
212
        log.msg('(Port %s Closed)' % repr(self.port))
 
213
        base.BasePort.connectionLost(self, reason)
 
214
        if hasattr(self, "protocol"):
 
215
            # we won't have attribute in ConnectedPort, in cases
 
216
            # where there was an error in connection process
 
217
            self.protocol.doStop()
 
218
        self.connected = 0
 
219
        self.socket.close()
 
220
        del self.socket
 
221
        del self.fileno
 
222
        if hasattr(self, "d"):
 
223
            self.d.callback(None)
 
224
            del self.d
 
225
 
 
226
    def setLogStr(self):
 
227
        self.logstr = reflect.qual(self.protocol.__class__) + " (UDP)"
 
228
 
 
229
    def getHost(self):
 
230
        return address.UNIXAddress(self.socket.getsockname())
 
231
 
 
232
 
 
233
class ConnectedDatagramPort(DatagramPort):
 
234
    """A connected datagram UNIX socket."""
 
235
 
 
236
    implementsOnly(interfaces.IUNIXDatagramConnectedTransport,
 
237
                   *(implementedBy(base.BasePort)))
 
238
 
 
239
    def __init__(self, addr, proto, maxPacketSize=8192, mode=0666, bindAddress=None, reactor=None):
 
240
        assert isinstance(proto, protocol.ConnectedDatagramProtocol)
 
241
        DatagramPort.__init__(self, bindAddress, proto, maxPacketSize, mode, reactor)
 
242
        self.remoteaddr = addr
 
243
 
 
244
    def startListening(self):
 
245
        try:
 
246
            self._bindSocket()
 
247
            self.socket.connect(self.remoteaddr)
 
248
            self._connectToProtocol()
 
249
        except:
 
250
            self.connectionFailed(failure.Failure())
 
251
 
 
252
    def connectionFailed(self, reason):
 
253
        self.loseConnection()
 
254
        self.protocol.connectionFailed(reason)
 
255
        del self.protocol
 
256
 
 
257
    def doRead(self):
 
258
        """Called when my socket is ready for reading."""
 
259
        read = 0
 
260
        while read < self.maxThroughput:
 
261
            try:
 
262
                data, addr = self.socket.recvfrom(self.maxPacketSize)
 
263
                read += len(data)
 
264
                self.protocol.datagramReceived(data)
 
265
            except socket.error, se:
 
266
                no = se.args[0]
 
267
                if no in (EAGAIN, EINTR, EWOULDBLOCK):
 
268
                    return
 
269
                if no == ECONNREFUSED:
 
270
                    self.protocol.connectionRefused()
 
271
                else:
 
272
                    raise
 
273
            except:
 
274
                log.deferr()
 
275
 
 
276
    def write(self, data):
 
277
        """Write a datagram."""
 
278
        try:
 
279
            return self.socket.send(data)
 
280
        except socket.error, se:
 
281
            no = se.args[0]
 
282
            if no == EINTR:
 
283
                return self.write(data)
 
284
            elif no == EMSGSIZE:
 
285
                raise error.MessageLengthError, "message too long"
 
286
            elif no == ECONNREFUSED:
 
287
                self.protocol.connectionRefused()
 
288
            elif no == EAGAIN:
 
289
                # oh, well, drop the data. The only difference from UDP
 
290
                # is that UDP won't ever notice.
 
291
                # TODO: add TCP-like buffering
 
292
                pass
 
293
            else:
 
294
                raise
 
295
 
 
296
    def getPeer(self):
 
297
        return address.UNIXAddress(self.remoteaddr)