~cbehrens/nova/lp844160-build-works-with-zones

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/internet/udp.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- test-case-name: twisted.test.test_udp -*-
 
2
# Copyright (c) 2001-2009 Twisted Matrix Laboratories.
 
3
# See LICENSE for details.
 
4
 
 
5
"""
 
6
Various asynchronous UDP classes.
 
7
 
 
8
Please do not use this module directly.
 
9
 
 
10
Maintainer: Itamar Shtull-Trauring
 
11
"""
 
12
 
 
13
# System Imports
 
14
import socket
 
15
import operator
 
16
import struct
 
17
import warnings
 
18
 
 
19
from zope.interface import implements
 
20
 
 
21
from twisted.python.runtime import platformType
 
22
if platformType == 'win32':
 
23
    from errno import WSAEWOULDBLOCK as EWOULDBLOCK
 
24
    from errno import WSAEINTR as EINTR
 
25
    from errno import WSAEMSGSIZE as EMSGSIZE
 
26
    from errno import WSAECONNREFUSED as ECONNREFUSED
 
27
    from errno import WSAECONNRESET
 
28
    EAGAIN=EWOULDBLOCK
 
29
else:
 
30
    from errno import EWOULDBLOCK, EINTR, EMSGSIZE, ECONNREFUSED, EAGAIN
 
31
 
 
32
# Twisted Imports
 
33
from twisted.internet import base, defer, address
 
34
from twisted.python import log, reflect, failure
 
35
from twisted.internet import abstract, error, interfaces
 
36
 
 
37
 
 
38
class Port(base.BasePort):
 
39
    """UDP port, listening for packets."""
 
40
 
 
41
    implements(interfaces.IUDPTransport, interfaces.ISystemHandle)
 
42
 
 
43
    addressFamily = socket.AF_INET
 
44
    socketType = socket.SOCK_DGRAM
 
45
    maxThroughput = 256 * 1024 # max bytes we read in one eventloop iteration
 
46
 
 
47
    # Actual port number being listened on, only set to a non-None
 
48
    # value when we are actually listening.
 
49
    _realPortNumber = None
 
50
 
 
51
    def __init__(self, port, proto, interface='', maxPacketSize=8192, reactor=None):
 
52
        """Initialize with a numeric port to listen on.
 
53
        """
 
54
        base.BasePort.__init__(self, reactor)
 
55
        self.port = port
 
56
        self.protocol = proto
 
57
        self.maxPacketSize = maxPacketSize
 
58
        self.interface = interface
 
59
        self.setLogStr()
 
60
        self._connectedAddr = None
 
61
 
 
62
    def __repr__(self):
 
63
        if self._realPortNumber is not None:
 
64
            return "<%s on %s>" % (self.protocol.__class__, self._realPortNumber)
 
65
        else:
 
66
            return "<%s not connected>" % (self.protocol.__class__,)
 
67
 
 
68
    def getHandle(self):
 
69
        """Return a socket object."""
 
70
        return self.socket
 
71
 
 
72
    def startListening(self):
 
73
        """Create and bind my socket, and begin listening on it.
 
74
 
 
75
        This is called on unserialization, and must be called after creating a
 
76
        server to begin listening on the specified port.
 
77
        """
 
78
        self._bindSocket()
 
79
        self._connectToProtocol()
 
80
 
 
81
    def _bindSocket(self):
 
82
        try:
 
83
            skt = self.createInternetSocket()
 
84
            skt.bind((self.interface, self.port))
 
85
        except socket.error, le:
 
86
            raise error.CannotListenError, (self.interface, self.port, le)
 
87
 
 
88
        # Make sure that if we listened on port 0, we update that to
 
89
        # reflect what the OS actually assigned us.
 
90
        self._realPortNumber = skt.getsockname()[1]
 
91
 
 
92
        log.msg("%s starting on %s"%(self.protocol.__class__, self._realPortNumber))
 
93
 
 
94
        self.connected = 1
 
95
        self.socket = skt
 
96
        self.fileno = self.socket.fileno
 
97
 
 
98
    def _connectToProtocol(self):
 
99
        self.protocol.makeConnection(self)
 
