1
# Copyright (c) 2010-2011 OpenStack, LLC.
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
7
# http://www.apache.org/licenses/LICENSE-2.0
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
16
from __future__ import with_statement
23
from paste.deploy import appconfig
25
from swift.common.internal_proxy import InternalProxy
26
from swift.common.daemon import Daemon
27
from swift.common import utils
30
class LogUploader(Daemon):
32
Given a local directory, a swift account, and a container name, LogParser
33
will upload all files in the local directory to the given account/
34
container. All but the newest files will be uploaded, and the files' md5
35
sum will be computed. The hash is used to prevent duplicate data from
36
being uploaded multiple times in different files (ex: log lines). Since
37
the hash is computed, it is also used as the uploaded object's etag to
38
ensure data integrity.
40
Note that after the file is successfully uploaded, it will be unlinked.
42
The given proxy server config is used to instantiate a proxy server for
45
The default log file format is: plugin_name-%Y%m%d%H* . Any other format
46
of log file names must supply a regular expression that defines groups
47
for year, month, day, and hour. The regular expression will be evaluated
48
with re.VERBOSE. A common example may be:
49
source_filename_pattern = ^cdn_logger-
57
def __init__(self, uploader_conf, plugin_name, regex=None, cutoff=None):
58
super(LogUploader, self).__init__(uploader_conf)
59
log_name = '%s-log-uploader' % plugin_name
60
self.logger = utils.get_logger(uploader_conf, log_name,
61
log_route=plugin_name)
62
self.log_dir = uploader_conf.get('log_dir', '/var/log/swift/')
63
self.swift_account = uploader_conf['swift_account']
64
self.container_name = uploader_conf['container_name']
65
proxy_server_conf_loc = uploader_conf.get('proxy_server_conf',
66
'/etc/swift/proxy-server.conf')
67
proxy_server_conf = appconfig('config:%s' % proxy_server_conf_loc,
69
self.internal_proxy = InternalProxy(proxy_server_conf)
70
self.new_log_cutoff = int(cutoff or
71
uploader_conf.get('new_log_cutoff', '7200'))
72
self.unlink_log = uploader_conf.get('unlink_log', 'true').lower() in \
74
self.filename_pattern = regex or \
75
uploader_conf.get('source_filename_pattern',
84
def run_once(self, *args, **kwargs):
85
self.logger.info(_("Uploading logs"))
87
self.upload_all_logs()
88
self.logger.info(_("Uploading logs complete (%0.2f minutes)") %
89
((time.time() - start) / 60))
91
def get_relpath_to_files_under_log_dir(self):
93
Look under log_dir recursively and return all filenames as relpaths
95
:returns : list of strs, the relpath to all filenames under log_dir
98
for path, dirs, files in os.walk(self.log_dir):
99
all_files.extend(os.path.join(path, f) for f in files)
100
return [os.path.relpath(f, start=self.log_dir) for f in all_files]
102
def filter_files(self, all_files):
104
Filter files based on regex pattern
106
:param all_files: list of strs, relpath of the filenames under log_dir
107
:param pattern: regex pattern to match against filenames
109
:returns : dict mapping full path of file to match group dict
113
for filename in all_files:
114
match = re.match(self.filename_pattern, filename, re.VERBOSE)
117
full_path = os.path.join(self.log_dir, filename)
118
filename2match[full_path] = match.groupdict()
120
self.logger.debug(_('%(filename)s does not match '
121
'%(pattern)s') % {'filename': filename,
122
'pattern': self.filename_pattern})
123
return filename2match
125
def upload_all_logs(self):
127
Match files under log_dir to source_filename_pattern and upload to
130
all_files = self.get_relpath_to_files_under_log_dir()
131
filename2match = self.filter_files(all_files)
132
if not filename2match:
133
self.logger.error(_('No files in %(log_dir)s match %(pattern)s') %
134
{'log_dir': self.log_dir,
135
'pattern': self.filename_pattern})
137
if not self.internal_proxy.create_container(self.swift_account,
138
self.container_name):
139
self.logger.error(_('Unable to create container for '
140
'%(account)s/%(container)s') % {
141
'account': self.swift_account,
142
'container': self.container_name})
144
for filename, match in filename2match.items():
145
# don't process very new logs
146
seconds_since_mtime = time.time() - os.stat(filename).st_mtime
147
if seconds_since_mtime < self.new_log_cutoff:
148
self.logger.debug(_("Skipping log: %(file)s "
149
"(< %(cutoff)d seconds old)") % {
151
'cutoff': self.new_log_cutoff})
153
self.upload_one_log(filename, **match)
155
def upload_one_log(self, filename, year, month, day, hour):
157
Upload one file to swift
159
if os.path.getsize(filename) == 0:
160
self.logger.debug(_("Log %s is 0 length, skipping") % filename)
162
self.logger.debug(_("Processing log: %s") % filename)
163
filehash = hashlib.md5()
164
already_compressed = True if filename.endswith('.gz') else False
165
opener = gzip.open if already_compressed else open
166
f = opener(filename, 'rb')
169
# filter out bad lines here?
170
filehash.update(line)
173
filehash = filehash.hexdigest()
174
# By adding a hash to the filename, we ensure that uploaded files
175
# have unique filenames and protect against uploading one file
176
# more than one time. By using md5, we get an etag for free.
177
target_filename = '/'.join([year, month, day, hour, filehash + '.gz'])
178
if self.internal_proxy.upload_file(filename,
182
compress=(not already_compressed)):
183
self.logger.debug(_("Uploaded log %(file)s to %(target)s") %
184
{'file': filename, 'target': target_filename})
188
self.logger.error(_("ERROR: Upload of log %s failed!") % filename)