~certify-web-dev/twisted/certify-trunk

« back to all changes in this revision

Viewing changes to twisted/words/test/test_msn.py

  • Committer: Bazaar Package Importer
  • Author(s): Matthias Klose
  • Date: 2007-01-17 14:52:35 UTC
  • mfrom: (1.1.5 upstream) (2.1.2 etch)
  • Revision ID: james.westby@ubuntu.com-20070117145235-btmig6qfmqfen0om
Tags: 2.5.0-0ubuntu1
New upstream version, compatible with python2.5.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2001-2005 Twisted Matrix Laboratories.
 
2
# See LICENSE for details.
 
3
 
 
4
"""
 
5
Test cases for twisted.words.protocols.msn
 
6
"""
 
7
 
 
8
# System imports
 
9
import StringIO, sys
 
10
 
 
11
# Twisted imports
 
12
 
 
13
# t.w.p.msn requires an HTTP client
 
14
try:
 
15
    # So try to get one - do it directly instead of catching an ImportError
 
16
    # from t.w.p.msn so that other problems which cause that module to fail
 
17
    # to import don't cause the tests to be skipped.
 
18
    from twisted.web import client
 
19
except ImportError:
 
20
    # If there isn't one, we're going to skip all the tests.
 
21
    msn = None
 
22
else:
 
23
    # Otherwise importing it should work, so do it.
 
24
    from twisted.words.protocols import msn
 
25
 
 
26
 
 
27
from twisted.protocols import loopback
 
28
from twisted.internet.defer import Deferred
 
29
from twisted.trial import unittest
 
30
 
 
31
def printError(f):
 
32
    print f
 
33
 
 
34
class StringIOWithoutClosing(StringIO.StringIO):
 
35
    disconnecting = 0
 
36
    def close(self): pass
 
37
    def loseConnection(self): pass
 
38
 
 
39
class PassportTests(unittest.TestCase):
 
40
 
 
41
    def setUp(self):
 
42
        self.result = []
 
43
        self.deferred = Deferred()
 
44
        self.deferred.addCallback(lambda r: self.result.append(r))
 
45
        self.deferred.addErrback(printError)
 
46
 
 
47
    def testNexus(self):
 
48
        protocol = msn.PassportNexus(self.deferred, 'https://foobar.com/somepage.quux')
 
49
        headers = {
 
50
            'Content-Length' : '0',
 
51
            'Content-Type'   : 'text/html',
 
52
            'PassportURLs'   : 'DARealm=Passport.Net,DALogin=login.myserver.com/,DAReg=reg.myserver.com'
 
53
        }
 
54
        transport = StringIOWithoutClosing()
 
55
        protocol.makeConnection(transport)
 
56
        protocol.dataReceived('HTTP/1.0 200 OK\r\n')
 
57
        for (h,v) in headers.items(): protocol.dataReceived('%s: %s\r\n' % (h,v))
 
58
        protocol.dataReceived('\r\n')
 
59
        self.failUnless(self.result[0] == "https://login.myserver.com/")
 
60
 
 
61
    def _doLoginTest(self, response, headers):
 
62
        protocol = msn.PassportLogin(self.deferred,'foo@foo.com','testpass','https://foo.com/', 'a')
 
63
        protocol.makeConnection(StringIOWithoutClosing())
 
64
        protocol.dataReceived(response)
 
65
        for (h,v) in headers.items(): protocol.dataReceived('%s: %s\r\n' % (h,v))
 
66
        protocol.dataReceived('\r\n')
 
67
 
 
68
    def testPassportLoginSuccess(self):
 
69
        headers = {
 
70
            'Content-Length'      : '0',
 
71
            'Content-Type'        : 'text/html',
 
72
            'Authentication-Info' : "Passport1.4 da-status=success,tname=MSPAuth," +
 
73
                                    "tname=MSPProf,tname=MSPSec,from-PP='somekey'," +
 
74
                                    "ru=http://messenger.msn.com"
 
75
        }
 
76
        self._doLoginTest('HTTP/1.1 200 OK\r\n', headers)
 
77
        self.failUnless(self.result[0] == (msn.LOGIN_SUCCESS, 'somekey'))
 
78
 
 
79
    def testPassportLoginFailure(self):
 
