~jcsackett/charmworld/bac-tag-constraints

« back to all changes in this revision

Viewing changes to charmworld/jobs/worker.py

  • Committer: Aaron Bentley
  • Date: 2013-02-12 18:57:02 UTC
  • mfrom: (149 charmworld)
  • mto: This revision was merged to the branch mainline in revision 150.
  • Revision ID: aaron@canonical.com-20130212185702-9gnf40ao17a7uw7v
Merged trunk into mongo-urls.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import contextlib
 
2
import json
 
3
import logging
 
4
import os
 
5
import sys
 
6
import urllib
 
7
 
 
8
from bzrlib.errors import InvalidRevisionSpec
 
9
import xappy
 
10
 
 
11
import config
 
12
from config import CHARM_IMPORT_FILTER
 
13
from config import CHARM_IMPORT_LIMIT
 
14
from ingest import check_jenkins
 
15
from ingest import fetch_branch
 
16
from ingest import fetch_changes
 
17
from ingest import index_charm
 
18
from ingest import proof_charm
 
19
from ingest import scan_charm
 
20
from ingest import scan_repo
 
21
from ingest import check_store
 
22
from utils import parse_branch
 
23
 
 
24
 
 
25
BRANCH_TIPS = "https://api.launchpad.net/devel/charm?ws.op=getBranchTips"
 
26
CHARM_DIR = ""
 
27
 
 
28
 
 
29
def fetch_branches(root_dir, queue, scan_queue):
 
30
    log = logging.getLogger("charm.bzr")
 
31
    while 1:
 
32
        item = queue.next()
 
33
        if not item:
 
34
            return
 
35
        try:
 
36
            charm_data = item.payload
 
37
            fetch_branch(root_dir, charm_data)
 
38
        except Exception as e:
 
39
            log.exception("bzr error error on %s: %s", charm_data, str(e))
 
40
            # XXX j.c.sackett Jan 30, 2012 Bug:1110539 The charm here is being
 
41
            # annotated with error data, but since the charm falls out of
 
42
            # scope when this iteration of the loop ends, nothing is being
 
43
            # down with it--it's not stored, or put on the out queue.
 
44
            charm_data["error_stage"] = "bzr"
 
45
            if hasattr(e, 'output'):
 
46
                charm_data["error"] = str(e.output)
 
47
            continue
 
48
        else:
 
49
            scan_queue.put(charm_data)
 
50
        finally:
 
51
            # Remove the job from the input queue.
 
52
            item.complete()
 
53
 
 
54
 
 
55
def scan_changes(in_queue, out_queue):
 
56
    log = logging.getLogger("charm.changelog")
 
57
    while 1:
 
58
        item = in_queue.next()
 
59
        if not item:
 
60
            return
 
61
        try:
 
62
            charm_data = item.payload
 
63
            #log.info("Fetching changes for %s", charm_data["branch_spec"])
 
64
            fetch_changes(charm_data)
 
65
        except InvalidRevisionSpec, e:
 
66
            log.warning("bzr invalid rev on %s %s" % (
 
67
                charm_data['branch_spec'], charm_data['branch_dir']))
 
68
        except Exception, e:
 
69
            log.exception("bzr error on %s: %s", charm_data, str(e))
 
70
        else:
 
71
            out_queue.put(charm_data)
 
72
        finally:
 
73
            # Remove the job from the input queue.
 
74
            item.complete()
 
75
 
 
76
 
 
77
def setup_indexer(indexer):
 
78
    indexer.add_field_action('owner', xappy.FieldActions.INDEX_EXACT)
 
79
    indexer.add_field_action('series', xappy.FieldActions.INDEX_EXACT)
 
80
    indexer.add_field_action('subordinate', xappy.FieldActions.INDEX_EXACT)
 
81
    indexer.add_field_action('provides', xappy.FieldActions.INDEX_EXACT)
 
82
    indexer.add_field_action('requires', xappy.FieldActions.INDEX_EXACT)
 
83
 
 
84
    # Store content
 
85
    indexer.add_field_action('name', xappy.FieldActions.STORE_CONTENT)
 
86
    indexer.add_field_action('store_url', xappy.FieldActions.STORE_CONTENT)
 
87
    indexer.add_field_action('summary', xappy.FieldActions.STORE_CONTENT)
 
88
    indexer.add_field_action('short_url', xappy.FieldActions.STORE_CONTENT)
 
89
    indexer.add_field_action('label', xappy.FieldActions.STORE_CONTENT)
 
