first commit

This commit is contained in:
2018-07-24 15:40:59 +02:00
commit d0d03faed9
1884 changed files with 366061 additions and 0 deletions

View File

@@ -0,0 +1,471 @@
"""Handles all VCS (version control) support"""
from __future__ import absolute_import
import copy
import errno
import logging
import os
import shutil
import sys
from pip._vendor.six.moves.urllib import parse as urllib_parse
from pip._internal.exceptions import BadCommand
from pip._internal.utils.misc import (
display_path, backup_dir, call_subprocess, rmtree, ask_path_exists,
)
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from typing import Dict, Optional, Tuple
from pip._internal.basecommand import Command
__all__ = ['vcs', 'get_src_requirement']
logger = logging.getLogger(__name__)
class RevOptions(object):
"""
Encapsulates a VCS-specific revision to install, along with any VCS
install options.
Instances of this class should be treated as if immutable.
"""
def __init__(self, vcs, rev=None, extra_args=None):
"""
Args:
vcs: a VersionControl object.
rev: the name of the revision to install.
extra_args: a list of extra options.
"""
if extra_args is None:
extra_args = []
self.extra_args = extra_args
self.rev = rev
self.vcs = vcs
def __repr__(self):
return '<RevOptions {}: rev={!r}>'.format(self.vcs.name, self.rev)
@property
def arg_rev(self):
if self.rev is None:
return self.vcs.default_arg_rev
return self.rev
def to_args(self):
"""
Return the VCS-specific command arguments.
"""
args = []
rev = self.arg_rev
if rev is not None:
args += self.vcs.get_base_rev_args(rev)
args += self.extra_args
return args
def to_display(self):
if not self.rev:
return ''
return ' (to revision {})'.format(self.rev)
def make_new(self, rev):
"""
Make a copy of the current instance, but with a new rev.
Args:
rev: the name of the revision for the new object.
"""
return self.vcs.make_rev_options(rev, extra_args=self.extra_args)
class VcsSupport(object):
_registry = {} # type: Dict[str, Command]
schemes = ['ssh', 'git', 'hg', 'bzr', 'sftp', 'svn']
def __init__(self):
# Register more schemes with urlparse for various version control
# systems
urllib_parse.uses_netloc.extend(self.schemes)
# Python >= 2.7.4, 3.3 doesn't have uses_fragment
if getattr(urllib_parse, 'uses_fragment', None):
urllib_parse.uses_fragment.extend(self.schemes)
super(VcsSupport, self).__init__()
def __iter__(self):
return self._registry.__iter__()
@property
def backends(self):
return list(self._registry.values())
@property
def dirnames(self):
return [backend.dirname for backend in self.backends]
@property
def all_schemes(self):
schemes = []
for backend in self.backends:
schemes.extend(backend.schemes)
return schemes
def register(self, cls):
if not hasattr(cls, 'name'):
logger.warning('Cannot register VCS %s', cls.__name__)
return
if cls.name not in self._registry:
self._registry[cls.name] = cls
logger.debug('Registered VCS backend: %s', cls.name)
def unregister(self, cls=None, name=None):
if name in self._registry:
del self._registry[name]
elif cls in self._registry.values():
del self._registry[cls.name]
else:
logger.warning('Cannot unregister because no class or name given')
def get_backend_name(self, location):
"""
Return the name of the version control backend if found at given
location, e.g. vcs.get_backend_name('/path/to/vcs/checkout')
"""
for vc_type in self._registry.values():
if vc_type.controls_location(location):
logger.debug('Determine that %s uses VCS: %s',
location, vc_type.name)
return vc_type.name
return None
def get_backend(self, name):
name = name.lower()
if name in self._registry:
return self._registry[name]
def get_backend_from_location(self, location):
vc_type = self.get_backend_name(location)
if vc_type:
return self.get_backend(vc_type)
return None
vcs = VcsSupport()
class VersionControl(object):
name = ''
dirname = ''
# List of supported schemes for this Version Control
schemes = () # type: Tuple[str, ...]
# Iterable of environment variable names to pass to call_subprocess().
unset_environ = () # type: Tuple[str, ...]
default_arg_rev = None # type: Optional[str]
def __init__(self, url=None, *args, **kwargs):
self.url = url
super(VersionControl, self).__init__(*args, **kwargs)
def get_base_rev_args(self, rev):
"""
Return the base revision arguments for a vcs command.
Args:
rev: the name of a revision to install. Cannot be None.
"""
raise NotImplementedError
def make_rev_options(self, rev=None, extra_args=None):
"""
Return a RevOptions object.
Args:
rev: the name of a revision to install.
extra_args: a list of extra options.
"""
return RevOptions(self, rev, extra_args=extra_args)
def _is_local_repository(self, repo):
"""
posix absolute paths start with os.path.sep,
win32 ones start with drive (like c:\\folder)
"""
drive, tail = os.path.splitdrive(repo)
return repo.startswith(os.path.sep) or drive
# See issue #1083 for why this method was introduced:
# https://github.com/pypa/pip/issues/1083
def translate_egg_surname(self, surname):
# For example, Django has branches of the form "stable/1.7.x".
return surname.replace('/', '_')
def export(self, location):
"""
Export the repository at the url to the destination location
i.e. only download the files, without vcs informations
"""
raise NotImplementedError
def get_url_rev(self):
"""
Returns the correct repository URL and revision by parsing the given
repository URL
"""
error_message = (
"Sorry, '%s' is a malformed VCS url. "
"The format is <vcs>+<protocol>://<url>, "
"e.g. svn+http://myrepo/svn/MyApp#egg=MyApp"
)
assert '+' in self.url, error_message % self.url
url = self.url.split('+', 1)[1]
scheme, netloc, path, query, frag = urllib_parse.urlsplit(url)
rev = None
if '@' in path:
path, rev = path.rsplit('@', 1)
url = urllib_parse.urlunsplit((scheme, netloc, path, query, ''))
return url, rev
def get_info(self, location):
"""
Returns (url, revision), where both are strings
"""
assert not location.rstrip('/').endswith(self.dirname), \
'Bad directory: %s' % location
return self.get_url(location), self.get_revision(location)
def normalize_url(self, url):
"""
Normalize a URL for comparison by unquoting it and removing any
trailing slash.
"""
return urllib_parse.unquote(url).rstrip('/')
def compare_urls(self, url1, url2):
"""
Compare two repo URLs for identity, ignoring incidental differences.
"""
return (self.normalize_url(url1) == self.normalize_url(url2))
def obtain(self, dest):
"""
Called when installing or updating an editable package, takes the
source path of the checkout.
"""
raise NotImplementedError
def switch(self, dest, url, rev_options):
"""
Switch the repo at ``dest`` to point to ``URL``.
Args:
rev_options: a RevOptions object.
"""
raise NotImplementedError
def update(self, dest, rev_options):
"""
Update an already-existing repo to the given ``rev_options``.
Args:
rev_options: a RevOptions object.
"""
raise NotImplementedError
def is_commit_id_equal(self, dest, name):
"""
Return whether the id of the current commit equals the given name.
Args:
dest: the repository directory.
name: a string name.
"""
raise NotImplementedError
def check_destination(self, dest, url, rev_options):
"""
Prepare a location to receive a checkout/clone.
Return True if the location is ready for (and requires) a
checkout/clone, False otherwise.
Args:
rev_options: a RevOptions object.
"""
checkout = True
prompt = False
rev_display = rev_options.to_display()
if os.path.exists(dest):
checkout = False
if os.path.exists(os.path.join(dest, self.dirname)):
existing_url = self.get_url(dest)
if self.compare_urls(existing_url, url):
logger.debug(
'%s in %s exists, and has correct URL (%s)',
self.repo_name.title(),
display_path(dest),
url,
)
if not self.is_commit_id_equal(dest, rev_options.rev):
logger.info(
'Updating %s %s%s',
display_path(dest),
self.repo_name,
rev_display,
)
self.update(dest, rev_options)
else:
logger.info(
'Skipping because already up-to-date.')
else:
logger.warning(
'%s %s in %s exists with URL %s',
self.name,
self.repo_name,
display_path(dest),
existing_url,
)
prompt = ('(s)witch, (i)gnore, (w)ipe, (b)ackup ',
('s', 'i', 'w', 'b'))
else:
logger.warning(
'Directory %s already exists, and is not a %s %s.',
dest,
self.name,
self.repo_name,
)
prompt = ('(i)gnore, (w)ipe, (b)ackup ', ('i', 'w', 'b'))
if prompt:
logger.warning(
'The plan is to install the %s repository %s',
self.name,
url,
)
response = ask_path_exists('What to do? %s' % prompt[0],
prompt[1])
if response == 's':
logger.info(
'Switching %s %s to %s%s',
self.repo_name,
display_path(dest),
url,
rev_display,
)
self.switch(dest, url, rev_options)
elif response == 'i':
# do nothing
pass
elif response == 'w':
logger.warning('Deleting %s', display_path(dest))
rmtree(dest)
checkout = True
elif response == 'b':
dest_dir = backup_dir(dest)
logger.warning(
'Backing up %s to %s', display_path(dest), dest_dir,
)
shutil.move(dest, dest_dir)
checkout = True
elif response == 'a':
sys.exit(-1)
return checkout
def unpack(self, location):
"""
Clean up current location and download the url repository
(and vcs infos) into location
"""
if os.path.exists(location):
rmtree(location)
self.obtain(location)
def get_src_requirement(self, dist, location):
"""
Return a string representing the requirement needed to
redownload the files currently present in location, something
like:
{repository_url}@{revision}#egg={project_name}-{version_identifier}
"""
raise NotImplementedError
def get_url(self, location):
"""
Return the url used at location
Used in get_info or check_destination
"""
raise NotImplementedError
def get_revision(self, location):
"""
Return the current commit id of the files at the given location.
"""
raise NotImplementedError
def run_command(self, cmd, show_stdout=True, cwd=None,
on_returncode='raise',
command_desc=None,
extra_environ=None, spinner=None):
"""
Run a VCS subcommand
This is simply a wrapper around call_subprocess that adds the VCS
command name, and checks that the VCS is available
"""
cmd = [self.name] + cmd
try:
return call_subprocess(cmd, show_stdout, cwd,
on_returncode,
command_desc, extra_environ,
unset_environ=self.unset_environ,
spinner=spinner)
except OSError as e:
# errno.ENOENT = no such file or directory
# In other words, the VCS executable isn't available
if e.errno == errno.ENOENT:
raise BadCommand(
'Cannot find command %r - do you have '
'%r installed and in your '
'PATH?' % (self.name, self.name))
else:
raise # re-raise exception if a different error occurred
@classmethod
def controls_location(cls, location):
"""
Check if a location is controlled by the vcs.
It is meant to be overridden to implement smarter detection
mechanisms for specific vcs.
"""
logger.debug('Checking in %s for %s (%s)...',
location, cls.dirname, cls.name)
path = os.path.join(location, cls.dirname)
return os.path.exists(path)
def get_src_requirement(dist, location):
version_control = vcs.get_backend_from_location(location)
if version_control:
try:
return version_control().get_src_requirement(dist,
location)
except BadCommand:
logger.warning(
'cannot determine version of editable source in %s '
'(%s command not found in path)',
location,
version_control.name,
)
return dist.as_requirement()
logger.warning(
'cannot determine version of editable source in %s (is not SVN '
'checkout, Git clone, Mercurial clone or Bazaar branch)',
location,
)
return dist.as_requirement()

