~alecu/ubuntuone-client/proxy-tunnel-server

« back to all changes in this revision

Viewing changes to tests/syncdaemon/test_action_queue.py

  • Committer: Tarmac
  • Author(s): facundo at com
  • Date: 2012-02-07 12:50:26 UTC
  • mfrom: (1185.2.1 dont-send-fileobj)
  • Revision ID: tarmac-20120207125026-u6pr6944tnmwe1cm
Get the file object on the moment it is used in Upload and Download.

Show diffs side-by-side

added added

removed removed

Lines of Context:
858
858
            deferred.callback(True)
859
859
 
860
860
        self.zq._compress = fake_compress
861
 
        yield self.zq.zip(upload)
 
861
        yield self.zq.zip(upload, StringIO)
862
862
 
863
863
    @defer.inlineCallbacks
864
864
    def test_zip_calls_compress_with_file_object(self):
865
865
        """Test that _compress is called with the result of fileobj factory."""
866
866
        upload = FakeUpload()
 
867
        orig_fileobj = StringIO()
867
868
 
868
869
        def fake_compress(deferred, upload, fileobj):
869
870
            """Fake the _compress method."""
870
 
            self.assertEqual(fileobj, upload._fileobj)
 
871
            self.assertEqual(fileobj, orig_fileobj)
871
872
            deferred.callback(True)
872
873
 
873
874
        self.zq._compress = fake_compress
874
 
        yield self.zq.zip(upload)
 
875
        yield self.zq.zip(upload, lambda: orig_fileobj)
875
876
 
876
877
    @defer.inlineCallbacks
877
878
    def test_fileobj_factory_error_is_logged(self):
881
882
            raise ValueError("foo")
882
883
 
883
884
        upload = FakeUpload()
884
 
        self.patch(upload, 'fileobj_factory', crash)
885
885
 
886
886
        # set up the logger
887
887
        self.handler = MementoHandler()
888
888
        self.handler.setLevel(logging.DEBUG)
889
889
        upload.log.addHandler(self.handler)
890
890
 
891
 
        yield self.zq.zip(upload)
 
891
        yield self.zq.zip(upload, crash)
892
892
        self.assertTrue(self.handler.check_warning("Unable to build fileobj",
893
893
                                                   "ValueError", "foo"))
894
894
 
896
896
    def test_fileobj_factory_error_cancels_upload(self):
897
897
        """Cancel the upload when fileobj_factory fails."""
898
898
        upload = FakeUpload()
899
 
        self.patch(upload, 'fileobj_factory', None)
900
 
 
901
 
        yield self.zq.zip(upload)
 
899
        yield self.zq.zip(upload, 'willbreak')
902
900
        self.assertTrue(upload.cancelled)
903
901
 
904
902
    @defer.inlineCallbacks
905
903
    def test_fileobj_factory_error_dont_call_compress(self):
906
904
        """Stop the execution if fileobj_factory fails."""
907
905
        upload = FakeUpload()
908
 
        self.patch(upload, 'fileobj_factory', None)
909
 
 
910
906
        called = []
911
907
        self.zq._compress = lambda *a: called.append(True)
912
 
        yield self.zq.zip(upload)
 
908
        yield self.zq.zip(upload, 'willbreak')
913
909
        self.assertEqual(len(called), 0)
914
910
 
915
911
    @defer.inlineCallbacks
926
922
 
927
923
        self.zq.acquire = fake_acquire
928
924
        upload = FakeUpload()
929
 
        yield self.zq.zip(upload)
 
925
        yield self.zq.zip(upload, StringIO)
930
926
        self.assertTrue(called)
931
927
 
932
928
    @defer.inlineCallbacks
937
933
        self.zq.release = lambda: called.append(True)
938
934
 
939
935
        upload = FakeUpload()
940
 
        yield self.zq.zip(upload)
 
936
        yield self.zq.zip(upload, StringIO)
941
937
        self.assertTrue(called)
942
938
 
943
939
    @defer.inlineCallbacks
950
946
        upload = FakeUpload()
951
947
 
952
948
        try:
953
 
            yield self.zq.zip(upload)
 
949
            yield self.zq.zip(upload, StringIO)
954
950
        except Exception, e:
955
951
            # need to silent the exception we're generating in the test
956
952
            self.assertEqual(e, exc)
964
960
        called = []
965
961
        self.zq.release = lambda: called.append(True)