100
        self.startReading()
 
101
 
 
102
 
 
103
    def doRead(self):
 
104
        """Called when my socket is ready for reading."""
 
105
        read = 0
 
106
        while read < self.maxThroughput:
 
107
            try:
 
108
                data, addr = self.socket.recvfrom(self.maxPacketSize)
 
109
            except socket.error, se:
 
110
                no = se.args[0]
 
111
                if no in (EAGAIN, EINTR, EWOULDBLOCK):
 
112
                    return
 
113
                if (no == ECONNREFUSED) or (platformType == "win32" and no == WSAECONNRESET):
 
114
                    if self._connectedAddr:
 
115
                        self.protocol.connectionRefused()
 
116
                else:
 
117
                    raise
 
118
            else:
 
119
                read += len(data)
 
120
                try:
 
121
                    self.protocol.datagramReceived(data, addr)
 
122
                except:
 
123
                    log.err()
 
124
 
 
125
 
 
126
    def write(self, datagram, addr=None):
 
127
        """Write a datagram.
 
128
 
 
129
        @param addr: should be a tuple (ip, port), can be None in connected mode.
 
130
        """
 
131
        if self._connectedAddr:
 
132
            assert addr in (None, self._connectedAddr)
 
133
            try:
 
134
                return self.socket.send(datagram)
 
135
            except socket.error, se:
 
136
                no = se.args[0]
 
137
                if no == EINTR:
 
138
                    return self.write(datagram)
 
139
                elif no == EMSGSIZE:
 
140
                    raise error.MessageLengthError, "message too long"
 
141
                elif no == ECONNREFUSED:
 
142
                    self.protocol.connectionRefused()
 
143
                else:
 
144
                    raise
 
145
        else:
 
146
            assert addr != None
 
147
            if not addr[0].replace(".", "").isdigit():
 
148
                warnings.warn("Please only pass IPs to write(), not hostnames", DeprecationWarning, stacklevel=2)
 
149
            try:
 
150
                return self.socket.sendto(datagram, addr)
 
151
            except socket.error, se:
 
152
                no = se.args[0]
 
153
                if no == EINTR:
 
154
                    return self.write(datagram, addr)
 
155
                elif no == EMSGSIZE:
 
156
                    raise error.MessageLengthError, "message too long"
 
157
                elif no == ECONNREFUSED:
 
158
                    # in non-connected UDP ECONNREFUSED is platform dependent, I think
 
159
                    # and the info is not necessarily useful. Nevertheless maybe we
 
160
                    # should call connectionRefused? XXX
 
161
                    return
 
162
                else:
 
163
                    raise
 
164
 
 
165
    def writeSequence(self, seq, addr):
 
166
        self.write("".join(seq), addr)
 
167
 
 
168
    def connect(self, host, port):
 
169
        """'Connect' to remote server."""
 
170
        if self._connectedAddr:
 
171
            raise RuntimeError, "already connected, reconnecting is not currently supported (talk to itamar if you want this)"
 
172
        if not abstract.isIPAddress(host):
 
173
            raise ValueError, "please pass only IP addresses, not domain names"
 
174
        self._connectedAddr = (host, port)
 
175
        self.socket.connect((host, port))
 
176
 
 
177
    def _loseConnection(self):
 
178
        self.stopReading()
 
179
        if self.connected: # actually means if we are *listening*
 
180
            from twisted.internet import reactor
 
181
            reactor.callLater(0, self.connectionLost)
 
182
 
 
183
    def stopListening(self):
 
184
        if self.connected:
 
185
            result = self.d = defer.Deferred()
 
186
        else:
 
187
            result = None
 
188
        self._loseConnection()
 
189
        return result
 
190
 
 
191
    def loseConnection(self):
 
192
        warnings.warn("Please use stopListening() to disconnect port", DeprecationWarning, stacklevel=2)
 
193
        self.stopListening()
 
194
 
 
195
    def connectionLost(self, reason=None):
 
196
        """Cleans up my socket.
 
197
        """
 
198
        log.msg('(Port %s Closed)' % self._realPortNumber)
 
199
        self._realPortNumber = None
 