View File

@@ -0,0 +1,113 @@
from __future__ import absolute_import
import logging
import os
from pip._vendor.six.moves.urllib import parse as urllib_parse
from pip._internal.download import path_to_url
from pip._internal.utils.misc import display_path, rmtree
from pip._internal.utils.temp_dir import TempDirectory
from pip._internal.vcs import VersionControl, vcs
logger = logging.getLogger(__name__)
class Bazaar(VersionControl):
name = 'bzr'
dirname = '.bzr'
repo_name = 'branch'
schemes = (
'bzr', 'bzr+http', 'bzr+https', 'bzr+ssh', 'bzr+sftp', 'bzr+ftp',
'bzr+lp',
)
def __init__(self, url=None, *args, **kwargs):
super(Bazaar, self).__init__(url, *args, **kwargs)
# This is only needed for python <2.7.5
# Register lp but do not expose as a scheme to support bzr+lp.
if getattr(urllib_parse, 'uses_fragment', None):
urllib_parse.uses_fragment.extend(['lp'])
def get_base_rev_args(self, rev):
return ['-r', rev]
def export(self, location):
"""
Export the Bazaar repository at the url to the destination location
"""
# Remove the location to make sure Bazaar can export it correctly
if os.path.exists(location):
rmtree(location)
with TempDirectory(kind="export") as temp_dir:
self.unpack(temp_dir.path)
self.run_command(
['export', location],
cwd=temp_dir.path, show_stdout=False,
)
def switch(self, dest, url, rev_options):
self.run_command(['switch', url], cwd=dest)
def update(self, dest, rev_options):
cmd_args = ['pull', '-q'] + rev_options.to_args()
self.run_command(cmd_args, cwd=dest)
def obtain(self, dest):
url, rev = self.get_url_rev()
rev_options = self.make_rev_options(rev)
if self.check_destination(dest, url, rev_options):
rev_display = rev_options.to_display()
logger.info(
'Checking out %s%s to %s',
url,
rev_display,
display_path(dest),
)
cmd_args = ['branch', '-q'] + rev_options.to_args() + [url, dest]
self.run_command(cmd_args)
def get_url_rev(self):
# hotfix the URL scheme after removing bzr+ from bzr+ssh:// readd it
url, rev = super(Bazaar, self).get_url_rev()
if url.startswith('ssh://'):
url = 'bzr+' + url
return url, rev
def get_url(self, location):
urls = self.run_command(['info'], show_stdout=False, cwd=location)
for line in urls.splitlines():
line = line.strip()
for x in ('checkout of branch: ',
'parent branch: '):
if line.startswith(x):
repo = line.split(x)[1]
if self._is_local_repository(repo):
return path_to_url(repo)
return repo
return None
def get_revision(self, location):
revision = self.run_command(
['revno'], show_stdout=False, cwd=location,
)
return revision.splitlines()[-1]
def get_src_requirement(self, dist, location):
repo = self.get_url(location)
if not repo:
return None
if not repo.lower().startswith('bzr:'):
repo = 'bzr+' + repo
egg_project_name = dist.egg_name().split('-', 1)[0]
current_rev = self.get_revision(location)
return '%s@%s#egg=%s' % (repo, current_rev, egg_project_name)
def is_commit_id_equal(self, dest, name):
"""Always assume the versions don't match"""
return False
vcs.register(Bazaar)

