~apport-hackers/apport/trunk

« back to all changes in this revision

Viewing changes to problem_report.py

  • Committer: Martin Pitt
  • Date: 2012-02-23 10:31:55 UTC
  • Revision ID: martin.pitt@canonical.com-20120223103155-m7p4is95x27vzm8n
Move all test suites out of the code modules into test/test_<module>.py. This avoids having to load it every time the program runs, and also allows running the tests against the installed version of Apport.

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
# option) any later version.  See http://www.gnu.org/copyleft/gpl.html for
12
12
# the full text of the license.
13
13
 
14
 
import zlib, base64, time, sys, gzip, struct
 
14
import zlib, base64, time, sys, gzip, struct, os
15
15
from email.encoders import encode_base64
16
16
from email.mime.multipart import MIMEMultipart
17
17
from email.mime.base import MIMEBase
542
542
            offset += 2
543
543
 
544
544
        return line[offset:]
545
 
 
546
 
#
547
 
# Unit test
548
 
#
549
 
 
550
 
import unittest, tempfile, os, email
551
 
 
552
 
class _T(unittest.TestCase):
553
 
    def test_basic_operations(self):
554
 
        '''basic creation and operation.'''
555
 
 
556
 
        pr = ProblemReport()
557
 
        pr['foo'] = 'bar'
558
 
        pr['bar'] = ' foo   bar\nbaz\n   blip  '
559
 
        pr['dash-key'] = '1'
560
 
        pr['dot.key'] = '1'
561
 
        pr['underscore_key'] = '1'
562
 
        self.assertEqual(pr['foo'], 'bar')
563
 
        self.assertEqual(pr['bar'], ' foo   bar\nbaz\n   blip  ')
564
 
        self.assertEqual(pr['ProblemType'], 'Crash')
565
 
        self.assertTrue(time.strptime(pr['Date']))
566
 
        self.assertEqual(pr['dash-key'], '1')
567
 
        self.assertEqual(pr['dot.key'], '1')
568
 
        self.assertEqual(pr['underscore_key'], '1')
569
 
 
570
 
    def test_ctor_arguments(self):
571
 
        '''non-default constructor arguments.'''
572
 
 
573
 
        pr = ProblemReport('KernelCrash')
574
 
        self.assertEqual(pr['ProblemType'], 'KernelCrash')
575
 
        pr = ProblemReport(date = '19801224 12:34')
576
 
        self.assertEqual(pr['Date'], '19801224 12:34')
577
 
 
578
 
    def test_sanity_checks(self):
579
 
        '''various error conditions.'''
580
 
 
581
 
        pr = ProblemReport()
582
 
        self.assertRaises(AssertionError, pr.__setitem__, 'a b', '1')
583
 
        self.assertRaises(AssertionError, pr.__setitem__, 'a', 1)
584
 
        self.assertRaises(AssertionError, pr.__setitem__, 'a', 1)
585
 
        self.assertRaises(AssertionError, pr.__setitem__, 'a', (1,))
586
 
        self.assertRaises(AssertionError, pr.__setitem__, 'a', ('/tmp/nonexistant', ''))
587
 
        self.assertRaises(KeyError, pr.__getitem__, 'Nonexistant')
588
 
 
589
 
    def test_compressed_values(self):
590
 
        '''handling of CompressedValue values.'''
591
 
 
592
 
        large_val = 'A' * 5000000
593
 
 
594
 
        pr = ProblemReport()
595
 
        pr['Foo'] = CompressedValue('FooFoo!')
596
 
        pr['Bin'] = CompressedValue()
597
 
        pr['Bin'].set_value('AB' * 10 + '\0' * 10 + 'Z')
598
 
        pr['Large'] = CompressedValue(large_val)
599
 
 
600
 
        self.assertTrue(isinstance(pr['Foo'], CompressedValue))
601
 
        self.assertTrue(isinstance(pr['Bin'], CompressedValue))
602
 
        self.assertEqual(pr['Foo'].get_value(), 'FooFoo!')
603
 
        self.assertEqual(pr['Bin'].get_value(), 'AB' * 10 + '\0' * 10 + 'Z')
604
 
        self.assertEqual(pr['Large'].get_value(), large_val)
605
 
        self.assertEqual(len(pr['Foo']), 7)
606
 
        self.assertEqual(len(pr['Bin']), 31)
607
 
        self.assertEqual(len(pr['Large']), len(large_val))
608
 
 
609
 
        io = StringIO()
610
 
        pr['Bin'].write(io)
611
 
        self.assertEqual(io.getvalue(), 'AB' * 10 + '\0' * 10 + 'Z')
612
 
        io = StringIO()
613
 
        pr['Large'].write(io)
614
 
        self.assertEqual(io.getvalue(), large_val)
615
 
 
616
 
        pr['Multiline'] = CompressedValue('\1\1\1\n\2\2\n\3\3\3')
617
 
        self.assertEqual(pr['Multiline'].splitlines(), 
618
 
            ['\1\1\1', '\2\2', '\3\3\3'])
619
 
 
620
 
        # test writing of reports with CompressedValues
621
 
        io = StringIO()
622
 
        pr.write(io)
623
 
        io.seek(0)
624
 
        pr = ProblemReport()
625
 
        pr.load(io)
626
 
        self.assertEqual(pr['Foo'], 'FooFoo!')
627
 
        self.assertEqual(pr['Bin'], 'AB' * 10 + '\0' * 10 + 'Z')
628
 
        self.assertEqual(pr['Large'], large_val)
629
 
 
630
 
    def test_write(self):
631
 
        '''write() and proper formatting.'''
632
 
 
633
 
        pr = ProblemReport(date = 'now!')
