1
# Copyright (c) 2001-2005 Twisted Matrix Laboratories.
2
# See LICENSE for details.
5
from twisted.trial import unittest
6
from twisted.words.protocols import irc
7
from twisted.internet import protocol
12
class StringIOWithoutClosing(StringIO.StringIO):
17
"Hello, this is a nice string with no complications.",
18
"xargs%(NUL)smight%(NUL)slike%(NUL)sthis" % {'NUL': irc.NUL },
19
"embedded%(CR)snewline%(CR)s%(NL)sFUN%(NL)s" % {'CR': irc.CR,
21
"escape!%(X)s escape!%(M)s %(X)s%(X)sa %(M)s0" % {'X': irc.X_QUOTE,
26
class QuotingTest(unittest.TestCase):
27
def test_lowquoteSanity(self):
28
"""Testing client-server level quote/dequote"""
29
for s in stringSubjects:
30
self.failUnlessEqual(s, irc.lowDequote(irc.lowQuote(s)))
32
def test_ctcpquoteSanity(self):
33
"""Testing CTCP message level quote/dequote"""
34
for s in stringSubjects:
35
self.failUnlessEqual(s, irc.ctcpDequote(irc.ctcpQuote(s)))
38
class IRCClientWithoutLogin(irc.IRCClient):
42
class CTCPTest(unittest.TestCase):
44
self.file = StringIOWithoutClosing()
45
self.transport = protocol.FileWrapper(self.file)
46
self.client = IRCClientWithoutLogin()
47
self.client.makeConnection(self.transport)
49
def test_ERRMSG(self):
50
"""Testing CTCP query ERRMSG.
52
Not because this is this is an especially important case in the
53
field, but it does go through the entire dispatch/decode/encode
57
errQuery = (":nick!guy@over.there PRIVMSG #theChan :"
58
"%(X)cERRMSG t%(X)c%(EOL)s"
60
'EOL': irc.CR + irc.LF})
62
errReply = ("NOTICE nick :%(X)cERRMSG t :"
63
"No error has occoured.%(X)c%(EOL)s"
65
'EOL': irc.CR + irc.LF})
67
self.client.dataReceived(errQuery)
68
reply = self.file.getvalue()
70
self.failUnlessEqual(errReply, reply)
73
self.transport.loseConnection()
74
self.client.connectionLost()
78
class NoticingClient(object, IRCClientWithoutLogin):
81
'yourHost': ('info',),
82
'myInfo': ('servername', 'version', 'umodes', 'cmodes'),
83
'luserClient': ('info',),
85
'isupport': ('options',),
86
'luserChannels': ('channels',),
89
'receivedMOTD': ('motd',),
91
'privmsg': ('user', 'channel', 'message'),
92
'joined': ('channel',),
94
'noticed': ('user', 'channel', 'message'),
95
'modeChanged': ('user', 'channel', 'set', 'modes', 'args'),
96
'pong': ('user', 'secs'),
98
'kickedFrom': ('channel', 'kicker', 'message'),
99
'nickChanged': ('nick',),
101
'userJoined': ('user', 'channel'),
102
'userLeft': ('user', 'channel'),
103
'userKicked': ('user', 'channel', 'kicker', 'message'),
104
'action': ('user', 'channel', 'data'),
105
'topicUpdated': ('user', 'channel', 'newTopic'),
106
'userRenamed': ('oldname', 'newname')}
108
def __init__(self, *a, **kw):
109
object.__init__(self)
112
def __getattribute__(self, name):
113
if name.startswith('__') and name.endswith('__'):
114
return super(NoticingClient, self).__getattribute__(name)
116
args = super(NoticingClient, self).__getattribute__('methods')[name]
118
return super(NoticingClient, self).__getattribute__(name)
120
return self.makeMethod(name, args)
122
def makeMethod(self, fname, args):
123
def method(*a, **kw):
124
if len(a) > len(args):
125
raise TypeError("TypeError: %s() takes %d arguments "
126
"(%d given)" % (fname, len(args), len(a)))
127
for (name, value) in zip(args, a):
129
raise TypeError("TypeError: %s() got multiple values "
130
"for keyword argument '%s'" % (fname, name))
133
if len(kw) != len(args):
134
raise TypeError("TypeError: %s() takes %d arguments "
135
"(%d given)" % (fname, len(args), len(a)))
136
self.calls.append((fname, kw))
139
def pop(dict, key, default):
148
class ModeTestCase(unittest.TestCase):
150
self.file = StringIOWithoutClosing()
151
self.transport = protocol.FileWrapper(self.file)
152
self.client = NoticingClient()
153
self.client.makeConnection(self.transport)
156
self.transport.loseConnection()
157
self.client.connectionLost()
161
def testModeChange(self):
162
message = ":ChanServ!ChanServ@services. MODE #tanstaafl +o exarkun\r\n"
163
self.client.dataReceived(message)
166
[('modeChanged', {'user': "ChanServ!ChanServ@services.",
167
'channel': '#tanstaafl',
170
'args': ('exarkun',)})])
172
def _serverTestImpl(self, code, msg, func, **kw):
173
host = pop(kw, 'host', 'server.host')
174
nick = pop(kw, 'nick', 'nickname')
175
args = pop(kw, 'args', '')
184
self.client.dataReceived(message)
189
def testYourHost(self):
190
msg = "Your host is some.host[blah.blah/6667], running version server-version-3"
191
self._serverTestImpl("002", msg, "yourHost", info=msg)
193
def testCreated(self):
194
msg = "This server was cobbled together Fri Aug 13 18:00:25 UTC 2004"
195
self._serverTestImpl("003", msg, "created", when=msg)
197
def testMyInfo(self):
198
msg = "server.host server-version abcDEF bcdEHI"
199
self._serverTestImpl("004", msg, "myInfo",
200
servername="server.host",
201
version="server-version",
205
def testLuserClient(self):
206
msg = "There are 9227 victims and 9542 hiding on 24 servers"
207
self._serverTestImpl("251", msg, "luserClient",
210
def testISupport(self):
211
args = ("MODES=4 CHANLIMIT=#:20 NICKLEN=16 USERLEN=10 HOSTLEN=63 "
212
"TOPICLEN=450 KICKLEN=450 CHANNELLEN=30 KEYLEN=23 CHANTYPES=# "
213
"PREFIX=(ov)@+ CASEMAPPING=ascii CAPAB IRCD=dancer")
214
msg = "are available on this server"
215
self._serverTestImpl("005", msg, "isupport", args=args,
231
def testBounce(self):
232
msg = "Try server some.host, port 321"
233
self._serverTestImpl("005", msg, "bounce",
236
def testLuserChannels(self):
238
msg = "channels formed"
239
self._serverTestImpl("254", msg, "luserChannels", args=args,
242
def testLuserOp(self):
244
msg = "flagged staff members"
245
self._serverTestImpl("252", msg, "luserOp", args=args,
248
def testLuserMe(self):
249
msg = "I have 1937 clients and 0 servers"
250
self._serverTestImpl("255", msg, "luserMe",
255
":host.name 375 nickname :- host.name Message of the Day -",
256
":host.name 372 nickname :- Welcome to host.name",
257
":host.name 376 nickname :End of /MOTD command."]
259
self.assertEquals(self.client.calls, [])
260
self.client.dataReceived(L + '\r\n')
264
[("receivedMOTD", {"motd": ["host.name Message of the Day -", "Welcome to host.name"]})])
267
def _clientTestImpl(self, sender, group, type, msg, func, **kw):
268
ident = pop(kw, 'ident', 'ident')
269
host = pop(kw, 'host', 'host')
271
wholeUser = sender + '!' + ident + '@' + host
277
self.client.dataReceived(message)
281
self.client.calls = []
283
def testPrivmsg(self):
284
msg = "Tooty toot toot."
285
self._clientTestImpl("sender", "#group", "PRIVMSG", msg, "privmsg",
286
ident="ident", host="host",
287
# Expected results below
288
user="sender!ident@host",
292
self._clientTestImpl("sender", "recipient", "PRIVMSG", msg, "privmsg",
293
ident="ident", host="host",
294
# Expected results below
295
user="sender!ident@host",
299
class BasicServerFunctionalityTestCase(unittest.TestCase):
301
self.f = StringIOWithoutClosing()
302
self.t = protocol.FileWrapper(self.f)
304
self.p.makeConnection(self.t)
307
self.assertEquals(self.f.getvalue(), s)
309
def testPrivmsg(self):
310
self.p.privmsg("this-is-sender", "this-is-recip", "this is message")
311
self.check(":this-is-sender PRIVMSG this-is-recip :this is message\r\n")
313
def testNotice(self):
314
self.p.notice("this-is-sender", "this-is-recip", "this is notice")
315
self.check(":this-is-sender NOTICE this-is-recip :this is notice\r\n")
317
def testAction(self):
318
self.p.action("this-is-sender", "this-is-recip", "this is action")
319
self.check(":this-is-sender ACTION this-is-recip :this is action\r\n")
322
self.p.join("this-person", "#this-channel")
323
self.check(":this-person JOIN #this-channel\r\n")
326
self.p.part("this-person", "#that-channel")
327
self.check(":this-person PART #that-channel\r\n")
331
Verify that a whois by the client receives the right protocol actions
334
timestamp = int(time.time()-100)
335
hostname = self.p.hostname
336
req = 'requesting-nick'
338
self.p.whois(req, targ, 'target', 'host.com',
339
'Target User', 'irc.host.com', 'A fake server', False,
340
12, timestamp, ['#fakeusers', '#fakemisc'])
341
expected = '\r\n'.join([
342
':%(hostname)s 311 %(req)s %(targ)s target host.com * :Target User',
343
':%(hostname)s 312 %(req)s %(targ)s irc.host.com :A fake server',
344
':%(hostname)s 317 %(req)s %(targ)s 12 %(timestamp)s :seconds idle, signon time',
345
':%(hostname)s 319 %(req)s %(targ)s :#fakeusers #fakemisc',
346
':%(hostname)s 318 %(req)s %(targ)s :End of WHOIS list.',
347
'']) % dict(hostname=hostname, timestamp=timestamp, req=req, targ=targ)
351
class DummyClient(irc.IRCClient):
354
def sendLine(self, m):
358
class ClientMsgTests(unittest.TestCase):
360
self.client = DummyClient()
362
def testSingleLine(self):
363
self.client.msg('foo', 'bar')
364
self.assertEquals(self.client.lines, ['PRIVMSG foo :bar'])
366
def testDodgyMaxLength(self):
367
self.assertRaises(ValueError, self.client.msg, 'foo', 'bar', 0)
368
self.assertRaises(ValueError, self.client.msg, 'foo', 'bar', 3)
370
def testMultipleLine(self):
371
maxLen = len('PRIVMSG foo :') + 3 + 2 # 2 for line endings
372
self.client.msg('foo', 'barbazbo', maxLen)
373
self.assertEquals(self.client.lines, ['PRIVMSG foo :bar',
377
def testSufficientWidth(self):
379
maxLen = len('PRIVMSG foo :%s' % (msg,)) + 2
380
self.client.msg('foo', msg, maxLen)
381
self.assertEquals(self.client.lines, ['PRIVMSG foo :%s' % (msg,)])
382
self.client.lines = []
383
self.client.msg('foo', msg, maxLen-1)
384
self.assertEquals(2, len(self.client.lines))
385
self.client.lines = []
386
self.client.msg('foo', msg, maxLen+1)
387
self.assertEquals(1, len(self.client.lines))
389
def testSplitSanity(self):
391
self.assertRaises(ValueError, irc.split, 'foo', -1)
392
self.assertRaises(ValueError, irc.split, 'foo', 0)
393
self.assertEquals([], irc.split('', 1))
394
self.assertEquals([], irc.split(''))