90
    indexer.add_field_action('series', xappy.FieldActions.STORE_CONTENT)
 
91
    indexer.add_field_action('owner', xappy.FieldActions.STORE_CONTENT)
 
92
    indexer.add_field_action('subordinate', xappy.FieldActions.STORE_CONTENT)
 
93
 
 
94
    # Full text search fields
 
95
    indexer.add_field_action(
 
96
        'name', xappy.FieldActions.INDEX_FREETEXT, weight=10,
 
97
        language='en')
 
98
 
 
99
    indexer.add_field_action(
 
100
        'summary', xappy.FieldActions.INDEX_FREETEXT, weight=5,
 
101
        language='en')
 
102
 
 
103
    indexer.add_field_action(
 
104
        'description', xappy.FieldActions.INDEX_FREETEXT, weight=3,
 
105
        language='en')
 
106
 
 
107
    indexer.add_field_action(
 
108
        'config', xappy.FieldActions.INDEX_FREETEXT, language='en')
 
109
 
 
110
    indexer.add_field_action(
 
111
        'relations', xappy.FieldActions.INDEX_FREETEXT, language='en')
 
112
 
 
113
    indexer.add_field_action(
 
114
        'changes', xappy.FieldActions.INDEX_FREETEXT, language='en')
 
115
 
 
116
 
 
117
def index_queue(db, indexer, index_queue):
 
118
    log = logging.getLogger("charm.index")
 
119
    count = 0
 
120
    while 1:
 
121
        count += 1
 
122
        item = index_queue.next()
 
123
        if not item:
 
124
            return
 
125
        try:
 
126
            charm_data = item.payload
 
127
            charm = db.find_one({"_id": charm_data["branch_spec"]})
 
128
 
 
129
            log.debug("Indexing charm %s", charm_data["branch_spec"])
 
130
            if charm:
 
131
                index_charm(indexer, charm)
 
132
            else:
 
133
                log.info(
 
134
                    "Skipping unknown charm %s", charm_data["branch_spec"])
 
135
        except KeyboardInterrupt:
 
136
            raise
 
137
        except:
 
138
            log.exception("Error indexing %s", charm_data)
 
139
            continue
 
140
        finally:
 
141
            # Remove the job from the input queue.
 
142
            item.complete()
 
143
    log.info("Indexed %d Charms" % count)
 
144
 
 
145
 
 
146
def reindex(db, indexer):
 
147
    log = logging.getLogger("charm.index")
 
148
    count = 0
 
149
    for charm in db.find():
 
150
        log.debug("Indexing %s", charm['branch_spec'])
 
151
        try:
 
152
            index_charm(indexer, charm)
 
153
        except:
 
154
            log.error("Indexing charm %s", charm['branch_spec'])
 
155
            raise
 
156
        count += 1
 
157
    log.info("Indexed %d Charms" % count)
 
158
 
 
159
 
 
160
def test_charms(db, fs, in_queue, out_queue):
 
161
    log = logging.getLogger("charm.jenkins")
 
162
    while 1:
 
163
        item = in_queue.next()
 
164
        if not item:
 
165
            return
 
166
        try:
 
167
            charm_data = item.payload
 
168
            log.debug("Checking tests for %s", charm_data["branch_spec"])
 
169
            check_jenkins(db, fs, charm_data)
 
170
        except Exception, e:
 
171
            log.exception("jenkins error on %s: %s", charm_data, str(e))
 
172
            continue
 
173
        else:
 
174
            out_queue.put(charm_data)
 
175
        finally:
 
176
            # Remove the job from the input queue.
 
177
            item.complete()
 
178
 
 
179
 
 
180
def available_charms(charm_data=None):
 
181
    log = logging.getLogger("charm.launchpad")
 
182
    if not charm_data:
 
183
        charm_data = urllib.urlopen(BRANCH_TIPS).read()
 
184
        charm_data = json.loads(charm_data)
 
185
 
 
186
    count = 0
 
187
 
 
188
    for repo, commit, series in charm_data:
 
189
        _, branch_name = repo.rsplit("/", 1)
 
190
 
 
191
        try:
 
192
            data = parse_branch(repo, series, commit)
 
193
        except ValueError:
 
194
            log.warning("Unable to parse repo %s", repo)
 
195
            continue
 
196
 
 
197
        if CHARM_IMPORT_FILTER and not data['branch_spec'].startswith(
 
198
                CHARM_IMPORT_FILTER):
 
199
            continue
 
