~fginther/jenkins-launchpad-plugin/indirect-team-membership

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import os
import hashlib
from settings import LAUNCHPADLOCKS_DIR, logger


class MergeProposalReview():
    _mp = None

    def __init__(self, mp):
        self._mp = mp

    @property
    def mp(self):
        """Merge proposal."""

        return self._mp

    def _get_hash(self):
        return hashlib.sha224(self.mp.web_link).hexdigest()

    def get_lockfile(self):
        return LAUNCHPADLOCKS_DIR + '/' + self._get_hash()

    def lock(self, job_name):
        if not os.path.exists(LAUNCHPADLOCKS_DIR):
            os.makedirs(LAUNCHPADLOCKS_DIR)
        lockfile = open(self.get_lockfile(), 'w')
        lock_info = "{jenkins_job}\n{merge_proposal}\n"
        lock_info = lock_info.format(jenkins_job=job_name,
                                     merge_proposal=self.mp.web_link)
        lockfile.write(lock_info)
        lockfile.close()

    def unlock(self):
        try:
            os.remove(self.get_lockfile())
            return True
        except OSError as error:
            if error.errno == 2:
                logger.debug("mp lock doesn't exists. Ignoring this unlock.")
            return False

    def get_jenkins_job(self):
        if self.is_locked():
            lockfile = open(self.get_lockfile(), 'r')
            job_name = lockfile.readline().rstrip()
            return job_name
        else:
            return None

    def is_locked(self):
        return os.path.exists(self.get_lockfile())