~ttx/swift/release-1.4.2

« back to all changes in this revision

Viewing changes to test/unit/stats/test_log_processor.py

  • Committer: Tarmac
  • Author(s): gholt, FUJITA Tomonori, John Dickinson, David Goetz, John Dickinson, Joe Arnold, Scott Simpson, joe at cloudscaling, Thierry Carrez
  • Date: 2011-07-26 09:08:37 UTC
  • mfrom: (305.1.1 milestone-proposed)
  • Revision ID: tarmac-20110726090837-fwlvja8dnk7nkppw
Merge 1.4.2 development from trunk (rev331)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (c) 2010-2011 OpenStack, LLC.
2
 
#
3
 
# Licensed under the Apache License, Version 2.0 (the "License");
4
 
# you may not use this file except in compliance with the License.
5
 
# You may obtain a copy of the License at
6
 
#
7
 
#    http://www.apache.org/licenses/LICENSE-2.0
8
 
#
9
 
# Unless required by applicable law or agreed to in writing, software
10
 
# distributed under the License is distributed on an "AS IS" BASIS,
11
 
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
 
# implied.
13
 
# See the License for the specific language governing permissions and
14
 
# limitations under the License.
15
 
 
16
 
import unittest
17
 
from test.unit import tmpfile
18
 
import Queue
19
 
import datetime
20
 
import hashlib
21
 
import pickle
22
 
import time
23
 
 
24
 
from swift.common import internal_proxy
25
 
from swift.stats import log_processor
26
 
from swift.common.exceptions import ChunkReadTimeout
27
 
 
28
 
 
29
 
class FakeUploadApp(object):
30
 
    def __init__(self, *args, **kwargs):
31
 
        pass
32
 
 
33
 
class DumbLogger(object):
34
 
    def __getattr__(self, n):
35
 
        return self.foo
36
 
 
37
 
    def foo(self, *a, **kw):
38
 
        pass
39
 
 
40
 
class DumbInternalProxy(object):
41
 
    def __init__(self, code=200, timeout=False, bad_compressed=False):
42
 
        self.code = code
43
 
        self.timeout = timeout
44
 
        self.bad_compressed = bad_compressed
45
 
 
46
 
    def get_container_list(self, account, container, marker=None,
47
 
                           end_marker=None):
48
 
        n = '2010/03/14/13/obj1'
49
 
        if marker is None or n > marker:
50
 
            if end_marker:
51
 
                if n <= end_marker:
52
 
                    return [{'name': n}]
53
 
                else:
54
 
                    return []
55
 
            return [{'name': n}]
56
 
        return []
57
 
 
58
 
    def get_object(self, account, container, object_name):
59
 
        if object_name.endswith('.gz'):
60
 
            if self.bad_compressed:
61
 
                # invalid compressed data
62
 
                def data():
63
 
                    yield '\xff\xff\xff\xff\xff\xff\xff'
64
 
            else:
65
 
                # 'obj\ndata', compressed with gzip -9
66
 
                def data():
67
 
                    yield '\x1f\x8b\x08'
68
 
                    yield '\x08"\xd79L'
69
 
                    yield '\x02\x03te'
70
 
                    yield 'st\x00\xcbO'
71
 
                    yield '\xca\xe2JI,I'
72
 
                    yield '\xe4\x02\x00O\xff'
73
 
                    yield '\xa3Y\t\x00\x00\x00'
74
 
        else:
75
 
            def data():
76
 
                yield 'obj\n'
77
 
                if self.timeout:
78
 
                    raise ChunkReadTimeout
79
 
                yield 'data'
80
 
        return self.code, data()
81
 
 
82
 
class TestLogProcessor(unittest.TestCase):
83
 
 
84
 
    access_test_line = 'Jul  9 04:14:30 saio proxy-server 1.2.3.4 4.5.6.7 '\
85
 
                    '09/Jul/2010/04/14/30 GET '\
86
 
                    '/v1/acct/foo/bar?format=json&foo HTTP/1.0 200 - '\
87
 
                    'curl tk4e350daf-9338-4cc6-aabb-090e49babfbd '\
88
 
                    '6 95 - txfa431231-7f07-42fd-8fc7-7da9d8cc1f90 - 0.0262'
89
 
    stats_test_line = 'account,1,2,3'
90
 
    proxy_config = {'log-processor': {
91
 
 
92
 
                    }
93
 
                   }
94
 
 
95
 
    def test_lazy_load_internal_proxy(self):
96
 
        # stub out internal_proxy's upload_app
97
 
        internal_proxy.BaseApplication = FakeUploadApp
98
 
        dummy_proxy_config = """[app:proxy-server]
99
 
use = egg:swift#proxy
100
 
"""
101
 
        with tmpfile(dummy_proxy_config) as proxy_config_file:
102
 
            conf = {'log-processor': {
103
 
                    'proxy_server_conf': proxy_config_file,
104
 
                }
105
 
            }
106
 
            p = log_processor.LogProcessor(conf, DumbLogger())
