~jamesodhunt/upstart/test-quiesce-cleanup

« back to all changes in this revision

Viewing changes to scripts/pyupstart.py

  • Committer: Dmitrijs Ledkovs
  • Date: 2013-08-21 13:27:50 UTC
  • mfrom: (1472.4.9 python-upstart-module)
  • Revision ID: dmitrijs.ledkovs@canonical.com-20130821132750-613sjyrlc4kjte4w
MergeĀ lp:~jamesodhunt/upstart/python-upstart-module

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/python3
 
2
#---------------------------------------------------------------------
 
3
# = Limitations =
 
4
#
 
5
# - Override files are not currently supported.
 
6
#
 
7
#   Note that you can make use of Upstart.test_dir to determine
 
8
#   where to create the override but be aware that after creating a
 
9
#   '.override' file, you must wait until Upstart has re-parsed the job.
 
10
#
 
11
#---------------------------------------------------------------------
 
12
 
 
13
"""
 
14
Upstart test module.
 
15
"""
 
16
 
 
17
import os
 
18
import sys
 
19
import logging
 
20
import tempfile
 
21
import pyinotify
 
22
import subprocess
 
23
import shutil
 
24
import dbus
 
25
import dbus.service
 
26
import dbus.mainloop.glib
 
27
import time
 
28
import json
 
29
from datetime import datetime, timedelta
 
30
from gi.repository import GLib
 
31
 
 
32
VERSION = '0.1'
 
33
NAME = 'TestUpstart'
 
34
 
 
35
UPSTART = '/sbin/init'
 
36
INITCTL = '/sbin/initctl'
 
37
 
 
38
UPSTART_SESSION_ENV = 'UPSTART_SESSION'
 
39
 
 
40
UPSTART_STATE_FILE = 'upstart.state'
 
41
 
 
42
INIT_SOCKET = 'unix:abstract=/com/ubuntu/upstart'
 
43
 
 
44
SYSTEM_JOB_DIR = '/etc/init'
 
45
SYSTEM_LOG_DIR = '/var/log/upstart'
 
46
 
 
47
# used to log session init output
 
48
DEFAULT_LOGFILE = '/tmp/upstart.log'
 
49
 
 
50
DEFAULT_SESSION_INSTALL_PATH = '/usr/share/upstart/sessions'
 
51
 
 
52
SESSION_DIR_FMT = 'upstart/sessions'
 
53
 
 
54
BUS_NAME                 = 'com.ubuntu.Upstart'
 
55
INTERFACE_NAME           = 'com.ubuntu.Upstart0_6'
 
56
JOB_INTERFACE_NAME       = 'com.ubuntu.Upstart0_6.Job'
 
57
INSTANCE_INTERFACE_NAME  = 'com.ubuntu.Upstart0_6.Instance'
 
58
OBJECT_PATH              = '/com/ubuntu/Upstart'
 
59
FREEDESKTOP_PROPERTIES   = 'org.freedesktop.DBus.Properties'
 
60
 
 
61
# Maximum number of seconds to wait for Upstart to detect a new job
 
62
# has been created
 
63
JOB_WAIT_SECS = 5
 
64
 
 
65
# Maximum number of seconds to wait for session file to appear after
 
66
# startup of Session Init
 
67
SESSION_FILE_WAIT_SECS = 5
 
68
 
 
69
# Maximum number of seconds to wait for Upstart to complete a re-exec
 
70
REEXEC_WAIT_SECS = 5
 
71
 
 
72
# Maximum number of seconds to wait for Upstart to create a file
 
73
FILE_WAIT_SECS = 5
 
74
 
 
75
# Maximum number of seconds to wait for Upstart to create a logfile
 
76
LOGFILE_WAIT_SECS = 5
 
77
 
 
78
#---------------------------------------------------------------------
 
79
 
 
80
 
 
81
def dbus_encode(str):
 
82
    """
 
83
    Simulate nih_dbus_path() which Upstart uses to convert
 
84
    a job path into one suitable for use as a D-Bus object path.
 
85
 
 
86
    This entails converting all non-alpha-numeric bytes in the
 
87
    string into a 3 byte string comprising an underscore ('_'),
 
88
    followed by the 2 byte lower-case hex representation of the byte
 
89
    in question. Alpha-numeric bytes are left unmolested.
 
90
 
 
91
    Note that in the special case of the specified string being None
 
92
    or the nul string, it is encoded as '_'.
 
93
 
 
94
    Example: 'hello-world' would be encoded as 'hello_2dworld' since
 
95
    '-' is 2d in hex.
 
96
 
 
97
    """
 
98
    if not str:
 
99
        return '_'
 
100
 
 
101
    hex = []
 
102
    for ch in str:
 
103
        if ch.isalpha() or ch.isdigit():
 
104
            hex.append(ch)
 
105
        else:
 
