~divmod-dev/divmod.org/1304710-storeless-adapter

« back to all changes in this revision

Viewing changes to Vertex/vertex/test/test_sigma.py

  • Committer: cyli
  • Date: 2013-06-27 06:02:46 UTC
  • mto: This revision was merged to the branch mainline in revision 2702.
  • Revision ID: cyli-20130627060246-ciict8hwvjuy9d81
Move Vertex out of the Divmod.org repository

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright 2005 Divmod, Inc.  See LICENSE file for details
2
 
 
3
 
from cStringIO import StringIO
4
 
 
5
 
from twisted.internet.protocol import FileWrapper
6
 
from twisted.internet import defer
7
 
from twisted.python.failure import Failure
8
 
from twisted.python.filepath import FilePath
9
 
 
10
 
from twisted.trial import unittest
11
 
 
12
 
from twisted.test.iosim import connectedServerAndClient, FakeTransport
13
 
 
14
 
from vertex.q2q import Q2QAddress
15
 
from vertex import sigma
16
 
 
17
 
from vertex.test.mock_data import data as TEST_DATA
18
 
 
19
 
class FakeQ2QTransport(FakeTransport):
20
 
 
21
 
    def __init__(self, q2qhost, q2qpeer):
22
 
        FakeTransport.__init__(self)
23
 
        self.q2qhost = q2qhost
24
 
        self.q2qpeer = q2qpeer
25
 
 
26
 
    def getQ2QPeer(self):
27
 
        return self.q2qpeer
28
 
 
29
 
    def getQ2QHost(self):
30
 
        return self.q2qhost
31
 
 
32
 
class FakeDelayedCall:
33
 
    def __init__(self, fqs, tup):
34
 
        self.fqs = fqs
35
 
        self.tup = tup
36
 
 
37
 
    def cancel(self):
38
 
        self.fqs.calls.remove(self.tup)
39
 
 
40
 
class FakeQ2QService:
41
 
    # XXX TODO: move this into test_q2q and make sure that all the q2q tests
42
 
    # run with it in order to verify that the test harness is not broken.
43
 
 
44
 
    def __init__(self):
45
 
        self.listeners = {}     # map listening {(q2qid, protocol name):(protocol
46
 
                                # factory, protocol description)}
47
 
        self.pumps = []         # a list of IOPumps that we have to flush
48
 
        self.calls = []
49
 
        self.time = 0
50
 
 
51
 
    def callLater(self, s, f, *a, **k):
52
 
        # XXX TODO: return canceller
53
 
        assert f is not None
54
 
        tup = (self.time + s, f, a, k)
55
 
        self.calls.append(tup)
56
 
        self.calls.sort()
57
 
        return FakeDelayedCall(self, tup)
58
 
 
59
 
    def flush(self, debug=False):
60
 
        result = True
61
 
        while result:
62
 
            self.time += 1
63
 
            result = False
64
 
            for x in range(2):
65
 
                # run twice so that timed functions can interact with I/O
66
 
                for pump in self.pumps:
67
 
                    if pump.flush(debug):
68
 
                        result = True
69
 
                if debug:
70
 
                    print 'iteration finished.  continuing?', result
71
 
                c = self.calls
72
 
                self.calls = []
73
 
                for s, f, a, k in c:
74
 
                    if debug:
75
 
                        print 'timed event', s, f, a, k
76
 
                    f(*a,**k)
77
 
        return result
78
 
 
79
 
    def listenQ2Q(self, fromAddress, protocolsToFactories, serverDescription):
80
 
        for pname, pfact in protocolsToFactories.items():
81
 
            self.listeners[fromAddress, pname] = pfact, serverDescription
82
 
        return defer.succeed(None)
83
 
 
84
 
    def connectQ2Q(self, fromAddress, toAddress,
85
 
                   protocolName, protocolFactory,
86
 
                   chooser=lambda x: x and [x[0]]):
87
 
        # XXX update this when q2q is updated to return a connector rather than
88
 
        # a Deferred.
89
 
 
90
 
        # XXX this isn't really dealing with the multiple-connectors use case
91
 
        # now.  sigma doesn't need this functionality, but we will need to
92
 
        # update this class to do it properly before using it to test other Q2Q
93
 
        # code.
94
 
 
95
 
        listener, description = self.listeners.get((toAddress, protocolName))
96
 
        if listener is None:
97
 
            print 'void listener', fromAddress, toAddress, self.listeners, self.listener
98
 
            reason = Failure(KeyError())
99
 
            protocolFactory.clientConnectionFailed(None, reason)
100
 
            return defer.fail(reason)
101
 
        else:
102
 
            def makeFakeClient(c):
103
 
                ft = FakeQ2QTransport(fromAddress, toAddress)
