1
# Copyright (c) 2001-2005 Twisted Matrix Laboratories.
2
# See LICENSE for details.
5
Test cases for twisted.words.protocols.msn
13
# t.w.p.msn requires an HTTP client
15
# So try to get one - do it directly instead of catching an ImportError
16
# from t.w.p.msn so that other problems which cause that module to fail
17
# to import don't cause the tests to be skipped.
18
from twisted.web import client
20
# If there isn't one, we're going to skip all the tests.
23
# Otherwise importing it should work, so do it.
24
from twisted.words.protocols import msn
27
from twisted.protocols import loopback
28
from twisted.internet.defer import Deferred
29
from twisted.trial import unittest
34
class StringIOWithoutClosing(StringIO.StringIO):
37
def loseConnection(self): pass
39
class PassportTests(unittest.TestCase):
43
self.deferred = Deferred()
44
self.deferred.addCallback(lambda r: self.result.append(r))
45
self.deferred.addErrback(printError)
48
protocol = msn.PassportNexus(self.deferred, 'https://foobar.com/somepage.quux')
50
'Content-Length' : '0',
51
'Content-Type' : 'text/html',
52
'PassportURLs' : 'DARealm=Passport.Net,DALogin=login.myserver.com/,DAReg=reg.myserver.com'
54
transport = StringIOWithoutClosing()
55
protocol.makeConnection(transport)
56
protocol.dataReceived('HTTP/1.0 200 OK\r\n')
57
for (h,v) in headers.items(): protocol.dataReceived('%s: %s\r\n' % (h,v))
58
protocol.dataReceived('\r\n')
59
self.failUnless(self.result[0] == "https://login.myserver.com/")
61
def _doLoginTest(self, response, headers):
62
protocol = msn.PassportLogin(self.deferred,'foo@foo.com','testpass','https://foo.com/', 'a')
63
protocol.makeConnection(StringIOWithoutClosing())
64
protocol.dataReceived(response)
65
for (h,v) in headers.items(): protocol.dataReceived('%s: %s\r\n' % (h,v))
66
protocol.dataReceived('\r\n')
68
def testPassportLoginSuccess(self):
70
'Content-Length' : '0',
71
'Content-Type' : 'text/html',
72
'Authentication-Info' : "Passport1.4 da-status=success,tname=MSPAuth," +
73
"tname=MSPProf,tname=MSPSec,from-PP='somekey'," +
74
"ru=http://messenger.msn.com"
76
self._doLoginTest('HTTP/1.1 200 OK\r\n', headers)
77
self.failUnless(self.result[0] == (msn.LOGIN_SUCCESS, 'somekey'))
79
def testPassportLoginFailure(self):
81
'Content-Type' : 'text/html',
82
'WWW-Authenticate' : 'Passport1.4 da-status=failed,' +
83
'srealm=Passport.NET,ts=-3,prompt,cburl=http://host.com,' +
84
'cbtxt=the%20error%20message'
86
self._doLoginTest('HTTP/1.1 401 Unauthorized\r\n', headers)
87
self.failUnless(self.result[0] == (msn.LOGIN_FAILURE, 'the error message'))
89
def testPassportLoginRedirect(self):
91
'Content-Type' : 'text/html',
92
'Authentication-Info' : 'Passport1.4 da-status=redir',
93
'Location' : 'https://newlogin.host.com/'
95
self._doLoginTest('HTTP/1.1 302 Found\r\n', headers)
96
self.failUnless(self.result[0] == (msn.LOGIN_REDIRECT, 'https://newlogin.host.com/', 'a'))
100
class DummySwitchboardClient(msn.SwitchboardClient):
101
def userTyping(self, message):
102
self.state = 'TYPING'
104
def gotSendRequest(self, fileName, fileSize, cookie, message):
105
if fileName == 'foobar.ext' and fileSize == 31337 and cookie == 1234: self.state = 'INVITATION'
108
class DummyNotificationClient(msn.NotificationClient):
109
def loggedIn(self, userHandle, screenName, verified):
110
if userHandle == 'foo@bar.com' and screenName == 'Test Screen Name' and verified:
113
def gotProfile(self, message):
114
self.state = 'PROFILE'
116
def gotContactStatus(self, code, userHandle, screenName):
117
if code == msn.STATUS_AWAY and userHandle == "foo@bar.com" and screenName == "Test Screen Name":
118
self.state = 'INITSTATUS'
120
def contactStatusChanged(self, code, userHandle, screenName):
121
if code == msn.STATUS_LUNCH and userHandle == "foo@bar.com" and screenName == "Test Name":
122
self.state = 'NEWSTATUS'
124
def contactOffline(self, userHandle):
125
if userHandle == "foo@bar.com": self.state = 'OFFLINE'
127
def statusChanged(self, code):
128
if code == msn.STATUS_HIDDEN: self.state = 'MYSTATUS'
130
def listSynchronized(self, *args):
131
self.state = 'GOTLIST'
133
def gotPhoneNumber(self, listVersion, userHandle, phoneType, number):
134
msn.NotificationClient.gotPhoneNumber(self, listVersion, userHandle, phoneType, number)
135
self.state = 'GOTPHONE'
137
def userRemovedMe(self, userHandle, listVersion):
138
msn.NotificationClient.userRemovedMe(self, userHandle, listVersion)
139
c = self.factory.contacts.getContact(userHandle)
140
if not c and self.factory.contacts.version == listVersion: self.state = 'USERREMOVEDME'
142
def userAddedMe(self, userHandle, screenName, listVersion):
143
msn.NotificationClient.userAddedMe(self, userHandle, screenName, listVersion)
144
c = self.factory.contacts.getContact(userHandle)
145
if c and (c.lists | msn.REVERSE_LIST) and (self.factory.contacts.version == listVersion) and \
146
(screenName == 'Screen Name'):
147
self.state = 'USERADDEDME'
149
def gotSwitchboardInvitation(self, sessionID, host, port, key, userHandle, screenName):
150
if sessionID == 1234 and \
151
host == '192.168.1.1' and \
153
key == '123.456' and \
154
userHandle == 'foo@foo.com' and \
155
screenName == 'Screen Name':
156
self.state = 'SBINVITED'
158
class NotificationTests(unittest.TestCase):
159
""" testing the various events in NotificationClient """
162
self.client = DummyNotificationClient()
163
self.client.factory = msn.NotificationFactory()
164
self.client.state = 'START'
170
self.client.lineReceived('USR 1 OK foo@bar.com Test%20Screen%20Name 1 0')
171
self.failUnless((self.client.state == 'LOGIN'), msg='Failed to detect successful login')
173
def testProfile(self):
174
m = 'MSG Hotmail Hotmail 353\r\nMIME-Version: 1.0\r\nContent-Type: text/x-msmsgsprofile; charset=UTF-8\r\n'
175
m += 'LoginTime: 1016941010\r\nEmailEnabled: 1\r\nMemberIdHigh: 40000\r\nMemberIdLow: -600000000\r\nlang_preference: 1033\r\n'
176
m += 'preferredEmail: foo@bar.com\r\ncountry: AU\r\nPostalCode: 90210\r\nGender: M\r\nKid: 0\r\nAge:\r\nsid: 400\r\n'
177
m += 'kv: 2\r\nMSPAuth: 2CACCBCCADMoV8ORoz64BVwmjtksIg!kmR!Rj5tBBqEaW9hc4YnPHSOQ$$\r\n\r\n'
178
map(self.client.lineReceived, m.split('\r\n')[:-1])
179
self.failUnless((self.client.state == 'PROFILE'), msg='Failed to detect initial profile')
181
def testStatus(self):
182
t = [('ILN 1 AWY foo@bar.com Test%20Screen%20Name 0', 'INITSTATUS', 'Failed to detect initial status report'),
183
('NLN LUN foo@bar.com Test%20Name 0', 'NEWSTATUS', 'Failed to detect contact status change'),
184
('FLN foo@bar.com', 'OFFLINE', 'Failed to detect contact signing off'),
185
('CHG 1 HDN 0', 'MYSTATUS', 'Failed to detect my status changing')]
187
self.client.lineReceived(i[0])
188
self.failUnless((self.client.state == i[1]), msg=i[2])
190
def testListSync(self):
191
# currently this test does not take into account the fact
192
# that BPRs sent as part of the SYN reply may not be interpreted
193
# as such if they are for the last LST -- maybe I should
194
# factor this in later.
195
self.client.makeConnection(StringIOWithoutClosing())
196
msn.NotificationClient.loggedIn(self.client, 'foo@foo.com', 'foobar', 1)
198
"SYN %s 100 1 1" % self.client.currentID,
201
"LSG 0 Other%20Contacts 0",
202
"LST userHandle@email.com Some%20Name 11 0"
204
map(self.client.lineReceived, lines)
205
contacts = self.client.factory.contacts
206
contact = contacts.getContact('userHandle@email.com')
207
self.failUnless(contacts.version == 100, "Invalid contact list version")
208
self.failUnless(contact.screenName == 'Some Name', "Invalid screen-name for user")
209
self.failUnless(contacts.groups == {0 : 'Other Contacts'}, "Did not get proper group list")
210
self.failUnless(contact.groups == [0] and contact.lists == 11, "Invalid contact list/group info")
211
self.failUnless(self.client.state == 'GOTLIST', "Failed to call list sync handler")
213
def testAsyncPhoneChange(self):
214
c = msn.MSNContact(userHandle='userHandle@email.com')
215
self.client.factory.contacts = msn.MSNContactList()
216
self.client.factory.contacts.addContact(c)
217
self.client.makeConnection(StringIOWithoutClosing())
218
self.client.lineReceived("BPR 101 userHandle@email.com PHH 123%20456")
219
c = self.client.factory.contacts.getContact('userHandle@email.com')
220
self.failUnless(self.client.state == 'GOTPHONE', "Did not fire phone change callback")
221
self.failUnless(c.homePhone == '123 456', "Did not update the contact's phone number")
222
self.failUnless(self.client.factory.contacts.version == 101, "Did not update list version")
224
def testLateBPR(self):
226
This test makes sure that if a BPR response that was meant
227
to be part of a SYN response (but came after the last LST)
228
is received, the correct contact is updated and all is well
230
self.client.makeConnection(StringIOWithoutClosing())
231
msn.NotificationClient.loggedIn(self.client, 'foo@foo.com', 'foo', 1)
233
"SYN %s 100 1 1" % self.client.currentID,
236
"LSG 0 Other%20Contacts 0",
237
"LST userHandle@email.com Some%20Name 11 0",
240
map(self.client.lineReceived, lines)
241
contact = self.client.factory.contacts.getContact('userHandle@email.com')
242
self.failUnless(contact.homePhone == '123 456', "Did not update contact's phone number")
244
def testUserRemovedMe(self):
245
self.client.factory.contacts = msn.MSNContactList()
246
contact = msn.MSNContact(userHandle='foo@foo.com')
247
contact.addToList(msn.REVERSE_LIST)
248
self.client.factory.contacts.addContact(contact)
249
self.client.lineReceived("REM 0 RL 100 foo@foo.com")
250
self.failUnless(self.client.state == 'USERREMOVEDME', "Failed to remove user from reverse list")
252
def testUserAddedMe(self):
253
self.client.factory.contacts = msn.MSNContactList()
254
self.client.lineReceived("ADD 0 RL 100 foo@foo.com Screen%20Name")
255
self.failUnless(self.client.state == 'USERADDEDME', "Failed to add user to reverse lise")
257
def testAsyncSwitchboardInvitation(self):
258
self.client.lineReceived("RNG 1234 192.168.1.1:1863 CKI 123.456 foo@foo.com Screen%20Name")
259
self.failUnless(self.client.state == "SBINVITED")
261
def testCommandFailed(self):
263
Ensures that error responses from the server fires an errback with
266
id, d = self.client._createIDMapping()
267
self.client.lineReceived("201 %s" % id)
268
d = self.assertFailure(d, msn.MSNCommandFailed)
269
def assertErrorCode(exception):
270
self.assertEqual(201, exception.errorCode)
271
return d.addCallback(assertErrorCode)
274
class MessageHandlingTests(unittest.TestCase):
275
""" testing various message handling methods from SwichboardClient """
278
self.client = DummySwitchboardClient()
279
self.client.state = 'START'
284
def testClientCapabilitiesCheck(self):
286
m.setHeader('Content-Type', 'text/x-clientcaps')
287
self.assertEquals(self.client.checkMessage(m), 0, 'Failed to detect client capability message')
289
def testTypingCheck(self):
291
m.setHeader('Content-Type', 'text/x-msmsgscontrol')
292
m.setHeader('TypingUser', 'foo@bar')
293
self.client.checkMessage(m)
294
self.failUnless((self.client.state == 'TYPING'), msg='Failed to detect typing notification')
296
def testFileInvitation(self, lazyClient=False):
298
m.setHeader('Content-Type', 'text/x-msmsgsinvite; charset=UTF-8')
299
m.message += 'Application-Name: File Transfer\r\n'
301
m.message += 'Application-GUID: {5D3E02AB-6190-11d3-BBBB-00C04F795683}\r\n'
302
m.message += 'Invitation-Command: Invite\r\n'
303
m.message += 'Invitation-Cookie: 1234\r\n'
304
m.message += 'Application-File: foobar.ext\r\n'
305
m.message += 'Application-FileSize: 31337\r\n\r\n'
306
self.client.checkMessage(m)
307
self.failUnless((self.client.state == 'INVITATION'), msg='Failed to detect file transfer invitation')
309
def testFileInvitationMissingGUID(self):
310
return self.testFileInvitation(True)
312
def testFileResponse(self):
314
d.addCallback(self.fileResponse)
315
self.client.cookies['iCookies'][1234] = (d, None)
317
m.setHeader('Content-Type', 'text/x-msmsgsinvite; charset=UTF-8')
318
m.message += 'Invitation-Command: ACCEPT\r\n'
319
m.message += 'Invitation-Cookie: 1234\r\n\r\n'
320
self.client.checkMessage(m)
321
self.failUnless((self.client.state == 'RESPONSE'), msg='Failed to detect file transfer response')
323
def testFileInfo(self):
325
d.addCallback(self.fileInfo)
326
self.client.cookies['external'][1234] = (d, None)
328
m.setHeader('Content-Type', 'text/x-msmsgsinvite; charset=UTF-8')
329
m.message += 'Invitation-Command: ACCEPT\r\n'
330
m.message += 'Invitation-Cookie: 1234\r\n'
331
m.message += 'IP-Address: 192.168.0.1\r\n'
332
m.message += 'Port: 6891\r\n'
333
m.message += 'AuthCookie: 4321\r\n\r\n'
334
self.client.checkMessage(m)
335
self.failUnless((self.client.state == 'INFO'), msg='Failed to detect file transfer info')
337
def fileResponse(self, (accept, cookie, info)):
338
if accept and cookie == 1234: self.client.state = 'RESPONSE'
340
def fileInfo(self, (accept, ip, port, aCookie, info)):
341
if accept and ip == '192.168.0.1' and port == 6891 and aCookie == 4321: self.client.state = 'INFO'
344
class FileTransferTestCase(unittest.TestCase):
345
""" test FileSend against FileReceive """
348
self.input = StringIOWithoutClosing()
349
self.input.writelines(['a'] * 7000)
351
self.output = StringIOWithoutClosing()
357
def testFileTransfer(self):
359
sender = msn.FileSend(self.input)
361
sender.fileSize = 7000
362
client = msn.FileReceive(auth, "foo@bar.com", self.output)
363
client.fileSize = 7000
365
self.failUnless((client.completed and sender.completed),
366
msg="send failed to complete")
367
self.failUnless((self.input.getvalue() == self.output.getvalue()),
368
msg="saved file does not match original")
369
d = loopback.loopbackAsync(sender, client)
375
for testClass in [PassportTests, NotificationTests,
376
MessageHandlingTests, FileTransferTestCase]:
378
"MSN requires an HTTP client but none is available, "