106
            hex.append("_%02x" % ord(ch))
 
107
 
 
108
    # convert back into a string
 
109
    return ''.join(hex)
 
110
 
 
111
 
 
112
def secs_to_milli(secs):
 
113
    """
 
114
    Convert @secs seconds to milli-seconds.
 
115
    """
 
116
    return secs * 1000
 
117
 
 
118
 
 
119
def wait_for_file(path, timeout=FILE_WAIT_SECS):
 
120
    """
 
121
    Wait for a specified file to exist.
 
122
 
 
123
    @path: Full path to file to wait for.
 
124
 
 
125
    Returns: True if file was created within @timeout seconds, else
 
126
     False.
 
127
    """
 
128
    until = datetime.now() + timedelta(seconds=timeout)
 
129
 
 
130
    while datetime.now() < until:
 
131
        if os.path.exists(path):
 
132
            return True
 
133
        time.sleep(0.1)
 
134
 
 
135
    return False
 
136
 
 
137
 
 
138
class InotifyHandler(pyinotify.ProcessEvent):
 
139
 
 
140
    # We don't actually do anything here since all we care
 
141
    # about is whether we timed-out.
 
142
    def process_IN_CREATE(self, event):
 
143
        pass
 
144
 
 
145
 
 
146
class UpstartException(Exception):
 
147
    """
 
148
    An Upstart Exception.
 
149
    """
 
150
    pass
 
151
 
 
152
 
 
153
class Upstart:
 
154
    """
 
155
    Upstart Class.
 
156
 
 
157
    conf_dir: Full path to job configuration file directory.
 
158
    test_dir: Full path to directory below @conf_dir used to store
 
159
     test jobs.
 
160
    test_dir_name: Relative directory of @test_dir below @conf_dir
 
161
        (effectively '@conf_dir - @test_dir').
 
162
    log_dir: Full path to job log files directory.
 
163
    """
 
164
 
 
165
    def __init__(self):
 
166
        self.logger = logging.getLogger(self.__class__.__name__)
 
167
        self.jobs = []
 
168
 
 
169
        self.conf_dir = None
 
170
        self.test_dir = None
 
171
        self.test_dir_name = None
 
172
        self.log_dir = None
 
173
 
 
174
        self.socket = None
 
175
        self.connection = None
 
176
 
 
177
        # Set to True when a new job is created
 
178
        self.job_seen = False
 
179
 
 
180
        self.new_job = None
 
181
        self.mainloop = None
 
182
        self.timeout_source = None
 
183
 
 
184
        dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
 
185
 
 
186
        self.create_dirs()
 
187
 
 
188
    def connect(self, force=False):
 
189
        """
 
190
        Connect to Upstart.
 
191
 
 
192
        @force: if True, connect regardless of whether already
 
193
        connected.
 
194
 
 
195
        Notes:
 
196
          - Raises an UpstartException() if already connected.
 
197
        """
 
198
        if self.connection and not force:
 
199
            raise UpstartException('Already connected')
 
200
 
 
201
        # Create appropriate D-Bus connection
 
202
        self.connection = dbus.connection.Connection(self.socket)
 
203
        self.remote_object = self.connection.get_object(
 
204
            object_path=OBJECT_PATH)
 
205
        self.proxy = dbus.Interface(self.remote_object, INTERFACE_NAME)
 
206
 
 
207
    def reconnect(self):
 
208
        """
 
209
        Forcibly reconnect to Upstart.
 
210
        """
 
211
        self.connect(force=True)
 
212
 
 
213
    def polling_connect(self, timeout=REEXEC_WAIT_SECS, force=False):
 
214
        """
 
215
        Attempt to connect to Upstart repeatedly for up to @timeout
 
216
        seconds.
 
217
 
 
218
        Useful after a re-exec since that operation although fast takes
 
219
        an indeterminate amount of time to complete.
 
220
 
 
221
        @timeout: seconds to wait for successful connection.
 
222
        @force: if True, force a reconnection.
 
223
        """
 
224
        for i in range(timeout):
 
225
            try:
 
226
                self.connect(force=force)
 
227
                self.version()
 
228
                return
 
229
            except dbus.exceptions.DBusException:
 
230
                time.sleep(1)
 
231
 
 
232
        raise UpstartException(
 
233
            'Failed to reconnect to Upstart after %d seconds' % timeout)
 
234
 
 
235
    def _timeout_cb(self):
 
236
        """
 
237
        Handle timeout if job not seen in a reasonable amount of time.
 
238
        """
 
239
        self.mainloop.quit()
 
240
 
 
241
    def _idle_create_job_cb(self, name, body, *args):
 
242
        """
 
243
        Handler to create a Job Configuration File as soon as
 
244
        the main loop starts.
 
245
 
 
246
        """
 
247
        self.new_job = Job(self, self.test_dir, self.test_dir_name, name, body=body)
 
