~landscape/zope3/ztk-1.1.3

« back to all changes in this revision

Viewing changes to src/twisted/internet/iocpreactor/udp.py

  • Committer: Andreas Hasenack
  • Date: 2009-07-20 17:49:16 UTC
  • Revision ID: andreas@canonical.com-20090720174916-g2tn6qmietz2hn0u
Revert twisted removal, it breaks several dozen tests [trivial]

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
 
2
# See LICENSE for details.
 
3
 
 
4
 
 
5
import socket
 
6
import struct
 
7
import operator
 
8
 
 
9
from twisted.internet import interfaces, defer, error, protocol, address
 
10
from twisted.internet.udp import MulticastMixin
 
11
from twisted.internet.abstract import isIPAddress
 
12
from twisted.persisted import styles
 
13
from twisted.python import log, failure, reflect
 
14
 
 
15
from ops import ReadFileOp, WriteFileOp, WSARecvFromOp, WSASendToOp
 
16
from util import StateEventMachineType
 
17
from zope.interface import implements
 
18
 
 
19
ERROR_PORT_UNREACHABLE = 1234
 
20
 
 
21
class Port(log.Logger, styles.Ephemeral, object):
 
22
    __metaclass__ = StateEventMachineType
 
23
    implements(interfaces.IUDPTransport)
 
24
    events = ["startListening", "stopListening", "write", "readDone", "readErr", "writeDone", "writeErr", "connect"]
 
25
    sockinfo = (socket.AF_INET, socket.SOCK_DGRAM, 0)
 
26
    read_op_class = WSARecvFromOp
 
27
    write_op_class = WSASendToOp
 
28
    reading = False
 
29
    # Actual port number being listened on, only set to a non-None
 
30
    # value when we are actually listening.
 
31
    _realPortNumber = None
 
32
    disconnected = property(lambda self: self.state == "disconnected")
 
33
 
 
34
    def __init__(self, bindAddress, proto, maxPacketSize=8192):
 
35
        assert isinstance(proto, protocol.DatagramProtocol)
 
36
        self.state = "disconnected"
 
37
        from twisted.internet import reactor
 
38
        self.bindAddress = bindAddress
 
39
        self._connectedAddr = None
 
40
        self.protocol = proto
 
41
        self.maxPacketSize = maxPacketSize
 
42
        self.logstr = reflect.qual(self.protocol.__class__) + " (UDP)"
 
43
        self.read_op = self.read_op_class(self)
 
44
        self.readbuf = reactor.AllocateReadBuffer(maxPacketSize)
 
45
        self.reactor = reactor
 
46
    
 
47
    def __repr__(self):
 
48
        if self._realPortNumber is not None:
 
49
            return "<%s on %s>" % (self.protocol.__class__, self._realPortNumber)
 
50
        else:
 
51
            return "<%s not connected>" % (self.protocol.__class__,)
 
52
 
 
53
    def handle_listening_connect(self, host, port):
 
54
        if not isIPAddress(host):
 
55
            raise ValueError, "please pass only IP addresses, not domain names"
 
56
        self.state = "connecting"
 
57
        return defer.maybeDeferred(self._connectDone, host, port)      
 
58
 
 
59
    def handle_connecting_connect(self, host, port):
 
60
        raise RuntimeError, "already connected, reconnecting is not currently supported (talk to itamar if you want this)"
 
61
    handle_connected_connect = handle_connecting_connect
 
62
        
 
63
    def _connectDone(self, host, port):
 
64
        self._connectedAddr = (host, port)
 
65
        self.state = "connected"
 
66
        self.socket.connect((host, port))
 
67
        return self._connectedAddr
 
68
 
 
69
    def handle_disconnected_startListening(self):
 
70
        self._bindSocket()
 
71
        host, port = self.bindAddress
 
72
        if isIPAddress(host):
 
73
             return defer.maybeDeferred(self._connectSocket, host)
 
74
        else:
 
75
            d = self.reactor.resolve(host)
 
76
            d.addCallback(self._connectSocket)
 
77
            return d
 
78
 
 
79
    def _bindSocket(self):
 
80
        try:
 
81
            skt = socket.socket(*self.sockinfo)
 
82
            skt.bind(self.bindAddress)
 
83
#            print "bound %s to %s" % (skt.fileno(), self.bindAddress)
 
84
        except socket.error, le:
 
85
            raise error.CannotListenError, (None, None, le)
 
86
        
 
87
        # Make sure that if we listened on port 0, we update that to
 
88
        # reflect what the OS actually assigned us.
 
89
        self._realPortNumber = skt.getsockname()[1]
 
90
        
 
91
        log.msg("%s starting on %s"%(self.protocol.__class__, self._realPortNumber))
 
92
        
 
93
        self.socket = skt
 
94
 
 
95
    def _connectSocket(self, host):
 
96
        self.bindAddress = (host, self.bindAddress[1])
 
97
        self.protocol.makeConnection(self)
 
98
        self.startReading()
 
99
        self.state = "listening"
 
100
 
 
101
    def startReading(self):
 
102
        self.reading = True
 
103
        try:
 