104
 
                ft.isServer = False
105
 
                ft.protocol = c
106
 
                return ft
107
 
 
108
 
            def makeFakeServer(s):
109
 
                ft = FakeQ2QTransport(toAddress, fromAddress)
110
 
                ft.isServer = True
111
 
                ft.protocol = s
112
 
                return ft
113
 
 
114
 
            client, server, pump = connectedServerAndClient(
115
 
                lambda: listener.buildProtocol(fromAddress),
116
 
                lambda: protocolFactory.buildProtocol(toAddress),
117
 
                makeFakeClient,
118
 
                makeFakeServer)
119
 
            self.pumps.append(pump)
120
 
 
121
 
            return defer.succeed(client)
122
 
 
123
 
 
124
 
sender = Q2QAddress("sending-data.net", "sender")
125
 
receiver = Q2QAddress("receiving-data.org", "receiver")
126
 
 
127
 
class TestBase(unittest.TestCase):
128
 
    def setUp(self):
129
 
        self.realChunkSize = sigma.CHUNK_SIZE
130
 
        sigma.CHUNK_SIZE = 100
131
 
        svc = self.service = FakeQ2QService()
132
 
        fname = self.mktemp()
133
 
 
134
 
        sf = self.sfile = FilePath(fname)
135
 
        if not sf.parent().isdir():
136
 
            sf.parent().makedirs()
137
 
        sf.open('w').write(TEST_DATA)
138
 
        self.senderNexus = sigma.Nexus(svc, sender,
139
 
                                       sigma.BaseNexusUI(self.mktemp()),
140
 
                                       svc.callLater)
141
 
 
142
 
    def tearDown(self):
143
 
        self.senderNexus.stopService()
144
 
        sigma.CHUNK_SIZE = self.realChunkSize
145
 
 
146
 
 
147
 
class BasicTransferTest(TestBase):
148
 
    def setUp(self):
149
 
        TestBase.setUp(self)
150
 
        self.stoppers = []
151
 
        self.receiverNexus = sigma.Nexus(self.service, receiver,
152
 
                                         sigma.BaseNexusUI(self.mktemp()),
153
 
                                         self.service.callLater)
154
 
        self.stoppers.append(self.receiverNexus)
155
 
 
156
 
 
157
 
    def tearDown(self):
158
 
        TestBase.tearDown(self)
159
 
        for stopper in self.stoppers:
160
 
            stopper.stopService()
161
 
 
162
 
 
163
 
    def testOneSenderOneRecipient(self):
164
 
        self.senderNexus.push(self.sfile, 'TESTtoTEST', [receiver])
165
 
        self.service.flush()
166
 
        peerThingyoes = childrenOf(self.receiverNexus.ui.basepath)
167
 
        self.assertEquals(len(peerThingyoes), 1)
168
 
        rfiles = childrenOf(peerThingyoes[0])
169
 
        self.assertEquals(len(rfiles), 1)
170
 
        rfile = rfiles[0]
171
 
        rfdata = rfile.open().read()
172
 
        self.assertEquals(len(rfdata),
173
 
                          len(TEST_DATA))
174
 
        self.assertEquals(rfdata, TEST_DATA,
175
 
                          "file values unequal")
176
 
 
177
 
    def testOneSenderManyRecipients(self):
178
 
        raddresses = [Q2QAddress("receiving-data.org", "receiver%d" % (x,))
179
 
                      for x in range(10)]
180
 
 
181
 
        nexi = [sigma.Nexus(self.service,
182
 
                            radr,
183
 
                            sigma.BaseNexusUI(self.mktemp()),
184
 
                            self.service.callLater) for radr in raddresses]
185
 
 
186
 
        self.stoppers.extend(nexi)
187
 
 
188
 
        self.senderNexus.push(self.sfile, 'TESTtoTEST', raddresses)
189
 
        self.service.flush()
190
 
 
191
 
        receivedIntroductions = 0
192
 
 
193
 
        for nexium in nexi:
194
 
            receivedIntroductions += nexium.ui.receivedIntroductions
195
 
        self.failUnless(receivedIntroductions > 1)
196
 
 
197
 
        for nexium in nexi:
198
 
            peerFiles = childrenOf(nexium.ui.basepath)
199
 
            self.assertEquals(len(peerFiles), 1)
200
 
            rfiles = childrenOf(peerFiles[0])
201
 
            self.assertEquals(len(rfiles), 1, rfiles)
202
 
            rfile = rfiles[0]
203
 
            self.assertEquals(rfile.open().read(),
204
 
                              TEST_DATA,
205
 
                              "file value mismatch")
206
 
 
207
 
 
208
 
def childrenOf(x):
209
 
    # this should be a part of FilePath, but hey
210
 
    return map(x.child, x.listdir())