248
 
 
249
        # deregister
 
250
        return False
 
251
 
 
252
    def _job_added_cb(self, path):
 
253
        """
 
254
        Handle the 'JobAdded(Object path)' signal.
 
255
        """
 
256
 
 
257
        # ignore signals that don't match the job we care about.
 
258
        if path != self.job_object_path:
 
259
            return True
 
260
 
 
261
        self.job_seen = True
 
262
 
 
263
        assert self.timeout_source, 'Expected timeout source to be defined'
 
264
 
 
265
        # remove timeout handler
 
266
        GLib.source_remove(self.timeout_source)
 
267
 
 
268
        self.mainloop.quit()
 
269
 
 
270
        # deregister
 
271
        return False
 
272
 
 
273
    def set_test_dir(self):
 
274
        """
 
275
        Create a directory to hold the test jobs beneath the job
 
276
        configuration directory.
 
277
        """
 
278
        self.test_dir = tempfile.mkdtemp(prefix=NAME + '-', dir=self.conf_dir)
 
279
        self.test_dir_name = self.test_dir.replace("%s/" % self.conf_dir, '')
 
280
 
 
281
    def create_dirs(self):
 
282
        """
 
283
        Create the directories required to store job configuration files
 
284
        and log job output.
 
285
        """
 
286
        for dir in (self.conf_dir, self.test_dir, self.log_dir):
 
287
            if dir:
 
288
                try:
 
289
                    os.makedirs(dir)
 
290
                except FileExistsError:
 
291
                    pass
 
292
 
 
293
    def destroy(self):
 
294
        """
 
295
        Remove all jobs.
 
296
        """
 
297
        for job in self.jobs:
 
298
            job.destroy()
 
299
 
 
300
        if self.test_dir:
 
301
            shutil.rmtree(self.test_dir)
 
302
 
 
303
        # invalidate
 
304
        self.connection = None
 
305
        self.remote_object = None
 
306
        self.proxy = None
 
307
 
 
308
    def emit(self, event, env=None, wait=True):
 
309
        """
 
310
        @event: Name of event to emit.
 
311
        @env: optional environment for event.
 
312
        @wait: if True, wait for event to be fully emitted
 
313
         (synchronous), else async.
 
314
 
 
315
        Emit event @event with optional environment @env.
 
316
        """
 
317
        if env is None:
 
318
            env = []
 
319
        self.proxy.EmitEvent(event, dbus.Array(env, 's'), wait)
 
320
 
 
321
    def version(self, raw=False):
 
322
        """
 
323
        Determine version of running instance of Upstart.
 
324
 
 
325
        @raw: if True, return full version string, else just the
 
326
        version in the form 'x.y'.
 
327
 
 
328
        Returns: Version as a string.
 
329
 
 
330
        """
 
331
        properties = dbus.Interface(self.remote_object, FREEDESKTOP_PROPERTIES)
 
332
        version_string = properties.Get(INTERFACE_NAME, 'version')
 
333
        if raw:
 
334
            return version_string
 
335
 
 
336
        return version_string.split()[2].strip(')')
 
337
 
 
338
    def get_state_json(self):
 
339
        """
 
340
        Obtain Upstart internal state in JSON format.
 
341
        """
 
342
        return self.proxy.GetState()
 
343
 
 
344
    def get_state(self):
 
345
        """
 
346
        Obtain Upstart internal state (JSON) and convert to Python
 
347
        dictionary format before returning.
 
348
        """
 
349
        return json.loads(self.get_state_json())
 
350
 
 
351
    def get_sessions(self):
 
352
        """
 
353
        Returns dictionary of session details.
 
354
        """
 
355
        state = self.get_state()
 
356
        sessions = state['sessions']
 
357
        return sessions
 
358
 
 
359
    def session_count(self):
 
360
        """
 
361
        Returns number of chroot sessions.
 
362
        """
 
363
        return len(self.get_sessions())
 
364
 
 
365
    def sessions_exist(self):
 
366
        """
 
367
        Returns True if sessions exist, else False.
 
368
        """
 
369
        return self.session_count() > 0
 
370
 
 
371
    def reexec(self):
 
372
        """
 
373
        Request Upstart re-exec itself.
 
374
 
 
375
        Note that after a re-exec, it is necessary to reconnect to
 
376
        Upstart.
 
377
        """
 
378
        raise NotImplementedError('method must be implemented by subclass')
 
379
 
 
380
    def job_create(self, name, body):
 