634
 
        pr['Simple'] = 'bar'
635
 
        if sys.version.startswith('2'):
636
 
            pr['SimpleUTF8'] = '1äö2Φ3'
637
 
            pr['SimpleUnicode'] = '1äö2Φ3'.decode('UTF-8')
638
 
            pr['TwoLineUnicode'] = 'pi-π\nnu-η'.decode('UTF-8')
639
 
            pr['TwoLineUTF8'] = 'pi-π\nnu-η'.decode('UTF-8')
640
 
        else:
641
 
            pr['SimpleUTF8'] = '1äö2Φ3'.encode('UTF-8')
642
 
            pr['SimpleUnicode'] = '1äö2Φ3'
643
 
            pr['TwoLineUnicode'] = 'pi-π\nnu-η'
644
 
            pr['TwoLineUTF8'] = 'pi-π\nnu-η'
645
 
        pr['WhiteSpace'] = ' foo   bar\nbaz\n  blip  \n\nafteremptyline'
646
 
        io = StringIO()
647
 
        pr.write(io)
648
 
        self.assertEqual(io.getvalue(),
649
 
'''ProblemType: Crash
650
 
Date: now!
651
 
Simple: bar
652
 
SimpleUTF8: 1äö2Φ3
653
 
SimpleUnicode: 1äö2Φ3
654
 
TwoLineUTF8:
655
 
 pi-π
656
 
 nu-η
657
 
TwoLineUnicode:
658
 
 pi-π
659
 
 nu-η
660
 
WhiteSpace:
661
 
  foo   bar
662
 
 baz
663
 
   blip  
664
 
 
665
 
 afteremptyline
666
 
''')
667
 
 
668
 
    def test_write_append(self):
669
 
        '''write() with appending to an existing file.'''
670
 
 
671
 
        pr = ProblemReport(date = 'now!')
672
 
        pr['Simple'] = 'bar'
673
 
        pr['WhiteSpace'] = ' foo   bar\nbaz\n  blip  '
674
 
        io = StringIO()
675
 
        pr.write(io)
676
 
 
677
 
        pr.clear()
678
 
        pr['Extra'] = 'appended'
679
 
        pr.write(io)
680
 
 
681
 
        self.assertEqual(io.getvalue(), 
682
 
'''ProblemType: Crash
683
 
Date: now!
684
 
Simple: bar
685
 
WhiteSpace:
686
 
  foo   bar
687
 
 baz
688
 
   blip  
689
 
Extra: appended
690
 
''')
691
 
 
692
 
        temp = tempfile.NamedTemporaryFile()
693
 
        temp.write('AB' * 10 + '\0' * 10 + 'Z')
694
 
        temp.flush()
695
 
 
696
 
        pr = ProblemReport(date = 'now!')
697
 
        pr['File'] = (temp.name,)
698
 
        io = StringIO()
699
 
        pr.write(io)
700
 
        temp.close()
701
 
 
702
 
        pr.clear()
703
 
        pr['Extra'] = 'appended'
704
 
        pr.write(io)
705
 
 
706
 
        io.seek(0)
707
 
        pr = ProblemReport()
708
 
        pr.load(io)
709
 
 
710
 
        self.assertEqual(pr['Date'], 'now!')
711
 
        self.assertEqual(pr['File'], 'AB' * 10 + '\0' * 10 + 'Z')
712
 
        self.assertEqual(pr['Extra'], 'appended')
713
 
 
714
 
    def test_load(self):
715
 
        '''load() with various formatting.'''
716
 
 
717
 
        pr = ProblemReport()
718
 
        pr.load(StringIO(
719
 
'''ProblemType: Crash
720
 
Date: now!
721
 
Simple: bar
722
 
WhiteSpace:
723
 
  foo   bar
724
 
 baz
725
 
   blip  
726
 
'''))
727
 
        self.assertEqual(pr['ProblemType'], 'Crash')
728
 
        self.assertEqual(pr['Date'], 'now!')
729
 
        self.assertEqual(pr['Simple'], 'bar')
730
 
        self.assertEqual(pr['WhiteSpace'], ' foo   bar\nbaz\n  blip  ')
731
 
 
732
 
        # test last field a bit more
733
 
        pr.load(StringIO(
734
 
'''ProblemType: Crash
735
 
Date: now!
736
 
Simple: bar
737
 
WhiteSpace:
738
 
  foo   bar
739
 
 baz
740
 
   blip  
741
 
 
742
 
'''))
743
 
        self.assertEqual(pr['ProblemType'], 'Crash')
744
 
        self.assertEqual(pr['Date'], 'now!')
745
 
        self.assertEqual(pr['Simple'], 'bar')
746
 
        self.assertEqual(pr['WhiteSpace'], ' foo   bar\nbaz\n  blip  \n')
747
 
 
748
 
        # last field might not be \n terminated
749
 
        pr.load(StringIO(
750
 
'''ProblemType: Crash
751
 
Date: now!
752
 
Simple: bar
753
 
WhiteSpace:
754
 
 foo
755
 
 bar'''))
756
 
        self.assertEqual(pr['ProblemType'], 'Crash')
757
 
        self.assertEqual(pr['Date'], 'now!')
758
 
        self.assertEqual(pr['Simple'], 'bar')
759
 
        self.assertEqual(pr['WhiteSpace'], 'foo\nbar')
760
 
 
761
 
        pr = ProblemReport()
762
 
        pr.load(StringIO(
763
 
'''ProblemType: Crash
764
 
WhiteSpace:
765
 
  foo   bar
766
 
 baz
767
 
 
768
 
   blip  
769
 
Last: foo
770
 
'''))
771
 
        self.assertEqual(pr['WhiteSpace'], ' foo   bar\nbaz\n\n  blip  ')
