~ttx/swift/release-1.4.2

« back to all changes in this revision

Viewing changes to swift/stats/log_uploader.py

  • Committer: Tarmac
  • Author(s): gholt, FUJITA Tomonori, John Dickinson, David Goetz, John Dickinson, Joe Arnold, Scott Simpson, joe at cloudscaling, Thierry Carrez
  • Date: 2011-07-26 09:08:37 UTC
  • mfrom: (305.1.1 milestone-proposed)
  • Revision ID: tarmac-20110726090837-fwlvja8dnk7nkppw
Merge 1.4.2 development from trunk (rev331)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (c) 2010-2011 OpenStack, LLC.
2
 
#
3
 
# Licensed under the Apache License, Version 2.0 (the "License");
4
 
# you may not use this file except in compliance with the License.
5
 
# You may obtain a copy of the License at
6
 
#
7
 
#    http://www.apache.org/licenses/LICENSE-2.0
8
 
#
9
 
# Unless required by applicable law or agreed to in writing, software
10
 
# distributed under the License is distributed on an "AS IS" BASIS,
11
 
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
 
# implied.
13
 
# See the License for the specific language governing permissions and
14
 
# limitations under the License.
15
 
 
16
 
from __future__ import with_statement
17
 
import os
18
 
import hashlib
19
 
import time
20
 
import gzip
21
 
import re
22
 
import sys
23
 
from paste.deploy import appconfig
24
 
 
25
 
from swift.common.internal_proxy import InternalProxy
26
 
from swift.common.daemon import Daemon
27
 
from swift.common import utils
28
 
 
29
 
 
30
 
class LogUploader(Daemon):
31
 
    '''
32
 
    Given a local directory, a swift account, and a container name, LogParser
33
 
    will upload all files in the local directory to the given account/
34
 
    container.  All but the newest files will be uploaded, and the files' md5
35
 
    sum will be computed. The hash is used to prevent duplicate data from
36
 
    being uploaded multiple times in different files (ex: log lines). Since
37
 
    the hash is computed, it is also used as the uploaded object's etag to
38
 
    ensure data integrity.
39
 
 
40
 
    Note that after the file is successfully uploaded, it will be unlinked.
41
 
 
42
 
    The given proxy server config is used to instantiate a proxy server for
43
 
    the object uploads.
44
 
 
45
 
    The default log file format is: plugin_name-%Y%m%d%H* . Any other format
46
 
    of log file names must supply a regular expression that defines groups
47
 
    for year, month, day, and hour. The regular expression will be evaluated
48
 
    with re.VERBOSE. A common example may be:
49
 
    source_filename_pattern = ^cdn_logger-
50
 
        (?P<year>[0-9]{4})
51
 
        (?P<month>[0-1][0-9])
52
 
        (?P<day>[0-3][0-9])
53
 
        (?P<hour>[0-2][0-9])
54
 
        .*$
55
 
    '''
56
 
 
57
 
    def __init__(self, uploader_conf, plugin_name, regex=None, cutoff=None):
58
 
        super(LogUploader, self).__init__(uploader_conf)
59
 
        log_name = '%s-log-uploader' % plugin_name
60
 
        self.logger = utils.get_logger(uploader_conf, log_name,
61
 
                                       log_route=plugin_name)
62
 
        self.log_dir = uploader_conf.get('log_dir', '/var/log/swift/')
63
 
        self.swift_account = uploader_conf['swift_account']
64
 
        self.container_name = uploader_conf['container_name']
65
 
        proxy_server_conf_loc = uploader_conf.get('proxy_server_conf',
66
 
                                            '/etc/swift/proxy-server.conf')
67
 
        proxy_server_conf = appconfig('config:%s' % proxy_server_conf_loc,
68
 
                                      name='proxy-server')
69
 
        self.internal_proxy = InternalProxy(proxy_server_conf)
70
 
        self.new_log_cutoff = int(cutoff or
71
 
                                  uploader_conf.get('new_log_cutoff', '7200'))
72
 
        self.unlink_log = uploader_conf.get('unlink_log', 'true').lower() in \
73
 
                utils.TRUE_VALUES
74
 
        self.filename_pattern = regex or \
75
 
            uploader_conf.get('source_filename_pattern',
76
 
                '''
77
 
                ^%s-
78
 
                (?P<year>[0-9]{4})
79
 
                (?P<month>[0-1][0-9])
80
 
                (?P<day>[0-3][0-9])
81
 
                (?P<hour>[0-2][0-9])
82
 
                .*$''' % plugin_name)
83
 
 
84
 
    def run_once(self, *args, **kwargs):
85
 
        self.logger.info(_("Uploading logs"))
86
 
        start = time.time()
87
 
        self.upload_all_logs()