966
962
        upload = FakeUpload()
967
 
        del upload.fileobj_factory
968
963
 
969
 
        yield self.zq.zip(upload)
 
964
        yield self.zq.zip(upload, 'willbreak')
970
965
        self.assertTrue(called)
971
966
 
972
967
    @defer.inlineCallbacks
974
969
        """Close the fileobj after compressing ok."""
975
970
        self.zq._compress = lambda deferred, upl, fobj: deferred.callback(True)
976
971
        upload = FakeUpload()
977
 
        yield self.zq.zip(upload)
 
972
        fileobj = StringIO()
 
973
        yield self.zq.zip(upload, lambda: fileobj)
978
974
 
979
 
        self.assertTrue(upload._fileobj.closed)
 
975
        self.assertTrue(fileobj.closed)
980
976
 
981
977
    @defer.inlineCallbacks
982
978
    def test_fileobj_closed_error(self):
985
981
        self.zq._compress = lambda deferred, upl, fobj: deferred.errback(exc)
986
982
 
987
983
        upload = FakeUpload()
988
 
        e = yield self.assertFailure(self.zq.zip(upload), Exception)
 
984
        s = StringIO()
 
985
        e = yield self.assertFailure(self.zq.zip(upload, lambda: s), Exception)
989
986
        self.assertEqual(e, exc)
 
987
        self.assertTrue(s.closed)
990
988
 
991
989
    @defer.inlineCallbacks
992
990
    def test_compress_gets_compressed_data(self):
993
991
        """Compressed data is generated by _compress."""
994
992
        upload = FakeUpload()
995
 
        data = upload.data
 
993
        data = "a lot of data to compress"
996
994
 
997
995
        # call and wait
998
996
        d = defer.Deferred()
999
 
        reactor.callInThread(self.zq._compress, d, upload,
1000
 
                             upload.fileobj_factory())
 
997
        reactor.callInThread(self.zq._compress, d, upload, StringIO(data))
1001
998
        yield d
1002
999
 
1003
1000
        with open_file(upload.tempfile.name) as f:
1008
1005
    def test_compress_gets_magic_hash(self):
1009
1006
        """The magic hash is generated by _compress."""
1010
1007
        upload = FakeUpload()
1011
 
        data = upload.data
 
1008
        data = "a lot of data to compress"
1012
1009
 
1013
1010
        # what the hash should give us
1014
1011
        mh = content_hash.magic_hash_factory()
1017
1014
 
1018
1015
        # call and wait
1019
1016
        d = defer.Deferred()
1020
 
        reactor.callInThread(self.zq._compress, d, upload, upload._fileobj)
 
1017
        reactor.callInThread(self.zq._compress, d, upload, StringIO(data))
1021
1018
        yield d
1022
1019
 
1023
1020
        hashed = upload.magic_hash._magic_hash
2790
2787
        self.rq = request_queue = RequestQueue(action_queue=self.action_queue)
2791
2788
        self.command = Download(request_queue, share_id='a_share_id',
2792
2789
                               node_id='a_node_id', server_hash='server_hash',
2793
 
                               path='path', fileobj_factory=lambda: None)
 
2790
                               path='path')
2794
2791
        self.command.make_logger()
2795
2792
 
2796
2793
    def test_progress_information_setup(self):
2797
2794
        """Test the setting up of the progress information in ._run()."""
 
2795
        self.patch(self.main.fs, 'get_partial_for_writing',
 
2796
                   lambda n, s: StringIO())
 
2797
 
2798
2798
        self.command.action_queue.connect_in_progress = False
2799
2799
        self.command.action_queue.client = FakeClient()
2800
2800
        self.command._run()
2805
2805
        self.assertEqual(len(self.action_queue.client.called), 2)
2806
2806
        meth, args, kwargs = self.action_queue.client.called[1]
2807
2807
        self.assertEqual(meth, 'get_content_request')
2808
 
        self.assertEqual(kwargs['offset'], 20)
 
2808
        self.assertEqual(kwargs['offset'], 0)  # resumable is not there yet
2809
2809
 
2810
2810
 
2811
2811
class DownloadTestCase(ConnectedBaseTestCase):
2825
2825
        self.command = MyDownload(self.rq, share_id='a_share_id',
2826
2826
                                  node_id='a_node_id',
2827
2827
                                  server_hash='server_hash',
2828
 
                                  path= os.path.join(os.path.sep, 'foo', 'bar'),
2829
 
                                  fileobj_factory=StringIO)
 