107
 
            self.assert_(isinstance(p._internal_proxy,
108
 
                                    None.__class__))
109
 
            self.assert_(isinstance(p.internal_proxy,
110
 
                                    log_processor.InternalProxy))
111
 
            self.assertEquals(p.internal_proxy, p._internal_proxy)
112
 
 
113
 
        # reset FakeUploadApp
114
 
        reload(internal_proxy)
115
 
 
116
 
    def test_access_log_line_parser(self):
117
 
        access_proxy_config = self.proxy_config.copy()
118
 
        access_proxy_config.update({
119
 
                        'log-processor-access': {
120
 
                            'source_filename_format':'%Y%m%d%H*',
121
 
                            'class_path':
122
 
                                'swift.stats.access_processor.AccessLogProcessor'
123
 
                        }})
124
 
        p = log_processor.LogProcessor(access_proxy_config, DumbLogger())
125
 
        result = p.plugins['access']['instance'].log_line_parser(self.access_test_line)
126
 
        self.assertEquals(result, {'code': 200,
127
 
           'processing_time': '0.0262',
128
 
           'auth_token': 'tk4e350daf-9338-4cc6-aabb-090e49babfbd',
129
 
           'month': '07',
130
 
           'second': '30',
131
 
           'year': '2010',
132
 
           'query': 'format=json&foo',
133
 
           'tz': '+0000',
134
 
           'http_version': 'HTTP/1.0',
135
 
           'object_name': 'bar',
136
 
           'etag': '-',
137
 
           'method': 'GET',
138
 
           'trans_id': 'txfa431231-7f07-42fd-8fc7-7da9d8cc1f90',
139
 
           'client_ip': '1.2.3.4',
140
 
           'format': 1,
141
 
           'bytes_out': 95,
142
 
           'container_name': 'foo',
143
 
           'day': '09',
144
 
           'minute': '14',
145
 
           'account': 'acct',
146
 
           'hour': '04',
147
 
           'referrer': '-',
148
 
           'request': '/v1/acct/foo/bar',
149
 
           'user_agent': 'curl',
150
 
           'bytes_in': 6,
151
 
           'lb_ip': '4.5.6.7'})
152
 
 
153
 
    def test_process_one_access_file(self):
154
 
        access_proxy_config = self.proxy_config.copy()
155
 
        access_proxy_config.update({
156
 
                        'log-processor-access': {
157
 
                            'source_filename_format':'%Y%m%d%H*',
158
 
                            'class_path':
159
 
                                'swift.stats.access_processor.AccessLogProcessor'
160
 
                        }})
161
 
        p = log_processor.LogProcessor(access_proxy_config, DumbLogger())
162
 
        def get_object_data(*a, **kw):
163
 
            return [self.access_test_line]
164
 
        p.get_object_data = get_object_data
165
 
        result = p.process_one_file('access', 'a', 'c', 'o')
166
 
        expected = {('acct', '2010', '07', '09', '04'):
167
 
                    {('public', 'object', 'GET', '2xx'): 1,
168
 
                    ('public', 'bytes_out'): 95,
169
 
                    'marker_query': 0,
170
 
                    'format_query': 1,
171
 
                    'delimiter_query': 0,
172
 
                    'path_query': 0,
173
 
                    ('public', 'bytes_in'): 6,
174
 
                    'prefix_query': 0}}
175
 
        self.assertEquals(result, expected)
176
 
 
177
 
    def test_process_one_access_file_error(self):
178
 
        access_proxy_config = self.proxy_config.copy()
179
 
        access_proxy_config.update({
180
 
                        'log-processor-access': {
181
 
                            'source_filename_format':'%Y%m%d%H*',
182
 
                            'class_path':
183
 
                                'swift.stats.access_processor.AccessLogProcessor'
184
 
                        }})
185
 
        p = log_processor.LogProcessor(access_proxy_config, DumbLogger())
186
 
        p._internal_proxy = DumbInternalProxy(code=500)
187
 
        self.assertRaises(log_processor.BadFileDownload, p.process_one_file,
188
 
                          'access', 'a', 'c', 'o')
189
 
 
190
 
    def test_get_container_listing(self):
191
 
        p = log_processor.LogProcessor(self.proxy_config, DumbLogger())
192
 
        p._internal_proxy = DumbInternalProxy()
193
 
        result = p.get_container_listing('a', 'foo')
194
 
        expected = ['2010/03/14/13/obj1']
195
 
        self.assertEquals(result, expected)
196
 
        result = p.get_container_listing('a', 'foo', listing_filter=expected)
197
 
        expected = []
198
 
        self.assertEquals(result, expected)
199
 
        result = p.get_container_listing('a', 'foo', start_date='2010031412',
200
 
                                            end_date='2010031414')
201
 
        expected = ['2010/03/14/13/obj1']
202
 
        self.assertEquals(result, expected)
203
 
        result = p.get_container_listing('a', 'foo', start_date='2010031414')
204
 
        expected = []
205
 
        self.assertEquals(result, expected)
