~statik/hydrazine/packaging

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
#! /usr/bin/python

import collections
import os.path
import subprocess
import sys

import launchpadlib
from launchpadlib.credentials import Credentials

from launchpadlib.launchpad import (
    Launchpad, STAGING_SERVICE_ROOT,
    EDGE_SERVICE_ROOT,
    )

import hydrazine

service_root = EDGE_SERVICE_ROOT

project_name = 'bzr'



def run_external(args):
    sys.stderr.write(">> %s\n" % args)
    rc = subprocess.call(args)
    if rc != 0:
        sys.stderr.write("failed %s!" % rc)
        raise AssertionError()


def trace(s):
    sys.stderr.write(s + '\n')


lplib_cachedir = os.path.expanduser("~/.cache/launchpadlib/")
hydrazine_cachedir = os.path.expanduser("~/.cache/hydrazine/")
rrd_dir = os.path.expanduser("~/.cache/hydrazine/rrd")
for d in [lplib_cachedir, hydrazine_cachedir, rrd_dir]:
    if not os.path.isdir(d):
        os.makedirs(d, mode=0700)


def create_session():
    hydrazine_credentials_filename = os.path.join(hydrazine_cachedir,
        'credentials')
    if os.path.exists(hydrazine_credentials_filename):
        credentials = Credentials()
        credentials.load(file(
            os.path.expanduser("~/.cache/hydrazine/credentials"),
            "r"))
        trace('loaded existing credentials')
        return Launchpad(credentials, service_root,
            lplib_cachedir)
        # TODO: handle the case of having credentials that have expired etc
    else:
        launchpad = Launchpad.get_token_and_login(
            'Hydrazine',
            service_root,
            lplib_cachedir)
        trace('saving credentials...')
        launchpad.credentials.save(file(
            hydrazine_credentials_filename,
            "w"))
        return launchpad

def get_project():
    sys.stderr.write('getting project... ')
    project = launchpad.projects[project_name]
    sys.stderr.write('%s (%s)\n' % (project.name, project.title))
    return project


class BugCategory(object):
    """Holds a set of logically-related bugs"""

    def __init__(self):
        self.bugs = set()

    def add(self, bt):
        self.bugs.add(bt)

    def count_bugs(self):
        return len(self.bugs)

    def get_link_url(self):
        return None


class HasPatchBugCategory(BugCategory):

    def get_name(self):
        return 'HasPatch'

    def get_link_url(self):
        return 'https://bugs.edge.launchpad.net/%s/+bugs' \
            '?search=Search&field.has_patch=on' \
            % (project_name)


class StatusBugCategory(BugCategory):

    def __init__(self, status):
        BugCategory.__init__(self)
        self.status = status

    def get_name(self):
        return self.status

    def get_link_url(self):
        return 'https://bugs.edge.launchpad.net/%s/+bugs?search=Search&field.status=%s' \
            % (project_name, self.status)


class CannedQuery(object):

    def __init__(self, project):
        self.project = project

    def _run_query(self, from_collection):
        sys.stderr.write(self.get_name())
        for bt in from_collection:
            yield bt
            sys.stderr.write('.')
        sys.stderr.write('\n')

    def show_text(self):
        # print self.get_name()
        for category in self.query_categories():
            print '%6d %s %s' % (category.count_bugs(),
                category.get_name(),
                category.get_link_url() or '')
        print


class PatchCannedQuery(CannedQuery):

    def get_collection(self):
        return self.project.searchTasks(has_patch=True)

    def get_name(self):
        return 'Bugs with patches'

    def query_categories(self):
        has_patches = HasPatchBugCategory()
        for bt in self._run_query(
            self.project.searchTasks(has_patch=True)):
            has_patches.add(bt)
        return [has_patches]


class StatusCannedQuery(CannedQuery):

    def get_name(self):
        return 'By Status'

    def query_categories(self):
        by_status = {}
        for bugtask in self._run_query(self.project.searchTasks()):
            if bugtask.status not in by_status:
                by_status[bugtask.status] = StatusBugCategory(bugtask.status)
            by_status[bugtask.status].add(bugtask)
        return by_status.values()


def show_bug_report(project):
    for query_class in StatusCannedQuery, PatchCannedQuery:
        query_class(project).show_text()


launchpad = hydrazine.create_session()
project = get_project()
show_bug_report(project)