80
        headers = {
 
81
            'Content-Type'     : 'text/html',
 
82
            'WWW-Authenticate' : 'Passport1.4 da-status=failed,' +
 
83
                                 'srealm=Passport.NET,ts=-3,prompt,cburl=http://host.com,' +
 
84
                                 'cbtxt=the%20error%20message'
 
85
        }
 
86
        self._doLoginTest('HTTP/1.1 401 Unauthorized\r\n', headers)
 
87
        self.failUnless(self.result[0] == (msn.LOGIN_FAILURE, 'the error message'))
 
88
 
 
89
    def testPassportLoginRedirect(self):
 
90
        headers = {
 
91
            'Content-Type'        : 'text/html',
 
92
            'Authentication-Info' : 'Passport1.4 da-status=redir',
 
93
            'Location'            : 'https://newlogin.host.com/'
 
94
        }
 
95
        self._doLoginTest('HTTP/1.1 302 Found\r\n', headers)
 
96
        self.failUnless(self.result[0] == (msn.LOGIN_REDIRECT, 'https://newlogin.host.com/', 'a'))
 
97
 
 
98
 
 
99
if msn is not None:
 
100
    class DummySwitchboardClient(msn.SwitchboardClient):
 
101
        def userTyping(self, message):
 
102
            self.state = 'TYPING'
 
103
 
 
104
        def gotSendRequest(self, fileName, fileSize, cookie, message):
 
105
            if fileName == 'foobar.ext' and fileSize == 31337 and cookie == 1234: self.state = 'INVITATION'
 
106
 
 
107
 
 
108
    class DummyNotificationClient(msn.NotificationClient):
 
109
        def loggedIn(self, userHandle, screenName, verified):
 
110
            if userHandle == 'foo@bar.com' and screenName == 'Test Screen Name' and verified:
 
111
                self.state = 'LOGIN'
 
112
 
 
113
        def gotProfile(self, message):
 
114
            self.state = 'PROFILE'
 
115
 
 
116
        def gotContactStatus(self, code, userHandle, screenName):
 
117
            if code == msn.STATUS_AWAY and userHandle == "foo@bar.com" and screenName == "Test Screen Name":
 
118
                self.state = 'INITSTATUS'
 
119
 
 
120
        def contactStatusChanged(self, code, userHandle, screenName):
 
121
            if code == msn.STATUS_LUNCH and userHandle == "foo@bar.com" and screenName == "Test Name":
 
122
                self.state = 'NEWSTATUS'
 
123
 
 
124
        def contactOffline(self, userHandle):
 
125
            if userHandle == "foo@bar.com": self.state = 'OFFLINE'
 
126
 
 
127
        def statusChanged(self, code):
 
128
            if code == msn.STATUS_HIDDEN: self.state = 'MYSTATUS'
 
129
 
 
130
        def listSynchronized(self, *args):
 
131
            self.state = 'GOTLIST'
 
132
 
 
133
        def gotPhoneNumber(self, listVersion, userHandle, phoneType, number):
 
134
            msn.NotificationClient.gotPhoneNumber(self, listVersion, userHandle, phoneType, number)
 
135
            self.state = 'GOTPHONE'
 
136
 
 
137
        def userRemovedMe(self, userHandle, listVersion):
 
138
            msn.NotificationClient.userRemovedMe(self, userHandle, listVersion)
 
139
            c = self.factory.contacts.getContact(userHandle)
 
140
            if not c and self.factory.contacts.version == listVersion: self.state = 'USERREMOVEDME'
 
141
 
 
142
        def userAddedMe(self, userHandle, screenName, listVersion):
 
143
            msn.NotificationClient.userAddedMe(self, userHandle, screenName, listVersion)
 
144
            c = self.factory.contacts.getContact(userHandle)
 
145
            if c and (c.lists | msn.REVERSE_LIST) and (self.factory.contacts.version == listVersion) and \
 
146
               (screenName == 'Screen Name'):
 
147
                self.state = 'USERADDEDME'
 
148
 
 
149
        def gotSwitchboardInvitation(self, sessionID, host, port, key, userHandle, screenName):
 
150
            if sessionID == 1234 and \
 
151
               host == '192.168.1.1' and \
 
152
               port == 1863 and \
 
153
               key == '123.456' and \
 
154
               userHandle == 'foo@foo.com' and \
 
155
               screenName == 'Screen Name':
 
156
                self.state = 'SBINVITED'
 
157
 
 
158
class NotificationTests(unittest.TestCase):
 