View File

@@ -0,0 +1,311 @@
from __future__ import absolute_import
import logging
import os.path
import re
from pip._vendor.packaging.version import parse as parse_version
from pip._vendor.six.moves.urllib import parse as urllib_parse
from pip._vendor.six.moves.urllib import request as urllib_request
from pip._internal.compat import samefile
from pip._internal.exceptions import BadCommand
from pip._internal.utils.misc import display_path
from pip._internal.utils.temp_dir import TempDirectory
from pip._internal.vcs import VersionControl, vcs
urlsplit = urllib_parse.urlsplit
urlunsplit = urllib_parse.urlunsplit
logger = logging.getLogger(__name__)
HASH_REGEX = re.compile('[a-fA-F0-9]{40}')
def looks_like_hash(sha):
return bool(HASH_REGEX.match(sha))
class Git(VersionControl):
name = 'git'
dirname = '.git'
repo_name = 'clone'
schemes = (
'git', 'git+http', 'git+https', 'git+ssh', 'git+git', 'git+file',
)
# Prevent the user's environment variables from interfering with pip:
# https://github.com/pypa/pip/issues/1130
unset_environ = ('GIT_DIR', 'GIT_WORK_TREE')
default_arg_rev = 'HEAD'
def __init__(self, url=None, *args, **kwargs):
# Works around an apparent Git bug
# (see http://article.gmane.org/gmane.comp.version-control.git/146500)
if url:
scheme, netloc, path, query, fragment = urlsplit(url)
if scheme.endswith('file'):
initial_slashes = path[:-len(path.lstrip('/'))]
newpath = (
initial_slashes +
urllib_request.url2pathname(path)
.replace('\\', '/').lstrip('/')
)
url = urlunsplit((scheme, netloc, newpath, query, fragment))
after_plus = scheme.find('+') + 1
url = scheme[:after_plus] + urlunsplit(
(scheme[after_plus:], netloc, newpath, query, fragment),
)
super(Git, self).__init__(url, *args, **kwargs)
def get_base_rev_args(self, rev):
return [rev]
def get_git_version(self):
VERSION_PFX = 'git version '
version = self.run_command(['version'], show_stdout=False)
if version.startswith(VERSION_PFX):
version = version[len(VERSION_PFX):].split()[0]
else:
version = ''
# get first 3 positions of the git version becasue
# on windows it is x.y.z.windows.t, and this parses as
# LegacyVersion which always smaller than a Version.
version = '.'.join(version.split('.')[:3])
return parse_version(version)
def export(self, location):
"""Export the Git repository at the url to the destination location"""
if not location.endswith('/'):
location = location + '/'
with TempDirectory(kind="export") as temp_dir:
self.unpack(temp_dir.path)
self.run_command(
['checkout-index', '-a', '-f', '--prefix', location],
show_stdout=False, cwd=temp_dir.path
)
def get_revision_sha(self, dest, rev):
"""
Return a commit hash for the given revision if it names a remote
branch or tag. Otherwise, return None.
Args:
dest: the repository directory.
rev: the revision name.
"""
# Pass rev to pre-filter the list.
output = self.run_command(['show-ref', rev], cwd=dest,
show_stdout=False, on_returncode='ignore')
refs = {}
for line in output.strip().splitlines():
try:
sha, ref = line.split()
except ValueError:
# Include the offending line to simplify troubleshooting if
# this error ever occurs.
raise ValueError('unexpected show-ref line: {!r}'.format(line))
refs[ref] = sha
branch_ref = 'refs/remotes/origin/{}'.format(rev)
tag_ref = 'refs/tags/{}'.format(rev)
return refs.get(branch_ref) or refs.get(tag_ref)
def check_rev_options(self, dest, rev_options):
"""Check the revision options before checkout.
Returns a new RevOptions object for the SHA1 of the branch or tag
if found.
Args:
rev_options: a RevOptions object.
"""
rev = rev_options.arg_rev
sha = self.get_revision_sha(dest, rev)
if sha is not None:
return rev_options.make_new(sha)
# Do not show a warning for the common case of something that has
# the form of a Git commit hash.
if not looks_like_hash(rev):
logger.warning(
"Did not find branch or tag '%s', assuming revision or ref.",
rev,
)
return rev_options
def is_commit_id_equal(self, dest, name):
"""
Return whether the current commit hash equals the given name.
Args:
dest: the repository directory.
name: a string name.
"""
if not name:
# Then avoid an unnecessary subprocess call.
return False
return self.get_revision(dest) == name
def switch(self, dest, url, rev_options):
self.run_command(['config', 'remote.origin.url', url], cwd=dest)
cmd_args = ['checkout', '-q'] + rev_options.to_args()
self.run_command(cmd_args, cwd=dest)
self.update_submodules(dest)
def update(self, dest, rev_options):
# First fetch changes from the default remote
if self.get_git_version() >= parse_version('1.9.0'):
# fetch tags in addition to everything else
self.run_command(['fetch', '-q', '--tags'], cwd=dest)
else:
self.run_command(['fetch', '-q'], cwd=dest)
# Then reset to wanted revision (maybe even origin/master)
rev_options = self.check_rev_options(dest, rev_options)
cmd_args = ['reset', '--hard', '-q'] + rev_options.to_args()
self.run_command(cmd_args, cwd=dest)
#: update submodules
self.update_submodules(dest)
def obtain(self, dest):
url, rev = self.get_url_rev()
rev_options = self.make_rev_options(rev)
if self.check_destination(dest, url, rev_options):
rev_display = rev_options.to_display()
logger.info(
'Cloning %s%s to %s', url, rev_display, display_path(dest),
)
self.run_command(['clone', '-q', url, dest])
if rev:
rev_options = self.check_rev_options(dest, rev_options)
# Only do a checkout if the current commit id doesn't match
# the requested revision.
if not self.is_commit_id_equal(dest, rev_options.rev):
rev = rev_options.rev
# Only fetch the revision if it's a ref
if rev.startswith('refs/'):
self.run_command(
['fetch', '-q', url] + rev_options.to_args(),
cwd=dest,
)
# Change the revision to the SHA of the ref we fetched
rev = 'FETCH_HEAD'
self.run_command(['checkout', '-q', rev], cwd=dest)
#: repo may contain submodules
self.update_submodules(dest)
def get_url(self, location):
"""Return URL of the first remote encountered."""
remotes = self.run_command(
['config', '--get-regexp', r'remote\..*\.url'],
show_stdout=False, cwd=location,
)
remotes = remotes.splitlines()
found_remote = remotes[0]
for remote in remotes:
if remote.startswith('remote.origin.url '):
found_remote = remote
break
url = found_remote.split(' ')[1]
return url.strip()
def get_revision(self, location):
current_rev = self.run_command(
['rev-parse', 'HEAD'], show_stdout=False, cwd=location,
)
return current_rev.strip()
def _get_subdirectory(self, location):
"""Return the relative path of setup.py to the git repo root."""
# find the repo root
git_dir = self.run_command(['rev-parse', '--git-dir'],
show_stdout=False, cwd=location).strip()
if not os.path.isabs(git_dir):
git_dir = os.path.join(location, git_dir)
root_dir = os.path.join(git_dir, '..')
# find setup.py
orig_location = location
while not os.path.exists(os.path.join(location, 'setup.py')):
last_location = location
location = os.path.dirname(location)
if location == last_location:
# We've traversed up to the root of the filesystem without
# finding setup.py
logger.warning(
"Could not find setup.py for directory %s (tried all "
"parent directories)",
orig_location,
)
return None
# relative path of setup.py to repo root
if samefile(root_dir, location):
return None
return os.path.relpath(location, root_dir)
def get_src_requirement(self, dist, location):
repo = self.get_url(location)
if not repo.lower().startswith('git:'):
repo = 'git+' + repo
egg_project_name = dist.egg_name().split('-', 1)[0]
if not repo:
return None
current_rev = self.get_revision(location)
req = '%s@%s#egg=%s' % (repo, current_rev, egg_project_name)
subdirectory = self._get_subdirectory(location)
if subdirectory:
req += '&subdirectory=' + subdirectory
return req
def get_url_rev(self):
"""
Prefixes stub URLs like 'user@hostname:user/repo.git' with 'ssh://'.
That's required because although they use SSH they sometimes doesn't
work with a ssh:// scheme (e.g. Github). But we need a scheme for
parsing. Hence we remove it again afterwards and return it as a stub.
"""
if '://' not in self.url:
assert 'file:' not in self.url
self.url = self.url.replace('git+', 'git+ssh://')
url, rev = super(Git, self).get_url_rev()
url = url.replace('ssh://', '')
else:
url, rev = super(Git, self).get_url_rev()
return url, rev
def update_submodules(self, location):
if not os.path.exists(os.path.join(location, '.gitmodules')):
return
self.run_command(
['submodule', 'update', '--init', '--recursive', '-q'],
cwd=location,
)
@classmethod
def controls_location(cls, location):
if super(Git, cls).controls_location(location):
return True
try:
r = cls().run_command(['rev-parse'],
cwd=location,
show_stdout=False,
on_returncode='ignore')
return not r
except BadCommand:
logger.debug("could not determine if %s is under git control "
"because git is not available", location)
return False
vcs.register(Git)

