~apport-hackers/apport/trunk

« back to all changes in this revision

Viewing changes to test/test_signal_crashes.py

  • Committer: Brian Murray
  • Date: 2017-11-15 20:07:23 UTC
  • Revision ID: brian@canonical.com-20171115200723-wi8z83t8ewf44xzg
SECURITY_UPDATEs for CVE-2017-14177 and CVE-2017-14180

Show diffs side-by-side

added added

removed removed

Lines of Context:
8
8
# the full text of the license.
9
9
 
10
10
import tempfile, shutil, os, subprocess, signal, time, stat, sys
11
 
import resource, errno, grp, unittest, socket, array
 
11
import resource, errno, grp, unittest, socket, array, pwd
12
12
import apport.fileutils
13
13
 
14
14
test_executable = '/usr/bin/yes'
89
89
 
90
90
        test_proc = self.create_test_process()
91
91
        try:
92
 
            app = subprocess.Popen([apport_path, str(test_proc), '42', '0'],
 
92
            app = subprocess.Popen([apport_path, str(test_proc), '42', '0', '1'],
93
93
                                   stdin=subprocess.PIPE, stderr=subprocess.PIPE)
94
94
            app.stdin.close()
95
95
            assert app.wait() == 0, app.stderr.read()
156
156
        test_proc = self.create_test_process()
157
157
        test_proc2 = self.create_test_process(False, '/bin/dd')
158
158
        try:
159
 
            app = subprocess.Popen([apport_path, str(test_proc), '42', '0'],
 
159
            app = subprocess.Popen([apport_path, str(test_proc), '42', '0', '1'],
160
160
                                   stdin=subprocess.PIPE, stderr=subprocess.PIPE)
161
161
 
162
162
            time.sleep(0.5)  # give it some time to grab the lock
163
163
 
164
 
            app2 = subprocess.Popen([apport_path, str(test_proc2), '42', '0'],
 
164
            app2 = subprocess.Popen([apport_path, str(test_proc2), '42', '0', '1'],
165
165
                                    stdin=subprocess.PIPE, stderr=subprocess.PIPE)
166
166
 
167
167
            # app should wait indefinitely for stdin, while app2 should terminate
205
205
        test_proc = self.create_test_process()
206
206
 
207
207
        try:
208
 
            app = subprocess.Popen([apport_path, str(test_proc), '42', '0'],
 
208
            app = subprocess.Popen([apport_path, str(test_proc), '42', '0', '1'],
209
209
                                   stdin=subprocess.PIPE, stderr=subprocess.PIPE)
210
210
            app.stdin.write(b'boo')
211
211
            app.stdin.close()
446
446
 
447
447
        test_proc = self.create_test_process()
448
448
        try:
449
 
            app = subprocess.Popen([apport_path, str(test_proc), '42', '0'],
 
449
            app = subprocess.Popen([apport_path, str(test_proc), '42', '0', '1'],
450
450
                                   stdin=subprocess.PIPE, stderr=subprocess.PIPE)
451
451
            # pipe an entire total memory size worth of spaces into it, which must be
452
452
            # bigger than the 'usable' memory size. apport should digest that and the
528
528
            time.sleep(1.1)
529
529
            os.utime(myexe, None)
530
530
 
531
 
            app = subprocess.Popen([apport_path, str(test_proc), '42', '0'],
 
531
            app = subprocess.Popen([apport_path, str(test_proc), '42', '0', '1'],
532
532
                                   stdin=subprocess.PIPE, stderr=subprocess.PIPE)
533
533
            err = app.communicate(b'foo')[1]
534
534
            self.assertEqual(app.returncode, 0, err)
552
552
        try:
553
553
            env = os.environ.copy()
554
554
            env['APPORT_LOG_FILE'] = log
555
 
            app = subprocess.Popen([apport_path, str(test_proc), '42', '0'],
 
555
            app = subprocess.Popen([apport_path, str(test_proc), '42', '0', '1'],
556
556
                                   stdin=subprocess.PIPE, env=env,
557
557
                                   stdout=subprocess.PIPE,
558
558
                                   stderr=subprocess.PIPE)
589
589
        try:
590
590
            env = os.environ.copy()
591
591
            env['APPORT_LOG_FILE'] = '/not/existing/apport.log'
592
 
            app = subprocess.Popen([apport_path, str(test_proc), '42', '0'],
 
592
            app = subprocess.Popen([apport_path, str(test_proc), '42', '0', '1'],
593
593
                                   stdin=subprocess.PIPE, env=env,
594
594
                                   stdout=subprocess.PIPE,
595
595
                                   stderr=subprocess.PIPE)
678
678
                          uid=8)
679
679
            self.assertEqual(apport.fileutils.get_all_reports(), [])
680
680
 
 
681
    @unittest.skipUnless(os.path.exists('/bin/ping'), 'this test needs /bin/ping')
 
682
    @unittest.skipIf(os.geteuid() != 0, 'this test needs to be run as root')
 
683
    def test_crash_setuid_drop_and_kill(self):
 
684
        '''process started by root as another user, killed by that user no core'''
 
685
        # override expected report file name
 
686
        self.test_report = os.path.join(
 
687
            apport.fileutils.report_dir, '%s.%i.crash' %
 
688
            ('_usr_bin_crontab', os.getuid()))
 
689
        # edit crontab as user "mail"
 
690
        resource.setrlimit(resource.RLIMIT_CORE, (-1, -1))
 
691
 
 
692
        if suid_dumpable:
 
693
            user = pwd.getpwuid(8)
 
694
            # if a user can crash a suid root binary, it should not create core files
 
695
            orig_editor = os.getenv('EDITOR')
 
696
            os.environ['EDITOR'] = '/usr/bin/yes'
 
697
            self.do_crash(command='/usr/bin/crontab', args=['-e', '-u', user[0]],
 
698
                          expect_corefile=False, core_location='/var/spool/cron/',
 
699
                          killer_id=8)
 
700
            if orig_editor is not None:
 
701
                os.environ['EDITOR'] = orig_editor
 
702
 
 
703
            # check crash report
 
704
            reports = apport.fileutils.get_all_reports()
 
705
            self.assertEqual(len(reports), 1)
 
706
            report = reports[0]
 
707
            st = os.stat(report)
 
708
            os.unlink(report)
 
709
            self.assertEqual(stat.S_IMODE(st.st_mode), 0o640, 'report has correct permissions')
 
710
            # this must be owned by root as it is a setuid binary
 
711
            self.assertEqual(st.st_uid, 0, 'report has correct owner')
 
712
        else:
 
713
            # no cores/dump if suid_dumpable == 0
 
714
            self.do_crash(False, command='/bin/ping', args=['127.0.0.1'],
 
715
                          uid=8)
 
716
            self.assertEqual(apport.fileutils.get_all_reports(), [])
 
717
 
681
718
    @unittest.skipIf(os.geteuid() != 0, 'this test needs to be run as root')
682
719
    def test_crash_setuid_unpackaged(self):
683
720
        '''report generation for unpackaged setuid program'''
762
799
                    fd.write(b'hel\x01lo')
763
800
                    fd.flush()
764
801
                    fd.seek(0)
765
 
                    args = '%s 11 0' % test_proc
 
802
                    args = '%s 11 0 1' % test_proc
766
803
                    fd_msg = (socket.SOL_SOCKET, socket.SCM_RIGHTS, array.array('i', [fd.fileno()]))
767
804
                    client.sendmsg([args.encode()], [fd_msg])
768
805
                os._exit(0)
839
876
    def do_crash(self, expect_coredump=True, expect_corefile=False,
840
877
                 sig=signal.SIGSEGV, check_running=True, sleep=0,
841
878
                 command=test_executable, uid=None,
842
 
                 expect_corefile_owner=None, args=[]):
 
879
                 expect_corefile_owner=None,
 
880
                 core_location=None,
 
881
                 killer_id=False, args=[]):
843
882
        '''Generate a test crash.
844
883
 
845
884
        This runs command (by default test_executable) in cwd, lets it crash,
854
893
        pid = self.create_test_process(check_running, command, uid=uid, args=args)
855
894
        if sleep > 0:
856
895
            time.sleep(sleep)
857
 
        os.kill(pid, sig)
 
896
        if killer_id:
 
897
            user = pwd.getpwuid(killer_id)
 
898
            # testing different editors via VISUAL= didn't help
 
899
            kill = subprocess.Popen(['sudo', '-s', '/bin/bash', '-c',
 
900
                                     "/bin/kill -s %i %s" % (sig, pid),
 
901
                                     '-u', user[0]])  # 'mail'])
 
902
            kill.communicate()
 
903
            # need to clean up system state
 
904
            if command == '/usr/bin/crontab':
 
905
                os.system('stty sane')
 
906
            if kill.returncode != 0:
 
907
                self.fail("Couldn't kill process %s as user %s." %
 
908
                          (pid, user[0]))
 
909
        else:
 
910
            os.kill(pid, sig)
858
911
        # wait max 5 seconds for the process to die
859
912
        timeout = 50
860
913
        while timeout >= 0:
867
920
            os.kill(pid, signal.SIGKILL)
868
921
            os.waitpid(pid, 0)
869
922
            self.fail('test process does not die on signal %i' % sig)
870
 
 
 
923
        if command == '/usr/bin/crontab':
 
924
            subprocess.Popen(['sudo', '-s', '/bin/bash', '-c',
 
925
                              "/usr/bin/pkill -9 -f crontab",
 
926
                              '-u', 'mail'])
871
927
        self.assertFalse(os.WIFEXITED(result), 'test process did not exit normally')
872
928
        self.assertTrue(os.WIFSIGNALED(result), 'test process died due to signal')
873
929
        self.assertEqual(os.WCOREDUMP(result), expect_coredump)
888
944
            self.assertEqual(subprocess.call(['pidof', command]), 1,
889
945
                             'no running test executable processes')
890
946
 
 
947
        core_path = '%s/' % os.getcwd()
 
948
        if core_location:
 
949
            core_path = '%s/' % core_location
 
950
        core_path += 'core'
891
951
        if expect_corefile:
892
 
            self.assertTrue(os.path.exists('core'), 'leaves wanted core file')
 
952
            self.assertTrue(os.path.exists(core_path), 'leaves wanted core file')
893
953
            try:
894
954
                # check core file permissions
895
 
                st = os.stat('core')
 
955
                st = os.stat(core_path)
896
956
                self.assertEqual(stat.S_IMODE(st.st_mode), 0o600, 'core file has correct permissions')
897
957
                if expect_corefile_owner is not None:
898
958
                    self.assertEqual(st.st_uid, expect_corefile_owner, 'core file has correct owner')
899
959
 
900
960
                # check that core file is valid
901
961
                gdb = subprocess.Popen(['gdb', '--batch', '--ex', 'bt',
902
 
                                        command, 'core'],
 
962
                                        command, core_path],
903
963
                                       stdout=subprocess.PIPE,
904
964
                                       stderr=subprocess.PIPE)
905
965
                (out, err) = gdb.communicate()
907
967
                out = out.decode()
908
968
                err = err.decode().strip()
909
969
            finally:
910
 
                os.unlink('core')
 
970
                os.unlink(core_path)
911
971
        else:
912
 
            if os.path.exists('core'):
 
972
            if os.path.exists(core_path):
913
973
                try:
914
 
                    os.unlink('core')
 
974
                    os.unlink(core_path)
915
975
                except OSError as e:
916
976
                    sys.stderr.write(
917
 
                        'WARNING: cannot clean up core file %s/core: %s\n' %
918
 
                        (os.getcwd(), str(e)))
 
977
                        'WARNING: cannot clean up core file %s: %s\n' %
 
978
                        (core_path, str(e)))
919
979
 
920
980
                self.fail('leaves unexpected core file behind')
921
981