381
        """
 
382
        Create a Job Configuration File.
 
383
 
 
384
        @name: Name to give the job.
 
385
        @body: String representation of configuration file, or list of
 
386
         strings.
 
387
 
 
388
        Strategy:
 
389
 
 
390
        Arranging for this method to detect that Upstart has registered
 
391
        this job in an efficient manner is tricky. The approach adopted
 
392
        is to:
 
393
 
 
394
        - Create a glib main loop.
 
395
        - Register a D-Bus signal handler which looks for the 'JobAdded'
 
396
          signal emitted by Upstart when a new job is available (in
 
397
          other words when Upstart has parsed its job configuration file
 
398
          successfully).
 
399
        - Create a glib main loop idle handler that will be called as
 
400
          soon as the loop starts and will actually create the job.
 
401
        - Add a glib main loop timeout handler which will detect if the
 
402
          job failed to be registered.
 
403
        - Run the main loop, which performs the following steps:
 
404
          - Calls the idle handler immediately. This creates the
 
405
            job configuration file, then deregisters itself so it is
 
406
            never called again.
 
407
            - If Upstart fails to parse the job, the timeout handler gets
 
408
              called which causes the main loop to exit. job_seen will not
 
409
              be set.
 
410
            - If Upstart does parse the file, the _job_added_cb()
 
411
              callback gets called as a result of the 'JobAdded' D-Bus
 
412
              signal being emitted. This will set job_seen and request
 
413
              the main loop exits.
 
414
        - Check the job_seen variable and react accordingly.
 
415
        """
 
416
 
 
417
        # Create a new mainloop
 
418
        self.mainloop = GLib.MainLoop()
 
419
 
 
420
        # reset
 
421
        self.job_seen = False
 
422
        self.new_job = None
 
423
 
 
424
        # construct the D-Bus path for the new job
 
425
        job_path = '{}/{}'.format(self.test_dir_name, name)
 
426
 
 
427
        self.job_object_path = '{}/{}/{}'.format(
 
428
            OBJECT_PATH, 'jobs', dbus_encode(job_path)
 
429
        )
 
430
 
 
431
        self.connection.add_signal_receiver(
 
432
            self._job_added_cb,
 
433
            dbus_interface=INTERFACE_NAME,
 
434
            path=OBJECT_PATH,
 
435
            signal_name='JobAdded')
 
436
 
 
437
        GLib.idle_add(self._idle_create_job_cb, name, body)
 
438
        self.timeout_source = GLib.timeout_add(
 
439
            secs_to_milli(JOB_WAIT_SECS),
 
440
            self._timeout_cb
 
441
        )
 
442
        self.mainloop.run()
 
443
 
 
444
        if not self.job_seen:
 
445
            return None
 
446
 
 
447
        # reset
 
448
        self.job_seen = False
 
449
        self.mainloop = None
 
450
 
 
451
        self.jobs.append(self.new_job)
 
452
 
 
453
        return self.new_job
 
454
 
 
455
 
 
456
class Job:
 
457
    """
 
458
    Representation of an Upstart Job.
 
459
 
 
460
    This equates to the Job Configuration file and details of the
 
461
    running instances.
 
462
 
 
463
    For single-instance jobs (those that do not specify the 'instance'
 
464
    stanza), this object is sufficient to control the job.
 
465
 
 
466
    For multi-instance jobs, manipulating this object will operate on
 
467
    *all* the instances where it makes sense to do so. If this is not
 
468
    desired behaviour, capture the JobInstance() returned from
 
469
    start() and operate on that instead.
 
470
 
 
471
    """
 
472
 
 
473
    def __init__(self, upstart, dir_name, subdir_name, job_name, body=None):
 
474
        """
 
475
        @upstart: Upstart() parent object.
 
476
        @dir_name: Full path to job configuration files directory.
 
477
        @subdir_name: Relative directory of test_dir below job
 
478
         configuration file directory.
 
479
        @job_name: Name of job.
 
480
        @body: Contents of job configuration file (either a string, or a
 
481
         list of strings).
 
482
        """
 
483
 
 
484
        self.logger = logging.getLogger(self.__class__.__name__)
 
485
 
 
486
        self.instances = []
 
487
        self.instance_names = []
 
488
 
 
489
        self.upstart = upstart
 
490
        self.subdir_name = subdir_name
 
491
        self.name = job_name
 
492
        self.job_dir = dir_name
 
493
        self.body = body
 
494
 
 
495
        self.instance_name = None
 
496
 
 
497
        # proxy to job instance
 
498
        self.instance = None
 
499
 
 
500
        self.properties = None
 
501
 
 
502
        if not self.body:
 
503
            raise UpstartException('No body specified')
 
504
 
 
505
        self.conffile = os.path.join(self.job_dir, self.name + '.conf')
 
506
 
 
507
        if self.body and isinstance(self.body, str):
 
508
            # Assume body cannot be a bytes object.
 
509
            body = body.splitlines()
 
510
 
 
511
        with open(self.conffile, 'w', encoding='utf-8') as fh:
 
512
            for line in body:
 
513
                print(line.strip(), file=fh)
 
514
            print(file=fh)
 
