~landscape/zope3/newer-from-ztk

« back to all changes in this revision

Viewing changes to src/twisted/web2/dav/test/test_copy.py

  • Committer: Thomas Hervé
  • Date: 2009-07-08 13:52:04 UTC
  • Revision ID: thomas@canonical.com-20090708135204-df5eesrthifpylf8
Remove twisted copy

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
##
2
 
# Copyright (c) 2005 Apple Computer, Inc. All rights reserved.
3
 
#
4
 
# Permission is hereby granted, free of charge, to any person obtaining a copy
5
 
# of this software and associated documentation files (the "Software"), to deal
6
 
# in the Software without restriction, including without limitation the rights
7
 
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8
 
# copies of the Software, and to permit persons to whom the Software is
9
 
# furnished to do so, subject to the following conditions:
10
 
11
 
# The above copyright notice and this permission notice shall be included in all
12
 
# copies or substantial portions of the Software.
13
 
14
 
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
 
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
 
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
 
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18
 
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19
 
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20
 
# SOFTWARE.
21
 
#
22
 
# DRI: Wilfredo Sanchez, wsanchez@apple.com
23
 
##
24
 
 
25
 
import os
26
 
import urllib
27
 
import md5
28
 
 
29
 
import twisted.web2.dav.test.util
30
 
from twisted.web2 import responsecode
31
 
from twisted.web2.test.test_server import SimpleRequest
32
 
from twisted.web2.dav.test.util import dircmp, serialize
33
 
from twisted.web2.dav.fileop import rmdir
34
 
 
35
 
class COPY(twisted.web2.dav.test.util.TestCase):
36
 
    """
37
 
    COPY request
38
 
    """
39
 
    # FIXME:
40
 
    # Check that properties are being copied
41
 
    def test_COPY_create(self):
42
 
        """
43
 
        COPY to new resource.
44
 
        """
45
 
        def test(response, path, isfile, sum, uri, depth, dst_path):
46
 
            if response.code != responsecode.CREATED:
47
 
                self.fail("Incorrect response code for COPY %s (depth=%r): %s != %s"
48
 
                          % (uri, depth, response.code, responsecode.CREATED))
49
 
 
50
 
            if response.headers.getHeader("location") is None:
51
 
                self.fail("Reponse to COPY %s (depth=%r) with CREATE status is missing location: header."
52
 
                          % (uri, depth))
53
 
 
54
 
            if os.path.isfile(path):
55
 
                if not os.path.isfile(dst_path):
56
 
                    self.fail("COPY %s (depth=%r) produced no output file" % (uri, depth))
57
 
                if not cmp(path, dst_path):
58
 
                    self.fail("COPY %s (depth=%r) produced different file" % (uri, depth))
59
 
                os.remove(dst_path)
60
 
 
61
 
            elif os.path.isdir(path):
62
 
                if not os.path.isdir(dst_path):
63
 
                    self.fail("COPY %s (depth=%r) produced no output directory" % (uri, depth))
64
 
 
65
 
                if depth in ("infinity", None):
66
 
                    if dircmp(path, dst_path):
67
 
                        self.fail("COPY %s (depth=%r) produced different directory" % (uri, depth))
68
 
 
69
 
                elif depth == "0":
70
 
                    for filename in os.listdir(dst_path):
71
 
                        self.fail("COPY %s (depth=%r) shouldn't copy directory contents (eg. %s)" % (uri, depth, filename))
72
 
 
73
 
                else: raise AssertionError("Unknown depth: %r" % (depth,))
74
 
 
75
 
                rmdir(dst_path)
76
 
 
77
 
            else:
78
 
                self.fail("Source %s is neither a file nor a directory"
79
 
                          % (path,))
80
 
 
81
 
        return serialize(self.send, work(self, test))
82
 
 
83
 
    def test_COPY_exists(self):
84
 
        """
85
 
        COPY to existing resource.
86
 
        """
87
 
        def test(response, path, isfile, sum, uri, depth, dst_path):
88
 
            if response.code != responsecode.PRECONDITION_FAILED:
89
 
                self.fail("Incorrect response code for COPY without overwrite %s: %s != %s"
90
 
                          % (uri, response.code, responsecode.PRECONDITION_FAILED))