206
 
        result = p.get_container_listing('a', 'foo', start_date='2010031410',
207
 
                                            end_date='2010031412')
208
 
        expected = []
209
 
        self.assertEquals(result, expected)
210
 
        result = p.get_container_listing('a', 'foo', start_date='2010031412',
211
 
                                            end_date='2010031413')
212
 
        expected = ['2010/03/14/13/obj1']
213
 
        self.assertEquals(result, expected)
214
 
 
215
 
    def test_get_object_data(self):
216
 
        p = log_processor.LogProcessor(self.proxy_config, DumbLogger())
217
 
        p._internal_proxy = DumbInternalProxy()
218
 
        result = list(p.get_object_data('a', 'c', 'o', False))
219
 
        expected = ['obj','data']
220
 
        self.assertEquals(result, expected)
221
 
        result = list(p.get_object_data('a', 'c', 'o.gz', True))
222
 
        self.assertEquals(result, expected)
223
 
 
224
 
    def test_get_object_data_errors(self):
225
 
        p = log_processor.LogProcessor(self.proxy_config, DumbLogger())
226
 
        p._internal_proxy = DumbInternalProxy(code=500)
227
 
        result = p.get_object_data('a', 'c', 'o')
228
 
        self.assertRaises(log_processor.BadFileDownload, list, result)
229
 
        p._internal_proxy = DumbInternalProxy(bad_compressed=True)
230
 
        result = p.get_object_data('a', 'c', 'o.gz', True)
231
 
        self.assertRaises(log_processor.BadFileDownload, list, result)
232
 
        p._internal_proxy = DumbInternalProxy(timeout=True)
233
 
        result = p.get_object_data('a', 'c', 'o')
234
 
        self.assertRaises(log_processor.BadFileDownload, list, result)
235
 
 
236
 
    def test_get_stat_totals(self):
237
 
        stats_proxy_config = self.proxy_config.copy()
238
 
        stats_proxy_config.update({
239
 
                        'log-processor-stats': {
240
 
                            'class_path':
241
 
                                'swift.stats.stats_processor.StatsLogProcessor'
242
 
                        }})
243
 
        p = log_processor.LogProcessor(stats_proxy_config, DumbLogger())
244
 
        p._internal_proxy = DumbInternalProxy()
245
 
        def get_object_data(*a,**kw):
246
 
            return [self.stats_test_line]
247
 
        p.get_object_data = get_object_data
248
 
        result = p.process_one_file('stats', 'a', 'c', 'y/m/d/h/o')
249
 
        expected = {('account', 'y', 'm', 'd', 'h'):
250
 
                    {'replica_count': 1,
251
 
                    'object_count': 2,
252
 
                    'container_count': 1,
253
 
                    'bytes_used': 3}}
254
 
        self.assertEquals(result, expected)
255
 
 
256
 
    def test_generate_keylist_mapping(self):
257
 
        p = log_processor.LogProcessor(self.proxy_config, DumbLogger())
258
 
        result = p.generate_keylist_mapping()
259
 
        expected = {}
260
 
        self.assertEquals(result, expected)
261
 
 
262
 
    def test_generate_keylist_mapping_with_dummy_plugins(self):
263
 
        class Plugin1(object):
264
 
            def keylist_mapping(self):
265
 
                return {'a': 'b', 'c': 'd', 'e': ['f', 'g']}
266
 
        class Plugin2(object):
267
 
            def keylist_mapping(self):
268
 
                return {'a': '1', 'e': '2', 'h': '3'}
269
 
        p = log_processor.LogProcessor(self.proxy_config, DumbLogger())
270
 
        p.plugins['plugin1'] = {'instance': Plugin1()}
271
 
        p.plugins['plugin2'] = {'instance': Plugin2()}
272
 
        result = p.generate_keylist_mapping()
273
 
        expected = {'a': set(['b', '1']), 'c': 'd', 'e': set(['2', 'f', 'g']),
274
 
                    'h': '3'}
275
 
        self.assertEquals(result, expected)
276
 
 
277
 
    def test_access_keylist_mapping_format(self):
278
 
        proxy_config = self.proxy_config.copy()
279
 
        proxy_config.update({
280
 
                        'log-processor-access': {
281
 
                            'source_filename_format':'%Y%m%d%H*',
282
 
                            'class_path':
283
 
                                'swift.stats.access_processor.AccessLogProcessor'
284
 
                        }})
285
 
        p = log_processor.LogProcessor(proxy_config, DumbLogger())
286
 
        mapping = p.generate_keylist_mapping()
287
 
        for k, v in mapping.items():
288
 
            # these only work for Py2.7+
289
 
            #self.assertIsInstance(k, str)
290
 
            self.assertTrue(isinstance(k, str), type(k))
291
 
 
292
 
    def test_stats_keylist_mapping_format(self):
293
 
        proxy_config = self.proxy_config.copy()
294
 
        proxy_config.update({
295
 
                        'log-processor-stats': {
296
 
                            'class_path':
297
 
                                'swift.stats.stats_processor.StatsLogProcessor'
298
 
                        }})