515
 
 
516
        self.valid = True
 
517
 
 
518
        subdir_object_path = dbus_encode(
 
519
            "%s/%s" % (self.subdir_name, self.name)
 
520
        )
 
521
        self.object_path = "%s/%s/%s" % \
 
522
            (OBJECT_PATH, 'jobs', subdir_object_path)
 
523
 
 
524
        self.remote_object = \
 
525
            self.upstart.connection.get_object(BUS_NAME, self.object_path)
 
526
        self.interface = dbus.Interface(self.remote_object, JOB_INTERFACE_NAME)
 
527
 
 
528
    def destroy(self):
 
529
        """
 
530
        Stop all instances and cleanup.
 
531
        """
 
532
        try:
 
533
            for instance in self.instances:
 
534
                instance.destroy()
 
535
 
 
536
            os.remove(self.conffile)
 
537
        except FileNotFoundError:
 
538
            pass
 
539
 
 
540
        self.valid = False
 
541
 
 
542
    def start(self, env=None, wait=True):
 
543
        """
 
544
        Start the job. For multi-instance jobs (those that specify the
 
545
        'instance' stanza), you will need to use the returned
 
546
        JobInstance object to manipulate the individual instance.
 
547
 
 
548
        Returns: JobInstance.
 
549
        """
 
550
 
 
551
        if env is None:
 
552
            env = []
 
553
        instance_path = self.interface.Start(dbus.Array(env, 's'), wait)
 
554
        instance_name = instance_path.replace("%s/" % self.object_path, '')
 
555
 
 
556
        # store the D-Bus encoded instance name ('_' for single-instance jobs)
 
557
        if instance_name not in self.instance_names:
 
558
            self.instance_names.append(instance_name)
 
559
 
 
560
        instance = JobInstance(self, instance_name, instance_path)
 
561
        self.instances.append(instance)
 
562
        return instance
 
563
 
 
564
    def _get_instance(self, name):
 
565
        """
 
566
        Retrieve job instance and its properties.
 
567
 
 
568
        @name: D-Bus encoded instance name.
 
569
 
 
570
        """
 
571
 
 
572
        assert name, 'Name must not be None'
 
573
 
 
574
        object_path = '{}/{}'.format(self.object_path, name)
 
575
 
 
576
        remote_object = \
 
577
            self.upstart.connection.get_object(BUS_NAME, object_path)
 
578
 
 
579
        return dbus.Interface(remote_object, INSTANCE_INTERFACE_NAME)
 
580
 
 
581
    def stop(self, wait=True):
 
582
        """
 
583
        Stop all running instance of the job.
 
584
 
 
585
        @wait: if False, stop job instances asynchronously.
 
586
        """
 
587
 
 
588
        for name in self.instance_names:
 
589
            instance = self._get_instance(name)
 
590
            try:
 
591
                instance.Stop(wait)
 
592
            except dbus.exceptions.DBusException:
 
593
                # job has already stopped
 
594
                pass
 
595
 
 
596
    def restart(self, wait=True):
 
597
        """
 
598
        Restart all running instance of the job.
 
599
 
 
600
        @wait: if False, stop job instances asynchronously.
 
601
        """
 
602
        for name in self.instance_names:
 
603
            instance = self._get_instance(name)
 
604
            instance.Restart(wait)
 
605
 
 
606
    def instance_object_paths(self):
 
607
        """
 
608
        Returns a list of instance object paths.
 
609
        """
 
610
        return ["%s/%s" % (self.object_path, instance)
 
611
                for instance in self.instance_names]
 
612
 
 
613
    def pids(self, name=None):
 
614
        """
 
615
        @name: D-Bus encoded instance name.
 
616
 
 
617
        Returns: Map of job processes:
 
618
            name=job process name
 
619
            value=pid
 
620
 
 
621
        Notes: If your job has multiple instances, call the method of
 
622
        the same name on the individual instance objects.
 
623
        """
 
624
 
 
625
        name = ('_' if name is None else name)
 
626
 
 
627
        if len(self.instance_names) > 1:
 
628
            raise UpstartException('Cannot handle multiple instances')
 
629
 
 
630
        assert(name in self.instance_names)
 
631
 
 
632
        instance = self._get_instance(name)
 
633
 
 
634
        properties = dbus.Interface(instance, FREEDESKTOP_PROPERTIES)
 
635
        procs = properties.Get(INSTANCE_INTERFACE_NAME, 'processes')
 
636
 
 
637
        pid_map = {}
 
638
 
 
639
        for proc in procs:
 
640
            # convert back to natural types
 
641
            job_proc = str(proc[0])
 
642
            job_pid = int(proc[1])
 
643
 
 
644
            pid_map[job_proc] = job_pid
 
645
 
 
646
        return pid_map
 
647
 
 
648
    def running(self, name):
 
