1
# Copyright 2005 Divmod, Inc. See LICENSE file for details
3
from cStringIO import StringIO
5
from twisted.internet.protocol import FileWrapper
6
from twisted.internet import defer
7
from twisted.python.failure import Failure
8
from twisted.python.filepath import FilePath
10
from twisted.trial import unittest
12
from twisted.test.iosim import connectedServerAndClient, FakeTransport
14
from vertex.q2q import Q2QAddress
15
from vertex import sigma
17
from vertex.test.mock_data import data as TEST_DATA
19
class FakeQ2QTransport(FakeTransport):
21
def __init__(self, q2qhost, q2qpeer):
22
FakeTransport.__init__(self)
23
self.q2qhost = q2qhost
24
self.q2qpeer = q2qpeer
32
class FakeDelayedCall:
33
def __init__(self, fqs, tup):
38
self.fqs.calls.remove(self.tup)
41
# XXX TODO: move this into test_q2q and make sure that all the q2q tests
42
# run with it in order to verify that the test harness is not broken.
45
self.listeners = {} # map listening {(q2qid, protocol name):(protocol
46
# factory, protocol description)}
47
self.pumps = [] # a list of IOPumps that we have to flush
51
def callLater(self, s, f, *a, **k):
52
# XXX TODO: return canceller
54
tup = (self.time + s, f, a, k)
55
self.calls.append(tup)
57
return FakeDelayedCall(self, tup)
59
def flush(self, debug=False):
65
# run twice so that timed functions can interact with I/O
66
for pump in self.pumps:
70
print 'iteration finished. continuing?', result
75
print 'timed event', s, f, a, k
79
def listenQ2Q(self, fromAddress, protocolsToFactories, serverDescription):
80
for pname, pfact in protocolsToFactories.items():
81
self.listeners[fromAddress, pname] = pfact, serverDescription
82
return defer.succeed(None)
84
def connectQ2Q(self, fromAddress, toAddress,
85
protocolName, protocolFactory,
86
chooser=lambda x: x and [x[0]]):
87
# XXX update this when q2q is updated to return a connector rather than
90
# XXX this isn't really dealing with the multiple-connectors use case
91
# now. sigma doesn't need this functionality, but we will need to
92
# update this class to do it properly before using it to test other Q2Q
95
listener, description = self.listeners.get((toAddress, protocolName))
97
print 'void listener', fromAddress, toAddress, self.listeners, self.listener
98
reason = Failure(KeyError())
99
protocolFactory.clientConnectionFailed(None, reason)
100
return defer.fail(reason)
102
def makeFakeClient(c):
103
ft = FakeQ2QTransport(fromAddress, toAddress)
108
def makeFakeServer(s):
109
ft = FakeQ2QTransport(toAddress, fromAddress)
114
client, server, pump = connectedServerAndClient(
115
lambda: listener.buildProtocol(fromAddress),
116
lambda: protocolFactory.buildProtocol(toAddress),
119
self.pumps.append(pump)
121
return defer.succeed(client)
124
sender = Q2QAddress("sending-data.net", "sender")
125
receiver = Q2QAddress("receiving-data.org", "receiver")
127
class TestBase(unittest.TestCase):
129
self.realChunkSize = sigma.CHUNK_SIZE
130
sigma.CHUNK_SIZE = 100
131
svc = self.service = FakeQ2QService()
132
fname = self.mktemp()
134
sf = self.sfile = FilePath(fname)
135
if not sf.parent().isdir():
136
sf.parent().makedirs()
137
sf.open('w').write(TEST_DATA)
138
self.senderNexus = sigma.Nexus(svc, sender,
139
sigma.BaseNexusUI(self.mktemp()),
143
self.senderNexus.stopService()
144
sigma.CHUNK_SIZE = self.realChunkSize
147
class BasicTransferTest(TestBase):
151
self.receiverNexus = sigma.Nexus(self.service, receiver,
152
sigma.BaseNexusUI(self.mktemp()),
153
self.service.callLater)
154
self.stoppers.append(self.receiverNexus)
158
TestBase.tearDown(self)
159
for stopper in self.stoppers:
160
stopper.stopService()
163
def testOneSenderOneRecipient(self):
164
self.senderNexus.push(self.sfile, 'TESTtoTEST', [receiver])
166
peerThingyoes = childrenOf(self.receiverNexus.ui.basepath)
167
self.assertEquals(len(peerThingyoes), 1)
168
rfiles = childrenOf(peerThingyoes[0])
169
self.assertEquals(len(rfiles), 1)
171
rfdata = rfile.open().read()
172
self.assertEquals(len(rfdata),
174
self.assertEquals(rfdata, TEST_DATA,
175
"file values unequal")
177
def testOneSenderManyRecipients(self):
178
raddresses = [Q2QAddress("receiving-data.org", "receiver%d" % (x,))
181
nexi = [sigma.Nexus(self.service,
183
sigma.BaseNexusUI(self.mktemp()),
184
self.service.callLater) for radr in raddresses]
186
self.stoppers.extend(nexi)
188
self.senderNexus.push(self.sfile, 'TESTtoTEST', raddresses)
191
receivedIntroductions = 0
194
receivedIntroductions += nexium.ui.receivedIntroductions
195
self.failUnless(receivedIntroductions > 1)
198
peerFiles = childrenOf(nexium.ui.basepath)
199
self.assertEquals(len(peerFiles), 1)
200
rfiles = childrenOf(peerFiles[0])
201
self.assertEquals(len(rfiles), 1, rfiles)
203
self.assertEquals(rfile.open().read(),
205
"file value mismatch")
209
# this should be a part of FilePath, but hey
210
return map(x.child, x.listdir())