1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
|
# Copyright 2012-2014 Marco Ceppi, Canonical Ltd. This software is
# licensed under the GNU Affero General Public License version 3 (see
# the file LICENSE).
import argparse
from bzrlib.branch import Branch
from datetime import (
datetime,
)
from email.utils import parseaddr
import os
import logging
from requests.exceptions import HTTPError
import sys
import yaml
from charmworld.charmstore import (
CharmStore,
get_store_charm_info,
LocalCharmStore,
)
from charmworld.jobs.ingest import update_from_store
from charmworld.lp import (
BASKET_SERIES,
get_branch_tips,
)
from charmworld.models import (
getconnection,
getdb,
)
from charmworld.utils import (
configure_logging,
get_ini,
)
from config import (
BASKET_QUEUE,
CHARM_IMPORT_FILTER,
CHARM_QUEUE,
)
from utils import (
get_queue,
lock,
LockHeld,
parse_branch,
)
unspecified = object()
def getLogger():
return logging.getLogger('charm.launchpad')
def local_branch_data(local_repo, branch_data=None):
"""Find branch data for local charms.
The tree rooted at local_repo is searched for Bazaar branches, which must
be charms. Any directory with a '.bzr' subdirectory is assumed to be the
Bazaar branch for a charm and it will be included.
:param local_repo: Path to root directory holding charms.
:param branch_data: Existing branch data to be extended.
:returns: List of charms and bundles, later is guaranteed to be empty.
"""
log = getLogger()
local_repo = os.path.abspath(os.path.expanduser(local_repo))
charms = []
baskets = []
if branch_data is None:
branch_data = []
log = getLogger()
for root, dirs, files in os.walk(local_repo):
if '.bzr' in dirs:
# Remove remaining dirs from examination.
while len(dirs):
dirs.pop()
try:
fullpath = os.path.join(local_repo, root)
series = root.split(os.sep)[-2]
branch = Branch.open(fullpath)
branch_data.append(
(fullpath, branch, series))
except Exception as e:
# Log the exception but keep going.
log.error(e)
for repo, branch, series in branch_data:
metadata_file = os.path.join(repo, 'metadata.yaml')
# If no owner can by gleaned, just use 'local'.
owner = 'local'
try:
with open(metadata_file) as f:
data = yaml.load(f)
maintainers = data.get('maintainer') or data.get('maintainers')
if maintainers:
if isinstance(maintainers, basestring):
maintainer = maintainers
else:
maintainer = maintainers[0]
if maintainer:
owner = parseaddr(maintainer)[1].split('@')[0]
except IOError as e:
msg = 'Error parsing maintainers from {} for {}: {}'.format(
metadata_file, repo, e)
log.warning(msg)
name = repo.split(os.sep)[-1]
log.info('{} owned by {}'.format(repo, owner))
data = {
'bname': u'trunk',
'branch_deleted': False,
'branch_spec': 'local:' + repo,
'commit': branch.last_revision(),
'distro_series': [],
'name': name,
'owner': owner,
'promulgated': True,
'series': series,
}
charms.append(data)
return charms, baskets
def all_branch_data(branch_data=None):
# Return data needed for the ingest job for charm branches on Launchpad.
charms = []
baskets = []
log = getLogger()
if branch_data is None:
# Get all charm and bundle tip branches from Launchpad.
try:
branch_data = get_branch_tips(cache=False)
except HTTPError as e:
msg = 'Error retrieving branch tips: {}'.format(e.message)
log.error(msg)
branch_data = []
for repo, commit, series in branch_data:
_, branch_name = repo.rsplit('/', 1)
try:
data = parse_branch(repo, series, commit)
except ValueError as e:
log.error('Parse error: {}'.format(e.message))
continue
if data['series'] == BASKET_SERIES:
# The branch is a basket of bundles.
data['promulgated'] = data['owner'] == u'charmers'
if data['bname'] == 'bundle':
baskets.append(data)
else:
# The branch is a charm.
data['promulgated'] = data['series'] in data['distro_series']
if data['bname'] != 'trunk':
continue
# The branch is a charm trunk branch. Use it.
data['branch_deleted'] = False
charms.append(data)
return charms, baskets
def db_charms(db):
# Return data needed for the ingest job for charms stored in
# charmworld's database.
for charm in db.charms.find(
fields=('owner', 'series', 'name', 'bname', 'branch_spec')):
# Override the promulgation related settings: Data from this iterator
# is used by all_charms() only when the charm no longer has a related
# Launchpad branch. A charm is promulgated if its LP branch is linked
# to a sourcepackage, hence a charm without a branch cannot be
# promulgated.
charm['promulgated'] = False
charm['distro_series'] = []
charm['branch_deleted'] = True
yield charm
def _filtered_payloads(payloads, import_filter):
if not import_filter:
return iter(payloads)
return (p for p in payloads if p['branch_spec'].startswith(import_filter))
def all_charms(db, available, limit=None, import_filter=None):
# Return a sequence of charms that have a Launchpad branch or that are
# already known in charmworld's database.
log = getLogger()
all_ = dict((charm['branch_spec'], charm) for charm in db_charms(db))
all_.update((charm['branch_spec'], charm) for charm in available)
all_ = sorted(all_.values(), key=lambda charm: charm['branch_spec'])
count = 0
for charm in _filtered_payloads(all_, import_filter):
yield charm
count += 1
if limit and count >= limit:
log.info("Import limit reached (%d)... stopping", limit)
break
def queue_from_branches(db, store, charm_queue, basket_queue, limit=None,
import_filter=unspecified, check_time=None,
local_repo=None):
log = getLogger()
# Get charm and basket data from Launchpad.
if local_repo is None:
charm_data, baskets = all_branch_data()
else:
charm_data, baskets = local_branch_data(local_repo)
store.set_charm_data(charm_data)
if import_filter is unspecified:
import_filter = CHARM_IMPORT_FILTER
# Now get all charms, those just found on Launchpad and those already in
# the database.
if local_repo is None:
charms = list(all_charms(db, charm_data, limit=limit,
import_filter=import_filter))
else:
charms = charm_data
charms_info = get_store_charm_info(store, charms, num_revisions=1)
if check_time is None:
check_time = datetime.utcnow().ctime()
for charm, charm_info in zip(charms, charms_info):
# Process the charms in reverse order to ensure the newest is last and
# therefore the most current in the databases.
newest_revision = charm_info[0][1]['revision']
for address, store_data in reversed(charm_info):
charm_copy = dict(charm)
charm_copy['newest_revision'] = newest_revision
update_from_store(charm_copy, address, store_data, check_time,
log)
added = charm_queue.put(charm_copy)
if added:
log.info('Queueing charm %s (%d)',
charm_copy['branch_spec'],
charm_copy['store_data']['revision'])
filtered_baskets = _filtered_payloads(baskets, import_filter)
for basket in filtered_baskets:
basket['branch_deleted'] = False
added = basket_queue.put(basket)
if added:
log.info('Queueing bundle %s', basket['branch_spec'])
def dequeue():
"""Dequeue all the charm data in the CHARM_QUEUE.
This is a script entry point to dequeue all charms. This might be done when
when a running instance will get a code update that is incompatible with
charm data in the queue.
"""
configure_logging()
log = getLogger()
settings = get_ini()
connection = getconnection(settings)
db = getdb(connection, settings.get('mongo.database'))
lease_time = int(settings['script_lease_time']) * 60
try:
with lock('ingest-queue', lease_time, db, log):
queue = get_queue(db, CHARM_QUEUE)
queue_size = queue.size()
queue.clear()
log.info("Dequeued %s charms", queue_size)
except LockHeld, error:
log.warn(str(error))
def configure_parser(parser):
parser.add_argument(
'--limit',
help='Maximum number of charms to queue.',
type=int)
parser.add_argument(
'--prefix',
help='Only branches matching this prefix are queued.',
type=str, default=unspecified)
parser.add_argument(
'--clear',
help='Clear the queues before beginning.',
action='store_true', dest='clear', default=False)
parser.add_argument(
'--debug',
help='Turn on debugging',
action='store_true', dest='debug',
default=False)
parser.add_argument(
'--local-repo',
help='Path to local repo.',
default=None)
def clear_queues(queues):
"""Clear the queues."""
for queue in queues:
queue.clear()
def run(args):
log = getLogger()
if args.debug:
log.setLevel(logging.DEBUG)
settings = get_ini()
if args.limit is None:
charm_import_limit = settings.get('charm_import_limit')
charm_import_limit = (
int(charm_import_limit) if charm_import_limit else None)
else:
charm_import_limit = args.limit
log.debug('Using import limit {}'.format(charm_import_limit))
connection = getconnection(settings)
db = getdb(connection, settings.get('mongo.database'))
try:
with lock('ingest-queue', int(settings['script_lease_time']) * 60,
db, log):
charm_queue = get_queue(db, CHARM_QUEUE)
basket_queue = get_queue(db, BASKET_QUEUE)
# Release any entries that already exist.
if args.clear:
clear_queues((charm_queue, basket_queue))
if args.local_repo is None:
store = CharmStore()
else:
store = LocalCharmStore()
queue_from_branches(db, store, charm_queue, basket_queue,
charm_import_limit, args.prefix,
local_repo=args.local_repo)
except LockHeld, error:
log.warn(str(error))
def main(arglist=None):
if arglist is None:
arglist = sys.argv[1:]
configure_logging()
parser = argparse.ArgumentParser()
configure_parser(parser)
args = parser.parse_args(arglist)
run(args)
if __name__ == '__main__':
main(sys.argv[1:])
|