~certify-web-dev/twisted/certify-trunk

« back to all changes in this revision

Viewing changes to twisted/web2/dav/test/test_copy.py

  • Committer: Bazaar Package Importer
  • Author(s): Matthias Klose
  • Date: 2007-01-17 14:52:35 UTC
  • mfrom: (1.1.5 upstream) (2.1.2 etch)
  • Revision ID: james.westby@ubuntu.com-20070117145235-btmig6qfmqfen0om
Tags: 2.5.0-0ubuntu1
New upstream version, compatible with python2.5.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
##
 
2
# Copyright (c) 2005 Apple Computer, Inc. All rights reserved.
 
3
#
 
4
# Permission is hereby granted, free of charge, to any person obtaining a copy
 
5
# of this software and associated documentation files (the "Software"), to deal
 
6
# in the Software without restriction, including without limitation the rights
 
7
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 
8
# copies of the Software, and to permit persons to whom the Software is
 
9
# furnished to do so, subject to the following conditions:
 
10
 
11
# The above copyright notice and this permission notice shall be included in all
 
12
# copies or substantial portions of the Software.
 
13
 
14
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 
15
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 
16
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 
17
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 
18
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 
19
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 
20
# SOFTWARE.
 
21
#
 
22
# DRI: Wilfredo Sanchez, wsanchez@apple.com
 
23
##
 
24
 
 
25
import os
 
26
import urllib
 
27
import md5
 
28
 
 
29
import twisted.web2.dav.test.util
 
30
from twisted.web2 import responsecode
 
31
from twisted.web2.test.test_server import SimpleRequest
 
32
from twisted.web2.dav.test.util import dircmp, serialize
 
33
from twisted.web2.dav.fileop import rmdir
 
34
 
 
35
class COPY(twisted.web2.dav.test.util.TestCase):
 
36
    """
 
37
    COPY request
 
38
    """
 
39
    # FIXME:
 
40
    # Check that properties are being copied
 
41
    def test_COPY_create(self):
 
42
        """
 
43
        COPY to new resource.
 
44
        """
 
45
        def test(response, path, isfile, sum, uri, depth, dst_path):
 
46
            if response.code != responsecode.CREATED:
 
47
                self.fail("Incorrect response code for COPY %s (depth=%r): %s != %s"
 
48
                          % (uri, depth, response.code, responsecode.CREATED))
 
49
 
 
50
            if response.headers.getHeader("location") is None:
 
51
                self.fail("Reponse to COPY %s (depth=%r) with CREATE status is missing location: header."
 
52
                          % (uri, depth))
 
53
 
 
54
            if os.path.isfile(path):
 
55
                if not os.path.isfile(dst_path):
 
56
                    self.fail("COPY %s (depth=%r) produced no output file" % (uri, depth))
 
57
                if not cmp(path, dst_path):
 
58
                    self.fail("COPY %s (depth=%r) produced different file" % (uri, depth))
 
59
                os.remove(dst_path)
 
60
 
 
61
            elif os.path.isdir(path):
 
62
                if not os.path.isdir(dst_path):
 
63
                    self.fail("COPY %s (depth=%r) produced no output directory" % (uri, depth))
 
64
 
 
65
                if depth in ("infinity", None):
 
66
                    if dircmp(path, dst_path):
 
67
                        self.fail("COPY %s (depth=%r) produced different directory" % (uri, depth))
 
68
 
 
69
                elif depth == "0":
 
70
                    for filename in os.listdir(dst_path):
 
71
                        self.fail("COPY %s (depth=%r) shouldn't copy directory contents (eg. %s)" % (uri, depth, filename))
 
72
 
 
73
                else: raise AssertionError("Unknown depth: %r" % (depth,))
 
74
 
 
75
                rmdir(dst_path)
 
76
 
 
77
            else:
 
78
                self.fail("Source %s is neither a file nor a directory"
 
79
                          % (path,))
 
80
 
 
81
        return serialize(self.send, work(self, test))
 
82
 
 
83
    def test_COPY_exists(self):
 
84
        """
 
85
        COPY to existing resource.
 
86
        """
 
87
        def test(response, path, isfile, sum, uri, depth, dst_path):
 
88
            if response.code != responsecode.PRECONDITION_FAILED:
 