772
 
        self.assertEqual(pr['Last'], 'foo')
773
 
 
774
 
        pr.load(StringIO(
775
 
'''ProblemType: Crash
776
 
WhiteSpace:
777
 
  foo   bar
778
 
 baz
779
 
   blip  
780
 
Last: foo
781
 
 
782
 
'''))
783
 
        self.assertEqual(pr['WhiteSpace'], ' foo   bar\nbaz\n  blip  ')
784
 
        self.assertEqual(pr['Last'], 'foo\n')
785
 
 
786
 
        # empty lines in values must have a leading space in coding
787
 
        invalid_spacing = StringIO('''WhiteSpace:
788
 
 first
789
 
 
790
 
 second
791
 
''')
792
 
        pr = ProblemReport()
793
 
        self.assertRaises(ValueError, pr.load, invalid_spacing)
794
 
 
795
 
        # test that load() cleans up properly
796
 
        pr.load(StringIO('ProblemType: Crash'))
797
 
        self.assertEqual(list(pr.keys()), ['ProblemType'])
798
 
 
799
 
    def test_write_file(self):
800
 
        '''writing a report with binary file data.'''
801
 
 
802
 
        temp = tempfile.NamedTemporaryFile()
803
 
        temp.write('AB' * 10 + '\0' * 10 + 'Z')
804
 
        temp.flush()
805
 
 
806
 
        pr = ProblemReport(date = 'now!')
807
 
        pr['File'] = (temp.name,)
808
 
        pr['Afile'] = (temp.name,)
809
 
        io = StringIO()
810
 
        pr.write(io)
811
 
        temp.close()
812
 
 
813
 
        self.assertEqual(io.getvalue(),
814
 
'''ProblemType: Crash
815
 
Date: now!
816
 
Afile: base64
817
 
 H4sICAAAAAAC/0FmaWxlAA==
818
 
 c3RyxIAMcBAFAK/2p9MfAAAA
819
 
File: base64
820
 
 H4sICAAAAAAC/0ZpbGUA
821
 
 c3RyxIAMcBAFAK/2p9MfAAAA
822
 
''')
823
 
 
824
 
        # force compression/encoding bool
825
 
        temp = tempfile.NamedTemporaryFile()
826
 
        temp.write('foo\0bar')
827
 
        temp.flush()
828
 
        pr = ProblemReport(date = 'now!')
829
 
        pr['File'] = (temp.name, False)
830
 
        io = StringIO()
831
 
        pr.write(io)
832
 
 
833
 
        self.assertEqual(io.getvalue(),
834
 
'''ProblemType: Crash
835
 
Date: now!
836
 
File: foo\0bar
837
 
''')
838
 
 
839
 
        pr['File'] = (temp.name, True)
840
 
        io = StringIO()
841
 
        pr.write(io)
842
 
 
843
 
        self.assertEqual(io.getvalue(),
844
 
'''ProblemType: Crash
845
 
Date: now!
846
 
File: base64
847
 
 H4sICAAAAAAC/0ZpbGUA
848
 
 S8vPZ0hKLAIACq50HgcAAAA=
849
 
''')
850
 
        temp.close()
851
 
 
852
 
    def test_write_fileobj(self):
853
 
        '''writing a report with a pointer to a file-like object.'''
854
 
 
855
 
        tempbin = StringIO('AB' * 10 + '\0' * 10 + 'Z')
856
 
        tempasc = StringIO('Hello World')
857
 
 
858
 
        pr = ProblemReport(date = 'now!')
859
 
        pr['BinFile'] = (tempbin,)
860
 
        pr['AscFile'] = (tempasc, False)
861
 
        io = StringIO()
862
 
        pr.write(io)
863
 
        io.seek(0)
864
 
 
865
 
        pr = ProblemReport()
866
 
        pr.load(io)
867
 
        self.assertEqual(pr['BinFile'], tempbin.getvalue())
868
 
        self.assertEqual(pr['AscFile'], tempasc.getvalue())
869
 
 
870
 
    def test_write_empty_fileobj(self):
871
 
        '''writing a report with a pointer to a file-like object with enforcing non-emptyness.'''
872
 
 
873
 
        tempbin = StringIO('')
874
 
        tempasc = StringIO('')
875
 
 
876
 
        pr = ProblemReport(date = 'now!')
877
 
        pr['BinFile'] = (tempbin, True, None, True)
878
 
        io = StringIO()
879
 
        self.assertRaises(IOError, pr.write, io)
880
 
 
881
 
        pr = ProblemReport(date = 'now!')
882
 
        pr['AscFile'] = (tempasc, False, None, True)
883
 
        io = StringIO()
884
 
        self.assertRaises(IOError, pr.write, io)
885
 
 
886
 
    def test_write_delayed_fileobj(self):
887
 
        '''writing a report with file pointers and delayed data.'''
888
 
 
889
 
        (fout, fin) = os.pipe()
890
 
 
891
 
        if os.fork() == 0:
892
 
            os.close(fout)
893
 
            time.sleep(0.3)
894
 
            os.write(fin, 'ab' * 512*1024)
895
 
            time.sleep(0.3)
896
 
            os.write(fin, 'hello')
897
 
            time.sleep(0.3)
898
 
            os.write(fin, ' world')
899
 
            os.close(fin)
900
 
            os._exit(0)
901
 
 
902
 
        os.close(fin)
903
 
 
904
 
        pr = ProblemReport(date = 'now!')
905
 
        pr['BinFile'] = (os.fdopen(fout),)
906
 
        io = StringIO()
907
 
        pr.write(io)
908
 
        assert os.wait()[1] == 0
