~landscape/zope3/newer-from-ztk

« back to all changes in this revision

Viewing changes to src/twisted/words/test/test_irc.py

  • Committer: Thomas Hervé
  • Date: 2009-07-08 13:52:04 UTC
  • Revision ID: thomas@canonical.com-20090708135204-df5eesrthifpylf8
Remove twisted copy

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (c) 2001-2005 Twisted Matrix Laboratories.
2
 
# See LICENSE for details.
3
 
 
4
 
 
5
 
from twisted.trial import unittest
6
 
from twisted.words.protocols import irc
7
 
from twisted.internet import protocol
8
 
import StringIO
9
 
import time
10
 
 
11
 
 
12
 
class StringIOWithoutClosing(StringIO.StringIO):
13
 
    def close(self):
14
 
        pass
15
 
 
16
 
stringSubjects = [
17
 
    "Hello, this is a nice string with no complications.",
18
 
    "xargs%(NUL)smight%(NUL)slike%(NUL)sthis" % {'NUL': irc.NUL },
19
 
    "embedded%(CR)snewline%(CR)s%(NL)sFUN%(NL)s" % {'CR': irc.CR,
20
 
                                                    'NL': irc.NL},
21
 
    "escape!%(X)s escape!%(M)s %(X)s%(X)sa %(M)s0" % {'X': irc.X_QUOTE,
22
 
                                                      'M': irc.M_QUOTE}
23
 
    ]
24
 
 
25
 
 
26
 
class QuotingTest(unittest.TestCase):
27
 
    def test_lowquoteSanity(self):
28
 
        """Testing client-server level quote/dequote"""
29
 
        for s in stringSubjects:
30
 
            self.failUnlessEqual(s, irc.lowDequote(irc.lowQuote(s)))
31
 
 
32
 
    def test_ctcpquoteSanity(self):
33
 
        """Testing CTCP message level quote/dequote"""
34
 
        for s in stringSubjects:
35
 
            self.failUnlessEqual(s, irc.ctcpDequote(irc.ctcpQuote(s)))
36
 
 
37
 
 
38
 
class IRCClientWithoutLogin(irc.IRCClient):
39
 
    performLogin = 0
40
 
 
41
 
 
42
 
class CTCPTest(unittest.TestCase):
43
 
    def setUp(self):
44
 
        self.file = StringIOWithoutClosing()
45
 
        self.transport = protocol.FileWrapper(self.file)
46
 
        self.client = IRCClientWithoutLogin()
47
 
        self.client.makeConnection(self.transport)
48
 
 
49
 
    def test_ERRMSG(self):
50
 
        """Testing CTCP query ERRMSG.
51
 
 
52
 
        Not because this is this is an especially important case in the
53
 
        field, but it does go through the entire dispatch/decode/encode
54
 
        process.
55
 
        """
56
 
 
57
 
        errQuery = (":nick!guy@over.there PRIVMSG #theChan :"
58
 
                    "%(X)cERRMSG t%(X)c%(EOL)s"
59
 
                    % {'X': irc.X_DELIM,
60
 
                       'EOL': irc.CR + irc.LF})
61
 
 
62
 
        errReply = ("NOTICE nick :%(X)cERRMSG t :"
63
 
                    "No error has occoured.%(X)c%(EOL)s"
64
 
                    % {'X': irc.X_DELIM,
65
 
                       'EOL': irc.CR + irc.LF})
66
 
 
67
 
        self.client.dataReceived(errQuery)
68
 
        reply = self.file.getvalue()
69
 
 
70
 
        self.failUnlessEqual(errReply, reply)
71
 
 
72
 
    def tearDown(self):
73
 
        self.transport.loseConnection()
74
 
        self.client.connectionLost()
75
 
        del self.client
76
 
        del self.transport
77
 
 
78
 
class NoticingClient(object, IRCClientWithoutLogin):
79
 
    methods = {
80
 
        'created': ('when',),
81
 
        'yourHost': ('info',),
82
 
        'myInfo': ('servername', 'version', 'umodes', 'cmodes'),
83
 
        'luserClient': ('info',),
84
 
        'bounce': ('info',),
85
 
        'isupport': ('options',),
86
 
        'luserChannels': ('channels',),
87
 
        'luserOp': ('ops',),
88
 
        'luserMe': ('info',),
89
 
        'receivedMOTD': ('motd',),
90
 
 
91
 
        'privmsg': ('user', 'channel', 'message'),
92
 
        'joined': ('channel',),
93
 
        'left': ('channel',),
94
 
        'noticed': ('user', 'channel', 'message'),
95
 
        'modeChanged': ('user', 'channel', 'set', 'modes', 'args'),
96
 
        'pong': ('user', 'secs'),
97
 
        'signedOn': (),
98
 
        'kickedFrom': ('channel', 'kicker', 'message'),
99
 
        'nickChanged': ('nick',),
100
 
 
101
 
        'userJoined': ('user', 'channel'),
102
 
        'userLeft': ('user', 'channel'),
103
 
        'userKicked': ('user', 'channel', 'kicker', 'message'),
104
 
        'action': ('user', 'channel', 'data'),
105
 
        'topicUpdated': ('user', 'channel', 'newTopic'),
106
 
        'userRenamed': ('oldname', 'newname')}
