~landscape/zope3/newer-from-ztk

« back to all changes in this revision

Viewing changes to src/twisted/internet/iocpreactor/udp.py

  • Committer: Thomas Hervé
  • Date: 2009-07-08 13:52:04 UTC
  • Revision ID: thomas@canonical.com-20090708135204-df5eesrthifpylf8
Remove twisted copy

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
2
 
# See LICENSE for details.
3
 
 
4
 
 
5
 
import socket
6
 
import struct
7
 
import operator
8
 
 
9
 
from twisted.internet import interfaces, defer, error, protocol, address
10
 
from twisted.internet.udp import MulticastMixin
11
 
from twisted.internet.abstract import isIPAddress
12
 
from twisted.persisted import styles
13
 
from twisted.python import log, failure, reflect
14
 
 
15
 
from ops import ReadFileOp, WriteFileOp, WSARecvFromOp, WSASendToOp
16
 
from util import StateEventMachineType
17
 
from zope.interface import implements
18
 
 
19
 
ERROR_PORT_UNREACHABLE = 1234
20
 
 
21
 
class Port(log.Logger, styles.Ephemeral, object):
22
 
    __metaclass__ = StateEventMachineType
23
 
    implements(interfaces.IUDPTransport)
24
 
    events = ["startListening", "stopListening", "write", "readDone", "readErr", "writeDone", "writeErr", "connect"]
25
 
    sockinfo = (socket.AF_INET, socket.SOCK_DGRAM, 0)
26
 
    read_op_class = WSARecvFromOp
27
 
    write_op_class = WSASendToOp
28
 
    reading = False
29
 
    # Actual port number being listened on, only set to a non-None
30
 
    # value when we are actually listening.
31
 
    _realPortNumber = None
32
 
    disconnected = property(lambda self: self.state == "disconnected")
33
 
 
34
 
    def __init__(self, bindAddress, proto, maxPacketSize=8192):
35
 
        assert isinstance(proto, protocol.DatagramProtocol)
36
 
        self.state = "disconnected"
37
 
        from twisted.internet import reactor
38
 
        self.bindAddress = bindAddress
39
 
        self._connectedAddr = None
40
 
        self.protocol = proto
41
 
        self.maxPacketSize = maxPacketSize
42
 
        self.logstr = reflect.qual(self.protocol.__class__) + " (UDP)"
43
 
        self.read_op = self.read_op_class(self)
44
 
        self.readbuf = reactor.AllocateReadBuffer(maxPacketSize)
45
 
        self.reactor = reactor
46
 
    
47
 
    def __repr__(self):
48
 
        if self._realPortNumber is not None:
49
 
            return "<%s on %s>" % (self.protocol.__class__, self._realPortNumber)
50
 
        else:
51
 
            return "<%s not connected>" % (self.protocol.__class__,)
52
 
 
53
 
    def handle_listening_connect(self, host, port):
54
 
        if not isIPAddress(host):
55
 
            raise ValueError, "please pass only IP addresses, not domain names"
56
 
        self.state = "connecting"
57
 
        return defer.maybeDeferred(self._connectDone, host, port)      
58
 
 
59
 
    def handle_connecting_connect(self, host, port):
60
 
        raise RuntimeError, "already connected, reconnecting is not currently supported (talk to itamar if you want this)"
61
 
    handle_connected_connect = handle_connecting_connect
62
 
        
63
 
    def _connectDone(self, host, port):
64
 
        self._connectedAddr = (host, port)
65
 
        self.state = "connected"
66
 
        self.socket.connect((host, port))
67
 
        return self._connectedAddr
68
 
 
69
 
    def handle_disconnected_startListening(self):
70
 
        self._bindSocket()
71
 
        host, port = self.bindAddress
72
 
        if isIPAddress(host):
73
 
             return defer.maybeDeferred(self._connectSocket, host)
74
 
        else:
75
 
            d = self.reactor.resolve(host)
76
 
            d.addCallback(self._connectSocket)
77
 
            return d
78
 
 
79
 
    def _bindSocket(self):
80
 
        try:
81
 
            skt = socket.socket(*self.sockinfo)
82
 
            skt.bind(self.bindAddress)
83
 
#            print "bound %s to %s" % (skt.fileno(), self.bindAddress)
84
 
        except socket.error, le:
85
 
            raise error.CannotListenError, (None, None, le)
86
 
        
87
 
        # Make sure that if we listened on port 0, we update that to
88
 
        # reflect what the OS actually assigned us.
89
 
        self._realPortNumber = skt.getsockname()[1]
90
 
        
91
 
        log.msg("%s starting on %s"%(self.protocol.__class__, self._realPortNumber))
92
 
        
93
 
        self.socket = skt
94
 
 
95
 
    def _connectSocket(self, host):
96
 
        self.bindAddress = (host, self.bindAddress[1])
97
 
        self.protocol.makeConnection(self)
98
 
        self.startReading()
99
 
        self.state = "listening"
100
 
 
101
 
    def startReading(self):
102
 
        self.reading = True
103
 
        try:
