1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
|
import os
import sys
from launchpadlib.credentials import RequestTokenAuthorizationEngine
from launchpadlib.errors import HTTPError
from launchpadlib.launchpad import Launchpad
class Authorization(RequestTokenAuthorizationEngine):
def __init__(self, service_root, consumer_name, read_only=True):
if read_only:
levels = ("READ_PRIVATE", "READ_PUBLIC")
else:
levels = None
super(Authorization, self).__init__(service_root,
consumer_name=consumer_name,
allow_access_levels=levels)
self._service_root = service_root
def make_end_user_authorize_token(self, credentials, request_token):
"""Authorize the given request token using the given credentials."""
request_token_info = self.authorization_url(request_token)
print "Please authorize read access to Launchpad: " + request_token_info
while credentials.access_token is None:
print "Press enter once you have granted access...",
sys.stdin.readline()
try:
credentials.exchange_request_token_for_access_token(
web_root=self._service_root)
except HTTPError, error:
if error.response.status == 401:
# The user has not made a decision yet.
pass
elif error.response.status == 403:
# The user decided not to authorize this application.
sys.stderr.write("User declined authorization request.\n")
sys.exit(1)
else:
# There was an error accessing the server.
sys.stderr.write("Unexpected response from Launchpad:\n" + \
str(error) + "\n")
sys.exit(1)
print "Read access to Launchpad was authorized."
def lp_login(lp_instance='production', anon=False):
cachedir = os.path.abspath('lp_data/cache')
creddir = os.getenv("CRED_DIR")
if not creddir:
creddir = os.path.abspath('lp_data/lp_credentials')
if not os.path.isdir(creddir):
os.makedirs(creddir)
cred = os.path.join(creddir, 'sponsoring.credentials')
authorization = Authorization(lp_instance, 'sponsoring')
if anon:
launchpad = Launchpad.login_anonymously(consumer_name='sponsoring',
service_root=lp_instance,
launchpadlib_dir=cachedir,
version='devel')
else:
launchpad = Launchpad.login_with(service_root=lp_instance,
launchpadlib_dir=cachedir,
authorization_engine=authorization,
credentials_file=cred,
version='devel')
return launchpad
def get_packagesets(lp, distribution):
"""
Get list of packagesets for current development release of Ubuntu and
the packages they include.
"""
current_series = distribution.current_series
packagesets = filter(lambda a: (a.distroseries == current_series) or \
(a.distroseries.name == current_series.name) , lp.packagesets)
packages = {}
for packageset in packagesets:
packages[packageset.name] = packageset.getSourcesIncluded()
return packages
def person_can_upload(distribution, person, package_name):
if package_name == 'Ubuntu':
return False
if distribution.main_archive.isSourceUploadAllowed(
distroseries=distribution.current_series, person=person,
sourcepackagename=package_name):
return True
pubs = distribution.main_archive.getPublishedSources(
distro_series=distribution.current_series, exact_match=True,
pocket="Release", status="Published", source_name=package_name)
pubs = [pub for pub in pubs]
if len(pubs) < 1:
return False
component = pubs[0].component_name
for perm in distribution.main_archive.getPermissionsForPerson(person=person):
if perm.permission != 'Archive Upload Rights':
continue
if perm.component_name == component:
return True
if perm.source_package_name == package_name:
return True
return False
|