107
 
 
108
 
    def __init__(self, *a, **kw):
109
 
        object.__init__(self)
110
 
        self.calls = []
111
 
 
112
 
    def __getattribute__(self, name):
113
 
        if name.startswith('__') and name.endswith('__'):
114
 
            return super(NoticingClient, self).__getattribute__(name)
115
 
        try:
116
 
            args = super(NoticingClient, self).__getattribute__('methods')[name]
117
 
        except KeyError:
118
 
            return super(NoticingClient, self).__getattribute__(name)
119
 
        else:
120
 
            return self.makeMethod(name, args)
121
 
 
122
 
    def makeMethod(self, fname, args):
123
 
        def method(*a, **kw):
124
 
            if len(a) > len(args):
125
 
                raise TypeError("TypeError: %s() takes %d arguments "
126
 
                                "(%d given)" % (fname, len(args), len(a)))
127
 
            for (name, value) in zip(args, a):
128
 
                if name in kw:
129
 
                    raise TypeError("TypeError: %s() got multiple values "
130
 
                                    "for keyword argument '%s'" % (fname, name))
131
 
                else:
132
 
                    kw[name] = value
133
 
            if len(kw) != len(args):
134
 
                raise TypeError("TypeError: %s() takes %d arguments "
135
 
                                "(%d given)" % (fname, len(args), len(a)))
136
 
            self.calls.append((fname, kw))
137
 
        return method
138
 
 
139
 
def pop(dict, key, default):
140
 
    try:
141
 
        value = dict[key]
142
 
    except KeyError:
143
 
        return default
144
 
    else:
145
 
        del dict[key]
146
 
        return value
147
 
 
148
 
class ModeTestCase(unittest.TestCase):
149
 
    def setUp(self):
150
 
        self.file = StringIOWithoutClosing()
151
 
        self.transport = protocol.FileWrapper(self.file)
152
 
        self.client = NoticingClient()
153
 
        self.client.makeConnection(self.transport)
154
 
 
155
 
    def tearDown(self):
156
 
        self.transport.loseConnection()
157
 
        self.client.connectionLost()
158
 
        del self.client
159
 
        del self.transport
160
 
 
161
 
    def testModeChange(self):
162
 
        message = ":ChanServ!ChanServ@services. MODE #tanstaafl +o exarkun\r\n"
163
 
        self.client.dataReceived(message)
164
 
        self.assertEquals(
165
 
            self.client.calls,
166
 
            [('modeChanged', {'user': "ChanServ!ChanServ@services.",
167
 
                              'channel': '#tanstaafl',
168
 
                              'set': True,
169
 
                              'modes': 'o',
170
 
                              'args': ('exarkun',)})])
171
 
 
172
 
    def _serverTestImpl(self, code, msg, func, **kw):
173
 
        host = pop(kw, 'host', 'server.host')
174
 
        nick = pop(kw, 'nick', 'nickname')
175
 
        args = pop(kw, 'args', '')
176
 
 
177
 
        message = (":" +
178
 
                   host + " " +
179
 
                   code + " " +
180
 
                   nick + " " +
181
 
                   args + " :" +
182
 
                   msg + "\r\n")
183
 
 
184
 
        self.client.dataReceived(message)
185
 
        self.assertEquals(
186
 
            self.client.calls,
187
 
            [(func, kw)])
188
 
 
189
 
    def testYourHost(self):
190
 
        msg = "Your host is some.host[blah.blah/6667], running version server-version-3"
191
 
        self._serverTestImpl("002", msg, "yourHost", info=msg)
192
 
 
193
 
    def testCreated(self):
194
 
        msg = "This server was cobbled together Fri Aug 13 18:00:25 UTC 2004"
195
 
        self._serverTestImpl("003", msg, "created", when=msg)
196
 
 
197
 
    def testMyInfo(self):
198
 
        msg = "server.host server-version abcDEF bcdEHI"
199
 
        self._serverTestImpl("004", msg, "myInfo",
200
 
                             servername="server.host",
201
 
                             version="server-version",
202
 
                             umodes="abcDEF",
203
 
                             cmodes="bcdEHI")
204
 
 
205
 
    def testLuserClient(self):
