~gagern/bzr-svn/bug242321

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
# Copyright (C) 2005-2007 Jelmer Vernooij <jelmer@samba.org>

# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
"""Handles branch-specific operations."""

from bzrlib import ui
from bzrlib.branch import Branch, BranchFormat, BranchCheckResult, PullResult
from bzrlib.bzrdir import BzrDir
from bzrlib.errors import (NoSuchFile, DivergedBranches, NoSuchRevision, 
                           NotBranchError)
from bzrlib.inventory import (Inventory)
from bzrlib.revision import ensure_null, NULL_REVISION
from bzrlib.workingtree import WorkingTree

import svn.client, svn.core
from svn.core import SubversionException, Pool

from commit import push
from config import BranchConfig
from errors import NotSvnBranchPath
from format import get_rich_root_format
from repository import SvnRepository
from transport import bzr_to_svn_url, create_svn_client


class FakeControlFiles(object):
    """Dummy implementation of ControlFiles.
    
    This is required as some code relies on controlfiles being 
    available."""
    def get_utf8(self, name):
        raise NoSuchFile(name)

    def get(self, name):
        raise NoSuchFile(name)

    def break_lock(self):
        pass


class SvnBranch(Branch):
    """Maps to a Branch in a Subversion repository """
    def __init__(self, base, repository, branch_path):
        """Instantiate a new SvnBranch.

        :param repos: SvnRepository this branch is part of.
        :param branch_path: Relative path inside the repository this
            branch is located at.
        :param revnum: Subversion revision number of the branch to 
            look at; none for latest.
        """
        super(SvnBranch, self).__init__()
        self.repository = repository
        assert isinstance(self.repository, SvnRepository)
        self.control_files = FakeControlFiles()
        self.base = base.rstrip("/")
        self._format = SvnBranchFormat()
        self._lock_mode = None
        self._lock_count = 0
        self._cached_revnum = None
        self.mapping = self.repository.get_mapping()
        self._branch_path = branch_path.strip("/")
        assert isinstance(self._branch_path, str)
        try:
            revnum = self.get_revnum()
            if revnum is None:
                raise NotBranchError(self.base)
            if self.repository.transport.check_path(branch_path.strip("/"), 
                revnum) != svn.core.svn_node_dir:
                raise NotBranchError(self.base)
        except SubversionException, (_, num):
            if num == svn.core.SVN_ERR_FS_NO_SUCH_REVISION:
                raise NotBranchError(self.base)
            raise
        if not self.mapping.is_branch(branch_path):
            raise NotSvnBranchPath(branch_path, scheme=self.mapping.scheme)

    def set_branch_path(self, branch_path):
        """Change the branch path for this branch.

        :param branch_path: New branch path.
        """
        self._branch_path = branch_path.strip("/")

    def _get_append_revisions_only(self):
        value = self.get_config().get_user_option('append_revisions_only')
        return value == 'True'

    def unprefix(self, relpath):
        """Remove the branch path from a relpath.

        :param relpath: path from the repository root.
        """
        assert relpath.startswith(self.get_branch_path())
        return relpath[len(self.get_branch_path()):].strip("/")

    def get_branch_path(self, revnum=None):
        """Find the branch path of this branch in the specified revnum.

        :param revnum: Revnum to look for.
        """
        if revnum is None:
            return self._branch_path

        # TODO: Use revnum - this branch may have been moved in the past 
        return self._branch_path

    def get_revnum(self):
        """Obtain the Subversion revision number this branch was 
        last changed in.

        :return: Revision number
        """
        if self._lock_mode == 'r' and self._cached_revnum:
            return self._cached_revnum
        latest_revnum = self.repository.transport.get_latest_revnum()
        self._cached_revnum = self.repository._log.find_latest_change(self.get_branch_path(), latest_revnum, include_children=True)
        return self._cached_revnum

    def check(self):
        """See Branch.Check.

        Doesn't do anything for Subversion repositories at the moment (yet).
        """
        return BranchCheckResult(self)

    def _create_heavyweight_checkout(self, to_location, revision_id=None, hardlink=False):
        """Create a new heavyweight checkout of this branch.

        :param to_location: URL of location to create the new checkout in.
        :param revision_id: Revision that should be the tip of the checkout.
        :param hardlink: Whether to hardlink
        :return: WorkingTree object of checkout.
        """
        checkout_branch = BzrDir.create_branch_convenience(
            to_location, force_new_tree=False, format=get_rich_root_format())
        checkout = checkout_branch.bzrdir
        checkout_branch.bind(self)
        # pull up to the specified revision_id to set the initial 
        # branch tip correctly, and seed it with history.
        checkout_branch.pull(self, stop_revision=revision_id)
        return checkout.create_workingtree(revision_id, hardlink=hardlink)

    def lookup_revision_id(self, revid):
        """Look up the matching Subversion revision number on the mainline of 
        the branch.

        :param revid: Revision id to look up.
        :return: Revision number on the branch. 
        :raises NoSuchRevision: If the revision id was not found.
        """
        (bp, revnum, mapping) = self.repository.lookup_revision_id(revid, 
                                                             scheme=self.mapping.scheme)
        assert bp.strip("/") == self.get_branch_path(revnum).strip("/"), \
                "Got %r, expected %r" % (bp, self.get_branch_path(revnum))
        return revnum

    def _create_lightweight_checkout(self, to_location, revision_id=None):
        """Create a new lightweight checkout of this branch.

        :param to_location: URL of location to create the checkout in.
        :param revision_id: Tip of the checkout.
        :return: WorkingTree object of the checkout.
        """
        peg_rev = svn.core.svn_opt_revision_t()
        peg_rev.kind = svn.core.svn_opt_revision_head

        rev = svn.core.svn_opt_revision_t()
        if revision_id is None:
            rev.kind = svn.core.svn_opt_revision_head
        else:
            revnum = self.lookup_revision_id(revision_id)
            rev.kind = svn.core.svn_opt_revision_number
            rev.value.number = revnum

        client_ctx = create_svn_client(Pool())
        svn.client.checkout(bzr_to_svn_url(self.base), to_location, rev, 
                            True, client_ctx)

        return WorkingTree.open(to_location)

    def create_checkout(self, to_location, revision_id=None, lightweight=False,
                        accelerator_tree=None, hardlink=False):
        """See Branch.create_checkout()."""
        if lightweight:
            return self._create_lightweight_checkout(to_location, revision_id)
        else:
            return self._create_heavyweight_checkout(to_location, revision_id, hardlink=hardlink)

    def generate_revision_id(self, revnum):
        """Generate a new revision id for a revision on this branch."""
        assert isinstance(revnum, int)
        return self.repository.generate_revision_id(
                revnum, self.get_branch_path(revnum), self.mapping)

    def get_config(self):
        return BranchConfig(self)
       
    def _get_nick(self):
        """Find the nick name for this branch.

        :return: Branch nick
        """
        bp = self._branch_path.strip("/")
        if self._branch_path == "":
            return None
        return bp

    nick = property(_get_nick)

    def set_revision_history(self, rev_history):
        """See Branch.set_revision_history()."""
        raise NotImplementedError(self.set_revision_history)

    def set_last_revision_info(self, revno, revid):
        """See Branch.set_last_revision_info()."""

    def last_revision_info(self):
        """See Branch.last_revision_info()."""
        last_revid = self.last_revision()
        return self.revision_id_to_revno(last_revid), last_revid

    def get_root_id(self, revnum=None):
        if revnum is None:
            tree = self.basis_tree()
        else:
            tree = self.repository.revision_tree(self.get_rev_id(revnum))
        return tree.get_root_id()

    def set_push_location(self, location):
        """See Branch.set_push_location()."""
        raise NotImplementedError(self.set_push_location)

    def get_push_location(self):
        """See Branch.get_push_location()."""
        # get_push_location not supported on Subversion
        return None

    def _gen_revision_history(self):
        """Generate the revision history from last revision
        """
        history = list(self.repository.iter_reverse_revision_history(
            self.last_revision()))
        history.reverse()
        return history

    def last_revision(self):
        """See Branch.last_revision()."""
        # Shortcut for finding the tip. This avoids expensive generation time
        # on large branches.
        last_revnum = self.get_revnum()
        return self.repository.generate_revision_id(last_revnum, self.get_branch_path(last_revnum), self.mapping)

    def pull(self, source, overwrite=False, stop_revision=None, 
             _hook_master=None, run_hooks=True):
        """See Branch.pull()."""
        result = PullResult()
        result.source_branch = source
        result.master_branch = None
        result.target_branch = self
        source.lock_read()
        try:
            (result.old_revno, result.old_revid) = self.last_revision_info()
            try:
                self.update_revisions(source, stop_revision)
            except DivergedBranches:
                if overwrite:
                    raise NotImplementedError('overwrite not supported for '
                                              'Subversion branches')
                raise
            (result.new_revno, result.new_revid) = self.last_revision_info()
            return result
        finally:
            source.unlock()

    def generate_revision_history(self, revision_id, last_rev=None, 
        other_branch=None):
        """Create a new revision history that will finish with revision_id.
        
        :param revision_id: the new tip to use.
        :param last_rev: The previous last_revision. If not None, then this
            must be a ancestory of revision_id, or DivergedBranches is raised.
        :param other_branch: The other branch that DivergedBranches should
            raise with respect to.
        """
        # stop_revision must be a descendant of last_revision
        # make a new revision history from the graph

    def _synchronize_history(self, destination, revision_id):
        """Synchronize last revision and revision history between branches.

        This version is most efficient when the destination is also a
        BzrBranch6, but works for BzrBranch5, as long as the destination's
        repository contains all the lefthand ancestors of the intended
        last_revision.  If not, set_last_revision_info will fail.

        :param destination: The branch to copy the history into
        :param revision_id: The revision-id to truncate history at.  May
          be None to copy complete history.
        """
        if revision_id is None:
            revno, revision_id = self.last_revision_info()
        else:
            revno = self.revision_id_to_revno(revision_id)
        destination.set_last_revision_info(revno, revision_id)

    def update_revisions(self, other, stop_revision=None):
        """See Branch.update_revisions()."""
        if stop_revision is None:
            stop_revision = ensure_null(other.last_revision())
        if (self.last_revision() == stop_revision or
            self.last_revision() == other.last_revision()):
            return
        if not other.repository.get_graph().is_ancestor(self.last_revision(), 
                                                        stop_revision):
            if self.repository.get_graph().is_ancestor(stop_revision, 
                                                       self.last_revision()):
                return
            raise DivergedBranches(self, other)
        todo = self.repository.lhs_missing_revisions(other.revision_history(), 
                                                     stop_revision)
        pb = ui.ui_factory.nested_progress_bar()
        try:
            for revid in todo:
                pb.update("pushing revisions", todo.index(revid), 
                          len(todo))
                push(self, other, revid)
                self._clear_cached_state()
        finally:
            pb.finished()

    def lock_write(self):
        """See Branch.lock_write()."""
        # TODO: Obtain lock on the remote server?
        if self._lock_mode:
            assert self._lock_mode == 'w'
            self._lock_count += 1
        else:
            self._lock_mode = 'w'
            self._lock_count = 1
        
    def lock_read(self):
        """See Branch.lock_read()."""
        if self._lock_mode:
            assert self._lock_mode in ('r', 'w')
            self._lock_count += 1
        else:
            self._lock_mode = 'r'
            self._lock_count = 1

    def unlock(self):
        """See Branch.unlock()."""
        self._lock_count -= 1
        if self._lock_count == 0:
            self._lock_mode = None
            self._cached_revnum = None
            self._clear_cached_state()

    def get_parent(self):
        """See Branch.get_parent()."""
        return None

    def set_parent(self, url):
        """See Branch.set_parent()."""

    def append_revision(self, *revision_ids):
        """See Branch.append_revision()."""
        self._clear_cached_state()
        #raise NotImplementedError(self.append_revision)
        #FIXME: Make sure the appended revision is already 
        # part of the revision history

    def get_physical_lock_status(self):
        """See Branch.get_physical_lock_status()."""
        return False

    def sprout(self, to_bzrdir, revision_id=None):
        """See Branch.sprout()."""
        result = to_bzrdir.create_branch()
        self.copy_content_into(result, revision_id=revision_id)
        return result

    def __str__(self):
        return '%s(%r)' % (self.__class__.__name__, self.base)

    __repr__ = __str__


class SvnBranchFormat(BranchFormat):
    """Branch format for Subversion Branches."""
    def __init__(self):
        BranchFormat.__init__(self)

    def __get_matchingbzrdir(self):
        """See BranchFormat.__get_matchingbzrdir()."""
        from remote import SvnRemoteFormat
        return SvnRemoteFormat()

    _matchingbzrdir = property(__get_matchingbzrdir)

    def get_format_description(self):
        """See BranchFormat.get_format_description."""
        return 'Subversion Smart Server'

    def get_format_string(self):
        """See BranchFormat.get_format_string()."""
        return 'Subversion Smart Server'

    def initialize(self, to_bzrdir):
        """See BranchFormat.initialize()."""
        raise NotImplementedError(self.initialize)