3
import logging.handlers
12
from argparse import ArgumentParser, SUPPRESS
13
from calendar import timegm
14
from datetime import datetime, timedelta
15
from gi.repository import Gtk, GObject
16
from time import time, localtime
21
Run power management operation as many times as needed
23
args, extra_args = MyArgumentParser().parse()
25
# Verify that script is run as root
27
sys.stderr.write('This script needs superuser '
28
'permissions to run correctly\n')
31
#Obtain name of the invoking user.
32
uid = os.getenv('SUDO_UID') or os.getenv('PKEXEC_UID')
34
sys.stderr.write('Unable to determine invoking user\n')
36
username = pwd.getpwuid(int(uid)).pw_name
38
LoggingConfiguration.set(args.log_level, args.log_filename, args.append)
39
logging.debug('Invoking username: %s', username)
40
logging.debug('Arguments: {0!r}'.format(args))
41
logging.debug('Extra Arguments: {0!r}'.format(extra_args))
44
operation = PowerManagementOperation(args, extra_args, user=username)
47
except (TestCancelled, TestFailed) as exception:
48
if isinstance(exception, TestFailed):
49
logging.error(exception.args[0])
50
message = exception.MESSAGE.format(args.pm_operation.capitalize())
54
title = '{0} test'.format(args.pm_operation.capitalize())
55
MessageDialog(title, message, Gtk.MessageType.ERROR).run()
58
return exception.RETURN_CODE
63
class PowerManagementOperation(object):
66
def __init__(self, args, extra_args, user=None):
68
self.extra_args = extra_args
73
Enable configuration file
75
# Enable autologin and sudo on first cycle
76
if self.args.total == self.args.repetitions:
77
AutoLoginConfigurator(user=self.user).enable()
78
SudoersConfigurator(user=self.user).enable()
80
# Schedule this script to be automatically executed
81
# on startup to continue testing
82
autostart_file = AutoStartFile(self.args, user=self.user)
83
autostart_file.write()
87
Run a power management iteration
89
logging.info('{0} operations remaining: {1}'
90
.format(self.args.pm_operation, self.args.repetitions))
91
if self.args.pm_timestamp:
92
pm_timestamp = datetime.fromtimestamp(self.args.pm_timestamp)
94
pm_time = now - pm_timestamp
95
logging.info('{0} time: {1}'
96
.format(self.args.pm_operation.capitalize(), pm_time))
97
if self.args.repetitions > 0:
98
self.run_suspend_cycles(self.args.suspends_before_reboot,
100
self.run_pm_command()
104
def run_pm_command(self):
106
Run power managment command and check result if needed
108
# Display information to user
109
# and make it possible to cancel the test
110
CountdownDialog(self.args.pm_operation,
112
self.args.hardware_delay,
113
self.args.total - self.args.repetitions,
114
self.args.total).run()
116
# A small sleep time is added to reboot and poweroff
117
# so that script has time to return a value
118
# (useful when running it as an automated test)
119
command_str = ('sleep {0}; {1}'
120
.format(self.SLEEP_TIME, self.args.pm_operation))
122
command_str += ' {0}'.format(' '.join(self.extra_args))
124
if self.args.pm_operation != 'reboot':
125
WakeUpAlarm.set(seconds=self.args.wakeup)
127
logging.info('Executing new {0!r} operation...'
128
.format(self.args.pm_operation))
129
logging.debug('Executing: {0!r}...'.format(command_str))
130
# The PM operation is performed asynchronously so let's just wait
131
# indefinitely until it happens and someone comes along to kill us.
132
# This addresses LP: #1413134
133
subprocess.check_call(command_str, shell=True)
136
def run_suspend_cycles(self, cycles_count, fwts):
137
"""Run suspend and resume cycles."""
141
script_name = 'fwts_test'
142
command_tpl = '{} -s s3 --s3-device-check --s3-sleep-delay=30 --s3-multiple={}'
144
script_name = 'sleep_test'
145
command_tpl = '{} -s mem -p -i {} -w 10'
146
script_path = os.path.join(
147
os.path.dirname(os.path.realpath(__file__)), script_name)
148
command_str = command_tpl.format(script_path, cycles_count)
149
logging.info('Running suspend/resume cycles')
150
logging.debug('Executing: {0!r}...'.format(command_str))
152
# We call sleep_test or fwts_test script and log its output as it
153
# contains average times we need to compute global average times later.
154
logging.info(subprocess.check_output(command_str,
155
universal_newlines=True, shell=True))
156
except subprocess.CalledProcessError as e:
157
logging.error('Error while running {0}:'.format(e.cmd))
158
logging.exception(e.output)
162
Gather hardware information for the last time,
163
log execution time and exit
165
# Just gather hardware information one more time and exit
166
CountdownDialog(self.args.pm_operation,
168
self.args.hardware_delay,
169
self.args.total - self.args.repetitions,
170
self.args.total).run()
174
# Log some time information
175
start = datetime.fromtimestamp(self.args.start)
177
if self.args.pm_operation == 'reboot':
178
sleep_time = timedelta(seconds=self.SLEEP_TIME)
180
sleep_time = timedelta(seconds=self.args.wakeup)
182
wait_time = timedelta(seconds=(self.args.pm_delay
183
+ (self.args.hardware_delay)
185
average = (end - start - wait_time) / self.args.total - sleep_time
186
time_message = ('Total elapsed time: {total}\n'
187
'Average recovery time: {average}'
188
.format(total=end - start,
190
logging.info(time_message)
192
message = ('{0} test complete'
193
.format(self.args.pm_operation.capitalize()))
195
logging.info(message)
197
title = '{0} test'.format(self.args.pm_operation.capitalize())
198
MessageDialog(title, message).run()
202
Restore configuration
204
# Don't execute this script again on next reboot
205
autostart_file = AutoStartFile(self.args, user=self.user)
206
autostart_file.remove()
208
# Restore previous configuration
209
SudoersConfigurator().disable()
210
AutoLoginConfigurator().disable()
213
class TestCancelled(Exception):
215
MESSAGE = '{0} test cancelled by user'
218
class TestFailed(Exception):
220
MESSAGE = '{0} test failed'
223
class WakeUpAlarm(object):
224
ALARM_FILENAME = '/sys/class/rtc/rtc0/wakealarm'
225
RTC_FILENAME = '/proc/driver/rtc'
228
def set(cls, minutes=0, seconds=0):
230
Calculate wakeup time and write it to BIOS
233
timeout = minutes * 60 + seconds
234
wakeup_time_utc = now + timeout
235
wakeup_time_local = timegm(localtime()) + timeout
237
subprocess.check_call('echo 0 > %s' % cls.ALARM_FILENAME, shell=True)
238
subprocess.check_call('echo %d > %s'
239
% (wakeup_time_utc, cls.ALARM_FILENAME),
242
with open(cls.ALARM_FILENAME) as alarm_file:
243
wakeup_time_stored_str = alarm_file.read()
245
if not re.match('\d+', wakeup_time_stored_str):
246
subprocess.check_call('echo "+%d" > %s'
247
% (timeout, cls.ALARM_FILENAME),
249
with open(cls.ALARM_FILENAME) as alarm_file2:
250
wakeup_time_stored_str = alarm_file2.read()
251
if not re.match('\d+', wakeup_time_stored_str):
252
logging.error('Invalid wakeup time format: {0!r}'
253
.format(wakeup_time_stored_str))
256
wakeup_time_stored = int(wakeup_time_stored_str)
258
logging.debug('Wakeup timestamp: {0} ({1})'
259
.format(wakeup_time_stored,
260
datetime.fromtimestamp(
261
wakeup_time_stored).strftime('%c')))
262
except ValueError as e:
266
if ((abs(wakeup_time_utc - wakeup_time_stored) > 1) and
267
(abs(wakeup_time_local - wakeup_time_stored) > 1)):
268
logging.error('Wakeup time not stored correctly')
271
with open(cls.RTC_FILENAME) as rtc_file:
272
separator_regex = re.compile('\s+:\s+')
273
rtc_data = dict([separator_regex.split(line.rstrip())
274
for line in rtc_file])
275
logging.debug('RTC data:\n{0}'
276
.format('\n'.join(['- {0}: {1}'.format(*pair)
277
for pair in rtc_data.items()])))
279
# Verify wakeup time has been set properly
280
# by looking into the alarm_IRQ and alrm_date field
281
if rtc_data['alarm_IRQ'] != 'yes':
282
logging.error('alarm_IRQ not set properly: {0}'
283
.format(rtc_data['alarm_IRQ']))
286
if '*' in rtc_data['alrm_date']:
287
logging.error('alrm_date not set properly: {0}'
288
.format(rtc_data['alrm_date']))
292
class Command(object):
294
Simple subprocess.Popen wrapper to run shell commands
297
def __init__(self, command_str, verbose=True):
298
self.command_str = command_str
299
self.verbose = verbose
308
Execute shell command and return output and status
310
logging.debug('Executing: {0!r}...'.format(self.command_str))
312
self.process = subprocess.Popen(self.command_str,
314
stdout=subprocess.PIPE,
315
stderr=subprocess.PIPE)
316
start = datetime.now()
317
result = self.process.communicate()
319
self.time = end - start
322
stdout, stderr = result
323
message = ['Output:\n'
324
'- returncode:\n{0}'.format(self.process.returncode)]
326
if type(stdout) is bytes:
327
stdout = stdout.decode('utf-8')
328
message.append('- stdout:\n{0}'.format(stdout))
330
if type(stderr) is bytes:
331
stderr = stderr.decode('utf-8')
332
message.append('- stderr:\n{0}'.format(stderr))
333
logging.debug('\n'.join(message))
341
class CountdownDialog(Gtk.Dialog):
343
Dialog that shows the amount of progress in the reboot test
344
and lets the user cancel it if needed
346
def __init__(self, pm_operation, pm_delay, hardware_delay,
347
iterations, iterations_count):
348
self.pm_operation = pm_operation
349
title = '{0} test'.format(pm_operation.capitalize())
351
buttons = (Gtk.STOCK_CANCEL, Gtk.ResponseType.CANCEL,)
352
super(CountdownDialog, self).__init__(title=title,
354
self.set_default_response(Gtk.ResponseType.CANCEL)
355
self.set_resizable(False)
356
self.set_position(Gtk.WindowPosition.CENTER)
358
progress_bar = Gtk.ProgressBar()
359
progress_bar.set_fraction(iterations / float(iterations_count))
360
progress_bar.set_text('{0}/{1}'
361
.format(iterations, iterations_count))
362
progress_bar.set_show_text(True)
363
self.vbox.pack_start(progress_bar, True, True, 0)
365
operation_event = {'template': ('Next {0} in {{time}} seconds...'
366
.format(self.pm_operation)),
368
hardware_info_event = \
369
{'template': 'Gathering hardware information in {time} seconds...',
370
'timeout': hardware_delay,
371
'callback': self.on_hardware_info_timeout_cb}
374
# In first iteration, gather hardware information directly
375
# and perform pm-operation
376
self.on_hardware_info_timeout_cb()
377
self.events = [operation_event]
378
elif iterations < iterations_count:
379
# In last iteration, wait before gathering hardware information
380
# and perform pm-operation
381
self.events = [operation_event,
384
# In last iteration, wait before gathering hardware information
385
# and finish the test
386
self.events = [hardware_info_event]
388
self.label = Gtk.Label()
389
self.vbox.pack_start(self.label, True, True, 0)
394
Set label text and run dialog
396
self.schedule_next_event()
397
response = super(CountdownDialog, self).run()
400
if response != Gtk.ResponseType.ACCEPT:
401
raise TestCancelled()
403
def schedule_next_event(self):
405
Schedule next timed event
408
self.event = self.events.pop()
409
self.timeout_counter = self.event.get('timeout', 0)
410
self.label.set_text(self.event['template']
411
.format(time=self.timeout_counter))
412
GObject.timeout_add_seconds(1, self.on_timeout_cb)
414
# Return Accept response
415
# if there are no other events scheduled
416
self.response(Gtk.ResponseType.ACCEPT)
418
def on_timeout_cb(self):
420
Set label properly and use callback method if needed
422
if self.timeout_counter > 0:
423
self.label.set_text(self.event['template']
424
.format(time=self.timeout_counter))
425
self.timeout_counter -= 1
428
# Call calback if defined
429
callback = self.event.get('callback')
433
# Schedule next event if needed
434
self.schedule_next_event()
438
def on_hardware_info_timeout_cb(self):
440
Gather hardware information and print it to logs
442
logging.info('Gathering hardware information...')
443
logging.debug('Networking:\n'
448
.format(network=(Command('lspci | grep Network')
450
ethernet=(Command('lspci | grep Ethernet')
452
ifconfig=(Command("ifconfig -a | grep -A1 '^\w'")
454
iwconfig=(Command("iwconfig | grep -A1 '^\w'")
456
logging.debug('Bluetooth Device:\n'
458
.format(hciconfig=(Command("hciconfig -a "
461
logging.debug('Video Card:\n'
463
.format(lspci=Command('lspci | grep VGA').run().stdout))
464
logging.debug('Touchpad and Keyboard:\n'
466
.format(xinput=Command(
467
'xinput list --name-only | sort').run().stdout))
468
logging.debug('Pulse Audio Sink:\n'
470
.format(pactl_sink=(Command('pactl list | grep Sink')
472
logging.debug('Pulse Audio Source:\n'
474
.format(pactl_source=(Command('pactl list | grep Source')
477
# Check kernel logs using firmware test suite
478
command = Command('fwts -r stdout klog oops').run()
479
if command.process.returncode != 0:
480
# Don't abort the test loop,
481
# errors can be retrieved by pm_log_check
482
logging.error('Problem found in logs by fwts')
485
class MessageDialog(object):
487
Simple wrapper aroung Gtk.MessageDialog
489
def __init__(self, title, message, type=Gtk.MessageType.INFO):
491
self.message = message
495
dialog = Gtk.MessageDialog(buttons=Gtk.ButtonsType.OK,
496
message_format=self.message,
498
logging.info(self.message)
499
dialog.set_title(self.title)
504
class AutoLoginConfigurator(object):
506
Enable/disable autologin configuration
507
to make sure that reboot test will work properly
509
CONFIG_FILENAME = '/etc/lightdm/lightdm.conf'
512
greeter-session=unity-greeter
514
autologin-user={username}
515
autologin-user-timeout=0
518
def __init__(self, user=None):
523
Make sure user will autologin in next reboot
525
logging.debug('Enabling autologin for this user...')
526
if os.path.exists(self.CONFIG_FILENAME):
527
for backup_filename in self.generate_backup_filename():
528
if not os.path.exists(backup_filename):
529
shutil.copyfile(self.CONFIG_FILENAME, backup_filename)
530
shutil.copystat(self.CONFIG_FILENAME, backup_filename)
533
with open(self.CONFIG_FILENAME, 'w') as f:
534
f.write(self.TEMPLATE.format(username=self.user))
538
Remove latest configuration file
539
and use the same configuration that was in place
540
before running the test
542
logging.debug('Restoring autologin configuration...')
543
backup_filename = None
544
for filename in self.generate_backup_filename():
545
if not os.path.exists(filename):
547
backup_filename = filename
550
shutil.copy(backup_filename, self.CONFIG_FILENAME)
551
os.remove(backup_filename)
553
os.remove(self.CONFIG_FILENAME)
555
def generate_backup_filename(self):
556
backup_filename = self.CONFIG_FILENAME + '.bak'
557
yield backup_filename
562
backup_filename = (self.CONFIG_FILENAME
563
+ '.bak.{0}'.format(index))
564
yield backup_filename
567
class SudoersConfigurator(object):
569
Enable/disable reboot test to be executed as root
570
to make sure that reboot test works properly
572
MARK = '# Automatically added by pm.py'
573
SUDOERS = '/etc/sudoers'
575
def __init__(self, user=None):
580
Make sure that user will be allowed to execute reboot test as root
582
logging.debug('Enabling user to execute test as root...')
583
command = ("sed -i -e '$a{mark}\\n"
584
"{user} ALL=NOPASSWD: /usr/bin/python3' "
585
"{filename}".format(mark=self.MARK,
587
script=os.path.realpath(__file__),
588
filename=self.SUDOERS))
590
Command(command, verbose=False).run()
594
Revert sudoers configuration changes
596
logging.debug('Restoring sudoers configuration...')
597
command = (("sed -i -e '/{mark}/,+1d' "
599
.format(mark=self.MARK,
600
filename=self.SUDOERS))
601
Command(command, verbose=False).run()
604
class AutoStartFile(object):
606
Generate autostart file contents and write it to proper location
610
Name={pm_operation} test
611
Comment=Verify {pm_operation} works properly
612
Exec=sudo /usr/bin/python3 {script} -r {repetitions} -w {wakeup} --hardware-delay {hardware_delay} --pm-delay {pm_delay} --min-pm-time {min_pm_time} --max-pm-time {max_pm_time} --append --total {total} --start {start} --pm-timestamp {pm_timestamp} {silent} --log-level={log_level} --log-dir={log_dir} --suspends-before-reboot={suspend_cycles} {fwts} {pm_operation}
614
X-GNOME-Autostart-enabled=true
618
def __init__(self, args, user=None):
622
# Generate desktop filename
623
# based on environment variables
625
default_config_directory = os.path.expanduser('~{0}/.config'
627
config_directory = os.getenv('XDG_CONFIG_HOME',
628
default_config_directory)
629
autostart_directory = os.path.join(config_directory, 'autostart')
630
if not os.path.exists(autostart_directory):
631
os.makedirs(autostart_directory)
632
user_id = os.getenv('PKEXEC_UID') or os.getenv('SUDO_UID')
633
group_id = os.getenv('PKEXEC_UID') or os.getenv('SUDO_GID')
635
os.chown(config_directory, int(user_id), int(group_id))
636
os.chown(autostart_directory, int(user_id), int(group_id))
638
basename = '{0}.desktop'.format(os.path.basename(__file__))
639
self.desktop_filename = os.path.join(autostart_directory,
644
Write autostart file to execute the script on startup
646
logging.debug('Writing desktop file ({0!r})...'
647
.format(self.desktop_filename))
649
contents = (self.TEMPLATE
650
.format(script=os.path.realpath(__file__),
651
repetitions=self.args.repetitions - 1,
652
wakeup=self.args.wakeup,
653
hardware_delay=self.args.hardware_delay,
654
pm_delay=self.args.pm_delay,
655
min_pm_time=self.args.min_pm_time,
656
max_pm_time=self.args.max_pm_time,
657
total=self.args.total,
658
start=self.args.start,
659
pm_timestamp=int(time()),
660
silent='--silent' if self.args.silent else '',
661
log_level=self.args.log_level_str,
662
log_dir=self.args.log_dir,
663
fwts='--fwts' if self.args.fwts else '',
664
suspend_cycles=self.args.suspends_before_reboot,
665
pm_operation=self.args.pm_operation))
666
logging.debug(contents)
668
with open(self.desktop_filename, 'w') as f:
673
Remove autostart file to avoid executing the script on startup
675
if os.path.exists(self.desktop_filename):
676
logging.debug('Removing desktop file ({0!r})...'
677
.format(self.desktop_filename))
678
os.remove(self.desktop_filename)
681
class LoggingConfiguration(object):
683
def set(cls, log_level, log_filename, append):
685
Configure a rotating file logger
687
logger = logging.getLogger()
688
logger.setLevel(logging.DEBUG)
690
# Log to sys.stderr using log level passed through command line
691
if log_level != logging.NOTSET:
692
log_handler = logging.StreamHandler()
693
formatter = logging.Formatter('%(levelname)-8s %(message)s')
694
log_handler.setFormatter(formatter)
695
log_handler.setLevel(log_level)
696
logger.addHandler(log_handler)
698
# Log to rotating file using DEBUG log level
699
log_handler = logging.handlers.RotatingFileHandler(log_filename,
702
formatter = logging.Formatter('%(asctime)s %(levelname)-8s '
704
log_handler.setFormatter(formatter)
705
log_handler.setLevel(logging.DEBUG)
706
logger.addHandler(log_handler)
709
# Create a new log file on every new
710
# (i.e. not scheduled) invocation
711
log_handler.doRollover()
714
class MyArgumentParser(object):
716
Command-line argument parser
722
pm_operations = ('poweroff', 'reboot')
723
description = 'Run power management operation as many times as needed'
724
epilog = ('Unknown arguments will be passed '
725
'to the underlying command: poweroff or reboot.')
726
parser = ArgumentParser(description=description, epilog=epilog)
727
parser.add_argument('-r', '--repetitions', type=int, default=1,
728
help=('Number of times that the power management '
729
'operation has to be repeated '
730
'(%(default)s by default)'))
731
parser.add_argument('-w', '--wakeup', type=int, default=60,
732
help=('Timeout in seconds for the wakeup alarm '
733
'(%(default)s by default). '
734
"Note: wakeup alarm won't be scheduled "
736
parser.add_argument('--min-pm-time', dest='min_pm_time',
738
help=('Minimum time in seconds that '
739
'it should take the power management '
740
'operation each cycle (0 for reboot and '
741
'wakeup time minus two seconds '
742
'for the other power management operations '
744
parser.add_argument('--max-pm-time', dest='max_pm_time',
745
type=int, default=300,
746
help=('Maximum time in seconds '
747
'that it should take '
748
'the power management operation each cycle '
749
'(%(default)s by default)'))
750
parser.add_argument('--pm-delay', dest='pm_delay',
752
help=('Delay in seconds '
753
'after hardware information '
754
'has been gathered and before executing '
755
'the power management operation '
756
'(%(default)s by default)'))
757
parser.add_argument('--hardware-delay', dest='hardware_delay',
758
type=int, default=30,
759
help=('Delay in seconds before gathering hardware '
760
'information (%(default)s by default)'))
761
parser.add_argument('--silent', action='store_true',
762
help=("Don't display any dialog "
763
'when test is complete '
764
'to let the script be used '
765
'in automated tests'))
766
log_levels = ['notset', 'debug', 'info',
767
'warning', 'error', 'critical']
768
parser.add_argument('--log-level', dest='log_level_str',
769
default='info', choices=log_levels,
771
'One of {0} or {1} (%(default)s by default)'
772
.format(', '.join(log_levels[:-1]),
774
parser.add_argument('--log-dir', dest='log_dir', default='/var/log',
775
help=('Path to the directory to store log files'))
776
parser.add_argument('pm_operation', choices=pm_operations,
777
help=('Power management operation to be performed '
778
'(one of {0} or {1!r})'
779
.format(', '.join(map(repr,
780
pm_operations[:-1])),
784
parser.add_argument('--start', type=int, default=0, help=SUPPRESS)
785
parser.add_argument('--pm-timestamp', dest='pm_timestamp',
786
type=int, default=0, help=SUPPRESS)
788
# Append to log on subsequent startups
789
parser.add_argument('--append', action='store_true',
790
default=False, help=SUPPRESS)
792
# Total number of iterations initially passed through the command line
793
parser.add_argument('--total', type=int, default=0, help=SUPPRESS)
795
# suspend cycles before reboot
796
parser.add_argument('--suspends-before-reboot', type=int, default=0,
797
help=('How many cycles of suspend/resume to run'
798
'before each reboot or poweroff'))
800
# use fwts for suspend tests
801
parser.add_argument('--fwts', action='store_true', help=('Use fwts '
802
'when doing the suspend tests'))
807
Parse command-line arguments
809
args, extra_args = self.parser.parse_known_args()
810
args.log_level = getattr(logging, args.log_level_str.upper())
812
# Total number of repetitions
813
# is the number of repetitions passed through the command line
814
# the first time the script is executed
816
args.total = args.repetitions
818
# Test start time automatically set on first iteration
820
args.start = int(time())
822
# Wakeup time set to 0 for 'reboot'
823
# since wakeup alarm won't be scheduled
824
if args.pm_operation == 'reboot':
828
# Minimum time for each power management operation
829
# is set to the wakeup time
830
if not args.min_pm_time:
831
min_pm_time = args.wakeup - 2
834
args.min_pm_time = min_pm_time
836
# Log filename shows clearly the type of test (pm_operation)
837
# and the times it was repeated (repetitions)
838
args.log_filename = os.path.join(args.log_dir,
840
.format(os.path.basename(__file__),
843
return args, extra_args
846
if __name__ == '__main__':