104
            self.read_op.initiateOp(self.socket.fileno(), self.readbuf)
 
105
        except WindowsError, we:
 
106
            log.msg("initiating read failed with args %s" % (we,))
 
107
 
 
108
    def stopReading(self):
 
109
        self.reading = False
 
110
 
 
111
    def handle_listening_readDone(self, bytes, addr = None):
 
112
        if addr:
 
113
            try:
 
114
                self.protocol.datagramReceived(self.readbuf[:bytes], addr)
 
115
            except:
 
116
                log.err()
 
117
        else:
 
118
            self.protocol.datagramReceived(self.readbuf[:bytes])
 
119
        if self.reading:
 
120
            self.startReading()
 
121
    handle_connecting_readDone = handle_listening_readDone
 
122
    handle_connected_readDone = handle_listening_readDone
 
123
 
 
124
    def handle_listening_readErr(self, ret, bytes):
 
125
        log.msg("read failed with err %s" % (ret,))
 
126
        # TODO: use Failures or something
 
127
        if ret == ERROR_PORT_UNREACHABLE:
 
128
            self.protocol.connectionRefused()
 
129
        if self.reading:
 
130
            self.startReading()
 
131
    handle_connecting_readErr = handle_listening_readErr
 
132
    handle_connected_readErr = handle_listening_readErr
 
133
 
 
134
    def handle_disconnected_readErr(self, ret, bytes):
 
135
        pass # no kicking the dead horse
 
136
 
 
137
    def handle_disconnected_readDone(self, bytes, addr = None):
 
138
        pass # no kicking the dead horse
 
139
 
 
140
    def handle_listening_write(self, data, addr):
 
141
        self.performWrite(data, addr)
 
142
 
 
143
    def handle_connected_write(self, data, addr = None):
 
144
        assert addr in (None, self._connectedAddr)
 
145
        self.performWrite(data, addr)
 
146
 
 
147
    def performWrite(self, data, addr = None):
 
148
#        print "performing write on", data, addr
 
149
        self.writing = True
 
150
        try:
 
151
            write_op = self.write_op_class(self)
 
152
            if not addr:
 
153
                addr = self._connectedAddr
 
154
            write_op.initiateOp(self.socket.fileno(), data, addr)
 
155
#            print "initiating write_op to", addr
 
156
        except WindowsError, we:
 
157
            log.msg("initiating write failed with args %s" % (we,))
 
158
 
 
159
    def handle_listening_writeDone(self, bytes):
 
160
        log.msg("write success with bytes %s" % (bytes,))
 
161
#        self.callBufferHandlers(event = "buffer empty")
 
162
    handle_connecting_writeDone = handle_listening_writeDone
 
163
    handle_connected_writeDone = handle_listening_writeDone
 
164
 
 
165
    def handle_listening_writeErr(self, ret, bytes):
 
166
        log.msg("write failed with err %s" % (ret,))
 
167
        if ret == ERROR_PORT_UNREACHABLE:
 
168
            self.protocol.connectionRefused()
 
169
    handle_connecting_writeErr = handle_listening_writeErr
 
170
    handle_connected_writeErr = handle_listening_writeErr
 
171
 
 
172
    def handle_disconnected_writeErr(self, ret, bytes):
 
173
        pass # no kicking the dead horse
 
174
 
 
175
    def handle_disconnected_writeDone(self, bytes):
 
176
        pass # no kicking the dead horse
 
177
 
 
178
    def writeSequence(self, seq, addr):
 
179
        self.write("".join(seq), addr)
 
180
 
 
181
    def handle_listening_stopListening(self):
 
182
        self.stopReading()
 
183
        self.connectionLost()
 
184
    handle_connecting_stopListening = handle_listening_stopListening
 
185
    handle_connected_stopListening = handle_listening_stopListening
 
186
 
 
187
    def connectionLost(self, reason=None):
 
188
        log.msg('(Port %s Closed)' % self._realPortNumber)
 
189
        self._realPortNumber = None
 
190
        self.protocol.doStop()
 
191
        self.socket.close()
 
192
        del self.socket
 
193
        self.state = "disconnected"
 
194
 
 
195
    def logPrefix(self):
 
196
        return self.logstr
 
197
 
 
198
    def getHost(self):
 
199
        return address.IPv4Address('UDP', *(self.socket.getsockname() + ('INET_UDP',)))
 
200
 
 
201
 
 
202
class MulticastPort(MulticastMixin, Port):
 
203
    """UDP Port that supports multicasting."""
 
204
 
 
205
    implements(interfaces.IMulticastTransport)
 
206
 
 
207
    def __init__(self, bindAddress, proto, maxPacketSize=8192, listenMultiple=False):
 
208
        Port.__init__(self, bindAddress, proto, maxPacketSize)
 
209
        self.listenMultiple = listenMultiple
 
210
 
 
211
    def createInternetSocket(self):
 
212
        skt = Port.createInternetSocket(self)
 
213
        if self.listenMultiple:
 
214
            skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 
215
            if hasattr(socket, "SO_REUSEPORT"):
 
216
                skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
 
217
        return skt