649
        """
 
650
        @name: D-Bus encoded name of job instance.
 
651
 
 
652
        Determine if an instance is currently running.
 
653
 
 
654
        Returns: True if @name is running, else false.
 
655
        """
 
656
        if len(self.instance_names) > 1:
 
657
            raise UpstartException('Cannot handle multiple instances')
 
658
 
 
659
        if name not in self.instance_names:
 
660
            return False
 
661
 
 
662
        return len(self.pids(name)) > 0
 
663
 
 
664
    def logfile_name(self, instance_name):
 
665
        """
 
666
        Determine full path to logfile for job instance.
 
667
 
 
668
        @instance_name: D-Bus encoded job instance name.
 
669
 
 
670
        Note: it is up to the caller to ensure the logfile exists.
 
671
 
 
672
        Returns: full path to logfile.
 
673
        """
 
674
 
 
675
        if instance_name != '_':
 
676
            filename = '{}_{}-{}.log'.format(
 
677
                self.subdir_name, self.name, instance_name
 
678
            )
 
679
        else:
 
680
            # Note the underscore that Upstart auto-maps from the subdirectory
 
681
            # slash (see init(5)).
 
682
            filename = '{}_{}.log'.format(self.subdir_name, self.name)
 
683
 
 
684
        logfile = os.path.join(self.upstart.log_dir, filename)
 
685
 
 
686
        return logfile
 
687
 
 
688
 
 
689
class LogFile:
 
690
    """
 
691
    Representation of an Upstart job logfile.
 
692
    """
 
693
 
 
694
    def __init__(self, path):
 
695
        """
 
696
        @path: full path to logfile.
 
697
        """
 
698
        self.logger = logging.getLogger(self.__class__.__name__)
 
699
        self.path = path
 
700
        self.valid = True
 
701
 
 
702
    def destroy(self):
 
703
        """
 
704
        Clean up: *MUST* be called by caller!
 
705
        """
 
706
        try:
 
707
            os.remove(self.path)
 
708
            self.valid = False
 
709
        except OSError:
 
710
            pass
 
711
 
 
712
    def exists(self):
 
713
        """
 
714
        Determine if logfile exists.
 
715
 
 
716
        Returns: True or False.
 
717
        """
 
718
        return os.path.exists(self.path)
 
719
 
 
720
    def _get_lines(self):
 
721
        """
 
722
        Get contents of log.
 
723
 
 
724
        Notes: '\r' characters added by pty() are removed by
 
725
        readlines().
 
726
 
 
727
        Returns: List of lines in logfile.
 
728
 
 
729
        """
 
730
 
 
731
        assert self.path, 'Path not set'
 
732
 
 
733
        with open(self.path, 'r', encoding='utf-8') as fh:
 
734
            return fh.readlines()
 
735
 
 
736
    def readlines(self, timeout=LOGFILE_WAIT_SECS):
 
737
        """
 
738
        Read logfile. A timeout has to be used to avoid "hanging" since the job
 
739
        associated with this logfile:
 
740
 
 
741
        - may not create any output.
 
742
        - may produce output only after a long period of time.
 
743
 
 
744
        @timeout: seconds to wait file to be created.
 
745
 
 
746
        Notes:
 
747
          - Raises an UpstartException() on timeout.
 
748
          - '\r' characters added by pty() will be removed.
 
749
 
 
750
        Returns: Array of lines in logfile or None if logfile was not
 
751
        created in @timeout seconds.
 
752
 
 
753
        """
 
754
 
 
755
        self.timeout = timeout
 
756
 
 
757
        # Polling - ugh. However, arranging for an inotify watch
 
758
        # at this level is tricky since the watch cannot be created here
 
759
        # as by the time the watch is in place, the logfile may already
 
760
        # have been created, which not only nullifies the reason for
 
761
        # adding the watch in the first place, but also results in an
 
762
        # impotent watch (unless a timeout on the watch is specified).
 
763
        # The correct place to create the watch is in one of the Upstart
 
764
        # classes. However, even then, timing issues result wrt to
 
765
        # calling the appropriate inotify APIs.
 
766
        #
 
767
        # Hence, altough polling is gross it has the advantage of
 
768
        # simplicity and reliability in this instance.
 
769
        until = datetime.now() + timedelta(seconds=self.timeout)
 
770
 
 
771
        while datetime.now() < until:
 
772
            try:
 
773
                return self._get_lines()
 
774
            except FileNotFoundError:
 
775
                time.sleep(0.1)
 
776
 
 
777
        return None
 
778
 
 
779
 
 
780
class JobInstance:
 
781
    """
 
782
    Representation of a running Upstart Job Instance.
 
783
    """
 
784
 
 
785
    def __init__(self, job, instance_name, object_path):
 
786
        """
 
787
        @instance_name: D-Bus encoded instance name.
 
788
        @object_path: D-Bus object path.
 
789
        """
 