View File

@@ -0,0 +1,105 @@
from __future__ import absolute_import
import logging
import os
from pip._vendor.six.moves import configparser
from pip._internal.download import path_to_url
from pip._internal.utils.misc import display_path
from pip._internal.utils.temp_dir import TempDirectory
from pip._internal.vcs import VersionControl, vcs
logger = logging.getLogger(__name__)
class Mercurial(VersionControl):
name = 'hg'
dirname = '.hg'
repo_name = 'clone'
schemes = ('hg', 'hg+http', 'hg+https', 'hg+ssh', 'hg+static-http')
def get_base_rev_args(self, rev):
return [rev]
def export(self, location):
"""Export the Hg repository at the url to the destination location"""
with TempDirectory(kind="export") as temp_dir:
self.unpack(temp_dir.path)
self.run_command(
['archive', location], show_stdout=False, cwd=temp_dir.path
)
def switch(self, dest, url, rev_options):
repo_config = os.path.join(dest, self.dirname, 'hgrc')
config = configparser.SafeConfigParser()
try:
config.read(repo_config)
config.set('paths', 'default', url)
with open(repo_config, 'w') as config_file:
config.write(config_file)
except (OSError, configparser.NoSectionError) as exc:
logger.warning(
'Could not switch Mercurial repository to %s: %s', url, exc,
)
else:
cmd_args = ['update', '-q'] + rev_options.to_args()
self.run_command(cmd_args, cwd=dest)
def update(self, dest, rev_options):
self.run_command(['pull', '-q'], cwd=dest)
cmd_args = ['update', '-q'] + rev_options.to_args()
self.run_command(cmd_args, cwd=dest)
def obtain(self, dest):
url, rev = self.get_url_rev()
rev_options = self.make_rev_options(rev)
if self.check_destination(dest, url, rev_options):
rev_display = rev_options.to_display()
logger.info(
'Cloning hg %s%s to %s',
url,
rev_display,
display_path(dest),
)
self.run_command(['clone', '--noupdate', '-q', url, dest])
cmd_args = ['update', '-q'] + rev_options.to_args()
self.run_command(cmd_args, cwd=dest)
def get_url(self, location):
url = self.run_command(
['showconfig', 'paths.default'],
show_stdout=False, cwd=location).strip()
if self._is_local_repository(url):
url = path_to_url(url)
return url.strip()
def get_revision(self, location):
current_revision = self.run_command(
['parents', '--template={rev}'],
show_stdout=False, cwd=location).strip()
return current_revision
def get_revision_hash(self, location):
current_rev_hash = self.run_command(
['parents', '--template={node}'],
show_stdout=False, cwd=location).strip()
return current_rev_hash
def get_src_requirement(self, dist, location):
repo = self.get_url(location)
if not repo.lower().startswith('hg:'):
repo = 'hg+' + repo
egg_project_name = dist.egg_name().split('-', 1)[0]
if not repo:
return None
current_rev_hash = self.get_revision_hash(location)
return '%s@%s#egg=%s' % (repo, current_rev_hash, egg_project_name)
def is_commit_id_equal(self, dest, name):
"""Always assume the versions don't match"""
return False
vcs.register(Mercurial)

