1
# unit tester for the Outlook addin.
3
# Note we are only attempting to test Outlook specific
4
# functionality, such as filters, etc.
6
# General process is to create test messages known to contain ham/spam
7
# keywords, and tracking their progress through the filters. We also
8
# move this test message back around, and watch the incremental retrain
9
# in action. Also checks that the message correctly remains classified
10
# after a message move.
11
from __future__ import generators
13
from win32com.client import constants
15
from time import sleep
21
from spambayes.storage import STATE_KEY
25
from win32com.mapi import mapi, mapiutil
32
TEST_SUBJECT = "SpamBayes addin auto-generated test message"
34
class TestFailure(Exception):
38
raise TestFailure(msg)
40
def AssertRaises(exception, func, *args):
43
raise TestFailed("Function '%s' should have raised '%r', but it worked!" % \
46
exc_type = sys.exc_info()[0]
47
if exc_type == exception or issubclass(exc_type, exception):
51
filter_event = threading.Event()
54
# Must wait longer than normal, so when run with a timer we still work.
57
pythoncom.PumpWaitingMessages()
58
if filter_event.isSet():
62
def DictExtractor(bayes):
63
for k, v in bayes.wordinfo.items():
66
def DBExtractor(bayes):
67
# We use bsddb3 now if we can
69
import bsddb3 as bsddb
70
bsddb_error = bsddb.db.DBNotFoundError
73
bsddb_error = bsddb.error
74
key = bayes.dbm.first()[0]
76
yield key, bayes._wordinfoget(key)
79
key = bayes.dbm.next()[0]
85
yield key, bayes._wordinfoget(key)
87
# Find the top 'n' words in the Spam database that are clearly
88
# marked as either ham or spam. Simply enumerates the
89
# bayes word list looking for any word with zero count in the
90
# non-requested category.
93
def FindTopWords(bayes, num, get_spam):
94
global _top_spam, _top_ham
95
if get_spam and _top_spam: return _top_spam
96
if not get_spam and _top_ham: return _top_ham
99
bayes.db # bsddb style
100
extractor = DBExtractor
101
except AttributeError:
102
extractor = DictExtractor
104
for word, info in extractor(bayes):
111
items.append((info.spamcount, word, info))
113
if info.spamcount==0:
114
items.append((info.hamcount, word, info))
117
# Throw an error if we don't have enough tokens - otherwise
118
# the test itself may fail, which will be more confusing than this.
120
TestFailed("Error: could not find %d words with Spam=%s - only found %d" % (num, get_spam, len(items)))
122
for n, word, info in items[:num]:
123
ret[word]=copy.copy(info)
130
# A little driver/manager for our tests
132
def __init__(self, mgr):
135
mgr = manager.GetManager()
137
# Remember the "spam" folder.
138
folder = mgr.message_store.GetFolder(mgr.config.filter.spam_folder_id)
139
self.folder_spam = folder.GetOutlookItem()
140
# Remember the "unsure" folder.
141
folder = mgr.message_store.GetFolder(mgr.config.filter.unsure_folder_id)
142
self.folder_unsure = folder.GetOutlookItem()
143
# And the drafts folder where new messages are created.
144
self.folder_drafts = mgr.outlook.Session.GetDefaultFolder(constants.olFolderDrafts)
146
def GetWatchFolderGenerator(self):
148
gen = mgr.message_store.GetFolderGenerator(
149
mgr.config.filter.watch_folder_ids,
150
mgr.config.filter.watch_include_sub)
152
yield f, f.GetOutlookItem()
154
def FindTestMessage(self, folder):
155
subject = TEST_SUBJECT
157
return items.Find("[Subject] = '%s'" % (subject,))
159
def CheckMessageFilteredFrom(self, folder):
160
# For hotmail accounts, the message may take a little time to actually
161
# be removed from the original folder (ie, it appears in the "dest"
162
# folder before it vanished.
164
if self.FindTestMessage(folder) is None:
169
ms_folder = self.manager.message_store.GetFolder(folder)
170
TestFailed("The test message remained in folder '%s'" % ms_folder.GetFQName())
172
def _CleanTestMessageFromFolder(self, folder):
173
subject = TEST_SUBJECT
175
# imap/hotmail etc only soft delete, and I see no way to differentiate
176
# force the user to purge them manually
178
msg = self.FindTestMessage(folder)
183
raise TestFailed("Old test messages appear to still exist. These may" \
184
"be 'soft-deleted' - you will need to purge them manually")
186
print "Cleaned %d test messages from folder '%s'" % (num, folder.Name)
188
def CleanAllTestMessages(self):
189
self._CleanTestMessageFromFolder(self.folder_spam)
190
self._CleanTestMessageFromFolder(self.folder_unsure)
191
self._CleanTestMessageFromFolder(self.folder_drafts)
192
for msf, of in self.GetWatchFolderGenerator():
193
self._CleanTestMessageFromFolder(of)
195
def CreateTestMessageInFolder(self, spam_status, folder):
196
msg, words = self.CreateTestMessage(spam_status)
197
msg.Save() # Put into "Drafts".
198
assert self.FindTestMessage(self.folder_drafts) is not None
199
# Move it to the specified folder
201
# And now find it in the specified folder
202
return self.FindTestMessage(folder), words
204
def CreateTestMessage(self, spam_status):
206
bayes = self.manager.classifier_data.bayes
207
if spam_status != SPAM:
208
words.update(FindTopWords(bayes, 50, False))
209
if spam_status != HAM:
210
words.update(FindTopWords(bayes, 50, True))
211
# Create a new blank message with our words
212
msg = self.manager.outlook.CreateItem(0)
213
msg.Body = "\n".join(words.keys())
214
msg.Subject = TEST_SUBJECT
217
def check_words(words, bayes, spam_offset, ham_offset):
218
for word, existing_info in words.items():
219
new_info = bayes._wordinfoget(word)
220
if existing_info.spamcount+spam_offset != new_info.spamcount or \
221
existing_info.hamcount+ham_offset != new_info.hamcount:
222
TestFailed("Word check for '%s failed. "
223
"old spam/ham=%d/%d, new spam/ham=%d/%d,"
224
"spam_offset=%d, ham_offset=%d" % \
226
existing_info.spamcount, existing_info.hamcount,
227
new_info.spamcount, new_info.hamcount,
228
spam_offset, ham_offset))
230
# The tests themselves.
231
# The "spam" test is huge - we do standard filter tests, but
232
# also do incremental retrain tests.
233
def TestSpamFilter(driver):
234
bayes = driver.manager.classifier_data.bayes
237
original_bayes = copy.copy(driver.manager.classifier_data.bayes)
238
# for each watch folder, create a spam message, and do the training thang
239
for msf_watch, folder_watch in driver.GetWatchFolderGenerator():
240
print "Performing Spam test on watch folder '%s'..." % msf_watch.GetFQName()
241
# Create a spam message in the Inbox - it should get immediately filtered
242
msg, words = driver.CreateTestMessageInFolder(SPAM, folder_watch)
243
# sleep to ensure filtering.
245
# It should no longer be in the Inbox.
246
driver.CheckMessageFilteredFrom(folder_watch)
247
# It should be in the "sure is spam" folder.
248
spam_msg = driver.FindTestMessage(driver.folder_spam)
250
TestFailed("The test message vanished from the Inbox, but didn't appear in Spam")
251
# Check that none of the above caused training.
252
if nspam != bayes.nspam:
253
TestFailed("Something caused a new spam message to appear")
254
if nham != bayes.nham:
255
TestFailed("Something caused a new ham message to appear")
256
check_words(words, bayes, 0, 0)
258
# Now move the message back to the inbox - it should get trained.
259
store_msg = driver.manager.message_store.GetMessage(spam_msg)
261
if train.been_trained_as_ham(store_msg, driver.manager.classifier_data):
262
TestFailed("This new spam message should not have been trained as ham yet")
263
if train.been_trained_as_spam(store_msg, driver.manager.classifier_data):
264
TestFailed("This new spam message should not have been trained as spam yet")
265
spam_msg.Move(folder_watch)
267
spam_msg = driver.FindTestMessage(folder_watch)
269
TestFailed("The message appears to have been filtered out of the watch folder")
270
store_msg = driver.manager.message_store.GetMessage(spam_msg)
273
if nspam != bayes.nspam:
274
TestFailed("There were not the same number of spam messages after a re-train")
275
if nham+1 != bayes.nham:
276
TestFailed("There was not one more ham messages after a re-train")
277
if train.been_trained_as_spam(store_msg, driver.manager.classifier_data):
278
TestFailed("This new spam message should not have been trained as spam yet")
279
if not train.been_trained_as_ham(store_msg, driver.manager.classifier_data):
280
TestFailed("This new spam message should have been trained as ham now")
281
# word infos should have one extra ham
282
check_words(words, bayes, 0, 1)
283
# Now move it back to the Spam folder.
284
# This should see the message un-trained as ham, and re-trained as Spam
285
spam_msg.Move(driver.folder_spam)
287
spam_msg = driver.FindTestMessage(driver.folder_spam)
289
TestFailed("Could not find the message in the Spam folder")
290
store_msg = driver.manager.message_store.GetMessage(spam_msg)
291
if nspam +1 != bayes.nspam:
292
TestFailed("There should be one more spam now")
293
if nham != bayes.nham:
294
TestFailed("There should be the same number of hams again")
295
if not train.been_trained_as_spam(store_msg, driver.manager.classifier_data):
296
TestFailed("This new spam message should have been trained as spam by now")
297
if train.been_trained_as_ham(store_msg, driver.manager.classifier_data):
298
TestFailed("This new spam message should have been un-trained as ham")
299
# word infos should have one extra spam, no extra ham
300
check_words(words, bayes, 1, 0)
301
# Move the message to another folder, and make sure we still
302
# identify it correctly as having been trained.
303
# Move to the "unsure" folder, just cos we know about it, and
304
# we know that no special watching of this folder exists.
305
spam_msg.Move(driver.folder_unsure)
306
spam_msg = driver.FindTestMessage(driver.folder_unsure)
308
TestFailed("Could not find the message in the Unsure folder")
309
store_msg = driver.manager.message_store.GetMessage(spam_msg)
310
if not train.been_trained_as_spam(store_msg, driver.manager.classifier_data):
311
TestFailed("Message was not identified as Spam after moving")
313
# word infos still be 'spam'
314
check_words(words, bayes, 1, 0)
316
# Now undo the damage we did.
317
was_spam = train.untrain_message(store_msg, driver.manager.classifier_data)
319
TestFailed("Untraining this message did not indicate it was spam")
320
if train.been_trained_as_spam(store_msg, driver.manager.classifier_data) or \
321
train.been_trained_as_ham(store_msg, driver.manager.classifier_data):
322
TestFailed("Untraining this message kept it has ham/spam")
326
train.untrain_message(store_msg, driver.manager.classifier_data)
328
# Check all the counts are back where we started.
329
if nspam != bayes.nspam:
330
TestFailed("Spam count didn't get back to the same")
331
if nham != bayes.nham:
332
TestFailed("Ham count didn't get back to the same")
333
check_words(words, bayes, 0, 0)
335
if bayes.wordinfo != original_bayes.wordinfo:
336
TestFailed("The bayes object's 'wordinfo' did not compare the same at the end of all this!")
337
if bayes.probcache != original_bayes.probcache:
338
TestFailed("The bayes object's 'probcache' did not compare the same at the end of all this!")
341
print "Created a Spam message, and saw it get filtered and trained."
343
def _DoTestHamTrain(driver, folder1, folder2):
344
# [ 780612 ] Outlook incorrectly trains on moved messages
345
# Should not train when previously classified message is moved by the user
346
# from one watch folder to another.
347
bayes = driver.manager.classifier_data.bayes
351
# Create a ham message in the Inbox - it wont get filtered if the other
352
# tests pass, but we do need to wait for it to be scored.
353
msg, words = driver.CreateTestMessageInFolder(HAM, folder1)
354
# sleep to ensure filtering.
356
# It should still be in the Inbox.
357
if driver.FindTestMessage(folder1) is None:
358
TestFailed("The test ham message appeared to have been filtered!")
360
# Manually move it to folder2
362
# sleep to any processing in this folder.
364
# re-find it in folder2
365
msg = driver.FindTestMessage(folder2)
366
if driver.FindTestMessage(folder2) is None:
367
TestFailed("Couldn't find the ham message we just moved")
369
if nspam != bayes.nspam or nham != bayes.nham:
370
TestFailed("Move of existing ham caused a train")
373
def _DoTestHamFilter(driver, folder):
374
# Create a ham message in the Inbox - it should not get filtered
375
msg, words = driver.CreateTestMessageInFolder(HAM, folder)
376
# sleep to ensure filtering.
378
# It should still be in the Inbox.
379
if driver.FindTestMessage(folder) is None:
380
TestFailed("The test ham message appeared to have been filtered!")
383
def TestHamFilter(driver):
384
# Execute the 'ham' test in every folder we watch.
386
gen = mgr.message_store.GetFolderGenerator(
387
mgr.config.filter.watch_folder_ids,
388
mgr.config.filter.watch_include_sub)
392
print "Running ham filter tests on folder '%s'" % f.GetFQName()
393
f = f.GetOutlookItem()
394
_DoTestHamFilter(driver, f)
397
# Now test incremental train logic, between all these folders.
399
print "NOTE: Can't do incremental training tests as only 1 watch folder is in place"
402
# 'targets' is a list of all folders except this
406
_DoTestHamTrain(driver, f, t)
407
print "Created a Ham message, and saw it remain in place (in %d watch folders.)" % num
409
def TestUnsureFilter(driver):
410
# Create a spam message in the Inbox - it should get immediately filtered
411
for msf_watch, folder_watch in driver.GetWatchFolderGenerator():
412
print "Performing Spam test on watch folder '%s'..." % msf_watch.GetFQName()
413
msg, words = driver.CreateTestMessageInFolder(UNSURE, folder_watch)
414
# sleep to ensure filtering.
416
# It should no longer be in the Inbox.
417
driver.CheckMessageFilteredFrom(folder_watch)
418
# It should be in the "unsure" folder.
419
spam_msg = driver.FindTestMessage(driver.folder_unsure)
421
TestFailed("The test message vanished from the Inbox, but didn't appear in Unsure")
423
print "Created an unsure message, and saw it get filtered"
425
def run_tests(manager):
427
driver = Driver(manager)
428
manager.Save() # necessary after a full retrain
429
assert driver.manager.config.filter.enabled, "Filtering must be enabled for these tests"
430
assert driver.manager.config.training.train_recovered_spam and \
431
driver.manager.config.training.train_manual_spam, "Incremental training must be enabled for these tests"
432
driver.CleanAllTestMessages()
433
TestSpamFilter(driver)
434
TestUnsureFilter(driver)
435
TestHamFilter(driver)
436
driver.CleanAllTestMessages()
438
def run_filter_tests(manager):
439
# setup config to save info with the message, and test
440
apply_with_new_config(manager,
441
{"Filter.timer_enabled": False,
442
"Filter.save_spam_info" : True,
446
apply_with_new_config(manager,
447
{"Filter.timer_enabled": True,
448
"Filter.save_spam_info" : True,
451
apply_with_new_config(manager,
452
{"Filter.timer_enabled": False,
453
"Filter.save_spam_info" : False,
457
apply_with_new_config(manager,
458
{"Filter.timer_enabled": True,
459
"Filter.save_spam_info" : False,
463
def apply_with_new_config(manager, new_config_dict, func, *args):
466
for name, val in new_config_dict.items():
467
sect_name, opt_name = name.split(".")
468
old_config[sect_name, opt_name] = manager.options.get(sect_name, opt_name)
469
manager.options.set(sect_name, opt_name, val)
470
friendly_opts.append("%s=%s" % (name, val))
471
manager.addin.FiltersChanged() # to ensure correct filtler in place
473
test_name = getattr(func, "__doc__", None)
474
if not test_name: test_name = func.__name__
475
print "*" * 10, "Running '%s' with %s" % (test_name, ", ".join(friendly_opts))
478
for (sect_name, opt_name), val in old_config.items():
479
manager.options.set(sect_name, opt_name, val)
481
###############################################################################
482
# "Non-filter" tests are those that don't require us to create messages and
483
# see them get filtered.
484
def run_nonfilter_tests(manager):
485
# And now some other 'sanity' checks.
486
# Check messages we are unable to score.
487
# Must enable the filtering code for this test
488
msgstore.test_suite_running = False
490
print "Scanning all your good mail and spam for some sanity checks..."
491
num_found = num_looked = 0
492
num_without_headers = num_without_body = num_without_html_body = 0
493
for folder_ids, include_sub in [
494
(manager.config.filter.watch_folder_ids, manager.config.filter.watch_include_sub),
495
([manager.config.filter.spam_folder_id], False),
497
for folder in manager.message_store.GetFolderGenerator(folder_ids, include_sub):
498
for message in folder.GetMessageGenerator(False):
499
# If not ipm.note, then no point reporting - but any
500
# ipm.note messages we don't want to filter should be
503
if num_looked % 500 == 0: print " scanned", num_looked, "messages..."
504
if not message.IsFilterCandidate() and \
505
message.msgclass.lower().startswith("ipm.note"):
508
print "WARNING: We found the following messages in your folders that would not be filtered by the addin"
509
print "If any of these messages should be filtered, we have a bug!"
511
print " %s/%s" % (folder.name, message.subject)
512
headers, body, html_body = message._GetMessageTextParts()
513
if not headers: num_without_headers += 1
514
if not body: num_without_body += 1
515
# for HTML, we only check multi-part
516
temp_obj = rfc822.Message(cStringIO.StringIO(headers+"\n\n"))
517
content_type = temp_obj.get("content-type", '')
518
if content_type.lower().startswith("multipart"):
519
if not html_body: num_without_html_body += 1
521
print "Checked %d items, %d non-filterable items found" % (num_looked, num_found)
522
print "of these items, %d had no headers, %d had no text body and %d had no HTML" % \
523
(num_without_headers, num_without_body, num_without_html_body)
525
msgstore.test_suite_running = True
527
def run_invalid_id_tests(manager):
528
# Do some tests with invalid message and folder IDs.
529
print "Doing some 'invalid ID' tests - you should see a couple of warning, but no errors or tracebacks"
530
id_no_item = ('0000','0000') # this ID is 'valid' - but there will be no such item
531
id_invalid = ('xxxx','xxxx') # this ID is 'invalid' in that the hex-bin conversion fails
534
bad_ids = id_no_item, id_invalid, id_empty1, id_empty2
536
AssertRaises(msgstore.MsgStoreException, manager.message_store.GetMessage, id)
537
# Test 'GetFolderGenerator' works with invalid ids.
539
AssertRaises(msgstore.MsgStoreException, manager.message_store.GetFolder, id)
540
ids = manager.config.filter.watch_folder_ids[:]
543
for f in manager.message_store.GetFolderGenerator(ids, False):
545
if found > len(manager.config.filter.watch_folder_ids):
546
raise TestFailed("Seemed to find the extra folder")
547
names = manager.FormatFolderNames(ids, False)
548
if names.find("<unknown") < 0:
549
raise TestFailed("Couldn't find unknown folder in names '%s'" % names)
550
print "Finished 'invalid ID' tests"
552
###############################################################################
553
# "Failure" tests - execute some tests while provoking the msgstore to simulate
554
# various MAPI errors. Although not complete, it does help exercise our code
555
# paths through the code.
556
def _restore_mapi_failure():
557
msgstore.test_suite_failure = None
558
msgstore.test_suite_failure_request = None
560
def _setup_for_mapi_failure(checkpoint, hr, fail_count = None):
561
assert msgstore.test_suite_running, "msgstore should already know its running"
562
assert not msgstore.test_suite_failure, "should already have torn down previous failure"
563
msgstore.test_suite_failure = pythoncom.com_error, \
564
(hr, "testsuite generated error", None, -1)
565
msgstore.test_suite_failure_request = checkpoint
566
msgstore.test_suite_failure_count = fail_count
568
def _setup_mapi_notfound_failure(checkpoint):
569
_setup_for_mapi_failure(checkpoint, mapi.MAPI_E_NOT_FOUND)
571
def _do_single_failure_ham_test(driver, checkpoint, hr, fail_count = None):
572
_do_single_failure_test(driver, True, checkpoint, hr, fail_count)
574
def _do_single_failure_spam_test(driver, checkpoint, hr, fail_count = None):
575
_do_single_failure_test(driver, False, checkpoint, hr, fail_count)
577
def _do_single_failure_test(driver, is_ham, checkpoint, hr, fail_count):
578
print "-> Testing MAPI error '%s' in %s" % (mapiutil.GetScodeString(hr),
580
# message moved after we have ID, but before opening.
581
for msf, folder in driver.GetWatchFolderGenerator():
582
print "Testing in folder '%s'" % msf.GetFQName()
584
msg, words = driver.CreateTestMessageInFolder(HAM, folder)
586
msg, words = driver.CreateTestMessageInFolder(SPAM, folder)
588
_setup_for_mapi_failure(checkpoint, hr, fail_count)
590
# sleep to ensure filtering.
593
_restore_mapi_failure()
594
if driver.FindTestMessage(folder) is None:
595
TestFailed("We appear to have filtered a message even though we forced 'not found' failure")
599
print "<- Finished MAPI error '%s' in %s" % (mapiutil.GetScodeString(hr),
602
def do_failure_tests(manager):
603
# We setup msgstore to fail for us, then try a few tests. The idea is to
604
# ensure we gracefully degrade in these failures.
605
# We set verbosity to min of 1, as this helps us see how the filters handle
607
driver = Driver(manager)
608
driver.CleanAllTestMessages()
609
old_verbose = manager.verbose
610
manager.verbose = max(1, old_verbose)
612
_do_single_failure_ham_test(driver, "MAPIMsgStoreMsg._EnsureObject", mapi.MAPI_E_NOT_FOUND)
613
_do_single_failure_ham_test(driver, "MAPIMsgStoreMsg.SetField", -2146644781)
614
_do_single_failure_ham_test(driver, "MAPIMsgStoreMsg.Save", -2146644781)
615
_do_single_failure_ham_test(driver, "MAPIMsgStoreMsg.Save",
616
mapi.MAPI_E_OBJECT_CHANGED, fail_count=1)
618
_do_single_failure_spam_test(driver, "MAPIMsgStoreMsg._DoCopyMove", mapi.MAPI_E_TABLE_TOO_BIG)
621
manager.verbose = old_verbose
623
def run_failure_tests(manager):
624
"Forced MAPI failure tests"
625
apply_with_new_config(manager,
626
{"Filter.timer_enabled": True,
628
do_failure_tests, manager)
629
apply_with_new_config(manager,
630
{"Filter.timer_enabled": False,
632
do_failure_tests, manager)
634
def filter_message_with_event(msg, mgr, all_actions=True):
636
ret = filter._original_filter_message(msg, mgr, all_actions)
638
filter_event.set() # only set if it works
642
from dialogs import SetWaitCursor
646
if "_original_filter_message" not in filter.__dict__:
647
filter._original_filter_message = filter.filter_message
648
filter.filter_message = filter_message_with_event
650
try: # restore the plugin config at exit.
651
assert not msgstore.test_suite_running, "already running??"
652
msgstore.test_suite_running = True
653
assert not manager.test_suite_running, "already running??"
654
manager.test_suite_running = True
655
run_filter_tests(manager)
656
run_failure_tests(manager)
657
run_invalid_id_tests(manager)
658
# non-filter tests take alot of time - ask if you want to do them
659
if manager.AskQuestion("Do you want to run the non-filter tests?" \
660
"\r\n\r\nThese may take some time"):
661
run_nonfilter_tests(manager)
663
print "Test suite finished without error!"
666
print "Restoring standard configuration..."
667
# Always restore configuration to how we started.
668
msgstore.test_suite_running = False
669
manager.test_suite_running = False
671
manager.addin.FiltersChanged() # restore original filters.
672
manager.addin.ProcessMissedMessages()
675
if __name__=='__main__':
676
print "NOTE: This will NOT work from the command line"
677
print "(it nearly will, and is useful for debugging the tests"
678
print "themselves, so we will run them anyway!)"