790
        self.logger = logging.getLogger(self.__class__.__name__)
 
791
        self.job = job
 
792
        self.instance_name = instance_name
 
793
        self.object_path = object_path
 
794
 
 
795
        self.remote_object = \
 
796
            self.job.upstart.connection.get_object(BUS_NAME, self.object_path)
 
797
 
 
798
        self.instance = \
 
799
            dbus.Interface(self.remote_object, INSTANCE_INTERFACE_NAME)
 
800
 
 
801
        self.properties = dbus.Interface(self.instance, FREEDESKTOP_PROPERTIES)
 
802
 
 
803
        # all jobs are expected to be created in a subdirectory.
 
804
        assert(self.job.subdir_name)
 
805
 
 
806
        logfile = job.logfile_name(instance_name)
 
807
        self.logfile = LogFile(logfile)
 
808
 
 
809
    def stop(self, wait=True):
 
810
        """
 
811
        Stop instance.
 
812
 
 
813
        @wait: if True, wait for job, else perform operation
 
814
         asynchronously.
 
815
        """
 
816
        try:
 
817
            self.instance.Stop(wait)
 
818
        except dbus.exceptions.DBusException:
 
819
            # job has already stopped
 
820
            pass
 
821
 
 
822
    def restart(self, wait=True):
 
823
        """
 
824
        Restart instance.
 
825
 
 
826
        @wait: if True, wait for job, else perform operation
 
827
         asynchronously.
 
828
        """
 
829
        self.instance.Restart(wait)
 
830
 
 
831
    def pids(self):
 
832
        procs = self.properties.Get(INSTANCE_INTERFACE_NAME, 'processes')
 
833
 
 
834
        pid_map = {}
 
835
 
 
836
        for proc in procs:
 
837
            # convert back to natural types
 
838
            job_proc = str(proc[0])
 
839
            job_pid = int(proc[1])
 
840
 
 
841
            pid_map[job_proc] = job_pid
 
842
 
 
843
        return pid_map
 
844
 
 
845
    def destroy(self):
 
846
        """
 
847
        Stop the instance and cleanup.
 
848
        """
 
849
        self.stop()
 
850
        self.logfile.destroy()
 
851
 
 
852
 
 
853
class SystemInit(Upstart):
 
854
 
 
855
    def __init__(self):
 
856
        super().__init__()
 
857
        self.logger = logging.getLogger(self.__class__.__name__)
 
858
        self.socket = INIT_SOCKET
 
859
        self.conf_dir = SYSTEM_JOB_DIR
 
860
        self.log_dir = SYSTEM_LOG_DIR
 
861
        self.set_test_dir()
 
862
        self.connect()
 
863
 
 
864
    def reexec(self):
 
865
        if not self.proxy:
 
866
            raise UpstartException('Not yet connected')
 
867
 
 
868
        # Use the official system interface
 
869
        os.system('telinit u')
 
870
 
 
871
 
 
872
class SessionInit(Upstart):
 
873
    """
 
874
    Create a new Upstart Session or join an existing one.
 
875
    """
 
876
 
 
877
    timeout = SESSION_FILE_WAIT_SECS
 
878
 
 
879
    def _get_sessions(self):
 
880
        """
 
881
        Obtain a list of running sessions.
 
882
 
 
883
        Returns: Map of sessions:
 
884
            name=pid
 
885
            value=socket address
 
886
        """
 
887
 
 
888
        sessions = {}
 
889
 
 
890
        args = [INITCTL, 'list-sessions']
 
891
 
 
892
        for line in subprocess.check_output(args,
 
893
                        universal_newlines=True).splitlines():
 
894
            pid, socket = line.split()
 
895
            sessions[pid] = socket
 
896
 
 
897
        return sessions
 
898
 
 
899
    def __init__(self, join=False, capture=None, extra=None):
 
900
        """
 
901
        @join: If False, start a new session. If TRUE join the existing
 
902
         main (non-test) session.
 
903
        @capture: Set to the name of a file to capture all stdout and
 
904
         stderr output.
 
905
        @extra: Array of extra arguments (only used when @join is False).
 
906
 
 
907
        Notes:
 
908
        - If @join is True, an UpstartException is raised if either
 
909
        no existing session is found, or multiple existing sessions are
 
910
        found.
 
911
        - Joining implies joining the main Upstart session, not a
 
912
        test session.
 
913
 
 
914
        """
 
915
        super().__init__()
 
916
 
 
917
        self.logger = logging.getLogger(self.__class__.__name__)
 
918
 
 
919
        self.join = join
 
920
        self.capture = capture
 
921
 
 
922
        self.socket = os.environ.get(UPSTART_SESSION_ENV)
 
923
 
 
924
        sessions = self._get_sessions()
 
925
        if self.join and len(sessions) > 1:
 
