3
from twisted.internet import interfaces, defer, main, error, protocol, address
4
from twisted.internet.abstract import isIPAddress
5
from twisted.persisted import styles
6
from twisted.python import log, failure, reflect
8
from ops import ReadFileOp, WriteFileOp, WSARecvFromOp, WSASendToOp
9
from util import StateEventMachineType
11
class Port(log.Logger, styles.Ephemeral, object):
12
__metaclass__ = StateEventMachineType
13
__implements__ = interfaces.IUDPTransport
14
events = ["startListening", "stopListening", "write", "readDone", "readErr", "writeDone", "writeErr", "connect"]
15
sockinfo = (socket.AF_INET, socket.SOCK_DGRAM, 0)
16
read_op_class = WSARecvFromOp
17
write_op_class = WSASendToOp
20
def __init__(self, bindAddress, proto, maxPacketSize=8192):
21
assert isinstance(proto, protocol.DatagramProtocol)
22
self.state = "disconnected"
23
from twisted.internet import reactor
24
self.bindAddress = bindAddress
26
self.maxPacketSize = maxPacketSize
27
self.logstr = reflect.qual(self.protocol.__class__) + " (UDP)"
28
self.read_op = self.read_op_class(self)
29
self.readbuf = reactor.AllocateReadBuffer(maxPacketSize)
30
self.reactor = reactor
33
return "<%s on %s>" % (self.protocol.__class__, 'port')
35
def handle_listening_connect(self, host, port):
36
self.state = "connecting"
38
return defer.maybeDeferred(self._connectDone, host, port)
40
d = self.reactor.resolve(host)
41
d.addCallback(self._connectDone, port)
44
def _connectDone(self, host, port):
45
self._connectedAddr = (host, port)
46
self.state = "connected"
47
self.socket.connect((host, port))
48
return self._connectedAddr
50
def handle_disconnected_startListening(self):
54
def _bindSocket(self):
55
log.msg("%s starting on %s" % (self.protocol.__class__, 'port'))
57
skt = socket.socket(*self.sockinfo)
58
skt.bind(self.bindAddress)
59
# print "bound %s to %s" % (skt.fileno(), self.bindAddress)
60
except socket.error, le:
61
raise error.CannotListenError, (None, None, le)
64
def _connectSocket(self):
65
self.protocol.makeConnection(self)
67
self.state = "listening"
69
def startReading(self):
72
self.read_op.initiateOp(self.socket.fileno(), self.readbuf)
73
except WindowsError, we:
74
log.msg("initiating read failed with args %s" % (we,))
76
def stopReading(self):
79
def handle_listening_readDone(self, bytes, addr = None):
81
self.protocol.datagramReceived(self.readbuf[:bytes], addr)
83
self.protocol.datagramReceived(self.readbuf[:bytes])
86
handle_connecting_readDone = handle_listening_readDone
87
handle_connected_readDone = handle_listening_readDone
89
def handle_listening_readErr(self, ret, bytes):
90
log.msg("read failed with err %s" % (ret,))
91
# TODO: use Failures or something
92
if ret == 1234: # ERROR_PORT_UNREACHABLE
93
self.protocol.connectionRefused()
96
handle_connecting_readErr = handle_listening_readErr
97
handle_connected_readErr = handle_listening_readErr
99
def handle_disconnected_readErr(self, ret, bytes):
100
pass # no kicking the dead horse
102
def handle_disconnected_readDone(self, bytes, addr = None):
103
pass # no kicking the dead horse
105
def handle_listening_write(self, data, addr):
106
self.performWrite(data, addr)
108
def handle_connected_write(self, data, addr = None):
109
assert addr in (None, self._connectedAddr)
110
self.performWrite(data, addr)
112
def performWrite(self, data, addr = None):
113
# print "performing write on", data, addr
116
write_op = self.write_op_class(self)
118
addr = self._connectedAddr
119
write_op.initiateOp(self.socket.fileno(), data, addr)
120
# print "initiating write_op to", addr
121
except WindowsError, we:
122
log.msg("initiating write failed with args %s" % (we,))
124
def handle_listening_writeDone(self, bytes):
125
log.msg("write success with bytes %s" % (bytes,))
126
# self.callBufferHandlers(event = "buffer empty")
127
handle_connecting_writeDone = handle_listening_writeDone
128
handle_connected_writeDone = handle_listening_writeDone
130
def handle_listening_writeErr(self, ret, bytes):
131
log.msg("write failed with err %s" % (ret,))
132
self.connectionLost()
133
handle_connecting_writeErr = handle_listening_writeErr
134
handle_connected_writeErr = handle_listening_writeErr
136
def handle_disconnected_writeErr(self, ret, bytes):
137
pass # no kicking the dead horse
139
def handle_disconnected_writeDone(self, bytes):
140
pass # no kicking the dead horse
142
def writeSequence(self, seq, addr):
143
self.write("".join(seq), addr)
145
def handle_listening_stopListening(self):
147
self.connectionLost()
148
handle_connecting_stopListening = handle_listening_stopListening
149
handle_connected_stopListening = handle_listening_stopListening
151
def connectionLost(self, reason=None):
152
log.msg('(Port %r Closed)' % ('port',))
153
self.protocol.doStop()
156
self.state = "disconnected"
162
return address.IPv4Address('UDP', *(self.socket.getsockname() + ('INET_UDP',)))