2828
                                  path= os.path.join(os.path.sep, 'foo', 'bar'))
2830
2829
        self.command.make_logger()
2831
2830
        self.rq.waiting.append(self.command)
2832
2831
 
2847
2846
                """Nothing!"""
2848
2847
                return ""
2849
2848
 
 
2849
        self.command.fileobj = StringIO()
2850
2850
        self.command._run()
2851
2851
        self.command.gunzip = FakeDecompressor()
2852
2852
        self.assertEqual(self.command.n_bytes_read, 0)
2942
2942
    def test_upload_download_uniqueness(self):
2943
2943
        """There should be only one upload/download for a specific node."""
2944
2944
        # totally fake, we don't care: the messages are only validated on run
2945
 
        self.action_queue.download('foo', 'bar', 0, 'path', 0)
 
2945
        self.action_queue.download('foo', 'bar', 0, 'path')
2946
2946
        first_cmd = self.action_queue.queue.waiting[0]
2947
 
        self.action_queue.upload('foo', 'bar', 0, 0, 0, 0, 'path', StringIO)
 
2947
        self.action_queue.upload('foo', 'bar', 0, 0, 0, 0, 'path')
2948
2948
        self.assertEqual(len(self.action_queue.queue.waiting), 1)
2949
2949
        self.assertTrue(first_cmd.cancelled)
2950
2950
 
2953
2953
        # totally fake, we don't care: the messages are only validated on run
2954
2954
        self.action_queue.upload('foo', 'bar', 0, 0, 0, 0, 'path', StringIO)
2955
2955
        first_cmd = self.action_queue.queue.waiting[0]
2956
 
        self.action_queue.download('foo', 'bar', 0, 'path', 0)
 
2956
        self.action_queue.download('foo', 'bar', 0, 'path')
2957
2957
        self.assertEqual(len(self.action_queue.queue.waiting), 1)
2958
2958
        self.assertTrue(first_cmd.cancelled)