200
 
 
201
        if data["bname"] not in ("trunk", "trunk-1"):
 
202
            log.debug("Skipped branch %s", repo)
 
203
            continue
 
204
 
 
205
        log.info("Queueing %s", data['branch_spec'])
 
206
        yield data
 
207
 
 
208
        count += 1
 
209
 
 
210
        if CHARM_IMPORT_LIMIT and count >= CHARM_IMPORT_LIMIT:
 
211
            log.info("Import limit reached (%d)... stopping",
 
212
                     CHARM_IMPORT_LIMIT)
 
213
            break
 
214
 
 
215
 
 
216
def queue_charms(out_queue):
 
217
    log = logging.getLogger("charm.launchpad")
 
218
    for charm in available_charms():
 
219
        added = out_queue.put(charm)
 
220
        if added and 0:
 
221
            log.info("Queued %s", charm)
 
222
 
 
223
 
 
224
@contextlib.contextmanager
 
225
def get_proof_lib(new_path):
 
226
    if new_path not in sys.path:
 
227
        sys.path.append(new_path)
 
228
    try:
 
229
        import lib.proof as prooflib
 
230
        yield prooflib
 
231
    except ImportError:
 
232
        yield None
 
233
    finally:
 
234
        if new_path in sys.path:
 
235
            sys.path.remove(new_path)
 
236
 
 
237
 
 
238
def proof_charms(in_queue, out_queue):
 
239
    log = logging.getLogger("charm.proof")
 
240
    if not os.path.isdir(config.CHARM_PROOF_PATH):
 
241
        err_msg = ("proof error before processing began: could not find "
 
242
                   "charm proof path.")
 
243
        log.exception(err_msg)
 
244
        log.exception("CHARM_PROOF_PATH: %s", config.CHARM_PROOF_PATH)
 
245
        log.exception("proof aborted.")
 
246
        return
 
247
    with get_proof_lib(config.CHARM_PROOF_PATH) as prooflib:
 
248
        if not prooflib:
 
249
            err_msg = ("proof error before processing began: could not "
 
250
                       "import charm proof lib.")
 
251
            log.exception(err_msg)
 
252
            log.exception("CHARM_PROOF_PATH: %s", config.CHARM_PROOF_PATH)
 
253
            log.exception("proof aborted.")
 
254
            return
 
255
        while 1:
 
256
            item = in_queue.next()
 
257
            if not item:
 
258
                return
 
259
            try:
 
260
                charm_data = item.payload
 
261
                log.info("Proofing charm %s", charm_data["branch_spec"])
 
262
                proof_charm(charm_data, prooflib)
 
263
            except Exception as e:
 
264
                err = "proof error on %s: %s" % (
 
265
                      charm_data['branch_spec'], str(e))
 
266
                charm_data['proof'] = {'e': [err]}
 
267
                log.exception(err)
 
268
            finally:
 
269
                # Remove the job from the input queue.
 
270
                item.complete()
 
271
            out_queue.put(charm_data)
 
272
 
 
273
 
 
274
def scan_charms(db, scan_queue, index_queue):
 
275
    log = logging.getLogger("charm.scan")
 
276
    while 1:
 
277
        item = scan_queue.next()
 
278
        if not item:
 
279
            return
 
280
        try:
 
281
            charm_data = item.payload
 
282
            scan_charm(db, charm_data)
 
283
            # XXX j.c.sackett Jan 31 2013 Bug:1111708 scan_repo is swapped for
 
284
            # scan_charm to reindex what's on disk; we should probably just
 
285
            # have a different job for this that's not part of ingest.
 
286
            #scan_repo(db, CHARM_DIR)
 
287
        except Exception, e:
 
288
            log.exception("scan error %s: %s", charm_data, e)
 
289
            break
 
290
        else:
 
291
            index_queue.put(charm_data)
 
292
 
 
293
 
 
294
def check_charms_store(in_queue, out_queue):
 
295
    log = logging.getLogger("charm.store")
 
296
    while 1:
 
297
        item = in_queue.next()
 
298
        if not item:
 
299
            return
 
300
        try:
 
301
            charm_data = item.payload
 
302
            log.debug("Checking store for %s", charm_data["branch_spec"])
 
303
            check_store(charm_data)
 
304
        except Exception, e:
 
305
            log.exception("store error on %s: %s", charm_data, str(e))
 
306
            continue
 
307
        else:
 
308
            out_queue.put(charm_data)
 
309
        finally:
 
310
            # Remove the job from the input queue.
 
311
            item.complete()