104
 
            self.read_op.initiateOp(self.socket.fileno(), self.readbuf)
105
 
        except WindowsError, we:
106
 
            log.msg("initiating read failed with args %s" % (we,))
107
 
 
108
 
    def stopReading(self):
109
 
        self.reading = False
110
 
 
111
 
    def handle_listening_readDone(self, bytes, addr = None):
112
 
        if addr:
113
 
            try:
114
 
                self.protocol.datagramReceived(self.readbuf[:bytes], addr)
115
 
            except:
116
 
                log.err()
117
 
        else:
118
 
            self.protocol.datagramReceived(self.readbuf[:bytes])
119
 
        if self.reading:
120
 
            self.startReading()
121
 
    handle_connecting_readDone = handle_listening_readDone
122
 
    handle_connected_readDone = handle_listening_readDone
123
 
 
124
 
    def handle_listening_readErr(self, ret, bytes):
125
 
        log.msg("read failed with err %s" % (ret,))
126
 
        # TODO: use Failures or something
127
 
        if ret == ERROR_PORT_UNREACHABLE:
128
 
            self.protocol.connectionRefused()
129
 
        if self.reading:
130
 
            self.startReading()
131
 
    handle_connecting_readErr = handle_listening_readErr
132
 
    handle_connected_readErr = handle_listening_readErr
133
 
 
134
 
    def handle_disconnected_readErr(self, ret, bytes):
135
 
        pass # no kicking the dead horse
136
 
 
137
 
    def handle_disconnected_readDone(self, bytes, addr = None):
138
 
        pass # no kicking the dead horse
139
 
 
140
 
    def handle_listening_write(self, data, addr):
141
 
        self.performWrite(data, addr)
142
 
 
143
 
    def handle_connected_write(self, data, addr = None):
144
 
        assert addr in (None, self._connectedAddr)
145
 
        self.performWrite(data, addr)
146
 
 
147
 
    def performWrite(self, data, addr = None):
148
 
#        print "performing write on", data, addr
149
 
        self.writing = True
150
 
        try:
151
 
            write_op = self.write_op_class(self)
152
 
            if not addr:
153
 
                addr = self._connectedAddr
154
 
            write_op.initiateOp(self.socket.fileno(), data, addr)
155
 
#            print "initiating write_op to", addr
156
 
        except WindowsError, we:
157
 
            log.msg("initiating write failed with args %s" % (we,))
158
 
 
159
 
    def handle_listening_writeDone(self, bytes):
160
 
        log.msg("write success with bytes %s" % (bytes,))
161
 
#        self.callBufferHandlers(event = "buffer empty")
162
 
    handle_connecting_writeDone = handle_listening_writeDone
163
 
    handle_connected_writeDone = handle_listening_writeDone
164
 
 
165
 
    def handle_listening_writeErr(self, ret, bytes):
166
 
        log.msg("write failed with err %s" % (ret,))
167
 
        if ret == ERROR_PORT_UNREACHABLE:
168
 
            self.protocol.connectionRefused()
169
 
    handle_connecting_writeErr = handle_listening_writeErr
170
 
    handle_connected_writeErr = handle_listening_writeErr
171
 
 
172
 
    def handle_disconnected_writeErr(self, ret, bytes):
173
 
        pass # no kicking the dead horse
174
 
 
175
 
    def handle_disconnected_writeDone(self, bytes):
176
 
        pass # no kicking the dead horse
177
 
 
178
 
    def writeSequence(self, seq, addr):
179
 
        self.write("".join(seq), addr)
180
 
 
181
 
    def handle_listening_stopListening(self):
182
 
        self.stopReading()
183
 
        self.connectionLost()
184
 
    handle_connecting_stopListening = handle_listening_stopListening
185
 
    handle_connected_stopListening = handle_listening_stopListening
186
 
 
187
 
    def connectionLost(self, reason=None):
188
 
        log.msg('(Port %s Closed)' % self._realPortNumber)
189
 
        self._realPortNumber = None
190
 
        self.protocol.doStop()
191
 
        self.socket.close()
192
 
        del self.socket
193
 
        self.state = "disconnected"
194
 
 
195
 
    def logPrefix(self):
196
 
        return self.logstr
197
 
 
198
 
    def getHost(self):
199
 
        return address.IPv4Address('UDP', *(self.socket.getsockname() + ('INET_UDP',)))
200
 
 
201
 
 
202
 
class MulticastPort(MulticastMixin, Port):
203
 
    """UDP Port that supports multicasting."""
204
 
 
205
 
    implements(interfaces.IMulticastTransport)
206
 
 
207
 
    def __init__(self, bindAddress, proto, maxPacketSize=8192, listenMultiple=False):
208
 
        Port.__init__(self, bindAddress, proto, maxPacketSize)
209
 
        self.listenMultiple = listenMultiple
210
 
 
211
 
    def createInternetSocket(self):
212
 
        skt = Port.createInternetSocket(self)
213
 
        if self.listenMultiple:
214
 
            skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
215
 
            if hasattr(socket, "SO_REUSEPORT"):
216
 
                skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
217
 
        return skt