1
# -*- test-case-name: twisted.test.test_amp.TLSTest -*-
2
"""Utilities and helpers for simulating a network
8
from zope.interface import implements, directlyProvides
10
from twisted.internet import error
11
from twisted.internet import interfaces
12
from OpenSSL.SSL import Error as NativeOpenSSLError
14
from twisted.internet._sslverify import OpenSSLVerifyError
17
def __init__(self, obj, connectState):
19
self.connectState = connectState
21
self.readyToSend = connectState
24
return 'TLSNegotiation(%r)' % (self.obj,)
26
def pretendToVerify(self, other, tpt):
27
# Set the transport problems list here? disconnections?
28
# hmmmmm... need some negative path tests.
30
if not self.obj.iosimVerify(other.obj):
31
tpt.problems.append(OpenSSLVerifyError("fake cert", "fake errno", "fake depth"))
32
tpt.disconnectReason = NativeOpenSSLError()
37
"""A wrapper around a file-like object to make it behave as a Transport.
39
This doesn't actually stream the file to the attached protocol,
40
and is thus useful mainly as a utility for debugging protocols.
43
implements(interfaces.ITransport,
44
interfaces.ITLSTransport) # ha ha not really
46
_nextserial = itertools.count().next
50
disconnectReason = error.ConnectionDone("Connection done")
58
self.serial = self._nextserial()
61
return 'FakeTransport<%s,%s,%s>' % (
62
self.isServer and 'S' or 'C', self.serial,
63
self.protocol.__class__.__name__)
65
def write(self, data):
66
if self.tls is not None:
67
self.tlsbuf.append(data)
69
self.stream.append(data)
71
def _checkProducer(self):
72
# Cheating; this is called at "idle" times to allow producers to be
73
# found and dealt with
75
self.producer.resumeProducing()
77
def registerProducer(self, producer, streaming):
78
"""From abstract.FileDescriptor
80
self.producer = producer
81
self.streamingProducer = streaming
83
producer.resumeProducing()
85
def unregisterProducer(self):
88
def stopConsuming(self):
89
self.unregisterProducer()
92
def writeSequence(self, iovec):
93
self.write("".join(iovec))
95
def loseConnection(self):
96
self.disconnecting = True
98
def reportDisconnect(self):
99
if self.tls is not None:
100
# We were in the middle of negotiating! Must have been a TLS problem.
101
err = NativeOpenSSLError()
103
err = self.disconnectReason
104
self.protocol.connectionLost(err)
107
# XXX: According to ITransport, this should return an IAddress!
108
return 'file', 'file'
111
# XXX: According to ITransport, this should return an IAddress!
114
def resumeProducing(self):
115
# Never sends data anyways
118
def pauseProducing(self):
119
# Never sends data anyways
122
def stopProducing(self):
123
self.loseConnection()
125
def startTLS(self, contextFactory, beNormal=True):
126
# Nothing's using this feature yet, but startTLS has an undocumented
127
# second argument which defaults to true; if set to False, servers will
128
# behave like clients and clients will behave like servers.
129
connectState = self.isServer ^ beNormal
130
self.tls = TLSNegotiation(contextFactory, connectState)
133
def getOutBuffer(self):
138
elif self.tls is not None:
139
if self.tls.readyToSend:
140
# Only _send_ the TLS negotiation "packet" if I'm ready to.
148
def bufferReceived(self, buf):
149
if isinstance(buf, TLSNegotiation):
150
assert self.tls is not None # By the time you're receiving a
151
# negotiation, you have to have called
154
self.tls.pretendToVerify(buf, self)
155
self.tls = None # we're done with the handshake if we've gotten
156
# this far... although maybe it failed...?
157
# TLS started! Unbuffer...
158
b, self.tlsbuf = self.tlsbuf, None
159
self.writeSequence(b)
160
directlyProvides(self, interfaces.ISSLTransport)
162
# We haven't sent our own TLS negotiation: time to do that!
163
self.tls.readyToSend = True
165
self.protocol.dataReceived(buf)
168
# this next bit is just to fake out problemsFromTransport, which is an
169
# ultra-shitty API anyway. remove it when we manage to remove that. -glyph
173
get_context = getHandle
174
get_app_data = getHandle
176
# end of gross problemsFromTransport stuff
178
def makeFakeClient(c):
184
def makeFakeServer(s):
191
"""Utility to pump data between clients and servers for protocol testing.
193
Perhaps this is a utility worthy of being in protocol.py?
195
def __init__(self, client, server, clientIO, serverIO, debug):
198
self.clientIO = clientIO
199
self.serverIO = serverIO
202
def flush(self, debug=False):
203
"""Pump until there is no more input or output.
205
Returns whether any data was moved.
208
for x in range(1000):
218
def pump(self, debug=False):
219
"""Move data back and forth.
221
Returns whether any data was moved.
223
if self.debug or debug:
225
sData = self.serverIO.getOutBuffer()
226
cData = self.clientIO.getOutBuffer()
227
self.clientIO._checkProducer()
228
self.serverIO._checkProducer()
229
if self.debug or debug:
231
# XXX slightly buggy in the face of incremental output
233
print 'C: '+repr(cData)
235
print 'S: '+repr(sData)
237
self.serverIO.bufferReceived(cData)
239
self.clientIO.bufferReceived(sData)
242
if (self.serverIO.disconnecting and
243
not self.serverIO.disconnected):
244
if self.debug or debug:
246
self.serverIO.disconnected = True
247
self.clientIO.disconnecting = True
248
self.clientIO.reportDisconnect()
250
if self.clientIO.disconnecting and not self.clientIO.disconnected:
251
if self.debug or debug:
253
self.clientIO.disconnected = True
254
self.serverIO.disconnecting = True
255
self.serverIO.reportDisconnect()
260
def connectedServerAndClient(ServerClass, ClientClass,
261
clientTransportFactory=makeFakeClient,
262
serverTransportFactory=makeFakeServer,
264
"""Returns a 3-tuple: (client, server, pump)
268
cio = clientTransportFactory(c)
269
sio = serverTransportFactory(s)
270
c.makeConnection(cio)
271
s.makeConnection(sio)
272
pump = IOPump(c, s, cio, sio, debug)
273
# kick off server greeting, etc