~certify-web-dev/twisted/certify-trunk

« back to all changes in this revision

Viewing changes to twisted/internet/iocpreactor/udp.py

  • Committer: Bazaar Package Importer
  • Author(s): Matthias Klose
  • Date: 2004-06-21 22:01:11 UTC
  • mto: (2.2.3 sid)
  • mto: This revision was merged to the branch mainline in revision 3.
  • Revision ID: james.westby@ubuntu.com-20040621220111-vkf909euqnyrp3nr
Tags: upstream-1.3.0
ImportĀ upstreamĀ versionĀ 1.3.0

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import socket
 
2
 
 
3
from twisted.internet import interfaces, defer, main, error, protocol, address
 
4
from twisted.internet.abstract import isIPAddress
 
5
from twisted.persisted import styles
 
6
from twisted.python import log, failure, reflect
 
7
 
 
8
from ops import ReadFileOp, WriteFileOp, WSARecvFromOp, WSASendToOp
 
9
from util import StateEventMachineType
 
10
 
 
11
class Port(log.Logger, styles.Ephemeral, object):
 
12
    __metaclass__ = StateEventMachineType
 
13
    __implements__ = interfaces.IUDPTransport
 
14
    events = ["startListening", "stopListening", "write", "readDone", "readErr", "writeDone", "writeErr", "connect"]
 
15
    sockinfo = (socket.AF_INET, socket.SOCK_DGRAM, 0)
 
16
    read_op_class = WSARecvFromOp
 
17
    write_op_class = WSASendToOp
 
18
    reading = False
 
19
 
 
20
    def __init__(self, bindAddress, proto, maxPacketSize=8192):
 
21
        assert isinstance(proto, protocol.DatagramProtocol)
 
22
        self.state = "disconnected"
 
23
        from twisted.internet import reactor
 
24
        self.bindAddress = bindAddress
 
25
        self.protocol = proto
 
26
        self.maxPacketSize = maxPacketSize
 
27
        self.logstr = reflect.qual(self.protocol.__class__) + " (UDP)"
 
28
        self.read_op = self.read_op_class(self)
 
29
        self.readbuf = reactor.AllocateReadBuffer(maxPacketSize)
 
30
        self.reactor = reactor
 
31
 
 
32
    def __repr__(self):
 
33
        return "<%s on %s>" % (self.protocol.__class__, 'port')
 
34
 
 
35
    def handle_listening_connect(self, host, port):
 
36
        self.state = "connecting"
 
37
        if isIPAddress(host):
 
38
             return defer.maybeDeferred(self._connectDone, host, port)
 
39
        else:
 
40
            d = self.reactor.resolve(host)
 
41
            d.addCallback(self._connectDone, port)
 
42
            return d        
 
43
 
 
44
    def _connectDone(self, host, port):
 
45
        self._connectedAddr = (host, port)
 
46
        self.state = "connected"
 
47
        self.socket.connect((host, port))
 
48
        return self._connectedAddr
 
49
 
 
50
    def handle_disconnected_startListening(self):
 
51
        self._bindSocket()
 
52
        self._connectSocket()
 
53
 
 
54
    def _bindSocket(self):
 
55
        log.msg("%s starting on %s" % (self.protocol.__class__, 'port'))
 
56
        try:
 
57
            skt = socket.socket(*self.sockinfo)
 
58
            skt.bind(self.bindAddress)
 
59
#            print "bound %s to %s" % (skt.fileno(), self.bindAddress)
 
60
        except socket.error, le:
 
61
            raise error.CannotListenError, (None, None, le)
 
62
        self.socket = skt
 
63
 
 
64
    def _connectSocket(self):
 
65
        self.protocol.makeConnection(self)
 
66
        self.startReading()
 
67
        self.state = "listening"
 
68
 
 
69
    def startReading(self):
 
70
        self.reading = True
 
71
        try:
 
72
            self.read_op.initiateOp(self.socket.fileno(), self.readbuf)
 
73
        except WindowsError, we:
 