200
        base.BasePort.connectionLost(self, reason)
 
201
        self.protocol.doStop()
 
202
        self.connected = 0
 
203
        self.socket.close()
 
204
        del self.socket
 
205
        del self.fileno
 
206
        if hasattr(self, "d"):
 
207
            self.d.callback(None)
 
208
            del self.d
 
209
 
 
210
    def setLogStr(self):
 
211
        self.logstr = reflect.qual(self.protocol.__class__) + " (UDP)"
 
212
 
 
213
    def logPrefix(self):
 
214
        """Returns the name of my class, to prefix log entries with.
 
215
        """
 
216
        return self.logstr
 
217
 
 
218
    def getHost(self):
 
219
        """
 
220
        Returns an IPv4Address.
 
221
 
 
222
        This indicates the address from which I am connecting.
 
223
        """
 
224
        return address.IPv4Address('UDP', *(self.socket.getsockname() + ('INET_UDP',)))
 
225
 
 
226
 
 
227
 
 
228
class MulticastMixin:
 
229
    """Implement multicast functionality."""
 
230
 
 
231
    def getOutgoingInterface(self):
 
232
        i = self.socket.getsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF)
 
233
        return socket.inet_ntoa(struct.pack("@i", i))
 
234
 
 
235
    def setOutgoingInterface(self, addr):
 
236
        """Returns Deferred of success."""
 
237
        return self.reactor.resolve(addr).addCallback(self._setInterface)
 
238
 
 
239
    def _setInterface(self, addr):
 
240
        i = socket.inet_aton(addr)
 
241
        self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, i)
 
242
        return 1
 
243
 
 
244
    def getLoopbackMode(self):
 
245
        return self.socket.getsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP)
 
246
 
 
247
    def setLoopbackMode(self, mode):
 
248
        mode = struct.pack("b", operator.truth(mode))
 
249
        self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP, mode)
 
250
 
 
251
    def getTTL(self):
 
252
        return self.socket.getsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL)
 
253
 
 
254
    def setTTL(self, ttl):
 
255
        ttl = struct.pack("B", ttl)
 
256
        self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl)
 
257
 
 
258
    def joinGroup(self, addr, interface=""):
 
259
        """Join a multicast group. Returns Deferred of success."""
 
260
        return self.reactor.resolve(addr).addCallback(self._joinAddr1, interface, 1)
 
261
 
 
262
    def _joinAddr1(self, addr, interface, join):
 
263
        return self.reactor.resolve(interface).addCallback(self._joinAddr2, addr, join)
 
264
 
 
265
    def _joinAddr2(self, interface, addr, join):
 
266
        addr = socket.inet_aton(addr)
 
267
        interface = socket.inet_aton(interface)
 
268
        if join:
 
269
            cmd = socket.IP_ADD_MEMBERSHIP
 
270
        else:
 
271
            cmd = socket.IP_DROP_MEMBERSHIP
 
272
        try:
 
273
            self.socket.setsockopt(socket.IPPROTO_IP, cmd, addr + interface)
 
274
        except socket.error, e:
 
275
            return failure.Failure(error.MulticastJoinError(addr, interface, *e.args))
 
276
 
 
277
    def leaveGroup(self, addr, interface=""):
 
278
        """Leave multicast group, return Deferred of success."""
 
279
        return self.reactor.resolve(addr).addCallback(self._joinAddr1, interface, 0)
 
280
 
 
281
 
 
282
class MulticastPort(MulticastMixin, Port):
 
283
    """UDP Port that supports multicasting."""
 
284
 
 
285
    implements(interfaces.IMulticastTransport)
 
286
 
 
287
    def __init__(self, port, proto, interface='', maxPacketSize=8192, reactor=None, listenMultiple=False):
 
288
        Port.__init__(self, port, proto, interface, maxPacketSize, reactor)
 
289
        self.listenMultiple = listenMultiple
 
290
 
 
291
    def createInternetSocket(self):
 
292
        skt = Port.createInternetSocket(self)
 
293
        if self.listenMultiple:
 
294
            skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 
295
            if hasattr(socket, "SO_REUSEPORT"):
 
296
                skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
 
297
        return skt