926
            raise UpstartException('Multiple existing sessions')
 
927
 
 
928
        if self.join and not self.socket:
 
929
            raise UpstartException('No existing session')
 
930
 
 
931
        if self.join:
 
932
            self.session = self.socket
 
933
 
 
934
            # We are joining the main desktop session.
 
935
            #
 
936
            # Multiple conf file directories are supported, but we'll
 
937
            # stick with the default.
 
938
            config_home = os.environ.get('XDG_CONFIG_HOME', "%s/%s"
 
939
                                         % (os.environ.get('HOME'), '.config'))
 
940
            cache_home = os.environ.get('XDG_CACHE_HOME', "%s/%s"
 
941
                                        % (os.environ.get('HOME'), '.cache'))
 
942
 
 
943
            self.conf_dir = "%s/%s" % (config_home, 'upstart')
 
944
            self.log_dir = "%s/%s" % (cache_home, 'upstart')
 
945
        else:
 
946
            args = []
 
947
 
 
948
            pid = os.getpid()
 
949
 
 
950
            self.conf_dir = \
 
951
                tempfile.mkdtemp(prefix="%s-confdir-%d-" % (NAME, pid))
 
952
 
 
953
            self.log_dir = \
 
954
                tempfile.mkdtemp(prefix="%s-logdir-%d-" % (NAME, pid))
 
955
 
 
956
            args.extend([UPSTART, '--user',
 
957
                           '--confdir', self.conf_dir,
 
958
                           '--logdir', self.log_dir])
 
959
 
 
960
            if extra:
 
961
                args.extend(extra)
 
962
 
 
963
            self.logger.debug(
 
964
                'Starting Session Init with arguments: %s' % " ".join(args)
 
965
            )
 
966
 
 
967
            watch_manager = pyinotify.WatchManager()
 
968
            mask = pyinotify.IN_CREATE
 
969
            notifier = \
 
970
                pyinotify.Notifier(watch_manager, InotifyHandler())
 
971
            session = os.path.join(os.environ['XDG_RUNTIME_DIR'], SESSION_DIR_FMT)
 
972
            watch_manager.add_watch(session, mask)
 
973
 
 
974
            notifier.process_events()
 
975
 
 
976
            if self.capture:
 
977
                self.out = open(self.capture, 'w')
 
978
            else:
 
979
                self.out = subprocess.DEVNULL
 
980
 
 
981
            self.proc = \
 
982
                subprocess.Popen(args=args, stdout=self.out, stderr=self.out)
 
983
 
 
984
            self.pid = self.proc.pid
 
985
 
 
986
            self.logger.debug('Session Init running with pid %d' % self.pid)
 
987
 
 
988
            millisecs_timeout = secs_to_milli(self.timeout)
 
989
            if not notifier.check_events(timeout=millisecs_timeout):
 
990
                msg = \
 
991
                    "Timed-out waiting for session file after %d seconds" \
 
992
                    % self.timeout
 
993
                raise UpstartException(msg)
 
994
 
 
995
            # consume
 
996
            notifier.read_events()
 
997
            notifier.stop()
 
998
 
 
999
            if self.capture:
 
1000
                self.out.flush()
 
1001
 
 
1002
            # Activity has been seen in the session file directory, so
 
1003
            # our Session Init should now have written the session file
 
1004
            # (although this assumes no other Session Inits are being
 
1005
            # started).
 
1006
            sessions = self._get_sessions()
 
1007
 
 
1008
            if not str(self.pid) in sessions:
 
1009
                msg = "Session with pid %d not found" % self.pid
 
1010
                raise UpstartException(msg)
 
1011
 
 
1012
            self.socket = sessions[str(self.pid)]
 
1013
            self.logger.debug("Created Upstart Session '%s'" % self.socket)
 
1014
            os.putenv(UPSTART_SESSION_ENV, self.socket)
 
1015
 
 
1016
        self.set_test_dir()
 
1017
        self.connect()
 
1018
 
 
1019
    def destroy(self):
 
1020
        """
 
1021
        Stop the SessionInit (if session was not joined) and cleanup.
 
1022
        """
 
1023
        super().destroy()
 
1024
        if self.capture:
 
1025
            self.out.close()
 
1026
        if not self.join:
 
1027
            self.logger.debug(
 
1028
                'Stopping Session Init running with pid %d' % self.pid
 
1029
            )
 
1030
            self.proc.terminate()
 
1031
            self.proc.wait()
 
1032
            os.rmdir(self.log_dir)
 
1033
            shutil.rmtree(self.conf_dir)
 
1034
            os.unsetenv(UPSTART_SESSION_ENV)
 
1035
 
 
1036
    def reexec(self):
 
1037
        if not self.proxy:
 
1038
            raise UpstartException('Not yet connected')
 
1039
 
 
1040
        self.proxy.Restart()