16
16
test_source = 'coreutils'
18
18
required_fields = ['ProblemType', 'CoreDump', 'Date', 'ExecutablePath',
19
'ProcCmdline', 'ProcEnviron', 'ProcMaps', 'Signal', 'UserGroups']
19
'ProcCmdline', 'ProcEnviron', 'ProcMaps', 'Signal',
21
22
ifpath = os.path.expanduser(apport.report._ignore_file)
42
43
# expected report name for test executable report
43
self.test_report = os.path.join(apport.fileutils.report_dir,
44
'%s.%i.crash' % (test_executable.replace('/', '_'), os.getuid()))
44
self.test_report = os.path.join(
45
apport.fileutils.report_dir, '%s.%i.crash' %
46
(test_executable.replace('/', '_'), os.getuid()))
46
48
def tearDown(self):
47
49
shutil.rmtree(self.report_dir)
65
67
test_proc = self.create_test_process()
67
69
app = subprocess.Popen([apport_path, str(test_proc), '42', '0'],
68
close_fds=True, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
70
stdin=subprocess.PIPE, stderr=subprocess.PIPE)
70
72
assert app.wait() == 0, app.stderr.read()
82
84
# check crash report
83
85
self.assertEqual(apport.fileutils.get_all_reports(), [self.test_report])
84
86
st = os.stat(self.test_report)
85
self.assertEqual(stat.S_IMODE(st.st_mode), 0o640,
86
'report has correct permissions')
87
self.assertEqual(stat.S_IMODE(st.st_mode), 0o640, 'report has correct permissions')
88
89
# a subsequent crash does not alter unseen report
100
101
with open(self.test_report, 'rb') as f:
102
103
self.assertTrue(set(required_fields).issubset(set(pr.keys())),
103
'report has required fields')
104
'report has required fields')
104
105
self.assertEqual(pr['ExecutablePath'], test_executable)
105
106
self.assertEqual(pr['ProcCmdline'], test_executable)
106
107
self.assertEqual(pr['Signal'], '%i' % signal.SIGSEGV)
108
109
# check safe environment subset
109
110
allowed_vars = ['SHELL', 'PATH', 'LANGUAGE', 'LANG', 'LC_CTYPE',
110
'LC_COLLATE', 'LC_TIME', 'LC_NUMERIC', 'LC_MONETARY', 'LC_MESSAGES',
111
'LC_PAPER', 'LC_NAME', 'LC_ADDRESS', 'LC_TELEPHONE', 'LC_MEASUREMENT',
112
'LC_IDENTIFICATION', 'LOCPATH', 'TERM']
111
'LC_COLLATE', 'LC_TIME', 'LC_NUMERIC', 'LC_MONETARY',
112
'LC_MESSAGES', 'LC_PAPER', 'LC_NAME', 'LC_ADDRESS',
113
'LC_TELEPHONE', 'LC_MEASUREMENT', 'LC_IDENTIFICATION',
114
116
for l in pr['ProcEnviron'].splitlines():
115
117
(k, v) = l.split('=', 1)
116
118
self.assertTrue(k in allowed_vars,
117
'report contains sensitive environment variable %s' % k)
119
'report contains sensitive environment variable %s' % k)
119
121
# UserGroups only has system groups
120
122
for g in pr['UserGroups'].split():
121
123
self.assertLess(grp.getgrnam(g).gr_gid, 500)
123
125
self.assertFalse('root' in pr['UserGroups'],
124
'collected system groups are not those from root')
126
'collected system groups are not those from root')
126
128
def test_parallel_crash(self):
127
129
'''only one apport instance is ran at a time'''
130
132
test_proc2 = self.create_test_process(False, '/bin/dd')
132
134
app = subprocess.Popen([apport_path, str(test_proc), '42', '0'],
133
close_fds=True, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
135
stdin=subprocess.PIPE, stderr=subprocess.PIPE)
135
137
time.sleep(0.5) # give it some time to grab the lock
137
139
app2 = subprocess.Popen([apport_path, str(test_proc2), '42', '0'],
138
close_fds=True, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
140
stdin=subprocess.PIPE, stderr=subprocess.PIPE)
140
142
# app should wait indefinitely for stdin, while app2 should terminate
141
143
# immediately (give it 5 seconds)
179
181
app = subprocess.Popen([apport_path, str(test_proc), '42', '0'],
180
close_fds=True, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
182
stdin=subprocess.PIPE, stderr=subprocess.PIPE)
181
183
app.stdin.write(b'boo')
182
184
app.stdin.close()
363
365
test_proc = self.create_test_process()
365
367
app = subprocess.Popen([apport_path, str(test_proc), '42', '0'],
366
close_fds=True, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
368
stdin=subprocess.PIPE, stderr=subprocess.PIPE)
367
369
# pipe an entire total memory size worth of spaces into it, which must be
368
370
# bigger than the 'usable' memory size. apport should digest that and the
369
371
# report should not have a core dump; NB that this should error out
441
443
os.utime(myexe, None)
443
445
app = subprocess.Popen([apport_path, str(test_proc), '42', '0'],
444
close_fds=True, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
446
stdin=subprocess.PIPE, stderr=subprocess.PIPE)
445
447
app.stdin.write(b'boo')
446
448
app.stdin.close()
447
449
err = app.stderr.read().decode()
463
465
env = os.environ.copy()
464
466
env['APPORT_LOG_FILE'] = log
465
467
app = subprocess.Popen([apport_path, str(test_proc), '42', '0'],
466
close_fds=True, stdin=subprocess.PIPE, env=env,
468
stdin=subprocess.PIPE, env=env,
467
469
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
468
470
universal_newlines=True)
469
471
(out, err) = app.communicate(b'hel\x01lo')
500
502
env = os.environ.copy()
501
503
env['APPORT_LOG_FILE'] = '/not/existing/apport.log'
502
504
app = subprocess.Popen([apport_path, str(test_proc), '42', '0'],
503
close_fds=True, stdin=subprocess.PIPE, env=env,
505
stdin=subprocess.PIPE, env=env,
504
506
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
505
507
universal_newlines=True)
506
508
(out, err) = app.communicate(b'hel\x01lo')
594
596
self.assertGreater(timeout, 0)
595
597
if check_running:
596
598
self.assertEqual(subprocess.call(['pidof', command]), 1,
597
'no running test executable processes')
599
'no running test executable processes')
599
601
if expect_corefile:
600
602
self.assertTrue(os.path.exists('/tmp/core'), 'leaves wanted core file')
602
604
# check that core file is valid
603
605
gdb = subprocess.Popen(['gdb', '--batch', '--ex', 'bt',
604
command, '/tmp/core'], stdout=subprocess.PIPE,
605
stderr=subprocess.PIPE)
606
command, '/tmp/core'],
607
stdout=subprocess.PIPE,
608
stderr=subprocess.PIPE)
606
609
(out, err) = gdb.communicate()
607
610
self.assertEqual(gdb.returncode, 0)
608
611
out = out.decode()
634
637
self.assertGreater(len(r['CoreDump']), 5000)
636
639
self.assertTrue('\n#2' in r.get('Stacktrace', ''),
637
r.get('Stacktrace', 'no Stacktrace field'))
640
r.get('Stacktrace', 'no Stacktrace field'))