1
# Copyright 2016 Canonical Limited.
3
# This file is part of charm-helpers.
5
# charm-helpers is free software: you can redistribute it and/or modify
6
# it under the terms of the GNU Lesser General Public License version 3 as
7
# published by the Free Software Foundation.
9
# charm-helpers is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
# GNU Lesser General Public License for more details.
14
# You should have received a copy of the GNU Lesser General Public License
15
# along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
22
from subprocess import (
27
from traceback import format_exc
28
from six import string_types
34
from charmhelpers.core.hookenv import (
41
from charmhelpers.core import unitdata
42
from charmhelpers.core.host import file_hash
43
from charmhelpers.contrib.hardening.audits import BaseAudit
44
from charmhelpers.contrib.hardening.templating import (
48
from charmhelpers.contrib.hardening import utils
51
class BaseFileAudit(BaseAudit):
52
"""Base class for file audits.
54
Provides api stubs for compliance check flow that must be used by any class
55
that implemented this one.
58
def __init__(self, paths, always_comply=False, *args, **kwargs):
60
:param paths: string path of list of paths of files we want to apply
61
compliance checks are criteria to.
62
:param always_comply: if true compliance criteria is always applied
63
else compliance is skipped for non-existent
66
super(BaseFileAudit, self).__init__(*args, **kwargs)
67
self.always_comply = always_comply
68
if isinstance(paths, string_types) or not hasattr(paths, '__iter__'):
73
def ensure_compliance(self):
74
"""Ensure that the all registered files comply to registered criteria.
78
if self.is_compliant(p):
81
log('File %s is not in compliance.' % p, level=INFO)
83
if not self.always_comply:
84
log("Non-existent path '%s' - skipping compliance check"
88
if self._take_action():
89
log("Applying compliance criteria to '%s'" % (p), level=INFO)
92
def is_compliant(self, path):
93
"""Audits the path to see if it is compliance.
95
:param path: the path to the file that should be checked.
97
raise NotImplementedError
99
def comply(self, path):
100
"""Enforces the compliance of a path.
102
:param path: the path to the file that should be enforced.
104
raise NotImplementedError
107
def _get_stat(cls, path):
108
"""Returns the Posix st_stat information for the specified file path.
110
:param path: the path to get the st_stat information for.
111
:returns: an st_stat object for the path or None if the path doesn't
117
class FilePermissionAudit(BaseFileAudit):
118
"""Implements an audit for file permissions and ownership for a user.
120
This class implements functionality that ensures that a specific user/group
121
will own the file(s) specified and that the permissions specified are
122
applied properly to the file.
124
def __init__(self, paths, user, group=None, mode=0o600, **kwargs):
128
super(FilePermissionAudit, self).__init__(paths, user, group, mode,
136
def user(self, name):
138
user = pwd.getpwnam(name)
140
log('Unknown user %s' % name, level=ERROR)
149
def group(self, name):
153
group = grp.getgrnam(name)
155
group = grp.getgrgid(self.user.pw_gid)
157
log('Unknown group %s' % name, level=ERROR)
160
def is_compliant(self, path):
161
"""Checks if the path is in compliance.
163
Used to determine if the path specified meets the necessary
164
requirements to be in compliance with the check itself.
166
:param path: the file path to check
167
:returns: True if the path is compliant, False otherwise.
169
stat = self._get_stat(path)
174
if stat.st_uid != user.pw_uid or stat.st_gid != group.gr_gid:
175
log('File %s is not owned by %s:%s.' % (path, user.pw_name,
180
# POSIX refers to the st_mode bits as corresponding to both the
181
# file type and file permission bits, where the least significant 12
182
# bits (o7777) are the suid (11), sgid (10), sticky bits (9), and the
183
# file permission bits (8-0)
184
perms = stat.st_mode & 0o7777
185
if perms != self.mode:
186
log('File %s has incorrect permissions, currently set to %s' %
187
(path, oct(stat.st_mode & 0o7777)), level=INFO)
192
def comply(self, path):
193
"""Issues a chown and chmod to the file paths specified."""
194
utils.ensure_permissions(path, self.user.pw_name, self.group.gr_name,
198
class DirectoryPermissionAudit(FilePermissionAudit):
199
"""Performs a permission check for the specified directory path."""
201
def __init__(self, paths, user, group=None, mode=0o600,
202
recursive=True, **kwargs):
203
super(DirectoryPermissionAudit, self).__init__(paths, user, group,
205
self.recursive = recursive
207
def is_compliant(self, path):
208
"""Checks if the directory is compliant.
210
Used to determine if the path specified and all of its children
211
directories are in compliance with the check itself.
213
:param path: the directory path to check
214
:returns: True if the directory tree is compliant, otherwise False.
216
if not os.path.isdir(path):
217
log('Path specified %s is not a directory.' % path, level=ERROR)
218
raise ValueError("%s is not a directory." % path)
220
if not self.recursive:
221
return super(DirectoryPermissionAudit, self).is_compliant(path)
224
for root, dirs, _ in os.walk(path):
228
if not super(DirectoryPermissionAudit, self).is_compliant(root):
234
def comply(self, path):
235
for root, dirs, _ in os.walk(path):
237
super(DirectoryPermissionAudit, self).comply(root)
240
class ReadOnly(BaseFileAudit):
241
"""Audits that files and folders are read only."""
242
def __init__(self, paths, *args, **kwargs):
243
super(ReadOnly, self).__init__(paths=paths, *args, **kwargs)
245
def is_compliant(self, path):
247
output = check_output(['find', path, '-perm', '-go+w',
248
'-type', 'f']).strip()
250
# The find above will find any files which have permission sets
251
# which allow too broad of write access. As such, the path is
252
# compliant if there is no output.
257
except CalledProcessError as e:
258
log('Error occurred checking finding writable files for %s. '
259
'Error information is: command %s failed with returncode '
260
'%d and output %s.\n%s' % (path, e.cmd, e.returncode, e.output,
261
format_exc(e)), level=ERROR)
264
def comply(self, path):
266
check_output(['chmod', 'go-w', '-R', path])
267
except CalledProcessError as e:
268
log('Error occurred removing writeable permissions for %s. '
269
'Error information is: command %s failed with returncode '
270
'%d and output %s.\n%s' % (path, e.cmd, e.returncode, e.output,
271
format_exc(e)), level=ERROR)
274
class NoReadWriteForOther(BaseFileAudit):
275
"""Ensures that the files found under the base path are readable or
276
writable by anyone other than the owner or the group.
278
def __init__(self, paths):
279
super(NoReadWriteForOther, self).__init__(paths)
281
def is_compliant(self, path):
283
cmd = ['find', path, '-perm', '-o+r', '-type', 'f', '-o',
284
'-perm', '-o+w', '-type', 'f']
285
output = check_output(cmd).strip()
287
# The find above here will find any files which have read or
288
# write permissions for other, meaning there is too broad of access
289
# to read/write the file. As such, the path is compliant if there's
295
except CalledProcessError as e:
296
log('Error occurred while finding files which are readable or '
297
'writable to the world in %s. '
298
'Command output is: %s.' % (path, e.output), level=ERROR)
300
def comply(self, path):
302
check_output(['chmod', '-R', 'o-rw', path])
303
except CalledProcessError as e:
304
log('Error occurred attempting to change modes of files under '
305
'path %s. Output of command is: %s' % (path, e.output))
308
class NoSUIDSGIDAudit(BaseFileAudit):
309
"""Audits that specified files do not have SUID/SGID bits set."""
310
def __init__(self, paths, *args, **kwargs):
311
super(NoSUIDSGIDAudit, self).__init__(paths=paths, *args, **kwargs)
313
def is_compliant(self, path):
314
stat = self._get_stat(path)
315
if (stat.st_mode & (S_ISGID | S_ISUID)) != 0:
320
def comply(self, path):
322
log('Removing suid/sgid from %s.' % path, level=DEBUG)
323
check_output(['chmod', '-s', path])
324
except CalledProcessError as e:
325
log('Error occurred removing suid/sgid from %s.'
326
'Error information is: command %s failed with returncode '
327
'%d and output %s.\n%s' % (path, e.cmd, e.returncode, e.output,
328
format_exc(e)), level=ERROR)
331
class TemplatedFile(BaseFileAudit):
332
"""The TemplatedFileAudit audits the contents of a templated file.
334
This audit renders a file from a template, sets the appropriate file
335
permissions, then generates a hashsum with which to check the content
338
def __init__(self, path, context, template_dir, mode, user='root',
339
group='root', service_actions=None, **kwargs):
340
self.context = context
344
self.template_dir = template_dir
345
self.service_actions = service_actions
346
super(TemplatedFile, self).__init__(paths=path, always_comply=True,
349
def is_compliant(self, path):
350
"""Determines if the templated file is compliant.
352
A templated file is only compliant if it has not changed (as
353
determined by its sha256 hashsum) AND its file permissions are set
356
:param path: the path to check compliance.
358
same_templates = self.templates_match(path)
359
same_content = self.contents_match(path)
360
same_permissions = self.permissions_match(path)
362
if same_content and same_permissions and same_templates:
367
def run_service_actions(self):
368
"""Run any actions on services requested."""
369
if not self.service_actions:
372
for svc_action in self.service_actions:
373
name = svc_action['service']
374
actions = svc_action['actions']
375
log("Running service '%s' actions '%s'" % (name, actions),
377
for action in actions:
378
cmd = ['service', name, action]
381
except CalledProcessError as exc:
382
log("Service name='%s' action='%s' failed - %s" %
383
(name, action, exc), level=WARNING)
385
def comply(self, path):
386
"""Ensures the contents and the permissions of the file.
388
:param path: the path to correct
390
dirname = os.path.dirname(path)
391
if not os.path.exists(dirname):
395
render_and_write(self.template_dir, path, self.context())
396
utils.ensure_permissions(path, self.user, self.group, self.mode)
397
self.run_service_actions()
398
self.save_checksum(path)
402
"""Invoked prior to writing the template."""
405
def post_write(self):
406
"""Invoked after writing the template."""
409
def templates_match(self, path):
410
"""Determines if the template files are the same.
412
The template file equality is determined by the hashsum of the
413
template files themselves. If there is no hashsum, then the content
414
cannot be sure to be the same so treat it as if they changed.
415
Otherwise, return whether or not the hashsums are the same.
417
:param path: the path to check
420
template_path = get_template_path(self.template_dir, path)
421
key = 'hardening:template:%s' % template_path
422
template_checksum = file_hash(template_path)
424
stored_tmplt_checksum = kv.get(key)
425
if not stored_tmplt_checksum:
426
kv.set(key, template_checksum)
428
log('Saved template checksum for %s.' % template_path,
430
# Since we don't have a template checksum, then assume it doesn't
431
# match and return that the template is different.
433
elif stored_tmplt_checksum != template_checksum:
434
kv.set(key, template_checksum)
436
log('Updated template checksum for %s.' % template_path,
440
# Here the template hasn't changed based upon the calculated
441
# checksum of the template and what was previously stored.
444
def contents_match(self, path):
445
"""Determines if the file content is the same.
447
This is determined by comparing hashsum of the file contents and
448
the saved hashsum. If there is no hashsum, then the content cannot
449
be sure to be the same so treat them as if they are not the same.
450
Otherwise, return True if the hashsums are the same, False if they
453
:param path: the file to check.
455
checksum = file_hash(path)
458
stored_checksum = kv.get('hardening:%s' % path)
459
if not stored_checksum:
460
# If the checksum hasn't been generated, return False to ensure
461
# the file is written and the checksum stored.
462
log('Checksum for %s has not been calculated.' % path, level=DEBUG)
464
elif stored_checksum != checksum:
465
log('Checksum mismatch for %s.' % path, level=DEBUG)
470
def permissions_match(self, path):
471
"""Determines if the file owner and permissions match.
473
:param path: the path to check.
475
audit = FilePermissionAudit(path, self.user, self.group, self.mode)
476
return audit.is_compliant(path)
478
def save_checksum(self, path):
479
"""Calculates and saves the checksum for the path specified.
481
:param path: the path of the file to save the checksum.
483
checksum = file_hash(path)
485
kv.set('hardening:%s' % path, checksum)
489
class DeletedFile(BaseFileAudit):
490
"""Audit to ensure that a file is deleted."""
491
def __init__(self, paths):
492
super(DeletedFile, self).__init__(paths)
494
def is_compliant(self, path):
495
return not os.path.exists(path)
497
def comply(self, path):
501
class FileContentAudit(BaseFileAudit):
502
"""Audit the contents of a file."""
503
def __init__(self, paths, cases, **kwargs):
504
# Cases we expect to pass
505
self.pass_cases = cases.get('pass', [])
506
# Cases we expect to fail
507
self.fail_cases = cases.get('fail', [])
508
super(FileContentAudit, self).__init__(paths, **kwargs)
510
def is_compliant(self, path):
512
Given a set of content matching cases i.e. tuple(regex, bool) where
513
bool value denotes whether or not regex is expected to match, check that
514
all cases match as expected with the contents of the file. Cases can be
515
expected to pass of fail.
517
:param path: Path of file to check.
518
:returns: Boolean value representing whether or not all cases are
519
found to be compliant.
521
log("Auditing contents of file '%s'" % (path), level=DEBUG)
522
with open(path, 'r') as fd:
526
for pattern in self.pass_cases:
527
key = re.compile(pattern, flags=re.MULTILINE)
528
results = re.search(key, contents)
532
log("Pattern '%s' was expected to pass but instead it failed"
533
% (pattern), level=WARNING)
535
for pattern in self.fail_cases:
536
key = re.compile(pattern, flags=re.MULTILINE)
537
results = re.search(key, contents)
541
log("Pattern '%s' was expected to fail but instead it passed"
542
% (pattern), level=WARNING)
544
total = len(self.pass_cases) + len(self.fail_cases)
545
log("Checked %s cases and %s passed" % (total, matches), level=DEBUG)
546
return matches == total
548
def comply(self, *args, **kwargs):
549
"""NOOP since we just issue warnings. This is to avoid the
552
log("Not applying any compliance criteria, only checks.", level=INFO)