206
 
        msg = "There are 9227 victims and 9542 hiding on 24 servers"
207
 
        self._serverTestImpl("251", msg, "luserClient",
208
 
                             info=msg)
209
 
 
210
 
    def testISupport(self):
211
 
        args = ("MODES=4 CHANLIMIT=#:20 NICKLEN=16 USERLEN=10 HOSTLEN=63 "
212
 
                "TOPICLEN=450 KICKLEN=450 CHANNELLEN=30 KEYLEN=23 CHANTYPES=# "
213
 
                "PREFIX=(ov)@+ CASEMAPPING=ascii CAPAB IRCD=dancer")
214
 
        msg = "are available on this server"
215
 
        self._serverTestImpl("005", msg, "isupport", args=args,
216
 
                             options=['MODES=4',
217
 
                                      'CHANLIMIT=#:20',
218
 
                                      'NICKLEN=16',
219
 
                                      'USERLEN=10',
220
 
                                      'HOSTLEN=63',
221
 
                                      'TOPICLEN=450',
222
 
                                      'KICKLEN=450',
223
 
                                      'CHANNELLEN=30',
224
 
                                      'KEYLEN=23',
225
 
                                      'CHANTYPES=#',
226
 
                                      'PREFIX=(ov)@+',
227
 
                                      'CASEMAPPING=ascii',
228
 
                                      'CAPAB',
229
 
                                      'IRCD=dancer'])
230
 
 
231
 
    def testBounce(self):
232
 
        msg = "Try server some.host, port 321"
233
 
        self._serverTestImpl("005", msg, "bounce",
234
 
                             info=msg)
235
 
 
236
 
    def testLuserChannels(self):
237
 
        args = "7116"
238
 
        msg = "channels formed"
239
 
        self._serverTestImpl("254", msg, "luserChannels", args=args,
240
 
                             channels=int(args))
241
 
 
242
 
    def testLuserOp(self):
243
 
        args = "34"
244
 
        msg = "flagged staff members"
245
 
        self._serverTestImpl("252", msg, "luserOp", args=args,
246
 
                             ops=int(args))
247
 
 
248
 
    def testLuserMe(self):
249
 
        msg = "I have 1937 clients and 0 servers"
250
 
        self._serverTestImpl("255", msg, "luserMe",
251
 
                             info=msg)
252
 
 
253
 
    def testMOTD(self):
254
 
        lines = [
255
 
            ":host.name 375 nickname :- host.name Message of the Day -",
256
 
            ":host.name 372 nickname :- Welcome to host.name",
257
 
            ":host.name 376 nickname :End of /MOTD command."]
258
 
        for L in lines:
259
 
            self.assertEquals(self.client.calls, [])
260
 
            self.client.dataReceived(L + '\r\n')
261
 
 
262
 
        self.assertEquals(
263
 
            self.client.calls,
264
 
            [("receivedMOTD", {"motd": ["host.name Message of the Day -", "Welcome to host.name"]})])
265
 
 
266
 
 
267
 
    def _clientTestImpl(self, sender, group, type, msg, func, **kw):
268
 
        ident = pop(kw, 'ident', 'ident')
269
 
        host = pop(kw, 'host', 'host')
270
 
 
271
 
        wholeUser = sender + '!' + ident + '@' + host
272
 
        message = (":" +
273
 
                   wholeUser + " " +
274
 
                   type + " " +
275
 
                   group + " :" +
276
 
                   msg + "\r\n")
277
 
        self.client.dataReceived(message)
278
 
        self.assertEquals(
279
 
            self.client.calls,
280
 
            [(func, kw)])
281
 
        self.client.calls = []
282
 
 
283
 
    def testPrivmsg(self):
284
 
        msg = "Tooty toot toot."
285
 
        self._clientTestImpl("sender", "#group", "PRIVMSG", msg, "privmsg",
286
 
                             ident="ident", host="host",
287
 
                             # Expected results below
288
 
                             user="sender!ident@host",
289
 
                             channel="#group",
290
 
                             message=msg)
291
 
 
292
 
        self._clientTestImpl("sender", "recipient", "PRIVMSG", msg, "privmsg",
293
 
                             ident="ident", host="host",
294
 
                             # Expected results below
295
 
                             user="sender!ident@host",
296
 
                             channel="recipient",
297
 
                             message=msg)
298
 
 
299
 
class BasicServerFunctionalityTestCase(unittest.TestCase):
300
 
    def setUp(self):
301
 
        self.f = StringIOWithoutClosing()
302
 
        self.t = protocol.FileWrapper(self.f)
303
 
        self.p = irc.IRC()
304
 
        self.p.makeConnection(self.t)
305
 
 
306
 
    def check(self, s):
