1
"""Handles all VCS (version control) support"""
8
from pip.exceptions import BadCommand
9
from pip.log import logger
10
from pip.util import display_path, backup_dir, find_command, ask
13
__all__ = ['vcs', 'get_src_requirement', 'import_vcs_support']
16
class VcsSupport(object):
18
schemes = ['ssh', 'git', 'hg', 'bzr', 'sftp']
21
# Register more schemes with urlparse for various version control systems
22
urlparse.uses_netloc.extend(self.schemes)
23
urlparse.uses_fragment.extend(self.schemes)
24
super(VcsSupport, self).__init__()
27
return self._registry.__iter__()
31
return self._registry.values()
35
return [backend.dirname for backend in self.backends]
38
def all_schemes(self):
40
for backend in self.backends:
41
schemes.extend(backend.schemes)
44
def register(self, cls):
45
if not hasattr(cls, 'name'):
46
logger.warn('Cannot register VCS %s' % cls.__name__)
48
if cls.name not in self._registry:
49
self._registry[cls.name] = cls
51
def unregister(self, cls=None, name=None):
52
if name in self._registry:
53
del self._registry[name]
54
elif cls in self._registry.values():
55
del self._registry[cls.name]
57
logger.warn('Cannot unregister because no class or name given')
59
def get_backend_name(self, location):
61
Return the name of the version control backend if found at given
62
location, e.g. vcs.get_backend_name('/path/to/vcs/checkout')
64
for vc_type in self._registry.values():
65
path = os.path.join(location, vc_type.dirname)
66
if os.path.exists(path):
70
def get_backend(self, name):
72
if name in self._registry:
73
return self._registry[name]
75
def get_backend_from_location(self, location):
76
vc_type = self.get_backend_name(location)
78
return self.get_backend(vc_type)
85
class VersionControl(object):
89
def __init__(self, url=None, *args, **kwargs):
92
super(VersionControl, self).__init__(*args, **kwargs)
94
def _filter(self, line):
95
return (logger.INFO, line)
97
def _is_local_repository(self, repo):
99
posix absolute paths start with os.path.sep,
100
win32 ones ones start with drive (like c:\\folder)
102
drive, tail = os.path.splitdrive(repo)
103
return repo.startswith(os.path.sep) or drive
107
if self._cmd is not None:
109
command = find_command(self.name)
111
raise BadCommand('Cannot find command %r' % self.name)
112
logger.info('Found command %r at %r' % (self.name, command))
116
def get_url_rev(self):
118
Returns the correct repository URL and revision by parsing the given
121
url = self.url.split('+', 1)[1]
122
scheme, netloc, path, query, frag = urlparse.urlsplit(url)
125
path, rev = path.rsplit('@', 1)
126
url = urlparse.urlunsplit((scheme, netloc, path, query, ''))
129
def get_info(self, location):
131
Returns (url, revision), where both are strings
133
assert not location.rstrip('/').endswith(self.dirname), 'Bad directory: %s' % location
134
return self.get_url(location), self.get_revision(location)
136
def normalize_url(self, url):
138
Normalize a URL for comparison by unquoting it and removing any trailing slash.
140
return urllib.unquote(url).rstrip('/')
142
def compare_urls(self, url1, url2):
144
Compare two repo URLs for identity, ignoring incidental differences.
146
return (self.normalize_url(url1) == self.normalize_url(url2))
148
def parse_vcs_bundle_file(self, content):
150
Takes the contents of the bundled text file that explains how to revert
151
the stripped off version control data of the given package and returns
152
the URL and revision of it.
154
raise NotImplementedError
156
def obtain(self, dest):
158
Called when installing or updating an editable package, takes the
159
source path of the checkout.
161
raise NotImplementedError
163
def switch(self, dest, url, rev_options):
165
Switch the repo at ``dest`` to point to ``URL``.
169
def update(self, dest, rev_options):
171
Update an already-existing repo to the given ``rev_options``.
173
raise NotImplementedError
175
def check_destination(self, dest, url, rev_options, rev_display):
177
Prepare a location to receive a checkout/clone.
179
Return True if the location is ready for (and requires) a
180
checkout/clone, False otherwise.
184
if os.path.exists(dest):
186
if os.path.exists(os.path.join(dest, self.dirname)):
187
existing_url = self.get_url(dest)
188
if self.compare_urls(existing_url, url):
189
logger.info('%s in %s exists, and has correct URL (%s)'
190
% (self.repo_name.title(), display_path(dest), url))
191
logger.notify('Updating %s %s%s'
192
% (display_path(dest), self.repo_name, rev_display))
193
self.update(dest, rev_options)
195
logger.warn('%s %s in %s exists with URL %s'
196
% (self.name, self.repo_name, display_path(dest), existing_url))
197
prompt = ('(s)witch, (i)gnore, (w)ipe, (b)ackup ', ('s', 'i', 'w', 'b'))
199
logger.warn('Directory %s already exists, and is not a %s %s.'
200
% (dest, self.name, self.repo_name))
201
prompt = ('(i)gnore, (w)ipe, (b)ackup ', ('i', 'w', 'b'))
203
logger.warn('The plan is to install the %s repository %s'
205
response = ask('What to do? %s' % prompt[0], prompt[1])
208
logger.notify('Switching %s %s to %s%s'
209
% (self.repo_name, display_path(dest), url, rev_display))
210
self.switch(dest, url, rev_options)
211
elif response == 'i':
214
elif response == 'w':
215
logger.warn('Deleting %s' % display_path(dest))
218
elif response == 'b':
219
dest_dir = backup_dir(dest)
220
logger.warn('Backing up %s to %s'
221
% (display_path(dest), dest_dir))
222
shutil.move(dest, dest_dir)
226
def unpack(self, location):
227
raise NotImplementedError
229
def get_src_requirement(self, dist, location, find_tags=False):
230
raise NotImplementedError
233
def get_src_requirement(dist, location, find_tags):
234
version_control = vcs.get_backend_from_location(location)
236
return version_control().get_src_requirement(dist, location, find_tags)
237
logger.warn('cannot determine version of editable source in %s (is not SVN checkout, Git clone, Mercurial clone or Bazaar branch)' % location)
238
return dist.as_requirement()