1
# Copyright (C) 2013 Canonical Ltd.
2
# Author: Colin Watson <cjwatson@ubuntu.com>
4
# This program is free software: you can redistribute it and/or modify
5
# it under the terms of the GNU General Public License as published by
6
# the Free Software Foundation; version 3 of the License.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program. If not, see <http://www.gnu.org/licenses/>.
16
"""Click package hooks.
18
See doc/hooks.rst for the draft specification.
21
from __future__ import print_function
26
"package_install_hooks",
31
from functools import partial
38
from string import Formatter
41
from debian.deb822 import Deb822
43
from click import osextras
44
from click.paths import hooks_dir
45
from click.user import ClickUser, ClickUsers
48
def _read_manifest_hooks(db, package, version):
52
manifest_path = os.path.join(
53
db.path(package, version), ".click", "info",
54
"%s.manifest" % package)
55
with io.open(manifest_path, encoding="UTF-8") as manifest:
56
return json.load(manifest).get("hooks", {})
57
except (KeyError, IOError):
61
class ClickPatternFormatter(Formatter):
62
"""A Formatter that handles simple $-expansions.
64
`${key}` is replaced by the value of the `key` argument; `$$` is
65
replaced by `$`. Any `$` character not followed by `{...}` or `$` is
68
_expansion_re = re.compile(r"\$(?:\$|{(.*?)})")
70
def parse(self, format_string):
72
match = self._expansion_re.search(format_string)
75
yield format_string, None, None, None
77
start, end = match.span()
78
if format_string[match.start():match.end()] == "$$":
79
yield format_string[:match.start() + 1], None, None, None
81
yield format_string[:match.start()], match.group(1), "", None
82
format_string = format_string[match.end():]
84
def get_field(self, field_name, args, kwargs):
85
value = kwargs.get(field_name)
88
return value, field_name
90
def possible_expansion(self, s, format_string, *args, **kwargs):
91
"""Check if s is a possible expansion.
93
Any (keyword) arguments have the effect of binding some keys to
94
fixed values; unspecified keys may take any value, and will bind
95
greedily to the longest possible string.
97
If s is a possible expansion, then this method returns a (possibly
98
empty) dictionary mapping all the unspecified keys to their bound
99
values. Otherwise, it returns None.
104
for literal_text, field_name, format_spec, conversion in \
105
self.parse(format_string):
107
regex_pieces.append(re.escape(literal_text))
108
if field_name is not None:
109
if field_name in kwargs:
110
regex_pieces.append(re.escape(kwargs[field_name]))
112
regex_pieces.append("(.*)")
113
group_names.append(field_name)
114
match = re.match("^%s$" % "".join(regex_pieces), s)
117
for group in range(len(group_names)):
118
ret[group_names[group]] = match.group(group + 1)
122
class ClickHook(Deb822):
123
_formatter = ClickPatternFormatter()
125
def __init__(self, db, name, sequence=None, fields=None, encoding="utf-8"):
126
super(ClickHook, self).__init__(
127
sequence=sequence, fields=fields, encoding=encoding)
132
def open(cls, db, name):
134
with open(os.path.join(hooks_dir, "%s.hook" % name)) as f:
135
return cls(db, name, f)
137
raise KeyError("No click hook '%s' installed" % name)
140
def open_all(cls, db, hook_name=None):
141
for entry in osextras.listdir_force(hooks_dir):
142
if not entry.endswith(".hook"):
145
with open(os.path.join(hooks_dir, entry)) as f:
146
hook = cls(db, entry[:-5], f)
147
if hook_name is None or hook.hook_name == hook_name:
153
def user_level(self):
154
return self.get("user-level", "no") == "yes"
157
def single_version(self):
158
return self.user_level or self.get("single-version", "no") == "yes"
162
return self.get("hook-name", self.name)
164
def short_app_id(self, package, app_name):
165
# TODO: perhaps this check belongs further up the stack somewhere?
166
if "_" in app_name or "/" in app_name:
168
"Application name '%s' may not contain _ or / characters" %
170
return "%s_%s" % (package, app_name)
172
def app_id(self, package, version, app_name):
173
return "%s_%s" % (self.short_app_id(package, app_name), version)
175
def _user_home(self, user):
178
# TODO: make robust against removed users
180
return pwd.getpwnam(user).pw_dir
182
def pattern(self, package, version, app_name, user=None):
183
app_id = self.app_id(package, version, app_name)
187
"home": self._user_home(user),
189
if self.single_version:
190
kwargs["short-id"] = self.short_app_id(package, app_name)
191
return self._formatter.format(self["pattern"], **kwargs).rstrip(os.sep)
193
def _drop_privileges(self, username):
194
if os.geteuid() != 0:
196
pw = pwd.getpwnam(username)
198
[g.gr_gid for g in grp.getgrall() if username in g.gr_mem])
199
# Portability note: this assumes that we have [gs]etres[gu]id, which
200
# is true on Linux but not necessarily elsewhere. If you need to
201
# support something else, there are reasonably standard alternatives
202
# involving other similar calls; see e.g. gnulib/lib/idpriv-drop.c.
203
os.setresgid(pw.pw_gid, pw.pw_gid, pw.pw_gid)
204
os.setresuid(pw.pw_uid, pw.pw_uid, pw.pw_uid)
205
assert os.getresuid() == (pw.pw_uid, pw.pw_uid, pw.pw_uid)
206
assert os.getresgid() == (pw.pw_gid, pw.pw_gid, pw.pw_gid)
207
os.environ["HOME"] = pw.pw_dir
208
os.umask(osextras.get_umask() | 0o002)
210
def _run_commands_user(self, user=None):
216
def _run_commands(self, user=None):
218
drop_privileges = partial(
219
self._drop_privileges, self._run_commands_user(user=user))
220
subprocess.check_call(
221
self["exec"], preexec_fn=drop_privileges, shell=True)
222
if self.get("trigger", "no") == "yes":
223
raise NotImplementedError("'Trigger: yes' not yet implemented")
225
def _previous_entries(self, user=None):
226
"""Find entries that match the structure of our links."""
227
link_dir = os.path.dirname(self.pattern("", "", "", user=user))
228
# TODO: This only works if the app ID only appears, at most, in the
229
# last component of the pattern path.
230
for previous_entry in osextras.listdir_force(link_dir):
231
previous_path = os.path.join(link_dir, previous_entry)
232
previous_exp = self._formatter.possible_expansion(
233
previous_path, self["pattern"], user=user,
234
home=self._user_home(user))
235
if previous_exp is None or "id" not in previous_exp:
237
previous_id = previous_exp["id"]
239
previous_package, previous_app_name, previous_version = (
240
previous_id.split("_", 2))
243
previous_package, previous_version, previous_app_name)
247
def _install_link(self, package, version, app_name, relative_path,
248
user=None, user_db=None):
249
"""Install a hook symlink.
251
This should be called with dropped privileges if necessary.
254
target = os.path.join(user_db.path(package), relative_path)
256
target = os.path.join(
257
self.db.path(package, version), relative_path)
258
link = self.pattern(package, version, app_name, user=user)
259
if not os.path.islink(link) or os.readlink(link) != target:
260
osextras.ensuredir(os.path.dirname(link))
261
osextras.symlink_force(target, link)
263
def install_package(self, package, version, app_name, relative_path,
266
user_db = ClickUser(self.db, user=user)
270
# Remove previous versions if necessary.
271
if self.single_version:
272
for path, p_package, p_version, p_app_name in \
273
self._previous_entries(user=user):
274
if (p_package == package and p_app_name == app_name and
275
p_version != version):
276
osextras.unlink_force(path)
279
with user_db._dropped_privileges():
281
package, version, app_name, relative_path,
282
user=user, user_db=user_db)
284
self._install_link(package, version, app_name, relative_path)
285
self._run_commands(user=user)
287
def remove_package(self, package, version, app_name, user=None):
288
osextras.unlink_force(
289
self.pattern(package, version, app_name, user=user))
290
self._run_commands(user=user)
292
def _all_packages(self, user=None):
293
"""Return an iterable of all unpacked packages.
295
If running a user-level hook, this returns (package, version, user)
296
for the current version of each package registered for each user, or
297
only for a single user if user is not None.
299
If running a system-level hook, this returns (package, version,
300
None) for each version of each unpacked package.
304
user_db = ClickUser(self.db, user=user)
305
for package, version in user_db.items():
306
yield package, version, user
308
for user_name, user_db in ClickUsers(self.db).items():
309
if user_name.startswith("@"):
311
for package, version in user_db.items():
312
yield package, version, user_name
314
for package, version, _, _ in self.db.packages():
315
yield package, version, None
317
def _relevant_apps(self, user=None):
318
"""Return an iterable of all applications relevant for this hook."""
319
for package, version, user_name in self._all_packages(user=user):
320
manifest = _read_manifest_hooks(self.db, package, version)
321
for app_name, hooks in manifest.items():
322
if self.hook_name in hooks:
324
package, version, app_name, user_name,
325
hooks[self.hook_name])
327
def install(self, user=None):
328
for package, version, app_name, user_name, relative_path in (
329
self._relevant_apps(user=user)):
330
self.install_package(
331
package, version, app_name, relative_path, user=user_name)
333
def remove(self, user=None):
334
for package, version, app_name, user_name, _ in (
335
self._relevant_apps(user=user)):
336
self.remove_package(package, version, app_name, user=user_name)
338
def sync(self, user=None):
340
user_db = ClickUser(self.db, user=user)
345
for package, version, app_name, user_name, relative_path in (
346
self._relevant_apps(user=user)):
347
seen.add((package, version, app_name))
349
with user_db._dropped_privileges():
351
package, version, app_name, relative_path,
352
user=user_name, user_db=user_db)
354
self._install_link(package, version, app_name, relative_path)
355
for path, package, version, app_name in \
356
self._previous_entries(user=user):
357
if (package, version, app_name) not in seen:
358
osextras.unlink_force(path)
359
self._run_commands(user=user)
362
def _app_hooks(hooks):
364
for app_name in hooks:
365
for hook_name in hooks[app_name]:
366
items.add((app_name, hook_name))
370
def package_install_hooks(db, package, old_version, new_version, user=None):
371
"""Run hooks following installation or upgrade of a Click package.
373
If user is None, only run system-level hooks. If user is not None, only
374
run user-level hooks for that user.
376
old_manifest = _read_manifest_hooks(db, package, old_version)
377
new_manifest = _read_manifest_hooks(db, package, new_version)
379
# Remove any targets for single-version hooks that were in the old
380
# manifest but not the new one.
381
for app_name, hook_name in sorted(
382
_app_hooks(old_manifest) - _app_hooks(new_manifest)):
383
for hook in ClickHook.open_all(db, hook_name):
384
if hook.user_level != (user is not None):
386
if hook.single_version:
387
hook.remove_package(package, old_version, app_name, user=user)
389
for app_name, app_hooks in sorted(new_manifest.items()):
390
for hook_name, relative_path in sorted(app_hooks.items()):
391
for hook in ClickHook.open_all(db, hook_name):
392
if hook.user_level != (user is not None):
394
hook.install_package(
395
package, new_version, app_name, relative_path, user=user)
398
def package_remove_hooks(db, package, old_version, user=None):
399
"""Run hooks following removal of a Click package.
401
If user is None, only run system-level hooks. If user is not None, only
402
run user-level hooks for that user.
404
old_manifest = _read_manifest_hooks(db, package, old_version)
406
for app_name, app_hooks in sorted(old_manifest.items()):
407
for hook_name in sorted(app_hooks):
408
for hook in ClickHook.open_all(db, hook_name):
409
if hook.user_level != (user is not None):
411
hook.remove_package(package, old_version, app_name, user=user)
414
def run_system_hooks(db):
415
"""Run system-level hooks for all installed packages.
417
This is useful when starting up from images with preinstalled packages
418
which may not have had their system-level hooks run properly when
419
building the image. It is suitable for running at system startup.
421
db.ensure_ownership()
422
for hook in ClickHook.open_all(db):
423
if not hook.user_level:
427
def run_user_hooks(db, user=None):
428
"""Run user-level hooks for all packages registered for a user.
430
This is useful to catch up with packages that may have been preinstalled
431
and registered for all users. It is suitable for running at session
435
user = pwd.getpwuid(os.getuid()).pw_name
436
for hook in ClickHook.open_all(db):