909
 
 
910
 
        io.seek(0)
911
 
 
912
 
        pr2 = ProblemReport()
913
 
        pr2.load(io)
914
 
        self.assertTrue(pr2['BinFile'].endswith('abhello world'))
915
 
        self.assertEqual(len(pr2['BinFile']), 1048576 + len('hello world'))
916
 
 
917
 
    def test_read_file(self):
918
 
        '''reading a report with binary data.'''
919
 
 
920
 
        bin_report = '''ProblemType: Crash
921
 
Date: now!
922
 
File: base64
923
 
 H4sICAAAAAAC/0ZpbGUA
924
 
 c3RyxIAMcBAFAK/2p9MfAAAA
925
 
Foo: Bar
926
 
'''
927
 
 
928
 
        # test with reading everything
929
 
        pr = ProblemReport()
930
 
        pr.load(StringIO(bin_report))
931
 
        self.assertEqual(pr['File'], 'AB' * 10 + '\0' * 10 + 'Z')
932
 
        self.assertEqual(pr.has_removed_fields(), False)
933
 
 
934
 
        # test with skipping binary data
935
 
        pr.load(StringIO(bin_report), binary=False)
936
 
        self.assertEqual(pr['File'], '')
937
 
        self.assertEqual(pr.has_removed_fields(), True)
938
 
 
939
 
        # test with keeping compressed binary data
940
 
        pr.load(StringIO(bin_report), binary='compressed')
941
 
        self.assertEqual(pr['Foo'], 'Bar')
942
 
        self.assertEqual(pr.has_removed_fields(), False)
943
 
        self.assertTrue(isinstance(pr['File'], CompressedValue))
944
 
 
945
 
        self.assertEqual(pr['File'].get_value(), 'AB' * 10 + '\0' * 10 + 'Z')
946
 
 
947
 
    def test_read_file_legacy(self):
948
 
        '''reading a report with binary data in legacy format without gzip
949
 
        header.'''
950
 
 
951
 
        bin_report = '''ProblemType: Crash
952
 
Date: now!
953
 
File: base64
954
 
 eJw=
955
 
 c3RyxIAMcBAFAG55BXk=
956
 
Foo: Bar
957
 
'''
958
 
 
959
 
        data = 'AB' * 10 + '\0' * 10 + 'Z'
960
 
 
961
 
        # test with reading everything
962
 
        pr = ProblemReport()
963
 
        pr.load(StringIO(bin_report))
964
 
        self.assertEqual(pr['File'], data)
965
 
        self.assertEqual(pr.has_removed_fields(), False)
966
 
 
967
 
        # test with skipping binary data
968
 
        pr.load(StringIO(bin_report), binary=False)
969
 
        self.assertEqual(pr['File'], '')
970
 
        self.assertEqual(pr.has_removed_fields(), True)
971
 
 
972
 
        # test with keeping CompressedValues
973
 
        pr.load(StringIO(bin_report), binary='compressed')
974
 
        self.assertEqual(pr.has_removed_fields(), False)
975
 
        self.assertEqual(len(pr['File']), len(data))
976
 
        self.assertEqual(pr['File'].get_value(), data)
977
 
        io = StringIO()
978
 
        pr['File'].write(io)
979
 
        io.seek(0)
980
 
        self.assertEqual(io.read(), data)
981
 
 
982
 
    def test_big_file(self):
983
 
        '''writing and re-decoding a big random file.'''
984
 
 
985
 
        # create 1 MB random file
986
 
        temp = tempfile.NamedTemporaryFile()
987
 
        data = os.urandom(1048576)
988
 
        temp.write(data)
989
 
        temp.flush()
990
 
 
991
 
        # write it into problem report
992
 
        pr = ProblemReport()
993
 
        pr['File'] = (temp.name,)
994
 
        pr['Before'] = 'xtestx'
995
 
        pr['ZAfter'] = 'ytesty'
996
 
        io = StringIO()
997
 
        pr.write(io)
998
 
        temp.close()
999
 
 
1000
 
        # read it again
1001
 
        io.seek(0)
1002
 
        pr = ProblemReport()
1003
 
        pr.load(io)
1004
 
 
1005
 
        self.assertTrue(pr['File'] == data)
1006
 
        self.assertEqual(pr['Before'], 'xtestx')
1007
 
        self.assertEqual(pr['ZAfter'], 'ytesty')
1008
 
 
1009
 
        # write it again
1010
 
        io2 = StringIO()
1011
 
        pr.write(io2)
1012
 
        self.assertTrue(io.getvalue() == io2.getvalue())
1013
 
 
1014
 
        # check gzip compatibility
1015
 
        io.seek(0)
1016
 
        pr = ProblemReport()
1017
 
        pr.load(io, binary='compressed')
1018
 
        self.assertEqual(pr['File'].get_value(), data)
1019
 
 
1020
 
    def test_size_limit(self):
1021
 
        '''writing and a big random file with a size limit key.'''
1022
 
 
1023
 
        # create 1 MB random file
1024
 
        temp = tempfile.NamedTemporaryFile()
1025
 
        data = os.urandom(1048576)
1026
 
        temp.write(data)
1027
 
        temp.flush()
1028
 
 
1029
 
        # write it into problem report
1030
 
        pr = ProblemReport()
1031
 
        pr['FileSmallLimit'] = (temp.name, True, 100)
1032
 
        pr['FileLimitMinus1'] = (temp.name, True, 1048575)
1033
 
        pr['FileExactLimit'] = (temp.name, True, 1048576)
1034
 
        pr['FileLimitPlus1'] = (temp.name, True, 1048577)
1035
 
        pr['FileLimitNone'] = (temp.name, True, None)