307
 
        self.assertEquals(self.f.getvalue(), s)
308
 
 
309
 
    def testPrivmsg(self):
310
 
        self.p.privmsg("this-is-sender", "this-is-recip", "this is message")
311
 
        self.check(":this-is-sender PRIVMSG this-is-recip :this is message\r\n")
312
 
 
313
 
    def testNotice(self):
314
 
        self.p.notice("this-is-sender", "this-is-recip", "this is notice")
315
 
        self.check(":this-is-sender NOTICE this-is-recip :this is notice\r\n")
316
 
 
317
 
    def testAction(self):
318
 
        self.p.action("this-is-sender", "this-is-recip", "this is action")
319
 
        self.check(":this-is-sender ACTION this-is-recip :this is action\r\n")
320
 
 
321
 
    def testJoin(self):
322
 
        self.p.join("this-person", "#this-channel")
323
 
        self.check(":this-person JOIN #this-channel\r\n")
324
 
 
325
 
    def testPart(self):
326
 
        self.p.part("this-person", "#that-channel")
327
 
        self.check(":this-person PART #that-channel\r\n")
328
 
 
329
 
    def testWhois(self):
330
 
        """
331
 
        Verify that a whois by the client receives the right protocol actions
332
 
        from the server.
333
 
        """
334
 
        timestamp = int(time.time()-100)
335
 
        hostname = self.p.hostname
336
 
        req = 'requesting-nick'
337
 
        targ = 'target-nick'
338
 
        self.p.whois(req, targ, 'target', 'host.com', 
339
 
                'Target User', 'irc.host.com', 'A fake server', False, 
340
 
                12, timestamp, ['#fakeusers', '#fakemisc'])
341
 
        expected = '\r\n'.join([
342
 
':%(hostname)s 311 %(req)s %(targ)s target host.com * :Target User',
343
 
':%(hostname)s 312 %(req)s %(targ)s irc.host.com :A fake server',
344
 
':%(hostname)s 317 %(req)s %(targ)s 12 %(timestamp)s :seconds idle, signon time',
345
 
':%(hostname)s 319 %(req)s %(targ)s :#fakeusers #fakemisc',
346
 
':%(hostname)s 318 %(req)s %(targ)s :End of WHOIS list.',
347
 
'']) % dict(hostname=hostname, timestamp=timestamp, req=req, targ=targ)
348
 
        self.check(expected)
349
 
 
350
 
 
351
 
class DummyClient(irc.IRCClient):
352
 
    def __init__(self):
353
 
        self.lines = []
354
 
    def sendLine(self, m):
355
 
        self.lines.append(m)
356
 
 
357
 
 
358
 
class ClientMsgTests(unittest.TestCase):
359
 
    def setUp(self):
360
 
        self.client = DummyClient()
361
 
 
362
 
    def testSingleLine(self):
363
 
        self.client.msg('foo', 'bar')
364
 
        self.assertEquals(self.client.lines, ['PRIVMSG foo :bar'])
365
 
 
366
 
    def testDodgyMaxLength(self):
367
 
        self.assertRaises(ValueError, self.client.msg, 'foo', 'bar', 0)
368
 
        self.assertRaises(ValueError, self.client.msg, 'foo', 'bar', 3)
369
 
 
370
 
    def testMultipleLine(self):
371
 
        maxLen = len('PRIVMSG foo :') + 3 + 2 # 2 for line endings
372
 
        self.client.msg('foo', 'barbazbo', maxLen)
373
 
        self.assertEquals(self.client.lines, ['PRIVMSG foo :bar',
374
 
                                              'PRIVMSG foo :baz',
375
 
                                              'PRIVMSG foo :bo'])
376
 
 
377
 
    def testSufficientWidth(self):
378
 
        msg = 'barbazbo'
379
 
        maxLen = len('PRIVMSG foo :%s' % (msg,)) + 2
380
 
        self.client.msg('foo', msg, maxLen)
381
 
        self.assertEquals(self.client.lines, ['PRIVMSG foo :%s' % (msg,)])
382
 
        self.client.lines = []
383
 
        self.client.msg('foo', msg, maxLen-1)
384
 
        self.assertEquals(2, len(self.client.lines))
385
 
        self.client.lines = []
386
 
        self.client.msg('foo', msg, maxLen+1)
387
 
        self.assertEquals(1, len(self.client.lines))
388
 
 
389
 
    def testSplitSanity(self):
390
 
        # Whiteboxing
391
 
        self.assertRaises(ValueError, irc.split, 'foo', -1)
392
 
        self.assertRaises(ValueError, irc.split, 'foo', 0)
393
 
        self.assertEquals([], irc.split('', 1))
394
 
        self.assertEquals([], irc.split(''))