2
#---------------------------------------------------------------------
5
# - Override files are not currently supported.
7
# Note that you can make use of Upstart.test_dir to determine
8
# where to create the override but be aware that after creating a
9
# '.override' file, you must wait until Upstart has re-parsed the job.
11
#---------------------------------------------------------------------
26
import dbus.mainloop.glib
29
from datetime import datetime, timedelta
30
from gi.repository import GLib
35
UPSTART = '/sbin/init'
36
INITCTL = '/sbin/initctl'
38
UPSTART_SESSION_ENV = 'UPSTART_SESSION'
40
UPSTART_STATE_FILE = 'upstart.state'
42
INIT_SOCKET = 'unix:abstract=/com/ubuntu/upstart'
44
SYSTEM_JOB_DIR = '/etc/init'
45
SYSTEM_LOG_DIR = '/var/log/upstart'
47
# used to log session init output
48
DEFAULT_LOGFILE = '/tmp/upstart.log'
50
DEFAULT_SESSION_INSTALL_PATH = '/usr/share/upstart/sessions'
52
SESSION_DIR_FMT = 'upstart/sessions'
54
BUS_NAME = 'com.ubuntu.Upstart'
55
INTERFACE_NAME = 'com.ubuntu.Upstart0_6'
56
JOB_INTERFACE_NAME = 'com.ubuntu.Upstart0_6.Job'
57
INSTANCE_INTERFACE_NAME = 'com.ubuntu.Upstart0_6.Instance'
58
OBJECT_PATH = '/com/ubuntu/Upstart'
59
FREEDESKTOP_PROPERTIES = 'org.freedesktop.DBus.Properties'
61
# Maximum number of seconds to wait for Upstart to detect a new job
65
# Maximum number of seconds to wait for session file to appear after
66
# startup of Session Init
67
SESSION_FILE_WAIT_SECS = 5
69
# Maximum number of seconds to wait for Upstart to complete a re-exec
72
# Maximum number of seconds to wait for Upstart to create a file
75
# Maximum number of seconds to wait for Upstart to create a logfile
78
#---------------------------------------------------------------------
83
Simulate nih_dbus_path() which Upstart uses to convert
84
a job path into one suitable for use as a D-Bus object path.
86
This entails converting all non-alpha-numeric bytes in the
87
string into a 3 byte string comprising an underscore ('_'),
88
followed by the 2 byte lower-case hex representation of the byte
89
in question. Alpha-numeric bytes are left unmolested.
91
Note that in the special case of the specified string being None
92
or the nul string, it is encoded as '_'.
94
Example: 'hello-world' would be encoded as 'hello_2dworld' since
103
if ch.isalpha() or ch.isdigit():
106
hex.append("_%02x" % ord(ch))
108
# convert back into a string
112
def secs_to_milli(secs):
114
Convert @secs seconds to milli-seconds.
119
def wait_for_file(path, timeout=FILE_WAIT_SECS):
121
Wait for a specified file to exist.
123
@path: Full path to file to wait for.
125
Returns: True if file was created within @timeout seconds, else
128
until = datetime.now() + timedelta(seconds=timeout)
130
while datetime.now() < until:
131
if os.path.exists(path):
138
class InotifyHandler(pyinotify.ProcessEvent):
140
# We don't actually do anything here since all we care
141
# about is whether we timed-out.
142
def process_IN_CREATE(self, event):
146
class UpstartException(Exception):
148
An Upstart Exception.
157
conf_dir: Full path to job configuration file directory.
158
test_dir: Full path to directory below @conf_dir used to store
160
test_dir_name: Relative directory of @test_dir below @conf_dir
161
(effectively '@conf_dir - @test_dir').
162
log_dir: Full path to job log files directory.
166
self.logger = logging.getLogger(self.__class__.__name__)
171
self.test_dir_name = None
175
self.connection = None
177
# Set to True when a new job is created
178
self.job_seen = False
182
self.timeout_source = None
184
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
188
def connect(self, force=False):
192
@force: if True, connect regardless of whether already
196
- Raises an UpstartException() if already connected.
198
if self.connection and not force:
199
raise UpstartException('Already connected')
201
# Create appropriate D-Bus connection
202
self.connection = dbus.connection.Connection(self.socket)
203
self.remote_object = self.connection.get_object(
204
object_path=OBJECT_PATH)
205
self.proxy = dbus.Interface(self.remote_object, INTERFACE_NAME)
209
Forcibly reconnect to Upstart.
211
self.connect(force=True)
213
def polling_connect(self, timeout=REEXEC_WAIT_SECS, force=False):
215
Attempt to connect to Upstart repeatedly for up to @timeout
218
Useful after a re-exec since that operation although fast takes
219
an indeterminate amount of time to complete.
221
@timeout: seconds to wait for successful connection.
222
@force: if True, force a reconnection.
224
for i in range(timeout):
226
self.connect(force=force)
229
except dbus.exceptions.DBusException:
232
raise UpstartException(
233
'Failed to reconnect to Upstart after %d seconds' % timeout)
235
def _timeout_cb(self):
237
Handle timeout if job not seen in a reasonable amount of time.
241
def _idle_create_job_cb(self, name, body, *args):
243
Handler to create a Job Configuration File as soon as
244
the main loop starts.
247
self.new_job = Job(self, self.test_dir, self.test_dir_name, name, body=body)
252
def _job_added_cb(self, path):
254
Handle the 'JobAdded(Object path)' signal.
257
# ignore signals that don't match the job we care about.
258
if path != self.job_object_path:
263
assert self.timeout_source, 'Expected timeout source to be defined'
265
# remove timeout handler
266
GLib.source_remove(self.timeout_source)
273
def set_test_dir(self):
275
Create a directory to hold the test jobs beneath the job
276
configuration directory.
278
self.test_dir = tempfile.mkdtemp(prefix=NAME + '-', dir=self.conf_dir)
279
self.test_dir_name = self.test_dir.replace("%s/" % self.conf_dir, '')
281
def create_dirs(self):
283
Create the directories required to store job configuration files
286
for dir in (self.conf_dir, self.test_dir, self.log_dir):
290
except FileExistsError:
297
for job in self.jobs:
301
shutil.rmtree(self.test_dir)
304
self.connection = None
305
self.remote_object = None
308
def emit(self, event, env=None, wait=True):
310
@event: Name of event to emit.
311
@env: optional environment for event.
312
@wait: if True, wait for event to be fully emitted
313
(synchronous), else async.
315
Emit event @event with optional environment @env.
319
self.proxy.EmitEvent(event, dbus.Array(env, 's'), wait)
321
def version(self, raw=False):
323
Determine version of running instance of Upstart.
325
@raw: if True, return full version string, else just the
326
version in the form 'x.y'.
328
Returns: Version as a string.
331
properties = dbus.Interface(self.remote_object, FREEDESKTOP_PROPERTIES)
332
version_string = properties.Get(INTERFACE_NAME, 'version')
334
return version_string
336
return version_string.split()[2].strip(')')
338
def get_state_json(self):
340
Obtain Upstart internal state in JSON format.
342
return self.proxy.GetState()
346
Obtain Upstart internal state (JSON) and convert to Python
347
dictionary format before returning.
349
return json.loads(self.get_state_json())
351
def get_sessions(self):
353
Returns dictionary of session details.
355
state = self.get_state()
356
sessions = state['sessions']
359
def session_count(self):
361
Returns number of chroot sessions.
363
return len(self.get_sessions())
365
def sessions_exist(self):
367
Returns True if sessions exist, else False.
369
return self.session_count() > 0
373
Request Upstart re-exec itself.
375
Note that after a re-exec, it is necessary to reconnect to
378
raise NotImplementedError('method must be implemented by subclass')
380
def job_create(self, name, body):
382
Create a Job Configuration File.
384
@name: Name to give the job.
385
@body: String representation of configuration file, or list of
390
Arranging for this method to detect that Upstart has registered
391
this job in an efficient manner is tricky. The approach adopted
394
- Create a glib main loop.
395
- Register a D-Bus signal handler which looks for the 'JobAdded'
396
signal emitted by Upstart when a new job is available (in
397
other words when Upstart has parsed its job configuration file
399
- Create a glib main loop idle handler that will be called as
400
soon as the loop starts and will actually create the job.
401
- Add a glib main loop timeout handler which will detect if the
402
job failed to be registered.
403
- Run the main loop, which performs the following steps:
404
- Calls the idle handler immediately. This creates the
405
job configuration file, then deregisters itself so it is
407
- If Upstart fails to parse the job, the timeout handler gets
408
called which causes the main loop to exit. job_seen will not
410
- If Upstart does parse the file, the _job_added_cb()
411
callback gets called as a result of the 'JobAdded' D-Bus
412
signal being emitted. This will set job_seen and request
414
- Check the job_seen variable and react accordingly.
417
# Create a new mainloop
418
self.mainloop = GLib.MainLoop()
421
self.job_seen = False
424
# construct the D-Bus path for the new job
425
job_path = '{}/{}'.format(self.test_dir_name, name)
427
self.job_object_path = '{}/{}/{}'.format(
428
OBJECT_PATH, 'jobs', dbus_encode(job_path)
431
self.connection.add_signal_receiver(
433
dbus_interface=INTERFACE_NAME,
435
signal_name='JobAdded')
437
GLib.idle_add(self._idle_create_job_cb, name, body)
438
self.timeout_source = GLib.timeout_add(
439
secs_to_milli(JOB_WAIT_SECS),
444
if not self.job_seen:
448
self.job_seen = False
451
self.jobs.append(self.new_job)
458
Representation of an Upstart Job.
460
This equates to the Job Configuration file and details of the
463
For single-instance jobs (those that do not specify the 'instance'
464
stanza), this object is sufficient to control the job.
466
For multi-instance jobs, manipulating this object will operate on
467
*all* the instances where it makes sense to do so. If this is not
468
desired behaviour, capture the JobInstance() returned from
469
start() and operate on that instead.
473
def __init__(self, upstart, dir_name, subdir_name, job_name, body=None):
475
@upstart: Upstart() parent object.
476
@dir_name: Full path to job configuration files directory.
477
@subdir_name: Relative directory of test_dir below job
478
configuration file directory.
479
@job_name: Name of job.
480
@body: Contents of job configuration file (either a string, or a
484
self.logger = logging.getLogger(self.__class__.__name__)
487
self.instance_names = []
489
self.upstart = upstart
490
self.subdir_name = subdir_name
492
self.job_dir = dir_name
495
self.instance_name = None
497
# proxy to job instance
500
self.properties = None
503
raise UpstartException('No body specified')
505
self.conffile = os.path.join(self.job_dir, self.name + '.conf')
507
if self.body and isinstance(self.body, str):
508
# Assume body cannot be a bytes object.
509
body = body.splitlines()
511
with open(self.conffile, 'w', encoding='utf-8') as fh:
513
print(line.strip(), file=fh)
518
subdir_object_path = dbus_encode(
519
"%s/%s" % (self.subdir_name, self.name)
521
self.object_path = "%s/%s/%s" % \
522
(OBJECT_PATH, 'jobs', subdir_object_path)
524
self.remote_object = \
525
self.upstart.connection.get_object(BUS_NAME, self.object_path)
526
self.interface = dbus.Interface(self.remote_object, JOB_INTERFACE_NAME)
530
Stop all instances and cleanup.
533
for instance in self.instances:
536
os.remove(self.conffile)
537
except FileNotFoundError:
542
def start(self, env=None, wait=True):
544
Start the job. For multi-instance jobs (those that specify the
545
'instance' stanza), you will need to use the returned
546
JobInstance object to manipulate the individual instance.
548
Returns: JobInstance.
553
instance_path = self.interface.Start(dbus.Array(env, 's'), wait)
554
instance_name = instance_path.replace("%s/" % self.object_path, '')
556
# store the D-Bus encoded instance name ('_' for single-instance jobs)
557
if instance_name not in self.instance_names:
558
self.instance_names.append(instance_name)
560
instance = JobInstance(self, instance_name, instance_path)
561
self.instances.append(instance)
564
def _get_instance(self, name):
566
Retrieve job instance and its properties.
568
@name: D-Bus encoded instance name.
572
assert name, 'Name must not be None'
574
object_path = '{}/{}'.format(self.object_path, name)
577
self.upstart.connection.get_object(BUS_NAME, object_path)
579
return dbus.Interface(remote_object, INSTANCE_INTERFACE_NAME)
581
def stop(self, wait=True):
583
Stop all running instance of the job.
585
@wait: if False, stop job instances asynchronously.
588
for name in self.instance_names:
589
instance = self._get_instance(name)
592
except dbus.exceptions.DBusException:
593
# job has already stopped
596
def restart(self, wait=True):
598
Restart all running instance of the job.
600
@wait: if False, stop job instances asynchronously.
602
for name in self.instance_names:
603
instance = self._get_instance(name)
604
instance.Restart(wait)
606
def instance_object_paths(self):
608
Returns a list of instance object paths.
610
return ["%s/%s" % (self.object_path, instance)
611
for instance in self.instance_names]
613
def pids(self, name=None):
615
@name: D-Bus encoded instance name.
617
Returns: Map of job processes:
618
name=job process name
621
Notes: If your job has multiple instances, call the method of
622
the same name on the individual instance objects.
625
name = ('_' if name is None else name)
627
if len(self.instance_names) > 1:
628
raise UpstartException('Cannot handle multiple instances')
630
assert(name in self.instance_names)
632
instance = self._get_instance(name)
634
properties = dbus.Interface(instance, FREEDESKTOP_PROPERTIES)
635
procs = properties.Get(INSTANCE_INTERFACE_NAME, 'processes')
640
# convert back to natural types
641
job_proc = str(proc[0])
642
job_pid = int(proc[1])
644
pid_map[job_proc] = job_pid
648
def running(self, name):
650
@name: D-Bus encoded name of job instance.
652
Determine if an instance is currently running.
654
Returns: True if @name is running, else false.
656
if len(self.instance_names) > 1:
657
raise UpstartException('Cannot handle multiple instances')
659
if name not in self.instance_names:
662
return len(self.pids(name)) > 0
664
def logfile_name(self, instance_name):
666
Determine full path to logfile for job instance.
668
@instance_name: D-Bus encoded job instance name.
670
Note: it is up to the caller to ensure the logfile exists.
672
Returns: full path to logfile.
675
if instance_name != '_':
676
filename = '{}_{}-{}.log'.format(
677
self.subdir_name, self.name, instance_name
680
# Note the underscore that Upstart auto-maps from the subdirectory
681
# slash (see init(5)).
682
filename = '{}_{}.log'.format(self.subdir_name, self.name)
684
logfile = os.path.join(self.upstart.log_dir, filename)
691
Representation of an Upstart job logfile.
694
def __init__(self, path):
696
@path: full path to logfile.
698
self.logger = logging.getLogger(self.__class__.__name__)
704
Clean up: *MUST* be called by caller!
714
Determine if logfile exists.
716
Returns: True or False.
718
return os.path.exists(self.path)
720
def _get_lines(self):
724
Notes: '\r' characters added by pty() are removed by
727
Returns: List of lines in logfile.
731
assert self.path, 'Path not set'
733
with open(self.path, 'r', encoding='utf-8') as fh:
734
return fh.readlines()
736
def readlines(self, timeout=LOGFILE_WAIT_SECS):
738
Read logfile. A timeout has to be used to avoid "hanging" since the job
739
associated with this logfile:
741
- may not create any output.
742
- may produce output only after a long period of time.
744
@timeout: seconds to wait file to be created.
747
- Raises an UpstartException() on timeout.
748
- '\r' characters added by pty() will be removed.
750
Returns: Array of lines in logfile or None if logfile was not
751
created in @timeout seconds.
755
self.timeout = timeout
757
# Polling - ugh. However, arranging for an inotify watch
758
# at this level is tricky since the watch cannot be created here
759
# as by the time the watch is in place, the logfile may already
760
# have been created, which not only nullifies the reason for
761
# adding the watch in the first place, but also results in an
762
# impotent watch (unless a timeout on the watch is specified).
763
# The correct place to create the watch is in one of the Upstart
764
# classes. However, even then, timing issues result wrt to
765
# calling the appropriate inotify APIs.
767
# Hence, altough polling is gross it has the advantage of
768
# simplicity and reliability in this instance.
769
until = datetime.now() + timedelta(seconds=self.timeout)
771
while datetime.now() < until:
773
return self._get_lines()
774
except FileNotFoundError:
782
Representation of a running Upstart Job Instance.
785
def __init__(self, job, instance_name, object_path):
787
@instance_name: D-Bus encoded instance name.
788
@object_path: D-Bus object path.
790
self.logger = logging.getLogger(self.__class__.__name__)
792
self.instance_name = instance_name
793
self.object_path = object_path
795
self.remote_object = \
796
self.job.upstart.connection.get_object(BUS_NAME, self.object_path)
799
dbus.Interface(self.remote_object, INSTANCE_INTERFACE_NAME)
801
self.properties = dbus.Interface(self.instance, FREEDESKTOP_PROPERTIES)
803
# all jobs are expected to be created in a subdirectory.
804
assert(self.job.subdir_name)
806
logfile = job.logfile_name(instance_name)
807
self.logfile = LogFile(logfile)
809
def stop(self, wait=True):
813
@wait: if True, wait for job, else perform operation
817
self.instance.Stop(wait)
818
except dbus.exceptions.DBusException:
819
# job has already stopped
822
def restart(self, wait=True):
826
@wait: if True, wait for job, else perform operation
829
self.instance.Restart(wait)
832
procs = self.properties.Get(INSTANCE_INTERFACE_NAME, 'processes')
837
# convert back to natural types
838
job_proc = str(proc[0])
839
job_pid = int(proc[1])
841
pid_map[job_proc] = job_pid
847
Stop the instance and cleanup.
850
self.logfile.destroy()
853
class SystemInit(Upstart):
857
self.logger = logging.getLogger(self.__class__.__name__)
858
self.socket = INIT_SOCKET
859
self.conf_dir = SYSTEM_JOB_DIR
860
self.log_dir = SYSTEM_LOG_DIR
866
raise UpstartException('Not yet connected')
868
# Use the official system interface
869
os.system('telinit u')
872
class SessionInit(Upstart):
874
Create a new Upstart Session or join an existing one.
877
timeout = SESSION_FILE_WAIT_SECS
879
def _get_sessions(self):
881
Obtain a list of running sessions.
883
Returns: Map of sessions:
890
args = [INITCTL, 'list-sessions']
892
for line in subprocess.check_output(args,
893
universal_newlines=True).splitlines():
894
pid, socket = line.split()
895
sessions[pid] = socket
899
def __init__(self, join=False, capture=None, extra=None):
901
@join: If False, start a new session. If TRUE join the existing
902
main (non-test) session.
903
@capture: Set to the name of a file to capture all stdout and
905
@extra: Array of extra arguments (only used when @join is False).
908
- If @join is True, an UpstartException is raised if either
909
no existing session is found, or multiple existing sessions are
911
- Joining implies joining the main Upstart session, not a
917
self.logger = logging.getLogger(self.__class__.__name__)
920
self.capture = capture
922
self.socket = os.environ.get(UPSTART_SESSION_ENV)
924
sessions = self._get_sessions()
925
if self.join and len(sessions) > 1:
926
raise UpstartException('Multiple existing sessions')
928
if self.join and not self.socket:
929
raise UpstartException('No existing session')
932
self.session = self.socket
934
# We are joining the main desktop session.
936
# Multiple conf file directories are supported, but we'll
937
# stick with the default.
938
config_home = os.environ.get('XDG_CONFIG_HOME', "%s/%s"
939
% (os.environ.get('HOME'), '.config'))
940
cache_home = os.environ.get('XDG_CACHE_HOME', "%s/%s"
941
% (os.environ.get('HOME'), '.cache'))
943
self.conf_dir = "%s/%s" % (config_home, 'upstart')
944
self.log_dir = "%s/%s" % (cache_home, 'upstart')
951
tempfile.mkdtemp(prefix="%s-confdir-%d-" % (NAME, pid))
954
tempfile.mkdtemp(prefix="%s-logdir-%d-" % (NAME, pid))
956
args.extend([UPSTART, '--user',
957
'--confdir', self.conf_dir,
958
'--logdir', self.log_dir])
964
'Starting Session Init with arguments: %s' % " ".join(args)
967
watch_manager = pyinotify.WatchManager()
968
mask = pyinotify.IN_CREATE
970
pyinotify.Notifier(watch_manager, InotifyHandler())
971
session = os.path.join(os.environ['XDG_RUNTIME_DIR'], SESSION_DIR_FMT)
972
watch_manager.add_watch(session, mask)
974
notifier.process_events()
977
self.out = open(self.capture, 'w')
979
self.out = subprocess.DEVNULL
982
subprocess.Popen(args=args, stdout=self.out, stderr=self.out)
984
self.pid = self.proc.pid
986
self.logger.debug('Session Init running with pid %d' % self.pid)
988
millisecs_timeout = secs_to_milli(self.timeout)
989
if not notifier.check_events(timeout=millisecs_timeout):
991
"Timed-out waiting for session file after %d seconds" \
993
raise UpstartException(msg)
996
notifier.read_events()
1002
# Activity has been seen in the session file directory, so
1003
# our Session Init should now have written the session file
1004
# (although this assumes no other Session Inits are being
1006
sessions = self._get_sessions()
1008
if not str(self.pid) in sessions:
1009
msg = "Session with pid %d not found" % self.pid
1010
raise UpstartException(msg)
1012
self.socket = sessions[str(self.pid)]
1013
self.logger.debug("Created Upstart Session '%s'" % self.socket)
1014
os.putenv(UPSTART_SESSION_ENV, self.socket)
1021
Stop the SessionInit (if session was not joined) and cleanup.
1028
'Stopping Session Init running with pid %d' % self.pid
1030
self.proc.terminate()
1032
os.rmdir(self.log_dir)
1033
shutil.rmtree(self.conf_dir)
1034
os.unsetenv(UPSTART_SESSION_ENV)
1038
raise UpstartException('Not yet connected')
1040
self.proxy.Restart()