1
# Copyright (c) 2010-2011 OpenStack, LLC.
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
7
# http://www.apache.org/licenses/LICENSE-2.0
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
17
from test.unit import tmpfile
24
from swift.common import internal_proxy
25
from swift.stats import log_processor
26
from swift.common.exceptions import ChunkReadTimeout
29
class FakeUploadApp(object):
30
def __init__(self, *args, **kwargs):
33
class DumbLogger(object):
34
def __getattr__(self, n):
37
def foo(self, *a, **kw):
40
class DumbInternalProxy(object):
41
def __init__(self, code=200, timeout=False, bad_compressed=False):
43
self.timeout = timeout
44
self.bad_compressed = bad_compressed
46
def get_container_list(self, account, container, marker=None,
48
n = '2010/03/14/13/obj1'
49
if marker is None or n > marker:
58
def get_object(self, account, container, object_name):
59
if object_name.endswith('.gz'):
60
if self.bad_compressed:
61
# invalid compressed data
63
yield '\xff\xff\xff\xff\xff\xff\xff'
65
# 'obj\ndata', compressed with gzip -9
72
yield '\xe4\x02\x00O\xff'
73
yield '\xa3Y\t\x00\x00\x00'
78
raise ChunkReadTimeout
80
return self.code, data()
82
class TestLogProcessor(unittest.TestCase):
84
access_test_line = 'Jul 9 04:14:30 saio proxy-server 1.2.3.4 4.5.6.7 '\
85
'09/Jul/2010/04/14/30 GET '\
86
'/v1/acct/foo/bar?format=json&foo HTTP/1.0 200 - '\
87
'curl tk4e350daf-9338-4cc6-aabb-090e49babfbd '\
88
'6 95 - txfa431231-7f07-42fd-8fc7-7da9d8cc1f90 - 0.0262'
89
stats_test_line = 'account,1,2,3'
90
proxy_config = {'log-processor': {
95
def test_lazy_load_internal_proxy(self):
96
# stub out internal_proxy's upload_app
97
internal_proxy.BaseApplication = FakeUploadApp
98
dummy_proxy_config = """[app:proxy-server]
101
with tmpfile(dummy_proxy_config) as proxy_config_file:
102
conf = {'log-processor': {
103
'proxy_server_conf': proxy_config_file,
106
p = log_processor.LogProcessor(conf, DumbLogger())
107
self.assert_(isinstance(p._internal_proxy,
109
self.assert_(isinstance(p.internal_proxy,
110
log_processor.InternalProxy))
111
self.assertEquals(p.internal_proxy, p._internal_proxy)
113
# reset FakeUploadApp
114
reload(internal_proxy)
116
def test_access_log_line_parser(self):
117
access_proxy_config = self.proxy_config.copy()
118
access_proxy_config.update({
119
'log-processor-access': {
120
'source_filename_format':'%Y%m%d%H*',
122
'swift.stats.access_processor.AccessLogProcessor'
124
p = log_processor.LogProcessor(access_proxy_config, DumbLogger())
125
result = p.plugins['access']['instance'].log_line_parser(self.access_test_line)
126
self.assertEquals(result, {'code': 200,
127
'processing_time': '0.0262',
128
'auth_token': 'tk4e350daf-9338-4cc6-aabb-090e49babfbd',
132
'query': 'format=json&foo',
134
'http_version': 'HTTP/1.0',
135
'object_name': 'bar',
138
'trans_id': 'txfa431231-7f07-42fd-8fc7-7da9d8cc1f90',
139
'client_ip': '1.2.3.4',
142
'container_name': 'foo',
148
'request': '/v1/acct/foo/bar',
149
'user_agent': 'curl',
153
def test_process_one_access_file(self):
154
access_proxy_config = self.proxy_config.copy()
155
access_proxy_config.update({
156
'log-processor-access': {
157
'source_filename_format':'%Y%m%d%H*',
159
'swift.stats.access_processor.AccessLogProcessor'
161
p = log_processor.LogProcessor(access_proxy_config, DumbLogger())
162
def get_object_data(*a, **kw):
163
return [self.access_test_line]
164
p.get_object_data = get_object_data
165
result = p.process_one_file('access', 'a', 'c', 'o')
166
expected = {('acct', '2010', '07', '09', '04'):
167
{('public', 'object', 'GET', '2xx'): 1,
168
('public', 'bytes_out'): 95,
171
'delimiter_query': 0,
173
('public', 'bytes_in'): 6,
175
self.assertEquals(result, expected)
177
def test_process_one_access_file_error(self):
178
access_proxy_config = self.proxy_config.copy()
179
access_proxy_config.update({
180
'log-processor-access': {
181
'source_filename_format':'%Y%m%d%H*',
183
'swift.stats.access_processor.AccessLogProcessor'
185
p = log_processor.LogProcessor(access_proxy_config, DumbLogger())
186
p._internal_proxy = DumbInternalProxy(code=500)
187
self.assertRaises(log_processor.BadFileDownload, p.process_one_file,
188
'access', 'a', 'c', 'o')
190
def test_get_container_listing(self):
191
p = log_processor.LogProcessor(self.proxy_config, DumbLogger())
192
p._internal_proxy = DumbInternalProxy()
193
result = p.get_container_listing('a', 'foo')
194
expected = ['2010/03/14/13/obj1']
195
self.assertEquals(result, expected)
196
result = p.get_container_listing('a', 'foo', listing_filter=expected)
198
self.assertEquals(result, expected)
199
result = p.get_container_listing('a', 'foo', start_date='2010031412',
200
end_date='2010031414')
201
expected = ['2010/03/14/13/obj1']
202
self.assertEquals(result, expected)
203
result = p.get_container_listing('a', 'foo', start_date='2010031414')
205
self.assertEquals(result, expected)
206
result = p.get_container_listing('a', 'foo', start_date='2010031410',
207
end_date='2010031412')
209
self.assertEquals(result, expected)
210
result = p.get_container_listing('a', 'foo', start_date='2010031412',
211
end_date='2010031413')
212
expected = ['2010/03/14/13/obj1']
213
self.assertEquals(result, expected)
215
def test_get_object_data(self):
216
p = log_processor.LogProcessor(self.proxy_config, DumbLogger())
217
p._internal_proxy = DumbInternalProxy()
218
result = list(p.get_object_data('a', 'c', 'o', False))
219
expected = ['obj','data']
220
self.assertEquals(result, expected)
221
result = list(p.get_object_data('a', 'c', 'o.gz', True))
222
self.assertEquals(result, expected)
224
def test_get_object_data_errors(self):
225
p = log_processor.LogProcessor(self.proxy_config, DumbLogger())
226
p._internal_proxy = DumbInternalProxy(code=500)
227
result = p.get_object_data('a', 'c', 'o')
228
self.assertRaises(log_processor.BadFileDownload, list, result)
229
p._internal_proxy = DumbInternalProxy(bad_compressed=True)
230
result = p.get_object_data('a', 'c', 'o.gz', True)
231
self.assertRaises(log_processor.BadFileDownload, list, result)
232
p._internal_proxy = DumbInternalProxy(timeout=True)
233
result = p.get_object_data('a', 'c', 'o')
234
self.assertRaises(log_processor.BadFileDownload, list, result)
236
def test_get_stat_totals(self):
237
stats_proxy_config = self.proxy_config.copy()
238
stats_proxy_config.update({
239
'log-processor-stats': {
241
'swift.stats.stats_processor.StatsLogProcessor'
243
p = log_processor.LogProcessor(stats_proxy_config, DumbLogger())
244
p._internal_proxy = DumbInternalProxy()
245
def get_object_data(*a,**kw):
246
return [self.stats_test_line]
247
p.get_object_data = get_object_data
248
result = p.process_one_file('stats', 'a', 'c', 'y/m/d/h/o')
249
expected = {('account', 'y', 'm', 'd', 'h'):
252
'container_count': 1,
254
self.assertEquals(result, expected)
256
def test_generate_keylist_mapping(self):
257
p = log_processor.LogProcessor(self.proxy_config, DumbLogger())
258
result = p.generate_keylist_mapping()
260
self.assertEquals(result, expected)
262
def test_generate_keylist_mapping_with_dummy_plugins(self):
263
class Plugin1(object):
264
def keylist_mapping(self):
265
return {'a': 'b', 'c': 'd', 'e': ['f', 'g']}
266
class Plugin2(object):
267
def keylist_mapping(self):
268
return {'a': '1', 'e': '2', 'h': '3'}
269
p = log_processor.LogProcessor(self.proxy_config, DumbLogger())
270
p.plugins['plugin1'] = {'instance': Plugin1()}
271
p.plugins['plugin2'] = {'instance': Plugin2()}
272
result = p.generate_keylist_mapping()
273
expected = {'a': set(['b', '1']), 'c': 'd', 'e': set(['2', 'f', 'g']),
275
self.assertEquals(result, expected)
277
def test_access_keylist_mapping_format(self):
278
proxy_config = self.proxy_config.copy()
279
proxy_config.update({
280
'log-processor-access': {
281
'source_filename_format':'%Y%m%d%H*',
283
'swift.stats.access_processor.AccessLogProcessor'
285
p = log_processor.LogProcessor(proxy_config, DumbLogger())
286
mapping = p.generate_keylist_mapping()
287
for k, v in mapping.items():
288
# these only work for Py2.7+
289
#self.assertIsInstance(k, str)
290
self.assertTrue(isinstance(k, str), type(k))
292
def test_stats_keylist_mapping_format(self):
293
proxy_config = self.proxy_config.copy()
294
proxy_config.update({
295
'log-processor-stats': {
297
'swift.stats.stats_processor.StatsLogProcessor'
299
p = log_processor.LogProcessor(proxy_config, DumbLogger())
300
mapping = p.generate_keylist_mapping()
301
for k, v in mapping.items():
302
# these only work for Py2.7+
303
#self.assertIsInstance(k, str)
304
self.assertTrue(isinstance(k, str), type(k))
306
def test_collate_worker(self):
308
log_processor.LogProcessor._internal_proxy = DumbInternalProxy()
309
def get_object_data(*a,**kw):
310
return [self.access_test_line]
311
orig_get_object_data = log_processor.LogProcessor.get_object_data
312
log_processor.LogProcessor.get_object_data = get_object_data
313
proxy_config = self.proxy_config.copy()
314
proxy_config.update({
315
'log-processor-access': {
316
'source_filename_format':'%Y%m%d%H*',
318
'swift.stats.access_processor.AccessLogProcessor'
320
processor_args = (proxy_config, DumbLogger())
322
q_out = Queue.Queue()
323
work_request = ('access', 'a','c','o')
324
q_in.put(work_request)
326
log_processor.collate_worker(processor_args, q_in, q_out)
327
item, ret = q_out.get()
328
self.assertEquals(item, work_request)
329
expected = {('acct', '2010', '07', '09', '04'):
330
{('public', 'object', 'GET', '2xx'): 1,
331
('public', 'bytes_out'): 95,
334
'delimiter_query': 0,
336
('public', 'bytes_in'): 6,
338
self.assertEquals(ret, expected)
340
log_processor.LogProcessor._internal_proxy = None
341
log_processor.LogProcessor.get_object_data = orig_get_object_data
343
def test_collate_worker_error(self):
344
def get_object_data(*a,**kw):
346
orig_get_object_data = log_processor.LogProcessor.get_object_data
348
log_processor.LogProcessor.get_object_data = get_object_data
349
proxy_config = self.proxy_config.copy()
350
proxy_config.update({
351
'log-processor-access': {
352
'source_filename_format':'%Y%m%d%H*',
354
'swift.stats.access_processor.AccessLogProcessor'
356
processor_args = (proxy_config, DumbLogger())
358
q_out = Queue.Queue()
359
work_request = ('access', 'a','c','o')
360
q_in.put(work_request)
362
log_processor.collate_worker(processor_args, q_in, q_out)
363
item, ret = q_out.get()
364
self.assertEquals(item, work_request)
365
# these only work for Py2.7+
366
#self.assertIsInstance(ret, log_processor.BadFileDownload)
367
self.assertTrue(isinstance(ret, Exception))
369
log_processor.LogProcessor.get_object_data = orig_get_object_data
371
def test_multiprocess_collate(self):
373
log_processor.LogProcessor._internal_proxy = DumbInternalProxy()
374
def get_object_data(*a,**kw):
375
return [self.access_test_line]
376
orig_get_object_data = log_processor.LogProcessor.get_object_data
377
log_processor.LogProcessor.get_object_data = get_object_data
378
proxy_config = self.proxy_config.copy()
379
proxy_config.update({
380
'log-processor-access': {
381
'source_filename_format':'%Y%m%d%H*',
383
'swift.stats.access_processor.AccessLogProcessor'
385
processor_args = (proxy_config, DumbLogger())
386
item = ('access', 'a','c','o')
387
logs_to_process = [item]
388
results = log_processor.multiprocess_collate(processor_args,
391
results = list(results)
392
expected = [(item, {('acct', '2010', '07', '09', '04'):
393
{('public', 'object', 'GET', '2xx'): 1,
394
('public', 'bytes_out'): 95,
397
'delimiter_query': 0,
399
('public', 'bytes_in'): 6,
400
'prefix_query': 0}})]
401
self.assertEquals(results, expected)
403
log_processor.LogProcessor._internal_proxy = None
404
log_processor.LogProcessor.get_object_data = orig_get_object_data
406
def test_multiprocess_collate_errors(self):
407
def get_object_data(*a,**kw):
408
raise log_processor.BadFileDownload()
409
orig_get_object_data = log_processor.LogProcessor.get_object_data
411
log_processor.LogProcessor.get_object_data = get_object_data
412
proxy_config = self.proxy_config.copy()
413
proxy_config.update({
414
'log-processor-access': {
415
'source_filename_format':'%Y%m%d%H*',
417
'swift.stats.access_processor.AccessLogProcessor'
419
processor_args = (proxy_config, DumbLogger())
420
item = ('access', 'a','c','o')
421
logs_to_process = [item]
422
results = log_processor.multiprocess_collate(processor_args,
425
results = list(results)
427
self.assertEquals(results, expected)
429
log_processor.LogProcessor._internal_proxy = None
430
log_processor.LogProcessor.get_object_data = orig_get_object_data
432
class TestLogProcessorDaemon(unittest.TestCase):
434
def test_get_lookback_interval(self):
435
class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
436
def __init__(self, lookback_hours, lookback_window):
437
self.lookback_hours = lookback_hours
438
self.lookback_window = lookback_window
441
d = datetime.datetime
444
[d(2011, 1, 1), 0, 0, None, None],
445
[d(2011, 1, 1), 120, 0, '2010122700', None],
446
[d(2011, 1, 1), 120, 24, '2010122700', '2010122800'],
447
[d(2010, 1, 2, 3, 4), 120, 48, '2009122803', '2009123003'],
448
[d(2009, 5, 6, 7, 8), 1200, 100, '2009031707', '2009032111'],
449
[d(2008, 9, 10, 11, 12), 3000, 1000, '2008050811', '2008061903'],
452
log_processor.now = lambda: x[0]
454
d = MockLogProcessorDaemon(x[1], x[2])
455
self.assertEquals((x[3], x[4]), d.get_lookback_interval())
457
log_processor.now = datetime.datetime.now
459
def test_get_processed_files_list(self):
460
class MockLogProcessor():
461
def __init__(self, stream):
464
def get_object_data(self, *args, **kwargs):
467
class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
468
def __init__(self, stream):
469
self.log_processor = MockLogProcessor(stream)
470
self.log_processor_account = 'account'
471
self.log_processor_container = 'container'
472
self.processed_files_filename = 'filename'
474
file_list = set(['a', 'b', 'c'])
476
for s, l in [['', None],
477
[pickle.dumps(set()).split('\n'), set()],
478
[pickle.dumps(file_list).split('\n'), file_list],
482
MockLogProcessorDaemon(s).get_processed_files_list())
484
def test_get_processed_files_list_bad_file_downloads(self):
485
class MockLogProcessor():
486
def __init__(self, status_code):
487
self.err = log_processor.BadFileDownload(status_code)
489
def get_object_data(self, *a, **k):
492
class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
493
def __init__(self, status_code):
494
self.log_processor = MockLogProcessor(status_code)
495
self.log_processor_account = 'account'
496
self.log_processor_container = 'container'
497
self.processed_files_filename = 'filename'
499
for c, l in [[404, set()], [503, None], [None, None]]:
501
MockLogProcessorDaemon(c).get_processed_files_list())
503
def test_get_aggregate_data(self):
504
# when run "for real"
505
# the various keys/values in the input and output
506
# dictionaries are often not simple strings
507
# for testing we can use keys that are easier to work with
509
processed_files = set()
513
'acct1_time1': {'field1': 1, 'field2': 2, 'field3': 3},
514
'acct1_time2': {'field1': 4, 'field2': 5},
515
'acct2_time1': {'field1': 6, 'field2': 7},
516
'acct3_time3': {'field1': 8, 'field2': 9},
519
['file2', {'acct1_time1': {'field1': 10}}],
522
expected_data_out = {
523
'acct1_time1': {'field1': 11, 'field2': 2, 'field3': 3},
524
'acct1_time2': {'field1': 4, 'field2': 5},
525
'acct2_time1': {'field1': 6, 'field2': 7},
526
'acct3_time3': {'field1': 8, 'field2': 9},
529
class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
533
d = MockLogProcessorDaemon()
534
data_out = d.get_aggregate_data(processed_files, data_in)
536
for k, v in expected_data_out.items():
537
self.assertEquals(v, data_out[k])
539
self.assertEquals(set(['file1', 'file2']), processed_files)
541
def test_get_final_info(self):
542
# when run "for real"
543
# the various keys/values in the input and output
544
# dictionaries are often not simple strings
545
# for testing we can use keys/values that are easier to work with
547
class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
549
self._keylist_mapping = {
550
'out_field1':['field1', 'field2', 'field3'],
551
'out_field2':['field2', 'field3'],
552
'out_field3':['field3'],
553
'out_field4':'field4',
554
'out_field5':['field6', 'field7', 'field8'],
555
'out_field6':['field6'],
556
'out_field7':'field7',
560
'acct1_time1': {'field1': 11, 'field2': 2, 'field3': 3,
561
'field4': 8, 'field5': 11},
562
'acct1_time2': {'field1': 4, 'field2': 5},
563
'acct2_time1': {'field1': 6, 'field2': 7},
564
'acct3_time3': {'field1': 8, 'field2': 9},
567
expected_data_out = {
568
'acct1_time1': {'out_field1': 16, 'out_field2': 5,
569
'out_field3': 3, 'out_field4': 8, 'out_field5': 0,
570
'out_field6': 0, 'out_field7': 0,},
571
'acct1_time2': {'out_field1': 9, 'out_field2': 5,
572
'out_field3': 0, 'out_field4': 0, 'out_field5': 0,
573
'out_field6': 0, 'out_field7': 0,},
574
'acct2_time1': {'out_field1': 13, 'out_field2': 7,
575
'out_field3': 0, 'out_field4': 0, 'out_field5': 0,
576
'out_field6': 0, 'out_field7': 0,},
577
'acct3_time3': {'out_field1': 17, 'out_field2': 9,
578
'out_field3': 0, 'out_field4': 0, 'out_field5': 0,
579
'out_field6': 0, 'out_field7': 0,},
582
self.assertEquals(expected_data_out,
583
MockLogProcessorDaemon().get_final_info(data_in))
585
def test_store_processed_files_list(self):
586
class MockInternalProxy:
587
def __init__(self, test, daemon, processed_files):
590
self.processed_files = processed_files
592
def upload_file(self, f, account, container, filename):
593
self.test.assertEquals(self.processed_files,
594
pickle.loads(f.getvalue()))
595
self.test.assertEquals(self.daemon.log_processor_account,
597
self.test.assertEquals(self.daemon.log_processor_container,
599
self.test.assertEquals(self.daemon.processed_files_filename,
602
class MockLogProcessor:
603
def __init__(self, test, daemon, processed_files):
604
self.internal_proxy = MockInternalProxy(test, daemon,
607
class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
608
def __init__(self, test, processed_files):
609
self.log_processor = \
610
MockLogProcessor(test, self, processed_files)
611
self.log_processor_account = 'account'
612
self.log_processor_container = 'container'
613
self.processed_files_filename = 'filename'
615
processed_files = set(['a', 'b', 'c'])
616
MockLogProcessorDaemon(self, processed_files).\
617
store_processed_files_list(processed_files)
619
def test_get_output(self):
620
class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
622
self._keylist_mapping = {'a':None, 'b':None, 'c':None}
625
('acct1', 2010, 1, 1, 0): {'a':1, 'b':2, 'c':3},
626
('acct1', 2010, 10, 10, 10): {'a':10, 'b':20, 'c':30},
627
('acct2', 2008, 3, 6, 9): {'a':8, 'b':9, 'c':12},
628
('acct3', 2005, 4, 8, 16): {'a':1, 'b':5, 'c':25},
631
expected_data_out = [
632
['data_ts', 'account', 'a', 'b', 'c'],
633
['2010/01/01 00:00:00', 'acct1', '1', '2', '3'],
634
['2010/10/10 10:00:00', 'acct1', '10', '20', '30'],
635
['2008/03/06 09:00:00', 'acct2', '8', '9', '12'],
636
['2005/04/08 16:00:00', 'acct3', '1', '5', '25'],
639
data_out = MockLogProcessorDaemon().get_output(data_in)
640
self.assertEquals(expected_data_out[0], data_out[0])
642
for row in data_out[1:]:
643
self.assert_(row in expected_data_out)
645
for row in expected_data_out[1:]:
646
self.assert_(row in data_out)
648
def test_store_output(self):
650
real_strftime = time.strftime
651
mock_strftime_return = '2010/03/02/01/'
652
def mock_strftime(format):
653
self.assertEquals('%Y/%m/%d/%H/', format)
654
return mock_strftime_return
655
log_processor.time.strftime = mock_strftime
658
['data_ts', 'account', 'a', 'b', 'c'],
659
['2010/10/10 10:00:00', 'acct1', '1', '2', '3'],
660
['2010/10/10 10:00:00', 'acct1', '10', '20', '30'],
661
['2008/03/06 09:00:00', 'acct2', '8', '9', '12'],
662
['2005/04/08 16:00:00', 'acct3', '1', '5', '25'],
665
expected_output = '\n'.join([','.join(row) for row in data_in])
666
h = hashlib.md5(expected_output).hexdigest()
667
expected_filename = '%s%s.csv.gz' % (mock_strftime_return, h)
669
class MockInternalProxy:
670
def __init__(self, test, daemon, expected_filename,
674
self.expected_filename = expected_filename
675
self.expected_output = expected_output
677
def upload_file(self, f, account, container, filename):
678
self.test.assertEquals(self.daemon.log_processor_account,
680
self.test.assertEquals(self.daemon.log_processor_container,
682
self.test.assertEquals(self.expected_filename, filename)
683
self.test.assertEquals(self.expected_output, f.getvalue())
685
class MockLogProcessor:
686
def __init__(self, test, daemon, expected_filename,
688
self.internal_proxy = MockInternalProxy(test, daemon,
689
expected_filename, expected_output)
691
class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
692
def __init__(self, test, expected_filename, expected_output):
693
self.log_processor = MockLogProcessor(test, self,
694
expected_filename, expected_output)
695
self.log_processor_account = 'account'
696
self.log_processor_container = 'container'
697
self.processed_files_filename = 'filename'
699
MockLogProcessorDaemon(self, expected_filename, expected_output).\
700
store_output(data_in)
702
log_processor.time.strftime = real_strftime
704
def test_keylist_mapping(self):
705
# Kind of lame test to see if the propery is both
706
# generated by a particular method and cached properly.
707
# The method that actually generates the mapping is
710
value_return = 'keylist_mapping'
711
class MockLogProcessor:
715
def generate_keylist_mapping(self):
719
class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
721
self.log_processor = MockLogProcessor()
722
self._keylist_mapping = None
724
d = MockLogProcessorDaemon()
725
self.assertEquals(value_return, d.keylist_mapping)
726
self.assertEquals(value_return, d.keylist_mapping)
727
self.assertEquals(1, d.log_processor.call_count)
729
def test_process_logs(self):
731
mock_logs_to_process = 'logs_to_process'
732
mock_processed_files = 'processed_files'
734
real_multiprocess_collate = log_processor.multiprocess_collate
735
multiprocess_collate_return = 'multiprocess_collate_return'
737
get_aggregate_data_return = 'get_aggregate_data_return'
738
get_final_info_return = 'get_final_info_return'
739
get_output_return = 'get_output_return'
741
class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
742
def __init__(self, test):
744
self.total_conf = 'total_conf'
745
self.logger = 'logger'
746
self.worker_count = 'worker_count'
748
def get_aggregate_data(self, processed_files, results):
749
self.test.assertEquals(mock_processed_files, processed_files)
750
self.test.assertEquals(multiprocess_collate_return, results)
751
return get_aggregate_data_return
753
def get_final_info(self, aggr_data):
754
self.test.assertEquals(get_aggregate_data_return, aggr_data)
755
return get_final_info_return
757
def get_output(self, final_info):
758
self.test.assertEquals(get_final_info_return, final_info)
759
return get_output_return
761
d = MockLogProcessorDaemon(self)
763
def mock_multiprocess_collate(processor_args, logs_to_process,
765
self.assertEquals(d.total_conf, processor_args[0])
766
self.assertEquals(d.logger, processor_args[1])
768
self.assertEquals(mock_logs_to_process, logs_to_process)
769
self.assertEquals(d.worker_count, worker_count)
771
return multiprocess_collate_return
773
log_processor.multiprocess_collate = mock_multiprocess_collate
775
output = d.process_logs(mock_logs_to_process, mock_processed_files)
776
self.assertEquals(get_output_return, output)
778
log_processor.multiprocess_collate = real_multiprocess_collate
780
def test_run_once_get_processed_files_list_returns_none(self):
781
class MockLogProcessor:
782
def get_data_list(self, lookback_start, lookback_end,
784
raise unittest.TestCase.failureException, \
785
'Method should not be called'
787
class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
789
self.logger = DumbLogger()
790
self.log_processor = MockLogProcessor()
792
def get_lookback_interval(self):
795
def get_processed_files_list(self):
798
MockLogProcessorDaemon().run_once()
800
def test_run_once_no_logs_to_process(self):
801
class MockLogProcessor():
802
def __init__(self, daemon, test):
806
def get_data_list(self, lookback_start, lookback_end,
808
self.test.assertEquals(self.daemon.lookback_start,
810
self.test.assertEquals(self.daemon.lookback_end,
812
self.test.assertEquals(self.daemon.processed_files,
816
class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
817
def __init__(self, test):
818
self.logger = DumbLogger()
819
self.log_processor = MockLogProcessor(self, test)
820
self.lookback_start = 'lookback_start'
821
self.lookback_end = 'lookback_end'
822
self.processed_files = ['a', 'b', 'c']
824
def get_lookback_interval(self):
825
return self.lookback_start, self.lookback_end
827
def get_processed_files_list(self):
828
return self.processed_files
830
def process_logs(logs_to_process, processed_files):
831
raise unittest.TestCase.failureException, \
832
'Method should not be called'
834
MockLogProcessorDaemon(self).run_once()