Source code for testing.functional

# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4 -*-
#
# Copyright 2012 Canonical Ltd
#
# This file is part of duplicity.
#
# Duplicity is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# Duplicity is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with duplicity; if not, write to the Free Software Foundation,
# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA

from future_builtins import map

import os
import pexpect
import platform
import sys
import time
import unittest

from duplicity import backend
from .. import DuplicityTestCase


[docs]class CmdError(Exception): """Indicates an error running an external command""" def __init__(self, code): Exception.__init__(self, code) self.exit_status = code
[docs]class FunctionalTestCase(DuplicityTestCase): _setsid_w = None @classmethod def _check_setsid(cls): if cls._setsid_w is not None: return if platform.platform().startswith('Linux'): # setsid behavior differs between distributions. # If setsid supports -w ("wait"), use it. import subprocess try: with open("/dev/null", "w") as sink: subprocess.check_call(["setsid", "-w", "ls"], stdout=sink, stderr=sink) except subprocess.CalledProcessError: cls._setsid_w = False else: cls._setsid_w = True
[docs] def setUp(self): super(FunctionalTestCase, self).setUp() self.unpack_testfiles() self.class_args = [] self.backend_url = "file://testfiles/output" self.last_backup = None self.set_environ('PASSPHRASE', self.sign_passphrase) self.set_environ("SIGN_PASSPHRASE", self.sign_passphrase) backend_inst = backend.get_backend(self.backend_url) bl = backend_inst.list() if bl: backend_inst.delete(backend_inst.list()) backend_inst.close() self._check_setsid()
[docs] def run_duplicity(self, options=[], current_time=None, fail=None, passphrase_input=[]): """ Run duplicity binary with given arguments and options """ # We run under setsid and take input from /dev/null (below) because # this way we force a failure if duplicity tries to read from the # console unexpectedly (like for gpg password or such). if platform.platform().startswith('Linux'): cmd_list = ['setsid'] if self._setsid_w: cmd_list.extend(["-w"]) else: cmd_list = [] cmd_list.extend(["duplicity"]) cmd_list.extend(options) cmd_list.extend(["-v0"]) cmd_list.extend(["--no-print-statistics"]) cmd_list.extend(["--allow-source-mismatch"]) cmd_list.extend(["--archive-dir=testfiles/cache"]) if current_time: cmd_list.extend(["--current-time", current_time]) cmd_list.extend(self.class_args) if fail: cmd_list.extend(["--fail", str(fail)]) cmdline = " ".join(map(lambda x: '"%s"' % x, cmd_list)) if not passphrase_input: cmdline += " < /dev/null" child = pexpect.spawn('/bin/sh', ['-c', cmdline], timeout=None) for passphrase in passphrase_input: child.expect('passphrase.*:') child.sendline(passphrase) # if the command fails, we need to clear its output # so it will terminate cleanly. lines = [] while child.isalive(): lines.append(child.readline()) return_val = child.exitstatus # print >>sys.stderr, "duplicity command:", cmdline # print >>sys.stderr, "current dir:", os.getcwd() # print >>sys.stderr, "....output:" # for line in lines: # print >>sys.stderr, line, # print >>sys.stderr, "....return_val:", return_val if fail: self.assertEqual(30, return_val) elif return_val: raise CmdError(return_val)
[docs] def backup(self, type, input_dir, options=[], **kwargs): """Run duplicity backup to default directory""" options = [type, input_dir, self.backend_url, "--volsize", "1"] + options before_files = self.get_backend_files() # If a chain ends with time X and the next full chain begins at time X, # we may trigger an assert in collections.py. If needed, sleep to # avoid such problems if self.last_backup == int(time.time()): time.sleep(1) result = self.run_duplicity(options=options, **kwargs) self.last_backup = int(time.time()) after_files = self.get_backend_files() return after_files - before_files
[docs] def restore(self, file_to_restore=None, time=None, options=[], **kwargs): assert not os.system("rm -rf testfiles/restore_out") options = [self.backend_url, "testfiles/restore_out"] + options if file_to_restore: options.extend(['--file-to-restore', file_to_restore]) if time: options.extend(['--restore-time', str(time)]) self.run_duplicity(options=options, **kwargs)
[docs] def verify(self, dirname, file_to_verify=None, time=None, options=[], **kwargs): options = ["verify", self.backend_url, dirname] + options if file_to_verify: options.extend(['--file-to-restore', file_to_verify]) if time: options.extend(['--restore-time', str(time)]) self.run_duplicity(options=options, **kwargs)
[docs] def cleanup(self, options=[]): """ Run duplicity cleanup to default directory """ options = ["cleanup", self.backend_url, "--force"] + options self.run_duplicity(options=options)
[docs] def get_backend_files(self): backend_inst = backend.get_backend(self.backend_url) bl = backend_inst.list() backend_inst.close() return set(bl)
[docs] def make_largefiles(self, count=3, size=2): """ Makes a number of large files in testfiles/largefiles that each are the specified number of megabytes. """ assert not os.system("mkdir testfiles/largefiles") for n in range(count): assert not os.system("dd if=/dev/urandom of=testfiles/largefiles/file%d bs=1024 count=%d > /dev/null 2>&1" % (n + 1, size * 1024))