91
 
            else:
92
 
                # FIXME: Check XML error code (2518bis)
93
 
                pass
94
 
 
95
 
        return serialize(self.send, work(self, test, overwrite=False))
96
 
 
97
 
    def test_COPY_overwrite(self):
98
 
        """
99
 
        COPY to existing resource with overwrite header.
100
 
        """
101
 
        def test(response, path, isfile, sum, uri, depth, dst_path):
102
 
            if response.code != responsecode.NO_CONTENT:
103
 
                self.fail("Incorrect response code for COPY with overwrite %s: %s != %s"
104
 
                          % (uri, response.code, responsecode.NO_CONTENT))
105
 
            else:
106
 
                # FIXME: Check XML error code (2518bis)
107
 
                pass
108
 
 
109
 
            self.failUnless(os.path.exists(dst_path), "COPY didn't produce file: %s" % (dst_path,))
110
 
 
111
 
        return serialize(self.send, work(self, test, overwrite=True))
112
 
 
113
 
    def test_COPY_no_parent(self):
114
 
        """
115
 
        COPY to resource with no parent.
116
 
        """
117
 
        def test(response, path, isfile, sum, uri, depth, dst_path):
118
 
            if response.code != responsecode.CONFLICT:
119
 
                self.fail("Incorrect response code for COPY with no parent %s: %s != %s"
120
 
                          % (uri, response.code, responsecode.CONFLICT))
121
 
            else:
122
 
                # FIXME: Check XML error code (2518bis)
123
 
                pass
124
 
 
125
 
        return serialize(self.send, work(self, test, dst=os.path.join(self.docroot, "elvislives!")))
126
 
 
127
 
def work(self, test, overwrite=None, dst=None, depths=("0", "infinity", None)):
128
 
    if dst is None:
129
 
        dst = os.path.join(self.docroot, "dst")
130
 
        os.mkdir(dst)
131
 
 
132
 
    for basename in os.listdir(self.docroot):
133
 
        if basename == "dst": continue
134
 
 
135
 
        path = os.path.join(self.docroot, basename)
136
 
        uri = "/" + basename
137
 
        isfile = os.path.isfile(path)
138
 
        sum = sumFile(path)
139
 
        basename = os.path.basename(path)
140
 
        dst_path = os.path.join(dst, basename)
141
 
        dst_uri = urllib.quote("/dst/" + basename)
142
 
 
143
 
        if not isfile:
144
 
            uri     += "/"
145
 
            dst_uri += "/"
146
 
 
147
 
        if overwrite is not None:
148
 
            # Create a file at dst_path to create a conflict
149
 
            file(dst_path, "w").close()
150
 
 
151
 
        for depth in depths:
152
 
            def do_test(response, path=path, isfile=isfile, sum=sum, uri=uri, depth=depth, dst_path=dst_path):
153
 
                test(response, path, isfile, sum, uri, depth, dst_path)
154
 
 
155
 
            request = SimpleRequest(self.site, self.__class__.__name__, uri)
156
 
            request.headers.setHeader("destination", dst_uri)
157
 
            if depth is not None:
158
 
                request.headers.setHeader("depth", depth)
159
 
            if overwrite is not None:
160
 
                request.headers.setHeader("overwrite", overwrite)
161
 
 
162
 
            yield (request, do_test)
163
 
 
164
 
def sumFile(path):
165
 
    m = md5.new()
166
 
 
167
 
    if os.path.isfile(path):
168
 
        f = file(path)
169
 
        try:
170
 
            m.update(f.read())
171
 
        finally:
172
 
            f.close()
173
 
 
174
 
    elif os.path.isdir(path):
175
 
        for dir, subdirs, files in os.walk(path):
176
 
            for filename in files:
177
 
                m.update(filename)
178
 
                f = file(os.path.join(dir, filename))
179
 
                try:
180
 
                    m.update(f.read())
181
 
                finally:
182
 
                    f.close()
183
 
            for dirname in subdirs:
184
 
                m.update(dirname + "/")
185
 
 
186
 
    else:
187
 
        raise AssertionError()
188
 
 
189
 
    return m.digest()