2959
2959
        self.assertTrue(self.handler.check_debug("Previous command cancelled",
2962
2962
    def test_uniqueness_download(self):
2963
2963
        """There should be only one upload/download for a specific node."""
2964
2964
        # totally fake, we don't care: the messages are only validated on run
2965
 
        self.action_queue.download('foo', 'bar', 0, 'path0', 0)
 
2965
        self.action_queue.download('foo', 'bar', 0, 'path0')
2966
2966
        first_cmd = self.action_queue.queue.waiting[0]
2967
 
        self.action_queue.download('foo', 'bar', 1, 'path1', 1)
 
2967
        self.action_queue.download('foo', 'bar', 1, 'path1')
2968
2968
        self.assertEqual(len(self.action_queue.queue.waiting), 1)
2969
2969
        self.assertTrue(first_cmd.cancelled)
2970
2970
        self.assertTrue(self.handler.check_debug("Previous command cancelled",
2973
2973
    def test_uniqueness_even_with_markers(self):
2974
2974
        """Only one upload/download per node, even using markers."""
2975
2975
        m = MDMarker('bar')
2976
 
        self.action_queue.download('share', m, 0, 'path', 0)
 
2976
        self.action_queue.download('share', m, 0, 'path')
2977
2977
        first_cmd = self.action_queue.queue.waiting[0]
2978
2978
        self.action_queue.uuid_map.set('bar', 'bah')
2979
 
        self.action_queue.download('share', 'bah', 0, 'path', 0)
 
2979
        self.action_queue.download('share', 'bah', 0, 'path')
2980
2980
        self.assertEqual(len(self.action_queue.queue.waiting), 1)
2981
2981
        self.assertTrue(first_cmd.cancelled)
2982
2982
 
2984
2984
        """Previous command didn't cancel even if we tried it."""
2985
2985
        # the first command will refuse to cancel (patch the class because
2986
2986
        # the instance is not patchable)
2987
 
        self.action_queue.download('foo', 'bar', 0, 'path0', 0)
 
2987
        self.action_queue.download('foo', 'bar', 0, 'path0')
2988
2988
        self.action_queue.queue.waiting[0]
2989
2989
        self.patch(Download, 'cancel', lambda instance: False)
2990
2990
 
2991
 
        self.action_queue.download('foo', 'bar', 1, 'path1', 1)
 
2991
        self.action_queue.download('foo', 'bar', 1, 'path1')
2992
2992
        self.assertEqual(len(self.action_queue.queue.waiting), 2)
2993
2993
        self.assertTrue(self.handler.check_debug("Tried to cancel", "couldn't",
2994
2994
                                                 "Download", "foo", "bar"))
3057
3057
        obj.deferred
3058
3058
        self.action_queue.client.get_content_request = lambda *a, **k: obj
3059
3059
 
 
3060
        self.patch(self.main.fs, 'get_partial_for_writing',
 
3061
                   lambda n, s: StringIO())
3060
3062
        self.command._run()
3061
3063
        decompressor1 = self.command.gunzip
3062
3064
        self.command._run()
3068
3070
        # don't use the real protocol
3069
3071
        self.action_queue.client.get_content_request = FakeRequest
3070
3072
 
3071
 
        class FakeFileObjFactory(object):
 
3073
        class FakeFileObj(object):
3072
3074
            """Fake class to check behaviour."""
3073
3075
            def __init__(self):
3074
3076
                self.seek_count = 0
3082
3084
                """Fake truncate."""
3083
3085
                self.truncate_count += 1
3084
3086
 
 
3087
        self.patch(self.main.fs, 'get_partial_for_writing',
 
3088
                   lambda n, s: FakeFileObj())
3085
3089
        cmd = Download(self.rq, 'a_share_id','a_node_id', 'server_hash',
3086
 
                       os.path.join(os.path.sep, 'foo','bar'),
3087
 
                       FakeFileObjFactory)
 
3090
                       os.path.join(os.path.sep, 'foo','bar'))
3088
3091
 
3089
3092
        # first run, it is just instantiated
3090
3093
        cmd._run()
3091
 
        self.assertTrue(isinstance(cmd.fileobj, FakeFileObjFactory))
 
3094
        self.assertTrue(isinstance(cmd.fileobj, FakeFileObj))
3092
3095
        self.assertEqual(cmd.fileobj.seek_count, 0)
3093
3096
        self.assertEqual(cmd.fileobj.truncate_count, 0)
3094
3097
 
3113
3116
        self.rq = request_queue = RequestQueue(action_queue=self.action_queue)
3114
3117
        self.command = Upload(request_queue, share_id='a_share_id',
3115
3118
                              node_id='a_node_id', previous_hash='prev_hash',
3116
 
                              hash='yadda', crc32=0, size=0, path='path',
3117
 
                              fileobj_factory=lambda: None)
 
3119
                              hash='yadda', crc32=0, size=0, path='path')
3118
3120
        self.command.make_logger()
3119
3121
        self.command.magic_hash = FakeMagicHash()
3120
3122
        self.client = FakeClient()
3257
3259
        self.command = MyUpload(self.rq, share_id=self.share_id,
3258
3260
                                node_id='a_node_id', previous_hash='prev_hash',
3259
3261
                                hash='yadda', crc32=0, size=0,
3260
 
                                path=os.path.join(os.path.sep, 'foo', 'bar'),
3261
 
                                fileobj_factory=StringIO)
 
3262
                                path=os.path.join(os.path.sep, 'foo', 'bar'))
3262
3263
        self.command.make_logger()
3263
3264
 
3264
3265
    @defer.inlineCallbacks
3270
3271
        protocol_msg.error.type = protocol_pb2.Error.UPLOAD_IN_PROGRESS
3271
3272
        err = errors.UploadInProgressError("request", protocol_msg)
3272
3273
 
 
3274
        # mock fsm
 
3275
        mocker = Mocker()
 
3276
        mdobj = mocker.mock()
 
3277
        expect(mdobj.mdid).result('mdid')
 
3278
        fsm = mocker.mock()
 
3279
        expect(fsm.get_by_node_id(self.command.share_id, self.command.node_id)
 
3280
               ).result(mdobj)
 
3281
        expect(fsm.open_file('mdid')).result(StringIO())
 
3282
        self.patch(self.main, 'fs', fsm)
 
3283
 
3273
3284
        # first fails with UploadInProgress, then finishes ok
3274
3285
        called = []
3275
3286
        run_deferreds = [defer.fail(err), defer.succeed(True)]
3280
3291
        self.command.handle_success = lambda _: d.callback(True)
3281
3292
 
3282
3293
        # go and check
3283
 
        self.command.go()
 
3294
        with mocker:
 
3295
            self.command.go()
3284
3296
        yield d
3285
3297
        self.assertEqual(called, [':)', ':)'])
3286
3298
 
3511
3523
    def test_uniqueness_upload(self):
3512
3524
        """There should be only one upload/download for a specific node."""
3513
3525
        # totally fake, we don't care: the messages are only validated on run
3514
 
        self.action_queue.upload('foo', 'bar', 0, 0, 0, 0, 'path0', StringIO)
 
3526
        self.action_queue.upload('foo', 'bar', 0, 0, 0, 0, 'path0')
3515
3527
        first_cmd = self.action_queue.queue.waiting[0]
3516
 
        self.action_queue.upload('foo', 'bar', 1, 1, 1, 1, 'path1', StringIO)
 
3528
        self.action_queue.upload('foo', 'bar', 1, 1, 1, 1, 'path1')
3517
3529
        self.assertEqual(len(self.action_queue.queue.waiting), 1)
3518
3530
        self.assertTrue(first_cmd.cancelled)
3519
3531
        self.assertTrue(self.handler.check_debug("Previous command cancelled",
3522
3534
    def test_uniqueness_download(self):
3523
3535
        """There should be only one upload/download for a specific node."""
3524
3536
        # totally fake, we don't care: the messages are only validated on run
3525
 
        self.action_queue.download('foo', 'bar', 0, 'path', StringIO)
 
3537
        self.action_queue.download('foo', 'bar', 0, 'path')
3526
3538
        first_cmd = self.action_queue.queue.waiting[0]
3527
 
        self.action_queue.upload('foo', 'bar', 0, 0, 0, 0, 'path', StringIO)
 
3539
        self.action_queue.upload('foo', 'bar', 0, 0, 0, 0, 'path')
3528
3540
        self.assertEqual(len(self.action_queue.queue.waiting), 1)
3529
3541
        self.assertTrue(first_cmd.cancelled)
3530
3542
        self.assertTrue(self.handler.check_debug("Previous command cancelled",
3533
3545
    def test_uniqueness_even_with_markers(self):
3534
3546
        """Only one upload/download per node, even using markers."""
3535
3547
        m = MDMarker('bar')
3536
 
        self.action_queue.download('share', m, 0, 'path', StringIO)
 
3548
        self.action_queue.download('share', m, 0, 'path')
3537
3549
        first_cmd = self.action_queue.queue.waiting[0]
3538
3550
        self.action_queue.uuid_map.set('bar', 'bah')
3539
 
        self.action_queue.upload('share', 'bah', 0, 0, 0, 0, 'path', StringIO)
 
3551
        self.action_queue.upload('share', 'bah', 0, 0, 0, 0, 'path')
3540
3552
        self.assertEqual(len(self.action_queue.queue.waiting), 1)
3541
3553
        self.assertTrue(first_cmd.cancelled)
3542
3554
 
3556
3568
        """_start acquire the semaphore and locks."""
3557
3569
        lock = defer.Deferred()
3558
3570
        self.rq.transfers_semaphore.acquire = lambda: lock
3559
 
        self.action_queue.zip_queue.zip = lambda u: defer.succeed(True)
 
3571
        self.action_queue.zip_queue.zip = lambda u, f: defer.succeed(True)
 
3572
 
 
3573
        # mock fsm
 
3574
        mocker = Mocker()
 
3575
        mdobj = mocker.mock()
 
3576
        expect(mdobj.mdid).result('mdid')
 
3577
        fsm = mocker.mock()
 
3578
        expect(fsm.get_by_node_id(self.command.share_id, self.command.node_id)
 
3579
               ).result(mdobj)
 
3580
        expect(fsm.open_file('mdid')).result(StringIO())
 
3581
        self.patch(self.main, 'fs', fsm)
3560
3582
 
3561
3583
        # _start and check it locked
3562
3584
        started = self.command._start()
5494
5516
        """
5495
5517
        cmd = Upload(self.queue, share_id='a_share_id', node_id='a_node_id',
5496
5518
                     previous_hash='prev_hash', hash='yadda', crc32=0, size=0,
5497
 
                     path='path', fileobj_factory=lambda: None)
 
5519
                     path='path')
5498
5520
 
5499
5521
        # patch the command to simulate a request to an already full
5500
5522
        # transfer semaphore in _start