88
 
        self.logger.info(_("Uploading logs complete (%0.2f minutes)") %
89
 
            ((time.time() - start) / 60))
90
 
 
91
 
    def get_relpath_to_files_under_log_dir(self):
92
 
        """
93
 
        Look under log_dir recursively and return all filenames as relpaths
94
 
 
95
 
        :returns : list of strs, the relpath to all filenames under log_dir
96
 
        """
97
 
        all_files = []
98
 
        for path, dirs, files in os.walk(self.log_dir):
99
 
            all_files.extend(os.path.join(path, f) for f in files)
100
 
        return [os.path.relpath(f, start=self.log_dir) for f in all_files]
101
 
 
102
 
    def filter_files(self, all_files):
103
 
        """
104
 
        Filter files based on regex pattern
105
 
 
106
 
        :param all_files: list of strs, relpath of the filenames under log_dir
107
 
        :param pattern: regex pattern to match against filenames
108
 
 
109
 
        :returns : dict mapping full path of file to match group dict
110
 
        """
111
 
        filename2match = {}
112
 
        found_match = False
113
 
        for filename in all_files:
114
 
            match = re.match(self.filename_pattern, filename, re.VERBOSE)
115
 
            if match:
116
 
                found_match = True
117
 
                full_path = os.path.join(self.log_dir, filename)
118
 
                filename2match[full_path] = match.groupdict()
119
 
            else:
120
 
                self.logger.debug(_('%(filename)s does not match '
121
 
                           '%(pattern)s') % {'filename': filename,
122
 
                                             'pattern': self.filename_pattern})
123
 
        return filename2match
124
 
 
125
 
    def upload_all_logs(self):
126
 
        """
127
 
        Match files under log_dir to source_filename_pattern and upload to
128
 
        swift
129
 
        """
130
 
        all_files = self.get_relpath_to_files_under_log_dir()
131
 
        filename2match = self.filter_files(all_files)
132
 
        if not filename2match:
133
 
            self.logger.error(_('No files in %(log_dir)s match %(pattern)s') %
134
 
                     {'log_dir': self.log_dir,
135
 
                      'pattern': self.filename_pattern})
136
 
            sys.exit(1)
137
 
        if not self.internal_proxy.create_container(self.swift_account,
138
 
                                                    self.container_name):
139
 
            self.logger.error(_('Unable to create container for '
140
 
                                '%(account)s/%(container)s') % {
141
 
                                    'account': self.swift_account,
142
 
                                    'container': self.container_name})
143
 
            return
144
 
        for filename, match in filename2match.items():
145
 
            # don't process very new logs
146
 
            seconds_since_mtime = time.time() - os.stat(filename).st_mtime
147
 
            if seconds_since_mtime < self.new_log_cutoff:
148
 
                self.logger.debug(_("Skipping log: %(file)s "
149
 
                                    "(< %(cutoff)d seconds old)") % {
150
 
                                        'file': filename,
151
 
                                        'cutoff': self.new_log_cutoff})
152
 
                continue
153
 
            self.upload_one_log(filename, **match)
154
 
 
155
 
    def upload_one_log(self, filename, year, month, day, hour):
156
 
        """
157
 
        Upload one file to swift
158
 
        """
159
 
        if os.path.getsize(filename) == 0:
160
 
            self.logger.debug(_("Log %s is 0 length, skipping") % filename)
161
 
            return
162
 
        self.logger.debug(_("Processing log: %s") % filename)
163
 
        filehash = hashlib.md5()
164
 
        already_compressed = True if filename.endswith('.gz') else False
165
 
        opener = gzip.open if already_compressed else open
166
 
        f = opener(filename, 'rb')
167
 
        try:
168
 
            for line in f:
169
 
                # filter out bad lines here?
170
 
                filehash.update(line)
171
 
        finally:
172
 
            f.close()
173
 
        filehash = filehash.hexdigest()
174
 
        # By adding a hash to the filename, we ensure that uploaded files
175
 
        # have unique filenames and protect against uploading one file
176
 
        # more than one time. By using md5, we get an etag for free.
177
 
        target_filename = '/'.join([year, month, day, hour, filehash + '.gz'])
178
 
        if self.internal_proxy.upload_file(filename,
179
 
                                          self.swift_account,
180
 
                                          self.container_name,
181
 
                                          target_filename,
182
 
                                          compress=(not already_compressed)):
183
 
            self.logger.debug(_("Uploaded log %(file)s to %(target)s") %
184
 
                {'file': filename, 'target': target_filename})
185
 
            if self.unlink_log:
186
 
                os.unlink(filename)
187
 
        else:
188
 
            self.logger.error(_("ERROR: Upload of log %s failed!") % filename)