159
    """ testing the various events in NotificationClient """
 
160
 
 
161
    def setUp(self):
 
162
        self.client = DummyNotificationClient()
 
163
        self.client.factory = msn.NotificationFactory()
 
164
        self.client.state = 'START'
 
165
 
 
166
    def tearDown(self):
 
167
        self.client = None
 
168
 
 
169
    def testLogin(self):
 
170
        self.client.lineReceived('USR 1 OK foo@bar.com Test%20Screen%20Name 1 0')
 
171
        self.failUnless((self.client.state == 'LOGIN'), msg='Failed to detect successful login')
 
172
 
 
173
    def testProfile(self):
 
174
        m = 'MSG Hotmail Hotmail 353\r\nMIME-Version: 1.0\r\nContent-Type: text/x-msmsgsprofile; charset=UTF-8\r\n'
 
175
        m += 'LoginTime: 1016941010\r\nEmailEnabled: 1\r\nMemberIdHigh: 40000\r\nMemberIdLow: -600000000\r\nlang_preference: 1033\r\n'
 
176
        m += 'preferredEmail: foo@bar.com\r\ncountry: AU\r\nPostalCode: 90210\r\nGender: M\r\nKid: 0\r\nAge:\r\nsid: 400\r\n'
 
177
        m += 'kv: 2\r\nMSPAuth: 2CACCBCCADMoV8ORoz64BVwmjtksIg!kmR!Rj5tBBqEaW9hc4YnPHSOQ$$\r\n\r\n'
 
178
        map(self.client.lineReceived, m.split('\r\n')[:-1])
 
179
        self.failUnless((self.client.state == 'PROFILE'), msg='Failed to detect initial profile')
 
180
 
 
181
    def testStatus(self):
 
182
        t = [('ILN 1 AWY foo@bar.com Test%20Screen%20Name 0', 'INITSTATUS', 'Failed to detect initial status report'),
 
183
             ('NLN LUN foo@bar.com Test%20Name 0', 'NEWSTATUS', 'Failed to detect contact status change'),
 
184
             ('FLN foo@bar.com', 'OFFLINE', 'Failed to detect contact signing off'),
 
185
             ('CHG 1 HDN 0', 'MYSTATUS', 'Failed to detect my status changing')]
 
186
        for i in t:
 
187
            self.client.lineReceived(i[0])
 
188
            self.failUnless((self.client.state == i[1]), msg=i[2])
 
189
 
 
190
    def testListSync(self):
 
191
        # currently this test does not take into account the fact
 
192
        # that BPRs sent as part of the SYN reply may not be interpreted
 
193
        # as such if they are for the last LST -- maybe I should
 
194
        # factor this in later.
 
195
        self.client.makeConnection(StringIOWithoutClosing())
 
196
        msn.NotificationClient.loggedIn(self.client, 'foo@foo.com', 'foobar', 1)
 
197
        lines = [
 
198
            "SYN %s 100 1 1" % self.client.currentID,
 
199
            "GTC A",
 
200
            "BLP AL",
 
201
            "LSG 0 Other%20Contacts 0",
 
202
            "LST userHandle@email.com Some%20Name 11 0"
 
203
        ]
 
204
        map(self.client.lineReceived, lines)
 
205
        contacts = self.client.factory.contacts
 
206
        contact = contacts.getContact('userHandle@email.com')
 
207
        self.failUnless(contacts.version == 100, "Invalid contact list version")
 
208
        self.failUnless(contact.screenName == 'Some Name', "Invalid screen-name for user")
 
209
        self.failUnless(contacts.groups == {0 : 'Other Contacts'}, "Did not get proper group list")
 
210
        self.failUnless(contact.groups == [0] and contact.lists == 11, "Invalid contact list/group info")
 
211
        self.failUnless(self.client.state == 'GOTLIST', "Failed to call list sync handler")
 
212
 
 
213
    def testAsyncPhoneChange(self):
 
214
        c = msn.MSNContact(userHandle='userHandle@email.com')
 
215
        self.client.factory.contacts = msn.MSNContactList()
 
216
        self.client.factory.contacts.addContact(c)
 
217
        self.client.makeConnection(StringIOWithoutClosing())
 
218
        self.client.lineReceived("BPR 101 userHandle@email.com PHH 123%20456")
 