299
 
        p = log_processor.LogProcessor(proxy_config, DumbLogger())
300
 
        mapping = p.generate_keylist_mapping()
301
 
        for k, v in mapping.items():
302
 
            # these only work for Py2.7+
303
 
            #self.assertIsInstance(k, str)
304
 
            self.assertTrue(isinstance(k, str), type(k))
305
 
 
306
 
    def test_collate_worker(self):
307
 
        try:
308
 
            log_processor.LogProcessor._internal_proxy = DumbInternalProxy()
309
 
            def get_object_data(*a,**kw):
310
 
                return [self.access_test_line]
311
 
            orig_get_object_data = log_processor.LogProcessor.get_object_data
312
 
            log_processor.LogProcessor.get_object_data = get_object_data
313
 
            proxy_config = self.proxy_config.copy()
314
 
            proxy_config.update({
315
 
                    'log-processor-access': {
316
 
                        'source_filename_format':'%Y%m%d%H*',
317
 
                        'class_path':
318
 
                            'swift.stats.access_processor.AccessLogProcessor'
319
 
                    }})
320
 
            processor_args = (proxy_config, DumbLogger())
321
 
            q_in = Queue.Queue()
322
 
            q_out = Queue.Queue()
323
 
            work_request = ('access', 'a','c','o')
324
 
            q_in.put(work_request)
325
 
            q_in.put(None)
326
 
            log_processor.collate_worker(processor_args, q_in, q_out)
327
 
            item, ret = q_out.get()
328
 
            self.assertEquals(item, work_request)
329
 
            expected = {('acct', '2010', '07', '09', '04'):
330
 
                        {('public', 'object', 'GET', '2xx'): 1,
331
 
                        ('public', 'bytes_out'): 95,
332
 
                        'marker_query': 0,
333
 
                        'format_query': 1,
334
 
                        'delimiter_query': 0,
335
 
                        'path_query': 0,
336
 
                        ('public', 'bytes_in'): 6,
337
 
                        'prefix_query': 0}}
338
 
            self.assertEquals(ret, expected)
339
 
        finally:
340
 
            log_processor.LogProcessor._internal_proxy = None
341
 
            log_processor.LogProcessor.get_object_data = orig_get_object_data
342
 
 
343
 
    def test_collate_worker_error(self):
344
 
        def get_object_data(*a,**kw):
345
 
            raise Exception()
346
 
        orig_get_object_data = log_processor.LogProcessor.get_object_data
347
 
        try:
348
 
            log_processor.LogProcessor.get_object_data = get_object_data
349
 
            proxy_config = self.proxy_config.copy()
350
 
            proxy_config.update({
351
 
                    'log-processor-access': {
352
 
                        'source_filename_format':'%Y%m%d%H*',
353
 
                        'class_path':
354
 
                            'swift.stats.access_processor.AccessLogProcessor'
355
 
                    }})
356
 
            processor_args = (proxy_config, DumbLogger())
357
 
            q_in = Queue.Queue()
358
 
            q_out = Queue.Queue()
359
 
            work_request = ('access', 'a','c','o')
360
 
            q_in.put(work_request)
361
 
            q_in.put(None)
362
 
            log_processor.collate_worker(processor_args, q_in, q_out)
363
 
            item, ret = q_out.get()
364
 
            self.assertEquals(item, work_request)
365
 
            # these only work for Py2.7+
366
 
            #self.assertIsInstance(ret, log_processor.BadFileDownload)
367
 
            self.assertTrue(isinstance(ret, Exception))
368
 
        finally:
369
 
            log_processor.LogProcessor.get_object_data = orig_get_object_data
370
 
 
371
 
    def test_multiprocess_collate(self):
372
 
        try:
373
 
            log_processor.LogProcessor._internal_proxy = DumbInternalProxy()
374
 
            def get_object_data(*a,**kw):
375
 
                return [self.access_test_line]
376
 
            orig_get_object_data = log_processor.LogProcessor.get_object_data
377
 
            log_processor.LogProcessor.get_object_data = get_object_data
378
 
            proxy_config = self.proxy_config.copy()
379
 
            proxy_config.update({
380
 
                    'log-processor-access': {
381
 
                        'source_filename_format':'%Y%m%d%H*',
382
 
                        'class_path':
383
 
                            'swift.stats.access_processor.AccessLogProcessor'
384
 
                    }})
385
 
            processor_args = (proxy_config, DumbLogger())
386
 
            item = ('access', 'a','c','o')
387
 
            logs_to_process = [item]
388
 
            results = log_processor.multiprocess_collate(processor_args,
389
 
                                                         logs_to_process,
390
 
                                                         1)
391
 
            results = list(results)
392
 
            expected = [(item, {('acct', '2010', '07', '09', '04'):
393
 
                        {('public', 'object', 'GET', '2xx'): 1,
394
 
                        ('public', 'bytes_out'): 95,
395
 
                        'marker_query': 0,
396
 
                        'format_query': 1,
397
 
                        'delimiter_query': 0,
398
 
                        'path_query': 0,
399
 
                        ('public', 'bytes_in'): 6,
400
 
                        'prefix_query': 0}})]