1036
 
        pr['Before'] = 'xtestx'
1037
 
        pr['ZAfter'] = 'ytesty'
1038
 
        io = StringIO()
1039
 
        pr.write(io)
1040
 
        temp.close()
1041
 
 
1042
 
        # read it again
1043
 
        io.seek(0)
1044
 
        pr = ProblemReport()
1045
 
        pr.load(io)
1046
 
 
1047
 
        self.assertFalse('FileSmallLimit' in pr)
1048
 
        self.assertFalse('FileLimitMinus1' in pr)
1049
 
        self.assertTrue(pr['FileExactLimit'] == data)
1050
 
        self.assertTrue(pr['FileLimitPlus1'] == data)
1051
 
        self.assertTrue(pr['FileLimitNone'] == data)
1052
 
        self.assertEqual(pr['Before'], 'xtestx')
1053
 
        self.assertEqual(pr['ZAfter'], 'ytesty')
1054
 
 
1055
 
    def test_iter(self):
1056
 
        '''ProblemReport iteration.'''
1057
 
 
1058
 
        pr = ProblemReport()
1059
 
        pr['foo'] = 'bar'
1060
 
 
1061
 
        keys = []
1062
 
        for k in pr:
1063
 
            keys.append(k)
1064
 
        keys.sort()
1065
 
        self.assertEqual(' '.join(keys), 'Date ProblemType foo')
1066
 
 
1067
 
        self.assertEqual(len([k for k in pr if k != 'foo']), 2)
1068
 
 
1069
 
    def test_modify(self):
1070
 
        '''reading, modifying fields, and writing back.'''
1071
 
 
1072
 
        report = '''ProblemType: Crash
1073
 
Date: now!
1074
 
Long:
1075
 
 xxx
1076
 
 .
1077
 
 yyy
1078
 
Short: Bar
1079
 
File: base64
1080
 
 H4sICAAAAAAC/0ZpbGUA
1081
 
 c3RyxIAMcBAFAK/2p9MfAAAA
1082
 
'''
1083
 
 
1084
 
        pr = ProblemReport()
1085
 
        pr.load(StringIO(report))
1086
 
 
1087
 
        self.assertEqual(pr['Long'], 'xxx\n.\nyyy')
1088
 
 
1089
 
        # write back unmodified
1090
 
        io = StringIO()
1091
 
        pr.write(io)
1092
 
        self.assertEqual(io.getvalue(), report)
1093
 
 
1094
 
        pr['Short'] = 'aaa\nbbb'
1095
 
        pr['Long'] = '123'
1096
 
        io = StringIO()
1097
 
        pr.write(io)
1098
 
        self.assertEqual(io.getvalue(),
1099
 
'''ProblemType: Crash
1100
 
Date: now!
1101
 
Long: 123
1102
 
Short:
1103
 
 aaa
1104
 
 bbb
1105
 
File: base64
1106
 
 H4sICAAAAAAC/0ZpbGUA
1107
 
 c3RyxIAMcBAFAK/2p9MfAAAA
1108
 
''')
1109
 
 
1110
 
    def test_add_to_existing(self):
1111
 
        '''adding information to an existing report.'''
1112
 
 
1113
 
        # original report
1114
 
        pr = ProblemReport()
1115
 
        pr['old1'] = '11'
1116
 
        pr['old2'] = '22'
1117
 
 
1118
 
        (fd, rep) = tempfile.mkstemp()
1119
 
        os.close(fd)
1120
 
        pr.write(open(rep, 'w'))
1121
 
 
1122
 
        origstat = os.stat(rep)
1123
 
 
1124
 
        # create a new one and add it
1125
 
        pr = ProblemReport()
1126
 
        pr.clear()
1127
 
        pr['new1'] = '33'
1128
 
 
1129
 
        pr.add_to_existing(rep, keep_times=True)
1130
 
 
1131
 
        # check keep_times
1132
 
        newstat = os.stat(rep)
1133
 
        self.assertEqual(origstat.st_mode, newstat.st_mode)
1134
 
        self.assertAlmostEqual(origstat.st_atime, newstat.st_atime, 1)
1135
 
        self.assertAlmostEqual(origstat.st_mtime, newstat.st_mtime, 1)
1136
 
 
1137
 
        # check report contents
1138
 
        newpr = ProblemReport()
1139
 
        newpr.load(open(rep))
1140
 
        self.assertEqual(newpr['old1'], '11')
1141
 
        self.assertEqual(newpr['old2'], '22')
1142
 
        self.assertEqual(newpr['new1'], '33')
1143
 
 
1144
 
        # create a another new one and add it, but make sure mtime must be
1145
 
        # different
1146
 
        time.sleep(1)
1147
 
        open(rep).read() # bump atime
1148
 
        time.sleep(1)
1149
 
 
1150
 
        pr = ProblemReport()
1151
 
        pr.clear()
1152
 
        pr['new2'] = '44'
1153
 
 
1154
 
        pr.add_to_existing(rep)
1155
 
 
1156
 
        # check that timestamps have been updates
1157
 
        newstat = os.stat(rep)
1158
 
        self.assertEqual(origstat.st_mode, newstat.st_mode)
1159
 
        self.assertNotEqual(origstat.st_mtime, newstat.st_mtime)
1160
 
        # skip atime check if filesystem is mounted noatime
1161
 
        skip_atime = False
1162
 
        dir = rep
1163
 
        while len(dir)>1:
1164
 
            dir, filename = os.path.split(dir)
1165
 
            if os.path.ismount(dir):
1166
 
                for line in open('/proc/mounts'):
1167
 
                    mount, fs, options = line.split(' ')[1:4]
1168
 
                    if mount == dir and 'noatime' in options.split(','):
