5
""" gozerbot tests framework. """
10
from config import config
11
from threads.thr import start_new_thread
12
from utils.locking import lockdec
13
from utils.log import rlog
14
from utils.trace import calledfrom, whichmodule
15
from utils.exception import exceptionmsg
16
from utils.dol import Dol
17
from eventbase import EventBase
20
import sys, re, thread, copy, time, threading, random
24
# used to copy attributes
30
testlock = threading.RLock()
31
locked = lockdec(testlock)
37
""" a test object. """
39
def __init__(self, execstring="", expect="", descr="", where="", fakein=""):
40
self.plugin = calledfrom(sys._getframe(1))
42
self.plugin = calledfrom(sys._getframe(2))
43
self.descr = cpy(descr)
44
self.execstring = cpy(execstring)
45
self.expect = cpy(expect)
49
self.where = cpy(where)
50
self.fakein = cpy(fakein)
57
return "test %s (%s) %s ==> %s (%s)" % (self.descr, self.where, self.execstring, self.response, self.error)
64
def run(self, bot, event):
66
""" run the test on bot with event. """
71
if config['loadlist'] and self.plugin not in config['loadlist']:
74
mevent = copy.deepcopy(event)
75
mevent.onlyqueues = False
76
mevent.channel = '#dunkbots'
77
bot.userhosts['bottest'] = 'bottest@test'
78
bot.userhosts['mtest'] = 'mekker@test'
79
bot.userhosts['exec'] = 'exec@gozerbot'
80
if 'exec@gozerbot' not in bot.state['joinedchannels']:
81
bot.state['joinedchannels'].append('exec@gozerbot')
82
bot.channels['exec@gozerbot'] = {}
83
if '#dunkbots' not in bot.state['joinedchannels']:
84
bot.state['joinedchannels'].append('#dunkbots')
85
bot.channels['#dunkbots'] = {'cc': '!'}
89
origexec = self.execstring
90
origexpect = self.expect
98
bot.fakein(self.fakein)
100
if self.prev and self.prev.groups:
102
execstring = self.execstring % self.prev.groups
103
self.execstring = execstring
107
expect = self.expect % self.prev.groups
112
self.execstring = self.execstring.replace('{{ me }}', mevent.nick)
113
mevent.txt = mevent.origtxt = str(self.execstring)
114
rlog(100, 'tests', 'launching %s' % mevent.txt)
116
from gozerbot.plugins import plugins
117
self.response = plugins.cmnd(bot, mevent)
119
if self.response and self.expect:
120
self.expect = self.expect.replace('{{ me }}', mevent.nick)
121
expects = self.expect.split('|')
123
for expect in expects:
124
regex = re.compile(expect)
125
result = regex.search(str(self.response))
132
self.error = 'invalid response'
134
self.groups = result.groups()
136
self.execstring = origexec
137
self.expect = origexpect
143
""" collection of all tests. """
152
def add(self, execstr, expect=None, descr="", fakein=""):
156
where = whichmodule(1)
158
where = whichmodule(2)
160
where = whichmodule(3)
161
test = Test(execstr, expect, descr, where, fakein)
162
self.tests.append(test)
166
def fakein(self, execstr, expect=None, descr=""):
168
""" call bot.fakein(). """
170
where = whichmodule(1)
172
where = whichmodule(2)
174
where = whichmodule(3)
175
test = Test(execstr, expect, descr, where, execstr)
177
self.tests.append(test)
180
def start(self, func):
182
""" optional start function. """
183
where = whichmodule(1)
185
where = whichmodule(2)
187
where = whichmodule(3)
191
test.execstring = 'start'
192
self.tests.append(test)
197
""" optional end function. """
199
where = whichmodule(1)
201
where = whichmodule(2)
203
where = whichmodule(3)
207
test.execstring = 'end'
208
self.tests.append(test)
211
def unload(self, plugname):
213
""" unload tests. """
215
for i in range(len(self.tests)-1, -1, -1):
216
if self.tests[i].plugin == plugname:
221
def activate(self, plugname):
223
""" activate tests. """
225
for i in range(len(self.tests)-1, -1, -1):
226
if self.tests[i].plugin == plugname:
227
self.tests[i].activate = True
231
def disable(self, plugname):
233
""" unload tests. """
235
for i in range(len(self.tests)-1, -1, -1):
236
if self.tests[i].plugin == plugname:
237
self.tests[i].activate = False
241
def dorun(self, bot, event, tests, where, plug=None):
247
if event.rest and event.rest not in test.plugin:
255
starttime = time.time()
257
e = copy.deepcopy(event)
258
result = test.run(bot, e)
259
finished = time.time()
260
if finished - starttime > 10:
261
self.toolate[test.execstring] = finished - starttime
265
event.reply("OK %s (%s) ==> %s" % (test.execstring, test.where, result.response))
267
self.err[test.execstring] = test
268
event.reply('ERROR %s (%s): %s ==> %s (%s)' % (test.error, test.where, test.execstring, test.response, test.expect))
269
except Exception, ex:
270
test.error = exceptionmsg()
271
self.err[test.execstring] = test
272
event.reply(test.error)
276
def dotests(self, bot, event, threaded=False, plug=None):
278
""" fire all tests. """
280
#event = EventBase(eventin)
283
for test in self.tests:
284
groups.add(test.where, test)
289
for where, tests in groups.iteritems():
290
testlist.append((where, tests))
292
random.shuffle(testlist)
296
if plug and plug not in where:
298
event.reply("running tests on %s" % where)
300
thread = start_new_thread(self.dorun, (bot, event, tests, where, plug))
301
threads.append(thread)
303
self.dorun(bot, event, tests, where, plug)
305
for thread in threads:
310
def sleep(self, seconds):
312
""" sleep nr of seconds. """
319
# expect is for examples