401
 
            self.assertEquals(results, expected)
402
 
        finally:
403
 
            log_processor.LogProcessor._internal_proxy = None
404
 
            log_processor.LogProcessor.get_object_data = orig_get_object_data
405
 
 
406
 
    def test_multiprocess_collate_errors(self):
407
 
        def get_object_data(*a,**kw):
408
 
            raise log_processor.BadFileDownload()
409
 
        orig_get_object_data = log_processor.LogProcessor.get_object_data
410
 
        try:
411
 
            log_processor.LogProcessor.get_object_data = get_object_data
412
 
            proxy_config = self.proxy_config.copy()
413
 
            proxy_config.update({
414
 
                    'log-processor-access': {
415
 
                        'source_filename_format':'%Y%m%d%H*',
416
 
                        'class_path':
417
 
                            'swift.stats.access_processor.AccessLogProcessor'
418
 
                    }})
419
 
            processor_args = (proxy_config, DumbLogger())
420
 
            item = ('access', 'a','c','o')
421
 
            logs_to_process = [item]
422
 
            results = log_processor.multiprocess_collate(processor_args,
423
 
                                                         logs_to_process,
424
 
                                                         1)
425
 
            results = list(results)
426
 
            expected = []
427
 
            self.assertEquals(results, expected)
428
 
        finally:
429
 
            log_processor.LogProcessor._internal_proxy = None
430
 
            log_processor.LogProcessor.get_object_data = orig_get_object_data
431
 
 
432
 
class TestLogProcessorDaemon(unittest.TestCase):
433
 
 
434
 
    def test_get_lookback_interval(self):
435
 
        class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
436
 
            def __init__(self, lookback_hours, lookback_window):
437
 
                self.lookback_hours = lookback_hours
438
 
                self.lookback_window = lookback_window
439
 
 
440
 
        try:
441
 
            d = datetime.datetime
442
 
 
443
 
            for x in [
444
 
                    [d(2011, 1, 1), 0, 0, None, None],
445
 
                    [d(2011, 1, 1), 120, 0, '2010122700', None],
446
 
                    [d(2011, 1, 1), 120, 24, '2010122700', '2010122800'],
447
 
                    [d(2010, 1, 2, 3, 4), 120, 48, '2009122803', '2009123003'],
448
 
                    [d(2009, 5, 6, 7, 8), 1200, 100, '2009031707', '2009032111'],
449
 
                    [d(2008, 9, 10, 11, 12), 3000, 1000, '2008050811', '2008061903'],
450
 
                ]:
451
 
 
452
 
                log_processor.now = lambda: x[0]
453
 
 
454
 
                d = MockLogProcessorDaemon(x[1], x[2])
455
 
                self.assertEquals((x[3], x[4]), d.get_lookback_interval())
456
 
        finally:
457
 
            log_processor.now = datetime.datetime.now
458
 
 
459
 
    def test_get_processed_files_list(self):
460
 
        class MockLogProcessor():
461
 
            def __init__(self, stream):
462
 
                self.stream = stream
463
 
 
464
 
            def get_object_data(self, *args, **kwargs):
465
 
                return self.stream
466
 
 
467
 
        class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
468
 
            def __init__(self, stream):
469
 
                self.log_processor = MockLogProcessor(stream)
470
 
                self.log_processor_account = 'account'
471
 
                self.log_processor_container = 'container'
472
 
                self.processed_files_filename = 'filename'
473
 
 
474
 
        file_list = set(['a', 'b', 'c'])
475
 
 
476
 
        for s, l in [['', None],
477
 
                [pickle.dumps(set()).split('\n'), set()],
478
 
                [pickle.dumps(file_list).split('\n'), file_list],
479
 
            ]:
480
 
 
481
 
            self.assertEquals(l,
482
 
                MockLogProcessorDaemon(s).get_processed_files_list())
483
 
 
484
 
    def test_get_processed_files_list_bad_file_downloads(self):
485
 
        class MockLogProcessor():
486
 
            def __init__(self, status_code):
487
 
                self.err = log_processor.BadFileDownload(status_code)
488
 
 
489
 
            def get_object_data(self, *a, **k):
490
 
                raise self.err
491
 
 
492
 
        class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
493
 
            def __init__(self, status_code):
494
 
                self.log_processor = MockLogProcessor(status_code)
495
 
                self.log_processor_account = 'account'
496
 
                self.log_processor_container = 'container'
497
 
                self.processed_files_filename = 'filename'
498
 
 
499
 
        for c, l in [[404, set()], [503, None], [None, None]]:
500
 
            self.assertEquals(l,
501
 
                MockLogProcessorDaemon(c).get_processed_files_list())
502
 
 
503
 
    def test_get_aggregate_data(self):
504
 
        # when run "for real"
505
 
        # the various keys/values in the input and output
506
 
        # dictionaries are often not simple strings
507
 
        # for testing we can use keys that are easier to work with