1169
 
                        skip_atime = True
1170
 
                        break
1171
 
                break
1172
 
        if not skip_atime:
1173
 
            self.assertNotEqual(origstat.st_atime, newstat.st_atime)
1174
 
 
1175
 
        # check report contents
1176
 
        newpr = ProblemReport()
1177
 
        newpr.load(open(rep))
1178
 
        self.assertEqual(newpr['old1'], '11')
1179
 
        self.assertEqual(newpr['old2'], '22')
1180
 
        self.assertEqual(newpr['new1'], '33')
1181
 
        self.assertEqual(newpr['new2'], '44')
1182
 
 
1183
 
        os.unlink(rep)
1184
 
 
1185
 
    def test_write_mime_text(self):
1186
 
        '''write_mime() for text values.'''
1187
 
 
1188
 
        pr = ProblemReport(date = 'now!')
1189
 
        pr['Simple'] = 'bar'
1190
 
        if sys.version.startswith('2'):
1191
 
            pr['SimpleUTF8'] = '1äö2Φ3'
1192
 
            pr['SimpleUnicode'] = '1äö2Φ3'.decode('UTF-8')
1193
 
            pr['TwoLineUnicode'] = 'pi-π\nnu-η\n'.decode('UTF-8')
1194
 
            pr['TwoLineUTF8'] = 'pi-π\nnu-η\n'.decode('UTF-8')
1195
 
        else:
1196
 
            pr['SimpleUTF8'] = '1äö2Φ3'.encode('UTF-8')
1197
 
            pr['SimpleUnicode'] = '1äö2Φ3'
1198
 
            pr['TwoLineUnicode'] = 'pi-π\nnu-η\n'
1199
 
            pr['TwoLineUTF8'] = 'pi-π\nnu-η\n'
1200
 
        pr['SimpleLineEnd'] = 'bar\n'
1201
 
        pr['TwoLine'] = 'first\nsecond\n'
1202
 
        pr['InlineMargin'] = 'first\nsecond\nthird\nfourth\nfifth\n'
1203
 
        pr['Multiline'] = ' foo   bar\nbaz\n  blip  \nline4\nline♥5!!\nłıµ€ ⅝\n'
1204
 
        io = StringIO()
1205
 
        pr.write_mime(io)
1206
 
        io.seek(0)
1207
 
 
1208
 
        msg = email.message_from_file(io)
1209
 
        parts = [p for p in msg.walk()]
1210
 
        self.assertEqual(len(parts), 3)
1211
 
 
1212
 
        # first part is the multipart container
1213
 
        self.assertTrue(parts[0].is_multipart())
1214
 
 
1215
 
        # second part should be an inline text/plain attachments with all short
1216
 
        # fields
1217
 
        self.assertTrue(not parts[1].is_multipart())
1218
 
        self.assertEqual(parts[1].get_content_type(), 'text/plain')
1219
 
        self.assertEqual(parts[1].get_content_charset(), 'utf-8')
1220
 
        self.assertEqual(parts[1].get_filename(), None)
1221
 
        self.assertEqual(parts[1].get_payload(decode=True), '''ProblemType: Crash
1222
 
Date: now!
1223
 
InlineMargin:
1224
 
 first
1225
 
 second
1226
 
 third
1227
 
 fourth
1228
 
 fifth
1229
 
Simple: bar
1230
 
SimpleLineEnd: bar
1231
 
SimpleUTF8: 1äö2Φ3
1232
 
SimpleUnicode: 1äö2Φ3
1233
 
TwoLine:
1234
 
 first
1235
 
 second
1236
 
TwoLineUTF8:
1237
 
 pi-π
1238
 
 nu-η
1239
 
TwoLineUnicode:
1240
 
 pi-π
1241
 
 nu-η
1242
 
''')
1243
 
 
1244
 
        # third part should be the Multiline: field as attachment
1245
 
        self.assertTrue(not parts[2].is_multipart())
1246
 
        self.assertEqual(parts[2].get_content_type(), 'text/plain')
1247
 
        self.assertEqual(parts[2].get_content_charset(), 'utf-8')
1248
 
        self.assertEqual(parts[2].get_filename(), 'Multiline.txt')
1249
 
        self.assertEqual(parts[2].get_payload(decode=True), ''' foo   bar
1250
 
baz
1251
 
  blip  
1252
 
line4
1253
 
line♥5!!
1254
 
łıµ€ ⅝
1255
 
''')
1256
 
 
1257
 
    def test_write_mime_binary(self):
1258
 
        '''write_mime() for binary values and file references.'''
1259
 
 
1260
 
        bin_value = 'AB' * 10 + '\0' * 10 + 'Z'
1261
 
 
1262
 
        temp = tempfile.NamedTemporaryFile()
1263
 
        temp.write(bin_value)
1264
 
        temp.flush()
1265
 
 
1266
 
        tempgz = tempfile.NamedTemporaryFile()
1267
 
        gz = gzip.GzipFile('File1', 'w', fileobj=tempgz)
1268
 
        gz.write(bin_value)
1269
 
        gz.close()
1270
 
        tempgz.flush()
1271
 
 
1272
 
        pr = ProblemReport(date = 'now!')
1273
 
        pr['Context'] = 'Test suite'
1274
 
        pr['File1'] = (temp.name,)
1275
 
        pr['File1.gz'] = (tempgz.name,)
1276
 
        pr['Value1'] = bin_value
1277
 
        pr['Value1.gz'] = open(tempgz.name).read()
1278
 
        pr['ZValue'] = CompressedValue(bin_value)
1279
 
        io = StringIO()
1280
 
        pr.write_mime(io)
1281
 
        io.seek(0)
