1
# -*- test-case-name: twisted.test.test_udp -*-
2
# Copyright (c) 2001-2009 Twisted Matrix Laboratories.
3
# See LICENSE for details.
6
Various asynchronous UDP classes.
8
Please do not use this module directly.
10
Maintainer: Itamar Shtull-Trauring
19
from zope.interface import implements
21
from twisted.python.runtime import platformType
22
if platformType == 'win32':
23
from errno import WSAEWOULDBLOCK as EWOULDBLOCK
24
from errno import WSAEINTR as EINTR
25
from errno import WSAEMSGSIZE as EMSGSIZE
26
from errno import WSAECONNREFUSED as ECONNREFUSED
27
from errno import WSAECONNRESET
30
from errno import EWOULDBLOCK, EINTR, EMSGSIZE, ECONNREFUSED, EAGAIN
33
from twisted.internet import base, defer, address
34
from twisted.python import log, reflect, failure
35
from twisted.internet import abstract, error, interfaces
38
class Port(base.BasePort):
39
"""UDP port, listening for packets."""
41
implements(interfaces.IUDPTransport, interfaces.ISystemHandle)
43
addressFamily = socket.AF_INET
44
socketType = socket.SOCK_DGRAM
45
maxThroughput = 256 * 1024 # max bytes we read in one eventloop iteration
47
# Actual port number being listened on, only set to a non-None
48
# value when we are actually listening.
49
_realPortNumber = None
51
def __init__(self, port, proto, interface='', maxPacketSize=8192, reactor=None):
52
"""Initialize with a numeric port to listen on.
54
base.BasePort.__init__(self, reactor)
57
self.maxPacketSize = maxPacketSize
58
self.interface = interface
60
self._connectedAddr = None
63
if self._realPortNumber is not None:
64
return "<%s on %s>" % (self.protocol.__class__, self._realPortNumber)
66
return "<%s not connected>" % (self.protocol.__class__,)
69
"""Return a socket object."""
72
def startListening(self):
73
"""Create and bind my socket, and begin listening on it.
75
This is called on unserialization, and must be called after creating a
76
server to begin listening on the specified port.
79
self._connectToProtocol()
81
def _bindSocket(self):
83
skt = self.createInternetSocket()
84
skt.bind((self.interface, self.port))
85
except socket.error, le:
86
raise error.CannotListenError, (self.interface, self.port, le)
88
# Make sure that if we listened on port 0, we update that to
89
# reflect what the OS actually assigned us.
90
self._realPortNumber = skt.getsockname()[1]
92
log.msg("%s starting on %s"%(self.protocol.__class__, self._realPortNumber))
96
self.fileno = self.socket.fileno
98
def _connectToProtocol(self):
99
self.protocol.makeConnection(self)
104
"""Called when my socket is ready for reading."""
106
while read < self.maxThroughput:
108
data, addr = self.socket.recvfrom(self.maxPacketSize)
109
except socket.error, se:
111
if no in (EAGAIN, EINTR, EWOULDBLOCK):
113
if (no == ECONNREFUSED) or (platformType == "win32" and no == WSAECONNRESET):
114
if self._connectedAddr:
115
self.protocol.connectionRefused()
121
self.protocol.datagramReceived(data, addr)
126
def write(self, datagram, addr=None):
129
@param addr: should be a tuple (ip, port), can be None in connected mode.
131
if self._connectedAddr:
132
assert addr in (None, self._connectedAddr)
134
return self.socket.send(datagram)
135
except socket.error, se:
138
return self.write(datagram)
140
raise error.MessageLengthError, "message too long"
141
elif no == ECONNREFUSED:
142
self.protocol.connectionRefused()
147
if not addr[0].replace(".", "").isdigit():
148
warnings.warn("Please only pass IPs to write(), not hostnames", DeprecationWarning, stacklevel=2)
150
return self.socket.sendto(datagram, addr)
151
except socket.error, se:
154
return self.write(datagram, addr)
156
raise error.MessageLengthError, "message too long"
157
elif no == ECONNREFUSED:
158
# in non-connected UDP ECONNREFUSED is platform dependent, I think
159
# and the info is not necessarily useful. Nevertheless maybe we
160
# should call connectionRefused? XXX
165
def writeSequence(self, seq, addr):
166
self.write("".join(seq), addr)
168
def connect(self, host, port):
169
"""'Connect' to remote server."""
170
if self._connectedAddr:
171
raise RuntimeError, "already connected, reconnecting is not currently supported (talk to itamar if you want this)"
172
if not abstract.isIPAddress(host):
173
raise ValueError, "please pass only IP addresses, not domain names"
174
self._connectedAddr = (host, port)
175
self.socket.connect((host, port))
177
def _loseConnection(self):
179
if self.connected: # actually means if we are *listening*
180
from twisted.internet import reactor
181
reactor.callLater(0, self.connectionLost)
183
def stopListening(self):
185
result = self.d = defer.Deferred()
188
self._loseConnection()
191
def loseConnection(self):
192
warnings.warn("Please use stopListening() to disconnect port", DeprecationWarning, stacklevel=2)
195
def connectionLost(self, reason=None):
196
"""Cleans up my socket.
198
log.msg('(Port %s Closed)' % self._realPortNumber)
199
self._realPortNumber = None
200
base.BasePort.connectionLost(self, reason)
201
self.protocol.doStop()
206
if hasattr(self, "d"):
207
self.d.callback(None)
211
self.logstr = reflect.qual(self.protocol.__class__) + " (UDP)"
214
"""Returns the name of my class, to prefix log entries with.
220
Returns an IPv4Address.
222
This indicates the address from which I am connecting.
224
return address.IPv4Address('UDP', *(self.socket.getsockname() + ('INET_UDP',)))
228
class MulticastMixin:
229
"""Implement multicast functionality."""
231
def getOutgoingInterface(self):
232
i = self.socket.getsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF)
233
return socket.inet_ntoa(struct.pack("@i", i))
235
def setOutgoingInterface(self, addr):
236
"""Returns Deferred of success."""
237
return self.reactor.resolve(addr).addCallback(self._setInterface)
239
def _setInterface(self, addr):
240
i = socket.inet_aton(addr)
241
self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, i)
244
def getLoopbackMode(self):
245
return self.socket.getsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP)
247
def setLoopbackMode(self, mode):
248
mode = struct.pack("b", operator.truth(mode))
249
self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP, mode)
252
return self.socket.getsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL)
254
def setTTL(self, ttl):
255
ttl = struct.pack("B", ttl)
256
self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl)
258
def joinGroup(self, addr, interface=""):
259
"""Join a multicast group. Returns Deferred of success."""
260
return self.reactor.resolve(addr).addCallback(self._joinAddr1, interface, 1)
262
def _joinAddr1(self, addr, interface, join):
263
return self.reactor.resolve(interface).addCallback(self._joinAddr2, addr, join)
265
def _joinAddr2(self, interface, addr, join):
266
addr = socket.inet_aton(addr)
267
interface = socket.inet_aton(interface)
269
cmd = socket.IP_ADD_MEMBERSHIP
271
cmd = socket.IP_DROP_MEMBERSHIP
273
self.socket.setsockopt(socket.IPPROTO_IP, cmd, addr + interface)
274
except socket.error, e:
275
return failure.Failure(error.MulticastJoinError(addr, interface, *e.args))
277
def leaveGroup(self, addr, interface=""):
278
"""Leave multicast group, return Deferred of success."""
279
return self.reactor.resolve(addr).addCallback(self._joinAddr1, interface, 0)
282
class MulticastPort(MulticastMixin, Port):
283
"""UDP Port that supports multicasting."""
285
implements(interfaces.IMulticastTransport)
287
def __init__(self, port, proto, interface='', maxPacketSize=8192, reactor=None, listenMultiple=False):
288
Port.__init__(self, port, proto, interface, maxPacketSize, reactor)
289
self.listenMultiple = listenMultiple
291
def createInternetSocket(self):
292
skt = Port.createInternetSocket(self)
293
if self.listenMultiple:
294
skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
295
if hasattr(socket, "SO_REUSEPORT"):
296
skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)