~ubuntu-branches/ubuntu/utopic/gozerbot/utopic

« back to all changes in this revision

Viewing changes to build/lib/gozerbot/tests.py

  • Committer: Package Import Robot
  • Author(s): Jeremy Malcolm
  • Date: 2012-04-03 21:58:28 UTC
  • mfrom: (3.1.11 sid)
  • Revision ID: package-import@ubuntu.com-20120403215828-6mik0tzug5na93la
Tags: 0.99.1-2
* Removes logfiles on purge (Closes: #668767)
* Reverted location of installed files back to /usr/lib/gozerbot.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# gozerbot/tests.py
 
2
#
 
3
#
 
4
 
 
5
""" gozerbot tests framework. """
 
6
 
 
7
## IMPORT SECTION
 
8
 
 
9
# gozerbot imports
 
10
from config import config
 
11
from threads.thr import start_new_thread
 
12
from utils.locking import lockdec
 
13
from utils.log import rlog
 
14
from utils.trace import calledfrom, whichmodule
 
15
from utils.exception import exceptionmsg
 
16
from utils.dol import Dol
 
17
from eventbase import EventBase
 
18
 
 
19
# basic imports
 
20
import sys, re, thread, copy, time, threading, random
 
21
 
 
22
## END IMPORT
 
23
 
 
24
# used to copy attributes
 
25
cpy = copy.deepcopy
 
26
 
 
27
## LOCK SECTION
 
28
 
 
29
# locks
 
30
testlock = threading.RLock()
 
31
locked = lockdec(testlock)
 
32
 
 
33
## END LOCK
 
34
 
 
35
class Test(object):
 
36
 
 
37
    """ a test object. """
 
38
 
 
39
    def __init__(self, execstring="", expect="", descr="", where="", fakein=""):
 
40
        self.plugin = calledfrom(sys._getframe(1))
 
41
        if not self.plugin:
 
42
            self.plugin = calledfrom(sys._getframe(2))
 
43
        self.descr = cpy(descr)
 
44
        self.execstring = cpy(execstring)
 
45
        self.expect = cpy(expect)
 
46
        self.response = ""
 
47
        self.groups = []
 
48
        self.error = ""
 
49
        self.where = cpy(where)
 
50
        self.fakein = cpy(fakein)
 
51
        self.start = None
 
52
        self.end = None
 
53
        self.prev = None
 
54
        self.activate = False
 
55
        
 
56
    def __str__(self):
 
57
        return "test %s (%s) %s ==> %s (%s)" % (self.descr, self.where, self.execstring, self.response, self.error)
 
58
 
 
59
    def begin(self):
 
60
        if self.start:
 
61
            self.start()
 
62
        return self
 
63
 
 
64
    def run(self, bot, event):
 
65
 
 
66
        """ run the test on bot with event. """
 
67
 
 
68
        if not self.activate:
 
69
            return
 
70
 
 
71
        if config['loadlist'] and self.plugin not in config['loadlist']:
 
72
            return
 
73
 
 
74
        mevent = copy.deepcopy(event)
 
75
        mevent.onlyqueues = False
 
76
        mevent.channel = '#dunkbots'
 
77
        bot.userhosts['bottest'] = 'bottest@test'
 
78
        bot.userhosts['mtest'] = 'mekker@test'
 
79
        bot.userhosts['exec'] = 'exec@gozerbot'
 
80
        if 'exec@gozerbot' not in bot.state['joinedchannels']:
 
81
            bot.state['joinedchannels'].append('exec@gozerbot')
 
82
        bot.channels['exec@gozerbot'] = {}
 
83
        if '#dunkbots' not in bot.state['joinedchannels']:
 
84
            bot.state['joinedchannels'].append('#dunkbots')
 
85
        bot.channels['#dunkbots'] = {'cc': '!'}
 
86
        self.error = ""
 
87
        self.response = ""
 
88
        self.groups = []
 
89
        origexec = self.execstring
 
90
        origexpect = self.expect
 
91
        if self.start:
 
92
            self.start()
 
93
            return self
 
94
        if self.end:
 
95
            self.end()
 
96
            return self
 
97
        if self.fakein:
 
98
            bot.fakein(self.fakein)
 
99
            return self
 
100
        if self.prev and self.prev.groups:
 
101
            try:
 
102
                execstring = self.execstring % self.prev.groups
 
103
                self.execstring = execstring
 
104
            except TypeError:
 
105
                pass
 
106
            try:
 
107
                expect = self.expect % self.prev.groups
 
108
                self.expect = expect
 
109
            except TypeError:
 
110
                pass
 
111
 
 
112
        self.execstring = self.execstring.replace('{{ me }}', mevent.nick)
 
113
        mevent.txt = mevent.origtxt = str(self.execstring)
 
114
        rlog(100, 'tests', 'launching %s' % mevent.txt)
 
115
 
 
116
        from gozerbot.plugins import plugins
 
117
        self.response = plugins.cmnd(bot, mevent)
 
118
 
 
119
        if self.response and self.expect:
 
120
            self.expect = self.expect.replace('{{ me }}', mevent.nick)
 
121
            expects = self.expect.split('|')
 
122
            got = False
 
123
            for expect in expects:
 
124
                regex = re.compile(expect)
 
125
                result = regex.search(str(self.response))
 
126
 
 
127
                if result:
 
128
                    got = True
 
129
                    break
 
130
 
 
131
            if not got:
 
132
                self.error = 'invalid response'
 
133
            else:
 
134
                self.groups = result.groups() 
 
135
 
 
136
        self.execstring = origexec
 
137
        self.expect = origexpect
 
138
 
 
139
        return self
 
140
 
 
141
class Tests(object):
 
142
 
 
143
    """ collection of all tests. """
 
144
 
 
145
    def __init__(self):
 
146
        self.tests = []
 
147
        self.err = Dol()
 
148
        self.toolate = Dol()
 
149
        self.teller = 0
 
150
 
 
151
    #@locked
 
152
    def add(self, execstr, expect=None, descr="", fakein=""):
 
153
 
 
154
        """ add a test. """
 
155
 
 
156
        where = whichmodule(1)
 
157
        if not where:
 
158
            where = whichmodule(2)
 
159
        if not where:
 
160
            where = whichmodule(3)
 
161
        test = Test(execstr, expect, descr, where, fakein)
 
162
        self.tests.append(test)
 
163
        return self
 
164
 
 
165
    #@locked
 
166
    def fakein(self, execstr, expect=None, descr=""):
 
167
 
 
168
        """ call bot.fakein(). """ 
 
169
 
 
170
        where = whichmodule(1)
 
171
        if not where:
 
172
            where = whichmodule(2)
 
173
        if not where:
 
174
            where = whichmodule(3)
 
175
        test = Test(execstr, expect, descr, where, execstr)
 
176
        test.where = where
 
177
        self.tests.append(test)
 
178
        return self
 
179
 
 
180
    def start(self, func):
 
181
 
 
182
        """ optional start function. """
 
183
        where = whichmodule(1)
 
184
        if not where:
 
185
            where = whichmodule(2)
 
186
        if not where:
 
187
            where = whichmodule(3)
 
188
        test = Test()
 
189
        test.start = func
 
190
        test.where = where
 
191
        test.execstring = 'start'
 
192
        self.tests.append(test)
 
193
        return self
 
194
 
 
195
    def end(self, func):
 
196
 
 
197
        """ optional end function. """
 
198
 
 
199
        where = whichmodule(1)
 
200
        if not where:
 
201
            where = whichmodule(2)
 
202
        if not where:
 
203
            where = whichmodule(3)
 
204
        test = Test()
 
205
        test.end = func
 
206
        test.where = where
 
207
        test.execstring = 'end'
 
208
        self.tests.append(test)
 
209
        return self
 
210
 
 
211
    def unload(self, plugname):
 
212
 
 
213
        """ unload tests. """
 
214
 
 
215
        for i in range(len(self.tests)-1, -1, -1):
 
216
            if self.tests[i].plugin == plugname:
 
217
                del self.tests[i]
 
218
 
 
219
        return self
 
220
 
 
221
    def activate(self, plugname):
 
222
 
 
223
        """ activate tests. """
 
224
 
 
225
        for i in range(len(self.tests)-1, -1, -1):
 
226
            if self.tests[i].plugin == plugname:
 
227
                self.tests[i].activate = True
 
228
 
 
229
        return self
 
230
 
 
231
    def disable(self, plugname):
 
232
 
 
233
        """ unload tests. """
 
234
 
 
235
        for i in range(len(self.tests)-1, -1, -1):
 
236
            if self.tests[i].plugin == plugname:
 
237
                self.tests[i].activate = False
 
238
 
 
239
        return self
 
240
 
 
241
    def dorun(self, bot, event, tests, where, plug=None):
 
242
        teller = 0
 
243
        err = {}
 
244
        toolate = []
 
245
        prev = None
 
246
        for test in tests:
 
247
            if event.rest and event.rest not in test.plugin:
 
248
                continue
 
249
            if prev: 
 
250
                test.prev = prev
 
251
            prev = test
 
252
            if test.expect:
 
253
                teller += 1
 
254
            try:
 
255
                starttime = time.time()
 
256
                self.teller += 1
 
257
                e = copy.deepcopy(event)
 
258
                result = test.run(bot, e)
 
259
                finished = time.time()
 
260
                if finished - starttime > 10:
 
261
                    self.toolate[test.execstring] = finished - starttime
 
262
                if not result:
 
263
                    continue
 
264
                if not result.error:
 
265
                    event.reply("OK %s (%s) ==> %s" % (test.execstring, test.where, result.response))
 
266
                else:
 
267
                    self.err[test.execstring] = test
 
268
                    event.reply('ERROR %s (%s): %s ==> %s (%s)' % (test.error, test.where, test.execstring, test.response, test.expect))
 
269
            except Exception, ex:
 
270
                test.error = exceptionmsg()
 
271
                self.err[test.execstring] = test
 
272
                event.reply(test.error)
 
273
 
 
274
        return self
 
275
 
 
276
    def dotests(self, bot, event, threaded=False, plug=None):
 
277
 
 
278
        """ fire all tests. """
 
279
 
 
280
        #event = EventBase(eventin)
 
281
        groups = Dol()
 
282
 
 
283
        for test in self.tests:
 
284
            groups.add(test.where, test)
 
285
 
 
286
        threads = []
 
287
        testlist = []
 
288
 
 
289
        for where, tests in groups.iteritems():
 
290
            testlist.append((where, tests))
 
291
 
 
292
        random.shuffle(testlist)
 
293
 
 
294
        for t in testlist:
 
295
            where, tests = t
 
296
            if plug and plug not in where:
 
297
                continue
 
298
            event.reply("running tests on %s" % where)
 
299
            if threaded:
 
300
                thread = start_new_thread(self.dorun, (bot, event, tests, where, plug))
 
301
                threads.append(thread)
 
302
            else:
 
303
                self.dorun(bot, event, tests, where, plug)
 
304
 
 
305
        for thread in threads:
 
306
            thread.join()
 
307
 
 
308
        event.done()
 
309
 
 
310
    def sleep(self, seconds):
 
311
 
 
312
        """ sleep nr of seconds. """
 
313
 
 
314
        time.sleep(seconds)
 
315
        return self
 
316
 
 
317
## INIT SECTION
 
318
 
 
319
# expect is for examples
 
320
 
 
321
expect = {}
 
322
 
 
323
# the tests
 
324
tests = Tests()
 
325
 
 
326
## END INIT