1282
 
 
1283
 
        msg = email.message_from_file(io)
1284
 
        parts = [p for p in msg.walk()]
1285
 
        self.assertEqual(len(parts), 7)
1286
 
 
1287
 
        # first part is the multipart container
1288
 
        self.assertTrue(parts[0].is_multipart())
1289
 
 
1290
 
        # second part should be an inline text/plain attachments with all short
1291
 
        # fields
1292
 
        self.assertTrue(not parts[1].is_multipart())
1293
 
        self.assertEqual(parts[1].get_content_type(), 'text/plain')
1294
 
        self.assertEqual(parts[1].get_content_charset(), 'utf-8')
1295
 
        self.assertEqual(parts[1].get_filename(), None)
1296
 
        self.assertEqual(parts[1].get_payload(decode=True),
1297
 
            'ProblemType: Crash\nContext: Test suite\nDate: now!\n')
1298
 
 
1299
 
        # third part should be the File1: file contents as gzip'ed attachment
1300
 
        self.assertTrue(not parts[2].is_multipart())
1301
 
        self.assertEqual(parts[2].get_content_type(), 'application/x-gzip')
1302
 
        self.assertEqual(parts[2].get_filename(), 'File1.gz')
1303
 
        f = tempfile.TemporaryFile()
1304
 
        f.write(parts[2].get_payload(decode=True))
1305
 
        f.seek(0)
1306
 
        self.assertEqual(gzip.GzipFile(mode='rb', fileobj=f).read(), bin_value)
1307
 
 
1308
 
        # fourth part should be the File1.gz: file contents as gzip'ed
1309
 
        # attachment; write_mime() should not compress it again
1310
 
        self.assertTrue(not parts[3].is_multipart())
1311
 
        self.assertEqual(parts[3].get_content_type(), 'application/x-gzip')
1312
 
        self.assertEqual(parts[3].get_filename(), 'File1.gz')
1313
 
        f = tempfile.TemporaryFile()
1314
 
        f.write(parts[3].get_payload(decode=True))
1315
 
        f.seek(0)
1316
 
        self.assertEqual(gzip.GzipFile(mode='rb', fileobj=f).read(), bin_value)
1317
 
 
1318
 
        # fifth part should be the Value1: value as gzip'ed attachment
1319
 
        self.assertTrue(not parts[4].is_multipart())
1320
 
        self.assertEqual(parts[4].get_content_type(), 'application/x-gzip')
1321
 
        self.assertEqual(parts[4].get_filename(), 'Value1.gz')
1322
 
        f = tempfile.TemporaryFile()
1323
 
        f.write(parts[4].get_payload(decode=True))
1324
 
        f.seek(0)
1325
 
        self.assertEqual(gzip.GzipFile(mode='rb', fileobj=f).read(), bin_value)
1326
 
 
1327
 
        # sixth part should be the Value1: value as gzip'ed attachment;
1328
 
        # write_mime should not compress it again
1329
 
        self.assertTrue(not parts[5].is_multipart())
1330
 
        self.assertEqual(parts[5].get_content_type(), 'application/x-gzip')
1331
 
        self.assertEqual(parts[5].get_filename(), 'Value1.gz')
1332
 
        f = tempfile.TemporaryFile()
1333
 
        f.write(parts[5].get_payload(decode=True))
1334
 
        f.seek(0)
1335
 
        self.assertEqual(gzip.GzipFile(mode='rb', fileobj=f).read(), bin_value)
1336
 
 
1337
 
        # seventh part should be the ZValue: value as gzip'ed attachment;
1338
 
        # write_mime should not compress it again
1339
 
        self.assertTrue(not parts[6].is_multipart())
1340
 
        self.assertEqual(parts[6].get_content_type(), 'application/x-gzip')
1341
 
        self.assertEqual(parts[6].get_filename(), 'ZValue.gz')
1342
 
        f = tempfile.TemporaryFile()
1343
 
        f.write(parts[6].get_payload(decode=True))
1344
 
        f.seek(0)
1345
 
        self.assertEqual(gzip.GzipFile(mode='rb', fileobj=f).read(), bin_value)
1346
 
 
1347
 
    def test_write_mime_extra_headers(self):
1348
 
        '''write_mime() with extra headers.'''
1349
 
 
1350
 
        pr = ProblemReport(date = 'now!')
1351
 
        pr['Simple'] = 'bar'
1352
 
        pr['TwoLine'] = 'first\nsecond\n'
1353
 
        io = StringIO()
1354
 
        pr.write_mime(io, extra_headers={'Greeting': 'hello world', 
1355
 
            'Foo': 'Bar'})
1356
 
        io.seek(0)
1357
 
 
1358
 
        msg = email.message_from_file(io)
1359
 
        self.assertEqual(msg['Greeting'], 'hello world')
1360
 
        self.assertEqual(msg['Foo'], 'Bar')
1361
 
        parts = [p for p in msg.walk()]
1362
 
        self.assertEqual(len(parts), 2)
1363
 
 
1364
 
        # first part is the multipart container
1365
 
        self.assertTrue(parts[0].is_multipart())
1366
 
 
1367
 
        # second part should be an inline text/plain attachments with all short
1368
 
        # fields
1369
 
        self.assertTrue(not parts[1].is_multipart())
1370
 
        self.assertEqual(parts[1].get_content_type(), 'text/plain')
1371
 
        self.assertTrue('Simple: bar' in parts[1].get_payload(decode=True))
1372
 
 
1373
 
    def test_write_mime_filter(self):
1374
 
        '''write_mime() with key filters.'''
1375
 
 
1376
 
        bin_value = 'AB' * 10 + '\0' * 10 + 'Z'
