1
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
2
# See LICENSE for details.
9
from twisted.internet import interfaces, defer, error, protocol, address
10
from twisted.internet.udp import MulticastMixin
11
from twisted.internet.abstract import isIPAddress
12
from twisted.persisted import styles
13
from twisted.python import log, failure, reflect
15
from ops import ReadFileOp, WriteFileOp, WSARecvFromOp, WSASendToOp
16
from util import StateEventMachineType
17
from zope.interface import implements
19
ERROR_PORT_UNREACHABLE = 1234
21
class Port(log.Logger, styles.Ephemeral, object):
22
__metaclass__ = StateEventMachineType
23
implements(interfaces.IUDPTransport)
24
events = ["startListening", "stopListening", "write", "readDone", "readErr", "writeDone", "writeErr", "connect"]
25
sockinfo = (socket.AF_INET, socket.SOCK_DGRAM, 0)
26
read_op_class = WSARecvFromOp
27
write_op_class = WSASendToOp
29
# Actual port number being listened on, only set to a non-None
30
# value when we are actually listening.
31
_realPortNumber = None
32
disconnected = property(lambda self: self.state == "disconnected")
34
def __init__(self, bindAddress, proto, maxPacketSize=8192):
35
assert isinstance(proto, protocol.DatagramProtocol)
36
self.state = "disconnected"
37
from twisted.internet import reactor
38
self.bindAddress = bindAddress
39
self._connectedAddr = None
41
self.maxPacketSize = maxPacketSize
42
self.logstr = reflect.qual(self.protocol.__class__) + " (UDP)"
43
self.read_op = self.read_op_class(self)
44
self.readbuf = reactor.AllocateReadBuffer(maxPacketSize)
45
self.reactor = reactor
48
if self._realPortNumber is not None:
49
return "<%s on %s>" % (self.protocol.__class__, self._realPortNumber)
51
return "<%s not connected>" % (self.protocol.__class__,)
53
def handle_listening_connect(self, host, port):
54
if not isIPAddress(host):
55
raise ValueError, "please pass only IP addresses, not domain names"
56
self.state = "connecting"
57
return defer.maybeDeferred(self._connectDone, host, port)
59
def handle_connecting_connect(self, host, port):
60
raise RuntimeError, "already connected, reconnecting is not currently supported (talk to itamar if you want this)"
61
handle_connected_connect = handle_connecting_connect
63
def _connectDone(self, host, port):
64
self._connectedAddr = (host, port)
65
self.state = "connected"
66
self.socket.connect((host, port))
67
return self._connectedAddr
69
def handle_disconnected_startListening(self):
71
host, port = self.bindAddress
73
return defer.maybeDeferred(self._connectSocket, host)
75
d = self.reactor.resolve(host)
76
d.addCallback(self._connectSocket)
79
def _bindSocket(self):
81
skt = socket.socket(*self.sockinfo)
82
skt.bind(self.bindAddress)
83
# print "bound %s to %s" % (skt.fileno(), self.bindAddress)
84
except socket.error, le:
85
raise error.CannotListenError, (None, None, le)
87
# Make sure that if we listened on port 0, we update that to
88
# reflect what the OS actually assigned us.
89
self._realPortNumber = skt.getsockname()[1]
91
log.msg("%s starting on %s"%(self.protocol.__class__, self._realPortNumber))
95
def _connectSocket(self, host):
96
self.bindAddress = (host, self.bindAddress[1])
97
self.protocol.makeConnection(self)
99
self.state = "listening"
101
def startReading(self):
104
self.read_op.initiateOp(self.socket.fileno(), self.readbuf)
105
except WindowsError, we:
106
log.msg("initiating read failed with args %s" % (we,))
108
def stopReading(self):
111
def handle_listening_readDone(self, bytes, addr = None):
114
self.protocol.datagramReceived(self.readbuf[:bytes], addr)
118
self.protocol.datagramReceived(self.readbuf[:bytes])
121
handle_connecting_readDone = handle_listening_readDone
122
handle_connected_readDone = handle_listening_readDone
124
def handle_listening_readErr(self, ret, bytes):
125
log.msg("read failed with err %s" % (ret,))
126
# TODO: use Failures or something
127
if ret == ERROR_PORT_UNREACHABLE:
128
self.protocol.connectionRefused()
131
handle_connecting_readErr = handle_listening_readErr
132
handle_connected_readErr = handle_listening_readErr
134
def handle_disconnected_readErr(self, ret, bytes):
135
pass # no kicking the dead horse
137
def handle_disconnected_readDone(self, bytes, addr = None):
138
pass # no kicking the dead horse
140
def handle_listening_write(self, data, addr):
141
self.performWrite(data, addr)
143
def handle_connected_write(self, data, addr = None):
144
assert addr in (None, self._connectedAddr)
145
self.performWrite(data, addr)
147
def performWrite(self, data, addr = None):
148
# print "performing write on", data, addr
151
write_op = self.write_op_class(self)
153
addr = self._connectedAddr
154
write_op.initiateOp(self.socket.fileno(), data, addr)
155
# print "initiating write_op to", addr
156
except WindowsError, we:
157
log.msg("initiating write failed with args %s" % (we,))
159
def handle_listening_writeDone(self, bytes):
160
log.msg("write success with bytes %s" % (bytes,))
161
# self.callBufferHandlers(event = "buffer empty")
162
handle_connecting_writeDone = handle_listening_writeDone
163
handle_connected_writeDone = handle_listening_writeDone
165
def handle_listening_writeErr(self, ret, bytes):
166
log.msg("write failed with err %s" % (ret,))
167
if ret == ERROR_PORT_UNREACHABLE:
168
self.protocol.connectionRefused()
169
handle_connecting_writeErr = handle_listening_writeErr
170
handle_connected_writeErr = handle_listening_writeErr
172
def handle_disconnected_writeErr(self, ret, bytes):
173
pass # no kicking the dead horse
175
def handle_disconnected_writeDone(self, bytes):
176
pass # no kicking the dead horse
178
def writeSequence(self, seq, addr):
179
self.write("".join(seq), addr)
181
def handle_listening_stopListening(self):
183
self.connectionLost()
184
handle_connecting_stopListening = handle_listening_stopListening
185
handle_connected_stopListening = handle_listening_stopListening
187
def connectionLost(self, reason=None):
188
log.msg('(Port %s Closed)' % self._realPortNumber)
189
self._realPortNumber = None
190
self.protocol.doStop()
193
self.state = "disconnected"
199
return address.IPv4Address('UDP', *(self.socket.getsockname() + ('INET_UDP',)))
202
class MulticastPort(MulticastMixin, Port):
203
"""UDP Port that supports multicasting."""
205
implements(interfaces.IMulticastTransport)
207
def __init__(self, bindAddress, proto, maxPacketSize=8192, listenMultiple=False):
208
Port.__init__(self, bindAddress, proto, maxPacketSize)
209
self.listenMultiple = listenMultiple
211
def createInternetSocket(self):
212
skt = Port.createInternetSocket(self)
213
if self.listenMultiple:
214
skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
215
if hasattr(socket, "SO_REUSEPORT"):
216
skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)