~savoirfairelinux-openerp/lp-community-utils/checkout-pep8

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
#
#    OpenERP, Open Source Management Solution
#    This module copyright (C) 2013 Therp BV (<http://therp.nl>).
#
#    Authors Holger Brunn, Stefan Rijnhart
#
#    This program is free software: you can redistribute it and/or modify
#    it under the terms of the GNU Affero General Public License as
#    published by the Free Software Foundation, either version 3 of the
#    License, or (at your option) any later version.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU Affero General Public License for more details.
#
#    You should have received a copy of the GNU Affero General Public License
#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
#
import os
import os.path
import shutil
import tempfile
import argparse
import logging
import simplejson
import urllib2
import bzrlib
from bzrlib.branch import Branch
from bzrlib.errors import NotBranchError, ConflictsInTree
from bzrlib.builtins import cmd_branch, cmd_push
from bzrlib.plugin import load_plugins
from bzrlib.plugins.launchpad import lp_api
from bzrlib.plugins.launchpad.account import get_lp_login
from launchpadlib.launchpad import Launchpad
import replay_missing

load_plugins()

# mapping from official projects/series to community series
project_mappings = {
    'openerp-web': {
        '6.1': 'lp:ocb-web/6.1',
        '7.0': 'lp:ocb-web/7.0',
        'trunk': 'lp:ocb-web/7.0',
    },
    'openobject-server': {
        '6.1': 'lp:ocb-server/6.1',
        '7.0': 'lp:ocb-server/7.0',
        'trunk': 'lp:ocb-server/7.0',
    },
    'openobject-addons': {
        '6.1': 'lp:ocb-addons/6.1',
        '7.0': 'lp:ocb-addons/7.0',
        'trunk': 'lp:ocb-addons/7.0',
    },
}

api_base_url = 'https://api.launchpad.net/1.0/'


def get_lp_json(url):
    """
    Load a launchpad API request in JSON format.

    This is currently only tested with merge proposals, as Launchpadlib
    does not facilitate getting a merge proposal by its id or url.

    Note that the web links in the result point to the API base url.
    I.e. the 'target_branch_link' of a merge proposal is represented as
    'https://api.launchpad.net/1.0/~openerp/openobject-server/7.0'.

    @param url: either an API url or a regular web link to a launchpad
    resource.
    @return dictionary with JSON data.
    """
    lp_urls = ['https://code.launchpad.net/']
    for lp_url in lp_urls:
        if url.lower().startswith(lp_url):
            api_url = api_base_url + url[len(lp_url):]
            break
    else:
        if url.lower().startswith(api_base_url):
            api_url = url
        else:
            logging.error('Unknown Launchpad URL')
            return
    request = urllib2.urlopen(api_url)
    return simplejson.load(request)


def get_project_series(lp_url):
    """
    Split off the lp: part (or harmlessly 'htt' in the case of http 
    scheme urls). Return (project, series-or-branch)
    """
    parts = lp_url[3:].split('/')
    return (parts[-2], parts[-1])


def clone_merge_proposals(lp, source_branch_address,
                          target_branch_address=None,
                          reference_branch_address=None,
                          target_copy=None):
    """Get merge proposals for branch and clone then in the appropriate 
    community branch
    
    :param lp: launchpad connection
    :param source_branch_address: the Launchpad location of the
    branch whose merge proposals are to be cloned
    :param target_branch_address: force a branch to propose merge to
    :param reference_branch_address: use this branch to determine
    which revisions of the source branch need to be replayed
    on the target branch
    :param target_copy: use this local branch instead of branching the
    target first.    
    """
    original_target_branch_address = None
    lp_source_branch = lp.branches.getByUrl(url=source_branch_address)

    # Also assign the last of the landing targets to 'mp'
    # even if the target branch is already known
    mp = None
    for mp in lp_source_branch.landing_targets:
        if not target_branch_address:
            logging.debug('Looking at possible target %s',
                          mp.target_branch.web_link)
            project, series = get_project_series(mp.target_branch.web_link)
            if (project in project_mappings and
                    series in project_mappings[project]):
                target_branch_address = project_mappings[project][series]
                original_target_branch_address = mp.target_branch.bzr_identity
                break
            else:
                logging.warning('Found a proposal on an unknown project '
                                '\'%s\' or series \'%s\'', project, series)

    if not mp:  # i.e. there were no landing targets
        logging.error('No merge proposal found for %s',
                      source_branch_address)
        return 1

    if target_branch_address:
        target_branch_dir = tempfile.mkdtemp()
        if target_copy:
            logging.debug('Using %s as a local copy of %s',
                          target_copy, target_branch_address)
            shutil.rmtree(target_branch_dir)
            shutil.copytree(target_copy, target_branch_dir)
            copied_branch = Branch.open(target_branch_dir)
            copied_branch.pull(Branch.open(copied_branch.get_parent()))
        else:
            logging.debug('getting %s to create merge proposal to %s',
                          target_branch_address, target_branch_dir)
            cmd_branch().run(target_branch_address,
                             to_location=target_branch_dir, stacked=True,
                             use_existing_dir=True)

        target_branch = lp_api.LaunchpadBranch.from_bzr(lp,
                                                        Branch.open(target_branch_dir))

        replay_args = [target_branch_dir, source_branch_address]
        if reference_branch_address:
            replay_args += ['-r', reference_branch_address]
        elif original_target_branch_address:
            replay_args += ['-r', original_target_branch_address]
        logging.debug('start replay %s', replay_args)

        try:
            replay_missing.main(replay_args)
        except ConflictsInTree:
            logging.error('There were conflicts. Go to %s, solve the conflicts '
                          'and push manually. You\'ll also have to create the merge '
                          'proposal manually', target_branch_dir)
            return 1

        push_location = 'lp:~%s/%s/%s' % (
            get_lp_login(),
            get_project_series(target_branch_address)[0],
            os.path.basename(mp.source_branch.web_link))
        try:
            Branch.open(push_location)
            logging.error(
                'A branch already exists at %s, aborting', push_location)
            return 1
        except NotBranchError:
            pass

        logging.debug('pushing to %s', push_location)
        push = cmd_push()
        # TODO: because of this, bzr's output doesn't end up in our logger
        push._setup_outf()
        push.run(
            location=push_location, directory=target_branch_dir, stacked=True)

        initial_comment = 'Automatically derived from %s for %s.' % (
            mp.source_branch.web_link, mp.target_branch.web_link)
        if mp.description:
            initial_comment += (' Below is a copy of the original '
                                'description.\n\n%s' % mp.description)
        lp.branches.getByUrl(url=push_location).createMergeProposal(
            commit_message=mp.commit_message,
            initial_comment=initial_comment,
            needs_review=True,
            target_branch=target_branch.lp)

        shutil.rmtree(target_branch_dir)
    else:
        logging.error(
            'No target branch specified and no proposal found for %s '
            'on any mapped project and series', source_branch_address)