508
 
 
509
 
        processed_files = set()
510
 
 
511
 
        data_in = [
512
 
            ['file1', {
513
 
                'acct1_time1': {'field1': 1, 'field2': 2, 'field3': 3},
514
 
                'acct1_time2': {'field1': 4, 'field2': 5},
515
 
                'acct2_time1': {'field1': 6, 'field2': 7},
516
 
                'acct3_time3': {'field1': 8, 'field2': 9},
517
 
                }
518
 
            ],
519
 
            ['file2', {'acct1_time1': {'field1': 10}}],
520
 
        ]
521
 
 
522
 
        expected_data_out = {
523
 
            'acct1_time1': {'field1': 11, 'field2': 2, 'field3': 3},
524
 
            'acct1_time2': {'field1': 4, 'field2': 5},
525
 
            'acct2_time1': {'field1': 6, 'field2': 7},
526
 
            'acct3_time3': {'field1': 8, 'field2': 9},
527
 
        }
528
 
 
529
 
        class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
530
 
            def __init__(self):
531
 
                pass
532
 
 
533
 
        d = MockLogProcessorDaemon()
534
 
        data_out = d.get_aggregate_data(processed_files, data_in)
535
 
 
536
 
        for k, v in expected_data_out.items():
537
 
            self.assertEquals(v, data_out[k])
538
 
 
539
 
        self.assertEquals(set(['file1', 'file2']), processed_files)
540
 
 
541
 
    def test_get_final_info(self):
542
 
        # when run "for real"
543
 
        # the various keys/values in the input and output
544
 
        # dictionaries are often not simple strings
545
 
        # for testing we can use keys/values that are easier to work with
546
 
 
547
 
        class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
548
 
            def __init__(self):
549
 
                self._keylist_mapping = {
550
 
                    'out_field1':['field1', 'field2', 'field3'],
551
 
                    'out_field2':['field2', 'field3'],
552
 
                    'out_field3':['field3'],
553
 
                    'out_field4':'field4',
554
 
                    'out_field5':['field6', 'field7', 'field8'],
555
 
                    'out_field6':['field6'],
556
 
                    'out_field7':'field7',
557
 
                }
558
 
 
559
 
        data_in = {
560
 
            'acct1_time1': {'field1': 11, 'field2': 2, 'field3': 3,
561
 
                'field4': 8, 'field5': 11},
562
 
            'acct1_time2': {'field1': 4, 'field2': 5},
563
 
            'acct2_time1': {'field1': 6, 'field2': 7},
564
 
            'acct3_time3': {'field1': 8, 'field2': 9},
565
 
        }
566
 
 
567
 
        expected_data_out = {
568
 
            'acct1_time1': {'out_field1': 16, 'out_field2': 5,
569
 
                'out_field3': 3, 'out_field4': 8, 'out_field5': 0,
570
 
                'out_field6': 0, 'out_field7': 0,},
571
 
            'acct1_time2': {'out_field1': 9, 'out_field2': 5,
572
 
                'out_field3': 0, 'out_field4': 0, 'out_field5': 0,
573
 
                'out_field6': 0, 'out_field7': 0,},
574
 
            'acct2_time1': {'out_field1': 13, 'out_field2': 7,
575
 
                'out_field3': 0, 'out_field4': 0, 'out_field5': 0,
576
 
                'out_field6': 0, 'out_field7': 0,},
577
 
            'acct3_time3': {'out_field1': 17, 'out_field2': 9,
578
 
                'out_field3': 0, 'out_field4': 0, 'out_field5': 0,
579
 
                'out_field6': 0, 'out_field7': 0,},
580
 
        }
581
 
 
582
 
        self.assertEquals(expected_data_out,
583
 
            MockLogProcessorDaemon().get_final_info(data_in))
584
 
 
585
 
    def test_store_processed_files_list(self):
586
 
        class MockInternalProxy:
587
 
            def __init__(self, test, daemon, processed_files):
588
 
                self.test = test
589
 
                self.daemon = daemon
590
 
                self.processed_files = processed_files
591
 
 
592
 
            def upload_file(self, f, account, container, filename):
593
 
                self.test.assertEquals(self.processed_files,
594
 
                    pickle.loads(f.getvalue()))
595
 
                self.test.assertEquals(self.daemon.log_processor_account,
596
 
                    account)
597
 
                self.test.assertEquals(self.daemon.log_processor_container,
598
 
                    container)
599
 
                self.test.assertEquals(self.daemon.processed_files_filename,
600
 
                    filename)
601
 
 
602
 
        class MockLogProcessor:
603
 
            def __init__(self, test, daemon, processed_files):
604
 
                self.internal_proxy = MockInternalProxy(test, daemon,
605
 
                    processed_files)
606
 
 
607
 
        class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
608
 
            def __init__(self, test, processed_files):
609
 
                self.log_processor = \
610
 
                    MockLogProcessor(test, self, processed_files)
611
 
                self.log_processor_account = 'account'
612
 
                self.log_processor_container = 'container'
613
 
                self.processed_files_filename = 'filename'