74
            log.msg("initiating read failed with args %s" % (we,))
 
75
 
 
76
    def stopReading(self):
 
77
        self.reading = False
 
78
 
 
79
    def handle_listening_readDone(self, bytes, addr = None):
 
80
        if addr:
 
81
            self.protocol.datagramReceived(self.readbuf[:bytes], addr)
 
82
        else:
 
83
            self.protocol.datagramReceived(self.readbuf[:bytes])
 
84
        if self.reading:
 
85
            self.startReading()
 
86
    handle_connecting_readDone = handle_listening_readDone
 
87
    handle_connected_readDone = handle_listening_readDone
 
88
 
 
89
    def handle_listening_readErr(self, ret, bytes):
 
90
        log.msg("read failed with err %s" % (ret,))
 
91
        # TODO: use Failures or something
 
92
        if ret == 1234: # ERROR_PORT_UNREACHABLE
 
93
            self.protocol.connectionRefused()
 
94
        if self.reading:
 
95
            self.startReading()
 
96
    handle_connecting_readErr = handle_listening_readErr
 
97
    handle_connected_readErr = handle_listening_readErr
 
98
 
 
99
    def handle_disconnected_readErr(self, ret, bytes):
 
100
        pass # no kicking the dead horse
 
101
 
 
102
    def handle_disconnected_readDone(self, bytes, addr = None):
 
103
        pass # no kicking the dead horse
 
104
 
 
105
    def handle_listening_write(self, data, addr):
 
106
        self.performWrite(data, addr)
 
107
 
 
108
    def handle_connected_write(self, data, addr = None):
 
109
        assert addr in (None, self._connectedAddr)
 
110
        self.performWrite(data, addr)
 
111
 
 
112
    def performWrite(self, data, addr = None):
 
113
#        print "performing write on", data, addr
 
114
        self.writing = True
 
115
        try:
 
116
            write_op = self.write_op_class(self)
 
117
            if not addr:
 
118
                addr = self._connectedAddr
 
119
            write_op.initiateOp(self.socket.fileno(), data, addr)
 
120
#            print "initiating write_op to", addr
 
121
        except WindowsError, we:
 
122
            log.msg("initiating write failed with args %s" % (we,))
 
123
 
 
124
    def handle_listening_writeDone(self, bytes):
 
125
        log.msg("write success with bytes %s" % (bytes,))
 
126
#        self.callBufferHandlers(event = "buffer empty")
 
127
    handle_connecting_writeDone = handle_listening_writeDone
 
128
    handle_connected_writeDone = handle_listening_writeDone
 
129
 
 
130
    def handle_listening_writeErr(self, ret, bytes):
 
131
        log.msg("write failed with err %s" % (ret,))
 
132
        self.connectionLost()
 
133
    handle_connecting_writeErr = handle_listening_writeErr
 
134
    handle_connected_writeErr = handle_listening_writeErr
 
135
 
 
136
    def handle_disconnected_writeErr(self, ret, bytes):
 
137
        pass # no kicking the dead horse
 
138
 
 
139
    def handle_disconnected_writeDone(self, bytes):
 
140
        pass # no kicking the dead horse
 
141
 
 
142
    def writeSequence(self, seq, addr):
 
143
        self.write("".join(seq), addr)
 
144
 
 
145
    def handle_listening_stopListening(self):
 
146
        self.stopReading()
 
147
        self.connectionLost()
 
148
    handle_connecting_stopListening = handle_listening_stopListening
 
149
    handle_connected_stopListening = handle_listening_stopListening
 
150
 
 
151
    def connectionLost(self, reason=None):
 
152
        log.msg('(Port %r Closed)' % ('port',))
 
153
        self.protocol.doStop()
 
154
        self.socket.close()
 
155
        del self.socket
 
156
        self.state = "disconnected"
 
157
 
 
158
    def logPrefix(self):
 
159
        return self.logstr
 
160
 
 
161
    def getHost(self):
 
162
        return address.IPv4Address('UDP', *(self.socket.getsockname() + ('INET_UDP',)))
 
163