219
        c = self.client.factory.contacts.getContact('userHandle@email.com')
 
220
        self.failUnless(self.client.state == 'GOTPHONE', "Did not fire phone change callback")
 
221
        self.failUnless(c.homePhone == '123 456', "Did not update the contact's phone number")
 
222
        self.failUnless(self.client.factory.contacts.version == 101, "Did not update list version")
 
223
 
 
224
    def testLateBPR(self):
 
225
        """
 
226
        This test makes sure that if a BPR response that was meant
 
227
        to be part of a SYN response (but came after the last LST)
 
228
        is received, the correct contact is updated and all is well
 
229
        """
 
230
        self.client.makeConnection(StringIOWithoutClosing())
 
231
        msn.NotificationClient.loggedIn(self.client, 'foo@foo.com', 'foo', 1)
 
232
        lines = [
 
233
            "SYN %s 100 1 1" % self.client.currentID,
 
234
            "GTC A",
 
235
            "BLP AL",
 
236
            "LSG 0 Other%20Contacts 0",
 
237
            "LST userHandle@email.com Some%20Name 11 0",
 
238
            "BPR PHH 123%20456"
 
239
        ]
 
240
        map(self.client.lineReceived, lines)
 
241
        contact = self.client.factory.contacts.getContact('userHandle@email.com')
 
242
        self.failUnless(contact.homePhone == '123 456', "Did not update contact's phone number")
 
243
 
 
244
    def testUserRemovedMe(self):
 
245
        self.client.factory.contacts = msn.MSNContactList()
 
246
        contact = msn.MSNContact(userHandle='foo@foo.com')
 
247
        contact.addToList(msn.REVERSE_LIST)
 
248
        self.client.factory.contacts.addContact(contact)
 
249
        self.client.lineReceived("REM 0 RL 100 foo@foo.com")
 
250
        self.failUnless(self.client.state == 'USERREMOVEDME', "Failed to remove user from reverse list")
 
251
 
 
252
    def testUserAddedMe(self):
 
253
        self.client.factory.contacts = msn.MSNContactList()
 
254
        self.client.lineReceived("ADD 0 RL 100 foo@foo.com Screen%20Name")
 
255
        self.failUnless(self.client.state == 'USERADDEDME', "Failed to add user to reverse lise")
 
256
 
 
257
    def testAsyncSwitchboardInvitation(self):
 
258
        self.client.lineReceived("RNG 1234 192.168.1.1:1863 CKI 123.456 foo@foo.com Screen%20Name")
 
259
        self.failUnless(self.client.state == "SBINVITED")
 
260
 
 
261
    def testCommandFailed(self):
 
262
        """
 
263
        Ensures that error responses from the server fires an errback with
 
264
        MSNCommandFailed.
 
265
        """
 
266
        id, d = self.client._createIDMapping()
 
267
        self.client.lineReceived("201 %s" % id)
 
268
        d = self.assertFailure(d, msn.MSNCommandFailed)
 
269
        def assertErrorCode(exception):
 
270
            self.assertEqual(201, exception.errorCode)
 
271
        return d.addCallback(assertErrorCode)
 
272
 
 
273
 
 
274
class MessageHandlingTests(unittest.TestCase):
 
275
    """ testing various message handling methods from SwichboardClient """
 
276
 
 
277
    def setUp(self):
 
278
        self.client = DummySwitchboardClient()
 
279
        self.client.state = 'START'
 
280
 
 
281
    def tearDown(self):
 
282
        self.client = None
 
283
 
 
284
    def testClientCapabilitiesCheck(self):
 
285
        m = msn.MSNMessage()
 
286
        m.setHeader('Content-Type', 'text/x-clientcaps')
 
287
        self.assertEquals(self.client.checkMessage(m), 0, 'Failed to detect client capability message')
 
288
        
 
289
    def testTypingCheck(self):
 
290
        m = msn.MSNMessage()
 
291
        m.setHeader('Content-Type', 'text/x-msmsgscontrol')
 
292
        m.setHeader('TypingUser', 'foo@bar')
 
293
        self.client.checkMessage(m)
 
294
        self.failUnless((self.client.state == 'TYPING'), msg='Failed to detect typing notification')
 
295
 
 
296
    def testFileInvitation(self, lazyClient=False):
 
297
        m = msn.MSNMessage()
 
298
        m.setHeader('Content-Type', 'text/x-msmsgsinvite; charset=UTF-8')
 