1377
 
 
1378
 
        pr = ProblemReport(date = 'now!')
1379
 
        pr['GoodText'] = 'Hi'
1380
 
        pr['BadText'] = 'YouDontSeeMe'
1381
 
        pr['GoodBin'] = bin_value
1382
 
        pr['BadBin'] = 'Y' + '\x05' * 10 + '-'
1383
 
        io = StringIO()
1384
 
        pr.write_mime(io, skip_keys=['BadText', 'BadBin'])
1385
 
        io.seek(0)
1386
 
 
1387
 
        msg = email.message_from_file(io)
1388
 
        parts = [p for p in msg.walk()]
1389
 
        self.assertEqual(len(parts), 3)
1390
 
 
1391
 
        # first part is the multipart container
1392
 
        self.assertTrue(parts[0].is_multipart())
1393
 
 
1394
 
        # second part should be an inline text/plain attachments with all short
1395
 
        # fields
1396
 
        self.assertTrue(not parts[1].is_multipart())
1397
 
        self.assertEqual(parts[1].get_content_type(), 'text/plain')
1398
 
        self.assertEqual(parts[1].get_content_charset(), 'utf-8')
1399
 
        self.assertEqual(parts[1].get_filename(), None)
1400
 
        self.assertEqual(parts[1].get_payload(decode=True), '''ProblemType: Crash
1401
 
Date: now!
1402
 
GoodText: Hi
1403
 
''')
1404
 
 
1405
 
        # third part should be the GoodBin: field as attachment
1406
 
        self.assertTrue(not parts[2].is_multipart())
1407
 
        f = tempfile.TemporaryFile()
1408
 
        f.write(parts[2].get_payload(decode=True))
1409
 
        f.seek(0)
1410
 
        self.assertEqual(gzip.GzipFile(mode='rb', fileobj=f).read(), bin_value)
1411
 
 
1412
 
    def test_write_mime_order(self):
1413
 
        '''write_mime() with keys ordered.'''
1414
 
 
1415
 
        pr = ProblemReport(date = 'now!')
1416
 
        pr['SecondText'] = 'What'
1417
 
        pr['FirstText'] = 'Who'
1418
 
        pr['FourthText'] = 'Today'
1419
 
        pr['ThirdText'] = "I Don't Know"
1420
 
        io = StringIO()
1421
 
        pr.write_mime(io, priority_fields=['FirstText', 'SecondText',
1422
 
            'ThirdText', 'Unknown', 'FourthText'])
1423
 
        io.seek(0)
1424
 
 
1425
 
        msg = email.message_from_file(io)
1426
 
        parts = [p for p in msg.walk()]
1427
 
        self.assertEqual(len(parts), 2)
1428
 
 
1429
 
        # first part is the multipart container
1430
 
        self.assertTrue(parts[0].is_multipart())
1431
 
 
1432
 
        # second part should be an inline text/plain attachments with all short
1433
 
        # fields
1434
 
        self.assertTrue(not parts[1].is_multipart())
1435
 
        self.assertEqual(parts[1].get_content_type(), 'text/plain')
1436
 
        self.assertEqual(parts[1].get_content_charset(), 'utf-8')
1437
 
        self.assertEqual(parts[1].get_filename(), None)
1438
 
        self.assertEqual(parts[1].get_payload(decode=True), '''FirstText: Who
1439
 
SecondText: What
1440
 
ThirdText: I Don't Know
1441
 
FourthText: Today
1442
 
ProblemType: Crash
1443
 
Date: now!
1444
 
''')
1445
 
 
1446
 
    def test_updating(self):
1447
 
        '''new_keys() and write() with only_new=True.'''
1448
 
 
1449
 
        pr = ProblemReport()
1450
 
        self.assertEqual(pr.new_keys(), set(['ProblemType', 'Date']))
1451
 
        pr.load(StringIO(
1452
 
'''ProblemType: Crash
1453
 
Date: now!
1454
 
Foo: bar
1455
 
Baz: blob
1456
 
'''))
1457
 
 
1458
 
        self.assertEqual(pr.new_keys(), set())
1459
 
 
1460
 
        pr['Foo'] = 'changed'
1461
 
        pr['NewKey'] = 'new new'
1462
 
        self.assertEqual(pr.new_keys(), set(['NewKey']))
1463
 
 
1464
 
        out = StringIO()
1465
 
        pr.write(out, only_new=True)
1466
 
        self.assertEqual(out.getvalue(), 'NewKey: new new\n')
1467
 
 
1468
 
    def test_import_dict(self):
1469
 
        '''importing a dictionary with update().'''
1470
 
 
1471
 
        pr = ProblemReport()
1472
 
        pr['oldtext'] = 'Hello world'
1473
 
        pr['oldbin'] = 'AB' * 10 + '\0' * 10 + 'Z'
1474
 
        pr['overwrite'] = 'I am crap'
1475
 
 
1476
 
        d = {}
1477
 
        d['newtext'] = 'Goodbye world'
1478
 
        d['newbin'] = '11\000\001\002\xFFZZ'
1479
 
        d['overwrite'] = 'I am good' 
1480
 
 
1481
 
        pr.update(d)
1482
 
        self.assertEqual(pr['oldtext'], 'Hello world')
1483
 
        self.assertEqual(pr['oldbin'], 'AB' * 10 + '\0' * 10 + 'Z')
1484
 
        self.assertEqual(pr['newtext'], 'Goodbye world')
1485
 
        self.assertEqual(pr['newbin'], '11\000\001\002\xFFZZ')
1486
 
        self.assertEqual(pr['overwrite'], 'I am good')
1487
 
 
1488
 
if __name__ == '__main__':
1489
 
    unittest.main()