def main(argv=None):
    parser = argparse.ArgumentParser()
    parser.add_argument('--mp-branch-dir', dest='mp_branch_dir',
                        help="the local directory of the merge proposal\'s branch")
    parser.add_argument('--mp-url', dest='mp_url',
                        help="the merge proposal's web address on launchpad")
    parser.add_argument('--reference-branch', '-r', dest='reference_branch',
                        help=("Only replay commits that are not present in this branch. This "
                              "would usually be a local copy of the original target branch "
                              "of the merge proposal"))
    parser.add_argument('--target-branch', dest='target_branch',
                        help="override automagicly chosen target branch")
    parser.add_argument('--target-copy', '-c',
                        help="Directory to use as a local copy of the target branch. This "
                        "prevents downloading the branch from Launchpad.")
    parser.add_argument('-d', '--debug', dest='loglevel', action='store_const',
                        const='DEBUG', help='debug output')
    parser.add_argument('-q', '--quiet', dest='loglevel', action='store_const',
                        const='ERROR', help='be quiet')
    arguments = parser.parse_args(argv)

    if arguments.loglevel:
        logging.getLogger().setLevel(getattr(logging, arguments.loglevel))

    lp = Launchpad.login_with('clone_mp_to_community', 'production')

    source_branch_location = None
    original_target_branch_location = None
    if arguments.mp_url:
        mp = get_lp_json(arguments.mp_url)
        if not mp:
            logging.error('No merge proposal found with url %s',
                          arguments.mp_url)
            return 1
        source_branch_link = mp['source_branch_link'].replace(
            api_base_url, 'https://code.launchpad.net/')
        source_branch_location = Branch.open(source_branch_link).base
        original_target_branch_link = mp['target_branch_link'].replace(
            api_base_url, 'https://code.launchpad.net/')
        original_target_branch_location = Branch.open(
            original_target_branch_link).base

    elif arguments.mp_branch_dir and os.path.isdir(arguments.mp_branch_dir):
        logging.debug('got branch directory %s', arguments.mp_branch_dir)
        source_branch_location = lp_api.LaunchpadBranch.candidate_urls(
            Branch.open(arguments.mp_branch_dir)).next()

    else:
        logging.debug('trying current directory')
        try:
            source_branch_location = lp_api.LaunchpadBranch.candidate_urls(
                Branch.open('.')).next()
        except bzrlib.errors.NotBranchError:
            if not source_branch_location:
                logging.error('current directory is not a branch and I got no '
                              'merge proposal - giving up')
                return 1

    try:
        clone_merge_proposals(lp, source_branch_location,
                              target_branch_address=arguments.target_branch,
                              reference_branch_address=(arguments.reference_branch
                                                        or original_target_branch_location),
                              target_copy=arguments.target_copy)

    except bzrlib.errors.NotBranchError as e:
        logging.error('found no branch to work on - branch dir is %s',
                      e.path)
        return 1

if __name__ == "__main__":
    import sys
    logging.basicConfig(
        format='%(levelname)s: %(message)s', level=logging.INFO)
    sys.exit(main(sys.argv[1:]))