299
        m.message += 'Application-Name: File Transfer\r\n'
 
300
        if not lazyClient:
 
301
            m.message += 'Application-GUID: {5D3E02AB-6190-11d3-BBBB-00C04F795683}\r\n'
 
302
        m.message += 'Invitation-Command: Invite\r\n'
 
303
        m.message += 'Invitation-Cookie: 1234\r\n'
 
304
        m.message += 'Application-File: foobar.ext\r\n'
 
305
        m.message += 'Application-FileSize: 31337\r\n\r\n'
 
306
        self.client.checkMessage(m)
 
307
        self.failUnless((self.client.state == 'INVITATION'), msg='Failed to detect file transfer invitation')
 
308
 
 
309
    def testFileInvitationMissingGUID(self):
 
310
        return self.testFileInvitation(True)
 
311
 
 
312
    def testFileResponse(self):
 
313
        d = Deferred()
 
314
        d.addCallback(self.fileResponse)
 
315
        self.client.cookies['iCookies'][1234] = (d, None)
 
316
        m = msn.MSNMessage()
 
317
        m.setHeader('Content-Type', 'text/x-msmsgsinvite; charset=UTF-8')
 
318
        m.message += 'Invitation-Command: ACCEPT\r\n'
 
319
        m.message += 'Invitation-Cookie: 1234\r\n\r\n'
 
320
        self.client.checkMessage(m)
 
321
        self.failUnless((self.client.state == 'RESPONSE'), msg='Failed to detect file transfer response')
 
322
 
 
323
    def testFileInfo(self):
 
324
        d = Deferred()
 
325
        d.addCallback(self.fileInfo)
 
326
        self.client.cookies['external'][1234] = (d, None)
 
327
        m = msn.MSNMessage()
 
328
        m.setHeader('Content-Type', 'text/x-msmsgsinvite; charset=UTF-8')
 
329
        m.message += 'Invitation-Command: ACCEPT\r\n'
 
330
        m.message += 'Invitation-Cookie: 1234\r\n'
 
331
        m.message += 'IP-Address: 192.168.0.1\r\n'
 
332
        m.message += 'Port: 6891\r\n'
 
333
        m.message += 'AuthCookie: 4321\r\n\r\n'
 
334
        self.client.checkMessage(m)
 
335
        self.failUnless((self.client.state == 'INFO'), msg='Failed to detect file transfer info')
 
336
 
 
337
    def fileResponse(self, (accept, cookie, info)):
 
338
        if accept and cookie == 1234: self.client.state = 'RESPONSE'
 
339
 
 
340
    def fileInfo(self, (accept, ip, port, aCookie, info)):
 
341
        if accept and ip == '192.168.0.1' and port == 6891 and aCookie == 4321: self.client.state = 'INFO'
 
342
 
 
343
 
 
344
class FileTransferTestCase(unittest.TestCase):
 
345
    """ test FileSend against FileReceive """
 
346
 
 
347
    def setUp(self):
 
348
        self.input = StringIOWithoutClosing()
 
349
        self.input.writelines(['a'] * 7000)
 
350
        self.input.seek(0)
 
351
        self.output = StringIOWithoutClosing()
 
352
 
 
353
    def tearDown(self):
 
354
        self.input = None
 
355
        self.output = None
 
356
 
 
357
    def testFileTransfer(self):
 
358
        auth = 1234
 
359
        sender = msn.FileSend(self.input)
 
360
        sender.auth = auth
 
361
        sender.fileSize = 7000
 
362
        client = msn.FileReceive(auth, "foo@bar.com", self.output)
 
363
        client.fileSize = 7000
 
364
        def check(ignored):
 
365
            self.failUnless((client.completed and sender.completed),
 
366
                            msg="send failed to complete")
 
367
            self.failUnless((self.input.getvalue() == self.output.getvalue()),
 
368
                            msg="saved file does not match original")
 
369
        d = loopback.loopbackAsync(sender, client)
 
370
        d.addCallback(check)
 
371
        return d
 
372
 
 
373
 
 
374
if msn is None:
 
375
    for testClass in [PassportTests, NotificationTests,
 
376
                      MessageHandlingTests, FileTransferTestCase]:
 
377
        testClass.skip = (
 
378
            "MSN requires an HTTP client but none is available, "
 
379
            "skipping tests.")