14
14
# You should have received a copy of the GNU General Public License
15
15
# along with this program. If not, see <http://www.gnu.org/licenses/>.
26
from xml.etree import ElementTree
28
_DEFAULT_SOURCES = '''deb http://${mirror}archive.ubuntu.com/ubuntu/ vivid main restricted
29
deb http://${mirror}archive.ubuntu.com/ubuntu/ vivid-updates main restricted
30
deb http://${mirror}archive.ubuntu.com/ubuntu/ vivid universe
31
deb http://${mirror}archive.ubuntu.com/ubuntu/ vivid-updates universe
32
deb http://${mirror}archive.ubuntu.com/ubuntu/ vivid multiverse
33
deb http://${mirror}archive.ubuntu.com/ubuntu/ vivid-updates multiverse
34
deb http://security.ubuntu.com/ubuntu vivid-security main restricted
35
deb http://security.ubuntu.com/ubuntu vivid-security universe
36
deb http://security.ubuntu.com/ubuntu vivid-security multiverse
38
_GEOIP_SERVER = "http://geoip.ubuntu.com/lookup"
24
41
class PackageNotFoundError(Exception):
46
def __init__(self, download_dir, recommends=False):
47
self.apt_cache = apt.Cache()
48
self.manifest_dep_names = self._manifest_dep_names()
63
def __init__(self, rootdir, recommends=False, sources=_DEFAULT_SOURCES):
64
sources = sources or _DEFAULT_SOURCES
65
self.downloaddir = os.path.join(rootdir, 'download')
66
self.rootdir = rootdir
67
self.apt_cache = _setup_apt_cache(rootdir, sources)
49
68
self.recommends = recommends
50
self.download_dir = download_dir
52
70
def get(self, package_names):
53
# TODO cleanup download_dir for clean gets and unpacks
54
self.all_dep_names = set()
56
all_package_names = self._compute_deps(package_names)
58
for pkg in all_package_names:
59
self.apt_cache[pkg].candidate.fetch_binary(destdir=self.download_dir)
61
return all_package_names
63
def unpack(self, root_dir):
64
pkgs_abs_path = glob.glob(os.path.join(self.download_dir, '*.deb'))
71
os.makedirs(self.downloaddir, exist_ok=True)
73
manifest_dep_names = self._manifest_dep_names()
75
for name in package_names:
77
self.apt_cache[name].mark_install()
79
raise PackageNotFoundError(name)
81
for pkg in self.apt_cache:
82
# those should be already on each system, it also prevents
83
# diving into downloading libc6
84
if (pkg.candidate.priority in 'essential' and
85
pkg.name not in package_names):
86
print('Skipping priority essential/imporant %s' % pkg.name)
88
if (pkg.name in manifest_dep_names and pkg.name not in package_names):
89
print('Skipping blacklisted from manifest package %s' % pkg.name)
91
if pkg.marked_install:
92
pkg.candidate.fetch_binary(destdir=self.downloaddir)
94
def unpack(self, rootdir):
95
pkgs_abs_path = glob.glob(os.path.join(self.downloaddir, '*.deb'))
65
96
for pkg in pkgs_abs_path:
66
97
# TODO needs elegance and error control
68
subprocess.check_call(['dpkg-deb', '--extract', pkg, root_dir])
99
subprocess.check_call(['dpkg-deb', '--extract', pkg, rootdir])
69
100
except subprocess.CalledProcessError:
70
101
raise UnpackError(pkg)
72
_fix_symlinks(root_dir)
103
_fix_symlinks(rootdir)
74
105
def _manifest_dep_names(self):
75
106
manifest_dep_names = set()
83
114
return manifest_dep_names
85
def _compute_deps(self, package_names):
86
self._add_deps(package_names)
88
for pkg in package_names:
89
if pkg not in self.all_dep_names:
90
raise PackageNotFoundError(pkg)
92
return sorted(self.all_dep_names)
94
def _add_deps(self, package_names):
95
def add_deps(packages):
97
# Remove the :any in packages
98
# TODO support multiarch
99
pkg = pkg.rsplit(':', 1)[0]
100
if pkg in self.all_dep_names:
102
if pkg in self.manifest_dep_names and pkg not in package_names:
106
candidate_pkg = self.apt_cache[pkg].candidate
108
raise PackageNotFoundError(pkg)
109
deps = candidate_pkg.dependencies
111
deps += candidate_pkg.recommends
112
self.all_dep_names.add(pkg)
113
add_deps([x[0].name for x in deps])
114
add_deps(package_names)
117
def get_geoip_country_code_prefix():
119
with urllib.request.urlopen(_GEOIP_SERVER) as f:
121
et = ElementTree.fromstring(xml_data)
122
cc = et.find("CountryCode")
125
return cc.text.lower() + "."
126
except (ElementTree.ParseError, urllib.error.URLError):
131
def _setup_apt_cache(rootdir, sources):
132
os.makedirs(os.path.join(rootdir, 'etc', 'apt'), exist_ok=True)
133
srcfile = os.path.join(rootdir, 'etc', 'apt', 'sources.list')
134
with open(srcfile, 'w') as f:
135
mirror_prefix = get_geoip_country_code_prefix()
136
sources_list = string.Template(sources).substitute(
137
{"mirror": mirror_prefix})
138
f.write(sources_list)
139
progress = apt.progress.text.AcquireProgress()
140
apt_cache = apt.Cache(rootdir=rootdir, memonly=True)
141
apt_cache.update(fetch_progress=progress, sources_list=srcfile)
117
147
def _fix_symlinks(debdir):