8
8
# the full text of the license.
10
10
import tempfile, shutil, os, subprocess, signal, time, stat, sys
11
import resource, errno, grp, unittest, socket, array
11
import resource, errno, grp, unittest, socket, array, pwd
12
12
import apport.fileutils
14
14
test_executable = '/usr/bin/yes'
90
90
test_proc = self.create_test_process()
92
app = subprocess.Popen([apport_path, str(test_proc), '42', '0'],
92
app = subprocess.Popen([apport_path, str(test_proc), '42', '0', '1'],
93
93
stdin=subprocess.PIPE, stderr=subprocess.PIPE)
95
95
assert app.wait() == 0, app.stderr.read()
156
156
test_proc = self.create_test_process()
157
157
test_proc2 = self.create_test_process(False, '/bin/dd')
159
app = subprocess.Popen([apport_path, str(test_proc), '42', '0'],
159
app = subprocess.Popen([apport_path, str(test_proc), '42', '0', '1'],
160
160
stdin=subprocess.PIPE, stderr=subprocess.PIPE)
162
162
time.sleep(0.5) # give it some time to grab the lock
164
app2 = subprocess.Popen([apport_path, str(test_proc2), '42', '0'],
164
app2 = subprocess.Popen([apport_path, str(test_proc2), '42', '0', '1'],
165
165
stdin=subprocess.PIPE, stderr=subprocess.PIPE)
167
167
# app should wait indefinitely for stdin, while app2 should terminate
205
205
test_proc = self.create_test_process()
208
app = subprocess.Popen([apport_path, str(test_proc), '42', '0'],
208
app = subprocess.Popen([apport_path, str(test_proc), '42', '0', '1'],
209
209
stdin=subprocess.PIPE, stderr=subprocess.PIPE)
210
210
app.stdin.write(b'boo')
211
211
app.stdin.close()
447
447
test_proc = self.create_test_process()
449
app = subprocess.Popen([apport_path, str(test_proc), '42', '0'],
449
app = subprocess.Popen([apport_path, str(test_proc), '42', '0', '1'],
450
450
stdin=subprocess.PIPE, stderr=subprocess.PIPE)
451
451
# pipe an entire total memory size worth of spaces into it, which must be
452
452
# bigger than the 'usable' memory size. apport should digest that and the
529
529
os.utime(myexe, None)
531
app = subprocess.Popen([apport_path, str(test_proc), '42', '0'],
531
app = subprocess.Popen([apport_path, str(test_proc), '42', '0', '1'],
532
532
stdin=subprocess.PIPE, stderr=subprocess.PIPE)
533
533
err = app.communicate(b'foo')[1]
534
534
self.assertEqual(app.returncode, 0, err)
553
553
env = os.environ.copy()
554
554
env['APPORT_LOG_FILE'] = log
555
app = subprocess.Popen([apport_path, str(test_proc), '42', '0'],
555
app = subprocess.Popen([apport_path, str(test_proc), '42', '0', '1'],
556
556
stdin=subprocess.PIPE, env=env,
557
557
stdout=subprocess.PIPE,
558
558
stderr=subprocess.PIPE)
590
590
env = os.environ.copy()
591
591
env['APPORT_LOG_FILE'] = '/not/existing/apport.log'
592
app = subprocess.Popen([apport_path, str(test_proc), '42', '0'],
592
app = subprocess.Popen([apport_path, str(test_proc), '42', '0', '1'],
593
593
stdin=subprocess.PIPE, env=env,
594
594
stdout=subprocess.PIPE,
595
595
stderr=subprocess.PIPE)
679
679
self.assertEqual(apport.fileutils.get_all_reports(), [])
681
@unittest.skipUnless(os.path.exists('/bin/ping'), 'this test needs /bin/ping')
682
@unittest.skipIf(os.geteuid() != 0, 'this test needs to be run as root')
683
def test_crash_setuid_drop_and_kill(self):
684
'''process started by root as another user, killed by that user no core'''
685
# override expected report file name
686
self.test_report = os.path.join(
687
apport.fileutils.report_dir, '%s.%i.crash' %
688
('_usr_bin_crontab', os.getuid()))
689
# edit crontab as user "mail"
690
resource.setrlimit(resource.RLIMIT_CORE, (-1, -1))
693
user = pwd.getpwuid(8)
694
# if a user can crash a suid root binary, it should not create core files
695
orig_editor = os.getenv('EDITOR')
696
os.environ['EDITOR'] = '/usr/bin/yes'
697
self.do_crash(command='/usr/bin/crontab', args=['-e', '-u', user[0]],
698
expect_corefile=False, core_location='/var/spool/cron/',
700
if orig_editor is not None:
701
os.environ['EDITOR'] = orig_editor
704
reports = apport.fileutils.get_all_reports()
705
self.assertEqual(len(reports), 1)
709
self.assertEqual(stat.S_IMODE(st.st_mode), 0o640, 'report has correct permissions')
710
# this must be owned by root as it is a setuid binary
711
self.assertEqual(st.st_uid, 0, 'report has correct owner')
713
# no cores/dump if suid_dumpable == 0
714
self.do_crash(False, command='/bin/ping', args=['127.0.0.1'],
716
self.assertEqual(apport.fileutils.get_all_reports(), [])
681
718
@unittest.skipIf(os.geteuid() != 0, 'this test needs to be run as root')
682
719
def test_crash_setuid_unpackaged(self):
683
720
'''report generation for unpackaged setuid program'''
762
799
fd.write(b'hel\x01lo')
765
args = '%s 11 0' % test_proc
802
args = '%s 11 0 1' % test_proc
766
803
fd_msg = (socket.SOL_SOCKET, socket.SCM_RIGHTS, array.array('i', [fd.fileno()]))
767
804
client.sendmsg([args.encode()], [fd_msg])
839
876
def do_crash(self, expect_coredump=True, expect_corefile=False,
840
877
sig=signal.SIGSEGV, check_running=True, sleep=0,
841
878
command=test_executable, uid=None,
842
expect_corefile_owner=None, args=[]):
879
expect_corefile_owner=None,
881
killer_id=False, args=[]):
843
882
'''Generate a test crash.
845
884
This runs command (by default test_executable) in cwd, lets it crash,
854
893
pid = self.create_test_process(check_running, command, uid=uid, args=args)
856
895
time.sleep(sleep)
897
user = pwd.getpwuid(killer_id)
898
# testing different editors via VISUAL= didn't help
899
kill = subprocess.Popen(['sudo', '-s', '/bin/bash', '-c',
900
"/bin/kill -s %i %s" % (sig, pid),
901
'-u', user[0]]) # 'mail'])
903
# need to clean up system state
904
if command == '/usr/bin/crontab':
905
os.system('stty sane')
906
if kill.returncode != 0:
907
self.fail("Couldn't kill process %s as user %s." %
858
911
# wait max 5 seconds for the process to die
860
913
while timeout >= 0:
867
920
os.kill(pid, signal.SIGKILL)
868
921
os.waitpid(pid, 0)
869
922
self.fail('test process does not die on signal %i' % sig)
923
if command == '/usr/bin/crontab':
924
subprocess.Popen(['sudo', '-s', '/bin/bash', '-c',
925
"/usr/bin/pkill -9 -f crontab",
871
927
self.assertFalse(os.WIFEXITED(result), 'test process did not exit normally')
872
928
self.assertTrue(os.WIFSIGNALED(result), 'test process died due to signal')
873
929
self.assertEqual(os.WCOREDUMP(result), expect_coredump)
888
944
self.assertEqual(subprocess.call(['pidof', command]), 1,
889
945
'no running test executable processes')
947
core_path = '%s/' % os.getcwd()
949
core_path = '%s/' % core_location
891
951
if expect_corefile:
892
self.assertTrue(os.path.exists('core'), 'leaves wanted core file')
952
self.assertTrue(os.path.exists(core_path), 'leaves wanted core file')
894
954
# check core file permissions
955
st = os.stat(core_path)
896
956
self.assertEqual(stat.S_IMODE(st.st_mode), 0o600, 'core file has correct permissions')
897
957
if expect_corefile_owner is not None:
898
958
self.assertEqual(st.st_uid, expect_corefile_owner, 'core file has correct owner')
900
960
# check that core file is valid
901
961
gdb = subprocess.Popen(['gdb', '--batch', '--ex', 'bt',
903
963
stdout=subprocess.PIPE,
904
964
stderr=subprocess.PIPE)
905
965
(out, err) = gdb.communicate()
907
967
out = out.decode()
908
968
err = err.decode().strip()
912
if os.path.exists('core'):
972
if os.path.exists(core_path):
915
975
except OSError as e:
916
976
sys.stderr.write(
917
'WARNING: cannot clean up core file %s/core: %s\n' %
918
(os.getcwd(), str(e)))
977
'WARNING: cannot clean up core file %s: %s\n' %
920
980
self.fail('leaves unexpected core file behind')