~ubuntu-dev/ubuntu-sponsoring/trunk

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import os
import sys

from launchpadlib.credentials import RequestTokenAuthorizationEngine
from launchpadlib.errors import HTTPError
from launchpadlib.launchpad import Launchpad

class Authorization(RequestTokenAuthorizationEngine):
    def __init__(self, service_root, consumer_name, read_only=True):
        if read_only:
            levels = ("READ_PRIVATE", "READ_PUBLIC")
        else:
            levels = None
        super(Authorization, self).__init__(service_root,
                                            consumer_name=consumer_name,
                                            allow_access_levels=levels)
        self._service_root = service_root

    def make_end_user_authorize_token(self, credentials, request_token):
        """Authorize the given request token using the given credentials."""

        request_token_info = self.authorization_url(request_token)
        print "Please authorize read access to Launchpad: " + request_token_info

        while credentials.access_token is None:
            print "Press enter once you have granted access...",
            sys.stdin.readline()
            try:
                credentials.exchange_request_token_for_access_token(
                    web_root=self._service_root)
            except HTTPError, error:
                if error.response.status == 401:
                    # The user has not made a decision yet.
                    pass
                elif error.response.status == 403:
                    # The user decided not to authorize this application.
                    sys.stderr.write("User declined authorization request.\n")
                    sys.exit(1)
                else:
                    # There was an error accessing the server.
                    sys.stderr.write("Unexpected response from Launchpad:\n" + \
                                     str(error) + "\n")
                    sys.exit(1)
        print "Read access to Launchpad was authorized."


def lp_login(lp_instance='production', anon=False):
    cachedir = os.path.abspath('lp_data/cache')

    creddir = os.getenv("CRED_DIR")
    if not creddir:
        creddir = os.path.abspath('lp_data/lp_credentials')
    if not os.path.isdir(creddir):
        os.makedirs(creddir)
    cred = os.path.join(creddir, 'sponsoring.credentials')

    authorization = Authorization(lp_instance, 'sponsoring')
    if anon:
        launchpad = Launchpad.login_anonymously(consumer_name='sponsoring',
                                         service_root=lp_instance,
                                         launchpadlib_dir=cachedir,
                                         version='devel')
    else:
        launchpad = Launchpad.login_with(service_root=lp_instance,
                                         launchpadlib_dir=cachedir,
                                         authorization_engine=authorization,
                                         credentials_file=cred,
                                         version='devel')

    return launchpad


def get_packagesets(lp, distribution):
    """
    Get list of packagesets for current development release of Ubuntu and 
    the packages they include.
    """
    current_series = distribution.current_series
    packagesets = filter(lambda a: (a.distroseries == current_series) or \
                                   (a.distroseries.name == current_series.name) , lp.packagesets)
    packages = {}
    for packageset in packagesets:
        packages[packageset.name] = packageset.getSourcesIncluded()
    return packages

def person_can_upload(distribution, person, package_name):
    if package_name == 'Ubuntu':
        return False
    if distribution.main_archive.isSourceUploadAllowed(
            distroseries=distribution.current_series, person=person,
            sourcepackagename=package_name):
        return True
    pubs = distribution.main_archive.getPublishedSources(
            distro_series=distribution.current_series, exact_match=True,
            pocket="Release", status="Published", source_name=package_name)
    pubs = [pub for pub in pubs]
    if len(pubs) < 1:
        return False
    component = pubs[0].component_name
    for perm in distribution.main_archive.getPermissionsForPerson(person=person):
        if perm.permission != 'Archive Upload Rights':
            continue
        if perm.component_name == component:
            return True
        if perm.source_package_name == package_name:
            return True
    return False