1
# refs.py -- For dealing with git refs
2
# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@samba.org>
4
# This program is free software; you can redistribute it and/or
5
# modify it under the terms of the GNU General Public License
6
# as published by the Free Software Foundation; version 2
7
# of the License or (at your option) any later version of
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
# GNU General Public License for more details.
15
# You should have received a copy of the GNU General Public License
16
# along with this program; if not, write to the Free Software
17
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
27
from dulwich.errors import (
31
from dulwich.objects import (
34
from dulwich.file import (
43
def check_ref_format(refname):
44
"""Check if a refname is correctly formatted.
46
Implements all the same rules as git-check-ref-format[1].
48
[1] http://www.kernel.org/pub/software/scm/git/docs/git-check-ref-format.html
50
:param refname: The refname to check
51
:return: True if refname is valid, False otherwise
53
# These could be combined into one big expression, but are listed separately
55
if '/.' in refname or refname.startswith('.'):
57
if '/' not in refname:
62
if ord(c) < 040 or c in '\177 ~^:?*[':
64
if refname[-1] in '/.':
66
if refname.endswith('.lock'):
75
class RefsContainer(object):
76
"""A container for refs."""
78
def set_symbolic_ref(self, name, other):
79
"""Make a ref point at another ref.
81
:param name: Name of the ref to set
82
:param other: Name of the ref to point at
84
raise NotImplementedError(self.set_symbolic_ref)
86
def get_packed_refs(self):
87
"""Get contents of the packed-refs file.
89
:return: Dictionary mapping ref names to SHA1s
91
:note: Will return an empty dictionary when no packed-refs file is
94
raise NotImplementedError(self.get_packed_refs)
96
def get_peeled(self, name):
97
"""Return the cached peeled value of a ref, if available.
99
:param name: Name of the ref to peel
100
:return: The peeled value of the ref. If the ref is known not point to a
101
tag, this will be the SHA the ref refers to. If the ref may point to
102
a tag, but no cached information is available, None is returned.
106
def import_refs(self, base, other):
107
for name, value in other.iteritems():
108
self["%s/%s" % (base, name)] = value
111
"""All refs present in this container."""
112
raise NotImplementedError(self.allkeys)
114
def keys(self, base=None):
115
"""Refs present in this container.
117
:param base: An optional base to return refs under.
118
:return: An unsorted set of valid refs in this container, including
122
return self.subkeys(base)
124
return self.allkeys()
126
def subkeys(self, base):
127
"""Refs present in this container under a base.
129
:param base: The base to return refs under.
130
:return: A set of valid refs in this container under the base; the base
131
prefix is stripped from the ref names returned.
134
base_len = len(base) + 1
135
for refname in self.allkeys():
136
if refname.startswith(base):
137
keys.add(refname[base_len:])
140
def as_dict(self, base=None):
141
"""Return the contents of this container as a dictionary.
145
keys = self.keys(base)
150
ret[key] = self[("%s/%s" % (base, key)).strip("/")]
152
continue # Unable to resolve
156
def _check_refname(self, name):
157
"""Ensure a refname is valid and lives in refs or is HEAD.
159
HEAD is not a valid refname according to git-check-ref-format, but this
160
class needs to be able to touch HEAD. Also, check_ref_format expects
161
refnames without the leading 'refs/', but this class requires that
162
so it cannot touch anything outside the refs dir (or HEAD).
164
:param name: The name of the reference.
165
:raises KeyError: if a refname is not HEAD or is otherwise not valid.
167
if name in ('HEAD', 'refs/stash'):
169
if not name.startswith('refs/') or not check_ref_format(name[5:]):
170
raise RefFormatError(name)
172
def read_ref(self, refname):
173
"""Read a reference without following any references.
175
:param refname: The name of the reference
176
:return: The contents of the ref file, or None if it does
179
contents = self.read_loose_ref(refname)
181
contents = self.get_packed_refs().get(refname, None)
184
def read_loose_ref(self, name):
185
"""Read a loose reference and return its contents.
187
:param name: the refname to read
188
:return: The contents of the ref file, or None if it does
191
raise NotImplementedError(self.read_loose_ref)
193
def _follow(self, name):
194
"""Follow a reference name.
196
:return: a tuple of (refname, sha), where refname is the name of the
197
last reference in the symbolic reference chain
199
contents = SYMREF + name
201
while contents.startswith(SYMREF):
202
refname = contents[len(SYMREF):]
203
contents = self.read_ref(refname)
209
return refname, contents
211
def __contains__(self, refname):
212
if self.read_ref(refname):
216
def __getitem__(self, name):
217
"""Get the SHA1 for a reference name.
219
This method follows all symbolic references.
221
_, sha = self._follow(name)
226
def set_if_equals(self, name, old_ref, new_ref):
227
"""Set a refname to new_ref only if it currently equals old_ref.
229
This method follows all symbolic references if applicable for the
230
subclass, and can be used to perform an atomic compare-and-swap
233
:param name: The refname to set.
234
:param old_ref: The old sha the refname must refer to, or None to set
236
:param new_ref: The new sha the refname will refer to.
237
:return: True if the set was successful, False otherwise.
239
raise NotImplementedError(self.set_if_equals)
241
def add_if_new(self, name, ref):
242
"""Add a new reference only if it does not already exist."""
243
raise NotImplementedError(self.add_if_new)
245
def __setitem__(self, name, ref):
246
"""Set a reference name to point to the given SHA1.
248
This method follows all symbolic references if applicable for the
251
:note: This method unconditionally overwrites the contents of a
252
reference. To update atomically only if the reference has not
253
changed, use set_if_equals().
254
:param name: The refname to set.
255
:param ref: The new sha the refname will refer to.
257
self.set_if_equals(name, None, ref)
259
def remove_if_equals(self, name, old_ref):
260
"""Remove a refname only if it currently equals old_ref.
262
This method does not follow symbolic references, even if applicable for
263
the subclass. It can be used to perform an atomic compare-and-delete
266
:param name: The refname to delete.
267
:param old_ref: The old sha the refname must refer to, or None to delete
269
:return: True if the delete was successful, False otherwise.
271
raise NotImplementedError(self.remove_if_equals)
273
def __delitem__(self, name):
276
This method does not follow symbolic references, even if applicable for
279
:note: This method unconditionally deletes the contents of a reference.
280
To delete atomically only if the reference has not changed, use
283
:param name: The refname to delete.
285
self.remove_if_equals(name, None)
288
class DictRefsContainer(RefsContainer):
289
"""RefsContainer backed by a simple dict.
291
This container does not support symbolic or packed references and is not
295
def __init__(self, refs):
300
return self._refs.keys()
302
def read_loose_ref(self, name):
303
return self._refs.get(name, None)
305
def get_packed_refs(self):
308
def set_symbolic_ref(self, name, other):
309
self._refs[name] = SYMREF + other
311
def set_if_equals(self, name, old_ref, new_ref):
312
if old_ref is not None and self._refs.get(name, None) != old_ref:
314
realname, _ = self._follow(name)
315
self._check_refname(realname)
316
self._refs[realname] = new_ref
319
def add_if_new(self, name, ref):
320
if name in self._refs:
322
self._refs[name] = ref
325
def remove_if_equals(self, name, old_ref):
326
if old_ref is not None and self._refs.get(name, None) != old_ref:
331
def get_peeled(self, name):
332
return self._peeled.get(name)
334
def _update(self, refs):
335
"""Update multiple refs; intended only for testing."""
336
# TODO(dborowitz): replace this with a public function that uses
338
self._refs.update(refs)
340
def _update_peeled(self, peeled):
341
"""Update cached peeled refs; intended only for testing."""
342
self._peeled.update(peeled)
345
class InfoRefsContainer(RefsContainer):
346
"""Refs container that reads refs from a info/refs file."""
348
def __init__(self, f):
351
for l in f.readlines():
352
sha, name = l.rstrip("\n").split("\t")
353
if name.endswith("^{}"):
355
if not check_ref_format(name):
356
raise ValueError("invalid ref name '%s'" % name)
357
self._peeled[name] = sha
359
if not check_ref_format(name):
360
raise ValueError("invalid ref name '%s'" % name)
361
self._refs[name] = sha
364
return self._refs.keys()
366
def read_loose_ref(self, name):
367
return self._refs.get(name, None)
369
def get_packed_refs(self):
372
def get_peeled(self, name):
374
return self._peeled[name]
376
return self._refs[name]
379
class DiskRefsContainer(RefsContainer):
380
"""Refs container that reads refs from disk."""
382
def __init__(self, path):
384
self._packed_refs = None
385
self._peeled_refs = None
388
return "%s(%r)" % (self.__class__.__name__, self.path)
390
def subkeys(self, base):
392
path = self.refpath(base)
393
for root, dirs, files in os.walk(path):
394
dir = root[len(path):].strip(os.path.sep).replace(os.path.sep, "/")
395
for filename in files:
396
refname = ("%s/%s" % (dir, filename)).strip("/")
397
# check_ref_format requires at least one /, so we prepend the
398
# base before calling it.
399
if check_ref_format("%s/%s" % (base, refname)):
401
for key in self.get_packed_refs():
402
if key.startswith(base):
403
keys.add(key[len(base):].strip("/"))
408
if os.path.exists(self.refpath("HEAD")):
410
path = self.refpath("")
411
for root, dirs, files in os.walk(self.refpath("refs")):
412
dir = root[len(path):].strip(os.path.sep).replace(os.path.sep, "/")
413
for filename in files:
414
refname = ("%s/%s" % (dir, filename)).strip("/")
415
if check_ref_format(refname):
417
keys.update(self.get_packed_refs())
420
def refpath(self, name):
421
"""Return the disk path of a ref.
424
if os.path.sep != "/":
425
name = name.replace("/", os.path.sep)
426
return os.path.join(self.path, name)
428
def get_packed_refs(self):
429
"""Get contents of the packed-refs file.
431
:return: Dictionary mapping ref names to SHA1s
433
:note: Will return an empty dictionary when no packed-refs file is
436
# TODO: invalidate the cache on repacking
437
if self._packed_refs is None:
438
# set both to empty because we want _peeled_refs to be
439
# None if and only if _packed_refs is also None.
440
self._packed_refs = {}
441
self._peeled_refs = {}
442
path = os.path.join(self.path, 'packed-refs')
444
f = GitFile(path, 'rb')
446
if e.errno == errno.ENOENT:
450
first_line = iter(f).next().rstrip()
451
if (first_line.startswith("# pack-refs") and " peeled" in
453
for sha, name, peeled in read_packed_refs_with_peeled(f):
454
self._packed_refs[name] = sha
456
self._peeled_refs[name] = peeled
459
for sha, name in read_packed_refs(f):
460
self._packed_refs[name] = sha
463
return self._packed_refs
465
def get_peeled(self, name):
466
"""Return the cached peeled value of a ref, if available.
468
:param name: Name of the ref to peel
469
:return: The peeled value of the ref. If the ref is known not point to a
470
tag, this will be the SHA the ref refers to. If the ref may point to
471
a tag, but no cached information is available, None is returned.
473
self.get_packed_refs()
474
if self._peeled_refs is None or name not in self._packed_refs:
475
# No cache: no peeled refs were read, or this ref is loose
477
if name in self._peeled_refs:
478
return self._peeled_refs[name]
483
def read_loose_ref(self, name):
484
"""Read a reference file and return its contents.
486
If the reference file a symbolic reference, only read the first line of
487
the file. Otherwise, only read the first 40 bytes.
489
:param name: the refname to read, relative to refpath
490
:return: The contents of the ref file, or None if the file does not
492
:raises IOError: if any other error occurs
494
filename = self.refpath(name)
496
f = GitFile(filename, 'rb')
498
header = f.read(len(SYMREF))
500
# Read only the first line
501
return header + iter(f).next().rstrip("\r\n")
503
# Read only the first 40 bytes
504
return header + f.read(40 - len(SYMREF))
508
if e.errno == errno.ENOENT:
512
def _remove_packed_ref(self, name):
513
if self._packed_refs is None:
515
filename = os.path.join(self.path, 'packed-refs')
516
# reread cached refs from disk, while holding the lock
517
f = GitFile(filename, 'wb')
519
self._packed_refs = None
520
self.get_packed_refs()
522
if name not in self._packed_refs:
525
del self._packed_refs[name]
526
if name in self._peeled_refs:
527
del self._peeled_refs[name]
528
write_packed_refs(f, self._packed_refs, self._peeled_refs)
533
def set_symbolic_ref(self, name, other):
534
"""Make a ref point at another ref.
536
:param name: Name of the ref to set
537
:param other: Name of the ref to point at
539
self._check_refname(name)
540
self._check_refname(other)
541
filename = self.refpath(name)
543
f = GitFile(filename, 'wb')
545
f.write(SYMREF + other + '\n')
546
except (IOError, OSError):
552
def set_if_equals(self, name, old_ref, new_ref):
553
"""Set a refname to new_ref only if it currently equals old_ref.
555
This method follows all symbolic references, and can be used to perform
556
an atomic compare-and-swap operation.
558
:param name: The refname to set.
559
:param old_ref: The old sha the refname must refer to, or None to set
561
:param new_ref: The new sha the refname will refer to.
562
:return: True if the set was successful, False otherwise.
564
self._check_refname(name)
566
realname, _ = self._follow(name)
569
filename = self.refpath(realname)
570
ensure_dir_exists(os.path.dirname(filename))
571
f = GitFile(filename, 'wb')
573
if old_ref is not None:
575
# read again while holding the lock
576
orig_ref = self.read_loose_ref(realname)
578
orig_ref = self.get_packed_refs().get(realname, None)
579
if orig_ref != old_ref:
582
except (OSError, IOError):
586
f.write(new_ref + "\n")
587
except (OSError, IOError):
594
def add_if_new(self, name, ref):
595
"""Add a new reference only if it does not already exist.
597
This method follows symrefs, and only ensures that the last ref in the
598
chain does not exist.
600
:param name: The refname to set.
601
:param ref: The new sha the refname will refer to.
602
:return: True if the add was successful, False otherwise.
605
realname, contents = self._follow(name)
606
if contents is not None:
610
self._check_refname(realname)
611
filename = self.refpath(realname)
612
ensure_dir_exists(os.path.dirname(filename))
613
f = GitFile(filename, 'wb')
615
if os.path.exists(filename) or name in self.get_packed_refs():
620
except (OSError, IOError):
627
def remove_if_equals(self, name, old_ref):
628
"""Remove a refname only if it currently equals old_ref.
630
This method does not follow symbolic references. It can be used to
631
perform an atomic compare-and-delete operation.
633
:param name: The refname to delete.
634
:param old_ref: The old sha the refname must refer to, or None to delete
636
:return: True if the delete was successful, False otherwise.
638
self._check_refname(name)
639
filename = self.refpath(name)
640
ensure_dir_exists(os.path.dirname(filename))
641
f = GitFile(filename, 'wb')
643
if old_ref is not None:
644
orig_ref = self.read_loose_ref(name)
646
orig_ref = self.get_packed_refs().get(name, None)
647
if orig_ref != old_ref:
653
if e.errno != errno.ENOENT:
655
self._remove_packed_ref(name)
657
# never write, we just wanted the lock
662
def _split_ref_line(line):
663
"""Split a single ref line into a tuple of SHA1 and name."""
664
fields = line.rstrip("\n").split(" ")
666
raise PackedRefsException("invalid ref line '%s'" % line)
670
except (AssertionError, TypeError), e:
671
raise PackedRefsException(e)
672
if not check_ref_format(name):
673
raise PackedRefsException("invalid ref name '%s'" % name)
677
def read_packed_refs(f):
678
"""Read a packed refs file.
680
:param f: file-like object to read from
681
:return: Iterator over tuples with SHA1s and ref names.
688
raise PackedRefsException(
689
"found peeled ref in packed-refs without peeled")
690
yield _split_ref_line(l)
693
def read_packed_refs_with_peeled(f):
694
"""Read a packed refs file including peeled refs.
696
Assumes the "# pack-refs with: peeled" line was already read. Yields tuples
697
with ref names, SHA1s, and peeled SHA1s (or None).
699
:param f: file-like object to read from, seek'ed to the second line
708
raise PackedRefsException("unexpected peeled ref line")
711
except (AssertionError, TypeError), e:
712
raise PackedRefsException(e)
713
sha, name = _split_ref_line(last)
715
yield (sha, name, l[1:])
718
sha, name = _split_ref_line(last)
719
yield (sha, name, None)
722
sha, name = _split_ref_line(last)
723
yield (sha, name, None)
726
def write_packed_refs(f, packed_refs, peeled_refs=None):
727
"""Write a packed refs file.
729
:param f: empty file-like object to write to
730
:param packed_refs: dict of refname to sha of packed refs to write
731
:param peeled_refs: dict of refname to peeled value of sha
733
if peeled_refs is None:
736
f.write('# pack-refs with: peeled\n')
737
for refname in sorted(packed_refs.iterkeys()):
738
f.write('%s %s\n' % (packed_refs[refname], refname))
739
if refname in peeled_refs:
740
f.write('^%s\n' % peeled_refs[refname])
743
def read_info_refs(f):
745
for l in f.readlines():
746
(sha, name) = l.rstrip("\r\n").split("\t", 1)
751
def write_info_refs(refs, store):
752
"""Generate info refs."""
753
for name, sha in sorted(refs.items()):
754
# get_refs() includes HEAD as a special case, but we don't want to
762
peeled = store.peel_sha(sha)
763
yield '%s\t%s\n' % (o.id, name)
764
if o.id != peeled.id:
765
yield '%s\t%s^{}\n' % (peeled.id, name)