89
                self.fail("Incorrect response code for COPY without overwrite %s: %s != %s"
 
90
                          % (uri, response.code, responsecode.PRECONDITION_FAILED))
 
91
            else:
 
92
                # FIXME: Check XML error code (2518bis)
 
93
                pass
 
94
 
 
95
        return serialize(self.send, work(self, test, overwrite=False))
 
96
 
 
97
    def test_COPY_overwrite(self):
 
98
        """
 
99
        COPY to existing resource with overwrite header.
 
100
        """
 
101
        def test(response, path, isfile, sum, uri, depth, dst_path):
 
102
            if response.code != responsecode.NO_CONTENT:
 
103
                self.fail("Incorrect response code for COPY with overwrite %s: %s != %s"
 
104
                          % (uri, response.code, responsecode.NO_CONTENT))
 
105
            else:
 
106
                # FIXME: Check XML error code (2518bis)
 
107
                pass
 
108
 
 
109
            self.failUnless(os.path.exists(dst_path), "COPY didn't produce file: %s" % (dst_path,))
 
110
 
 
111
        return serialize(self.send, work(self, test, overwrite=True))
 
112
 
 
113
    def test_COPY_no_parent(self):
 
114
        """
 
115
        COPY to resource with no parent.
 
116
        """
 
117
        def test(response, path, isfile, sum, uri, depth, dst_path):
 
118
            if response.code != responsecode.CONFLICT:
 
119
                self.fail("Incorrect response code for COPY with no parent %s: %s != %s"
 
120
                          % (uri, response.code, responsecode.CONFLICT))
 
121
            else:
 
122
                # FIXME: Check XML error code (2518bis)
 
123
                pass
 
124
 
 
125
        return serialize(self.send, work(self, test, dst=os.path.join(self.docroot, "elvislives!")))
 
126
 
 
127
def work(self, test, overwrite=None, dst=None, depths=("0", "infinity", None)):
 
128
    if dst is None:
 
129
        dst = os.path.join(self.docroot, "dst")
 
130
        os.mkdir(dst)
 
131
 
 
132
    for basename in os.listdir(self.docroot):
 
133
        if basename == "dst": continue
 
134
 
 
135
        path = os.path.join(self.docroot, basename)
 
136
        uri = "/" + basename
 
137
        isfile = os.path.isfile(path)
 
138
        sum = sumFile(path)
 
139
        basename = os.path.basename(path)
 
140
        dst_path = os.path.join(dst, basename)
 
141
        dst_uri = urllib.quote("/dst/" + basename)
 
142
 
 
143
        if not isfile:
 
144
            uri     += "/"
 
145
            dst_uri += "/"
 
146
 
 
147
        if overwrite is not None:
 
148
            # Create a file at dst_path to create a conflict
 
149
            file(dst_path, "w").close()
 
150
 
 
151
        for depth in depths:
 
152
            def do_test(response, path=path, isfile=isfile, sum=sum, uri=uri, depth=depth, dst_path=dst_path):
 
153
                test(response, path, isfile, sum, uri, depth, dst_path)
 
154
 
 
155
            request = SimpleRequest(self.site, self.__class__.__name__, uri)
 
156
            request.headers.setHeader("destination", dst_uri)
 
157
            if depth is not None:
 
158
                request.headers.setHeader("depth", depth)
 
159
            if overwrite is not None:
 
160
                request.headers.setHeader("overwrite", overwrite)
 
161
 
 
162
            yield (request, do_test)
 
163
 
 
164
def sumFile(path):
 
165
    m = md5.new()
 
166
 
 
167
    if os.path.isfile(path):
 
168
        f = file(path)
 
169
        try:
 
170
            m.update(f.read())
 
171
        finally:
 
172
            f.close()
 
173
 
 
174
    elif os.path.isdir(path):
 
175
        for dir, subdirs, files in os.walk(path):
 
176
            for filename in files:
 
177
                m.update(filename)
 
178
                f = file(os.path.join(dir, filename))
 
179
                try:
 
180
                    m.update(f.read())
 
181
                finally:
 
182
                    f.close()
 
183
            for dirname in subdirs:
 
184
                m.update(dirname + "/")
 
185
 
 
186
    else:
 
187
        raise AssertionError()
 
188
 
 
189
    return m.digest()