614
 
 
615
 
        processed_files = set(['a', 'b', 'c'])
616
 
        MockLogProcessorDaemon(self, processed_files).\
617
 
            store_processed_files_list(processed_files)
618
 
 
619
 
    def test_get_output(self):
620
 
        class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
621
 
            def __init__(self):
622
 
                self._keylist_mapping = {'a':None, 'b':None, 'c':None}
623
 
 
624
 
        data_in = {
625
 
            ('acct1', 2010, 1, 1, 0): {'a':1, 'b':2, 'c':3},
626
 
            ('acct1', 2010, 10, 10, 10): {'a':10, 'b':20, 'c':30},
627
 
            ('acct2', 2008, 3, 6, 9): {'a':8, 'b':9, 'c':12},
628
 
            ('acct3', 2005, 4, 8, 16): {'a':1, 'b':5, 'c':25},
629
 
        }
630
 
 
631
 
        expected_data_out = [
632
 
            ['data_ts', 'account', 'a', 'b', 'c'],
633
 
            ['2010/01/01 00:00:00', 'acct1', '1', '2', '3'],
634
 
            ['2010/10/10 10:00:00', 'acct1', '10', '20', '30'],
635
 
            ['2008/03/06 09:00:00', 'acct2', '8', '9', '12'],
636
 
            ['2005/04/08 16:00:00', 'acct3', '1', '5', '25'],
637
 
        ]
638
 
 
639
 
        data_out = MockLogProcessorDaemon().get_output(data_in)
640
 
        self.assertEquals(expected_data_out[0], data_out[0])
641
 
 
642
 
        for row in data_out[1:]:
643
 
            self.assert_(row in expected_data_out)
644
 
 
645
 
        for row in expected_data_out[1:]:
646
 
            self.assert_(row in data_out)
647
 
 
648
 
    def test_store_output(self):
649
 
        try:
650
 
            real_strftime = time.strftime
651
 
            mock_strftime_return = '2010/03/02/01/'
652
 
            def mock_strftime(format):
653
 
                self.assertEquals('%Y/%m/%d/%H/', format)
654
 
                return mock_strftime_return
655
 
            log_processor.time.strftime = mock_strftime
656
 
 
657
 
            data_in = [
658
 
                ['data_ts', 'account', 'a', 'b', 'c'],
659
 
                ['2010/10/10 10:00:00', 'acct1', '1', '2', '3'],
660
 
                ['2010/10/10 10:00:00', 'acct1', '10', '20', '30'],
661
 
                ['2008/03/06 09:00:00', 'acct2', '8', '9', '12'],
662
 
                ['2005/04/08 16:00:00', 'acct3', '1', '5', '25'],
663
 
            ]
664
 
 
665
 
            expected_output = '\n'.join([','.join(row) for row in data_in])
666
 
            h = hashlib.md5(expected_output).hexdigest()
667
 
            expected_filename = '%s%s.csv.gz' % (mock_strftime_return, h)
668
 
 
669
 
            class MockInternalProxy:
670
 
                def __init__(self, test, daemon, expected_filename,
671
 
                    expected_output):
672
 
                    self.test = test
673
 
                    self.daemon = daemon
674
 
                    self.expected_filename = expected_filename
675
 
                    self.expected_output = expected_output
676
 
 
677
 
                def upload_file(self, f, account, container, filename):
678
 
                    self.test.assertEquals(self.daemon.log_processor_account,
679
 
                        account)
680
 
                    self.test.assertEquals(self.daemon.log_processor_container,
681
 
                        container)
682
 
                    self.test.assertEquals(self.expected_filename, filename)
683
 
                    self.test.assertEquals(self.expected_output, f.getvalue())
684
 
 
685
 
            class MockLogProcessor:
686
 
                def __init__(self, test, daemon, expected_filename,
687
 
                    expected_output):
688
 
                    self.internal_proxy = MockInternalProxy(test, daemon,
689
 
                        expected_filename, expected_output)
690
 
 
691
 
            class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
692
 
                def __init__(self, test, expected_filename, expected_output):
693
 
                    self.log_processor = MockLogProcessor(test, self,
694
 
                        expected_filename, expected_output)
695
 
                    self.log_processor_account = 'account'
696
 
                    self.log_processor_container = 'container'
697
 
                    self.processed_files_filename = 'filename'
698
 
 
699
 
            MockLogProcessorDaemon(self, expected_filename, expected_output).\
700
 
                store_output(data_in)
701
 
        finally:
702
 
            log_processor.time.strftime = real_strftime
703
 
 
704
 
    def test_keylist_mapping(self):
705
 
        # Kind of lame test to see if the propery is both
706
 
        # generated by a particular method and cached properly.
707
 
        # The method that actually generates the mapping is
708
 
        # tested elsewhere.
709
 
 
710
 
        value_return = 'keylist_mapping'
711
 
        class MockLogProcessor:
712
 
            def __init__(self):
713
 
                self.call_count = 0
714
 
 
715
 
            def generate_keylist_mapping(self):