View File

@@ -0,0 +1,271 @@
from __future__ import absolute_import
import logging
import os
import re
from pip._vendor.six.moves.urllib import parse as urllib_parse
from pip._internal.index import Link
from pip._internal.utils.logging import indent_log
from pip._internal.utils.misc import display_path, rmtree
from pip._internal.vcs import VersionControl, vcs
_svn_xml_url_re = re.compile('url="([^"]+)"')
_svn_rev_re = re.compile(r'committed-rev="(\d+)"')
_svn_url_re = re.compile(r'URL: (.+)')
_svn_revision_re = re.compile(r'Revision: (.+)')
_svn_info_xml_rev_re = re.compile(r'\s*revision="(\d+)"')
_svn_info_xml_url_re = re.compile(r'<url>(.*)</url>')
logger = logging.getLogger(__name__)
class Subversion(VersionControl):
name = 'svn'
dirname = '.svn'
repo_name = 'checkout'
schemes = ('svn', 'svn+ssh', 'svn+http', 'svn+https', 'svn+svn')
def get_base_rev_args(self, rev):
return ['-r', rev]
def get_info(self, location):
"""Returns (url, revision), where both are strings"""
assert not location.rstrip('/').endswith(self.dirname), \
'Bad directory: %s' % location
output = self.run_command(
['info', location],
show_stdout=False,
extra_environ={'LANG': 'C'},
)
match = _svn_url_re.search(output)
if not match:
logger.warning(
'Cannot determine URL of svn checkout %s',
display_path(location),
)
logger.debug('Output that cannot be parsed: \n%s', output)
return None, None
url = match.group(1).strip()
match = _svn_revision_re.search(output)
if not match:
logger.warning(
'Cannot determine revision of svn checkout %s',
display_path(location),
)
logger.debug('Output that cannot be parsed: \n%s', output)
return url, None
return url, match.group(1)
def export(self, location):
"""Export the svn repository at the url to the destination location"""
url, rev = self.get_url_rev()
rev_options = get_rev_options(self, url, rev)
url = self.remove_auth_from_url(url)
logger.info('Exporting svn repository %s to %s', url, location)
with indent_log():
if os.path.exists(location):
# Subversion doesn't like to check out over an existing
# directory --force fixes this, but was only added in svn 1.5
rmtree(location)
cmd_args = ['export'] + rev_options.to_args() + [url, location]
self.run_command(cmd_args, show_stdout=False)
def switch(self, dest, url, rev_options):
cmd_args = ['switch'] + rev_options.to_args() + [url, dest]
self.run_command(cmd_args)
def update(self, dest, rev_options):
cmd_args = ['update'] + rev_options.to_args() + [dest]
self.run_command(cmd_args)
def obtain(self, dest):
url, rev = self.get_url_rev()
rev_options = get_rev_options(self, url, rev)
url = self.remove_auth_from_url(url)
if self.check_destination(dest, url, rev_options):
rev_display = rev_options.to_display()
logger.info(
'Checking out %s%s to %s',
url,
rev_display,
display_path(dest),
)
cmd_args = ['checkout', '-q'] + rev_options.to_args() + [url, dest]
self.run_command(cmd_args)
def get_location(self, dist, dependency_links):
for url in dependency_links:
egg_fragment = Link(url).egg_fragment
if not egg_fragment:
continue
if '-' in egg_fragment:
# FIXME: will this work when a package has - in the name?
key = '-'.join(egg_fragment.split('-')[:-1]).lower()
else:
key = egg_fragment
if key == dist.key:
return url.split('#', 1)[0]
return None
def get_revision(self, location):
"""
Return the maximum revision for all files under a given location
"""
# Note: taken from setuptools.command.egg_info
revision = 0
for base, dirs, files in os.walk(location):
if self.dirname not in dirs:
dirs[:] = []
continue # no sense walking uncontrolled subdirs
dirs.remove(self.dirname)
entries_fn = os.path.join(base, self.dirname, 'entries')
if not os.path.exists(entries_fn):
# FIXME: should we warn?
continue
dirurl, localrev = self._get_svn_url_rev(base)
if base == location:
base = dirurl + '/' # save the root url
elif not dirurl or not dirurl.startswith(base):
dirs[:] = []
continue # not part of the same svn tree, skip it
revision = max(revision, localrev)
return revision
def get_url_rev(self):
# hotfix the URL scheme after removing svn+ from svn+ssh:// readd it
url, rev = super(Subversion, self).get_url_rev()
if url.startswith('ssh://'):
url = 'svn+' + url
return url, rev
def get_url(self, location):
# In cases where the source is in a subdirectory, not alongside
# setup.py we have to look up in the location until we find a real
# setup.py
orig_location = location
while not os.path.exists(os.path.join(location, 'setup.py')):
last_location = location
location = os.path.dirname(location)
if location == last_location:
# We've traversed up to the root of the filesystem without
# finding setup.py
logger.warning(
"Could not find setup.py for directory %s (tried all "
"parent directories)",
orig_location,
)
return None
return self._get_svn_url_rev(location)[0]
def _get_svn_url_rev(self, location):
from pip._internal.exceptions import InstallationError
entries_path = os.path.join(location, self.dirname, 'entries')
if os.path.exists(entries_path):
with open(entries_path) as f:
data = f.read()
else: # subversion >= 1.7 does not have the 'entries' file
data = ''
if (data.startswith('8') or
data.startswith('9') or
data.startswith('10')):
data = list(map(str.splitlines, data.split('\n\x0c\n')))
del data[0][0] # get rid of the '8'
url = data[0][3]
revs = [int(d[9]) for d in data if len(d) > 9 and d[9]] + [0]
elif data.startswith('<?xml'):
match = _svn_xml_url_re.search(data)
if not match:
raise ValueError('Badly formatted data: %r' % data)
url = match.group(1) # get repository URL
revs = [int(m.group(1)) for m in _svn_rev_re.finditer(data)] + [0]
else:
try:
# subversion >= 1.7
xml = self.run_command(
['info', '--xml', location],
show_stdout=False,
)
url = _svn_info_xml_url_re.search(xml).group(1)
revs = [
int(m.group(1)) for m in _svn_info_xml_rev_re.finditer(xml)
]
except InstallationError:
url, revs = None, []
if revs:
rev = max(revs)
else:
rev = 0
return url, rev
def get_src_requirement(self, dist, location):
repo = self.get_url(location)
if repo is None:
return None
# FIXME: why not project name?
egg_project_name = dist.egg_name().split('-', 1)[0]
rev = self.get_revision(location)
return 'svn+%s@%s#egg=%s' % (repo, rev, egg_project_name)
def is_commit_id_equal(self, dest, name):
"""Always assume the versions don't match"""
return False
@staticmethod
def remove_auth_from_url(url):
# Return a copy of url with 'username:password@' removed.
# username/pass params are passed to subversion through flags
# and are not recognized in the url.
# parsed url
purl = urllib_parse.urlsplit(url)
stripped_netloc = \
purl.netloc.split('@')[-1]
# stripped url
url_pieces = (
purl.scheme, stripped_netloc, purl.path, purl.query, purl.fragment
)
surl = urllib_parse.urlunsplit(url_pieces)
return surl
def get_rev_options(vcs, url, rev):
"""
Return a RevOptions object.
"""
r = urllib_parse.urlsplit(url)
if hasattr(r, 'username'):
# >= Python-2.5
username, password = r.username, r.password
else:
netloc = r[1]
if '@' in netloc:
auth = netloc.split('@')[0]
if ':' in auth:
username, password = auth.split(':', 1)
else:
username, password = auth, None
else:
username, password = None, None
extra_args = []
if username:
extra_args += ['--username', username]
if password:
extra_args += ['--password', password]
return vcs.make_rev_options(rev, extra_args=extra_args)
vcs.register(Subversion)