716
 
                self.call_count += 1
717
 
                return value_return
718
 
 
719
 
        class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
720
 
            def __init__(self):
721
 
                self.log_processor = MockLogProcessor()
722
 
                self._keylist_mapping = None
723
 
 
724
 
        d = MockLogProcessorDaemon()
725
 
        self.assertEquals(value_return, d.keylist_mapping)
726
 
        self.assertEquals(value_return, d.keylist_mapping)
727
 
        self.assertEquals(1, d.log_processor.call_count)
728
 
 
729
 
    def test_process_logs(self):
730
 
        try:
731
 
            mock_logs_to_process = 'logs_to_process'
732
 
            mock_processed_files = 'processed_files'
733
 
 
734
 
            real_multiprocess_collate = log_processor.multiprocess_collate
735
 
            multiprocess_collate_return = 'multiprocess_collate_return'
736
 
 
737
 
            get_aggregate_data_return = 'get_aggregate_data_return'
738
 
            get_final_info_return = 'get_final_info_return'
739
 
            get_output_return = 'get_output_return'
740
 
 
741
 
            class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
742
 
                def __init__(self, test):
743
 
                    self.test = test
744
 
                    self.total_conf = 'total_conf'
745
 
                    self.logger = 'logger'
746
 
                    self.worker_count = 'worker_count'
747
 
 
748
 
                def get_aggregate_data(self, processed_files, results):
749
 
                    self.test.assertEquals(mock_processed_files, processed_files)
750
 
                    self.test.assertEquals(multiprocess_collate_return, results)
751
 
                    return get_aggregate_data_return
752
 
 
753
 
                def get_final_info(self, aggr_data):
754
 
                    self.test.assertEquals(get_aggregate_data_return, aggr_data)
755
 
                    return get_final_info_return
756
 
 
757
 
                def get_output(self, final_info):
758
 
                    self.test.assertEquals(get_final_info_return, final_info)
759
 
                    return get_output_return
760
 
 
761
 
            d = MockLogProcessorDaemon(self)
762
 
 
763
 
            def mock_multiprocess_collate(processor_args, logs_to_process,
764
 
                worker_count):
765
 
                self.assertEquals(d.total_conf, processor_args[0])
766
 
                self.assertEquals(d.logger, processor_args[1])
767
 
 
768
 
                self.assertEquals(mock_logs_to_process, logs_to_process)
769
 
                self.assertEquals(d.worker_count, worker_count)
770
 
 
771
 
                return multiprocess_collate_return
772
 
 
773
 
            log_processor.multiprocess_collate = mock_multiprocess_collate
774
 
 
775
 
            output = d.process_logs(mock_logs_to_process, mock_processed_files)
776
 
            self.assertEquals(get_output_return, output)
777
 
        finally:
778
 
            log_processor.multiprocess_collate = real_multiprocess_collate
779
 
 
780
 
    def test_run_once_get_processed_files_list_returns_none(self):
781
 
        class MockLogProcessor:
782
 
            def get_data_list(self, lookback_start, lookback_end,
783
 
                processed_files):
784
 
                raise unittest.TestCase.failureException, \
785
 
                    'Method should not be called'
786
 
 
787
 
        class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
788
 
            def __init__(self):
789
 
                self.logger = DumbLogger()
790
 
                self.log_processor = MockLogProcessor()
791
 
 
792
 
            def get_lookback_interval(self):
793
 
                return None, None
794
 
 
795
 
            def get_processed_files_list(self):
796
 
                return None
797
 
 
798
 
        MockLogProcessorDaemon().run_once()
799
 
 
800
 
    def test_run_once_no_logs_to_process(self):
801
 
        class MockLogProcessor():
802
 
            def __init__(self, daemon, test):
803
 
                self.daemon = daemon
804
 
                self.test = test
805
 
 
806
 
            def get_data_list(self, lookback_start, lookback_end,
807
 
                processed_files):
808
 
                self.test.assertEquals(self.daemon.lookback_start,
809
 
                    lookback_start)
810
 
                self.test.assertEquals(self.daemon.lookback_end,
811
 
                    lookback_end)
812
 
                self.test.assertEquals(self.daemon.processed_files,
813
 
                    processed_files)
814
 
                return []
815
 
 
816
 
        class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
817
 
            def __init__(self, test):
818
 
                self.logger = DumbLogger()
819
 
                self.log_processor = MockLogProcessor(self, test)
820
 
                self.lookback_start = 'lookback_start'
821
 
                self.lookback_end = 'lookback_end'
822
 
                self.processed_files = ['a', 'b', 'c']
823
 
 
824
 
            def get_lookback_interval(self):
825
 
                return self.lookback_start, self.lookback_end
826
 
 
827
 
            def get_processed_files_list(self):
828
 
                return self.processed_files
829
 
 
830
 
            def process_logs(logs_to_process, processed_files):
831
 
                raise unittest.TestCase.failureException, \
832
 
                    'Method should not be called'
833
 
 
834
 
        MockLogProcessorDaemon(self).run_once()