~ubuntu-branches/ubuntu/oneiric/zope.app.testing/oneiric

« back to all changes in this revision

Viewing changes to src/zope/app/testing/dochttp.py

  • Committer: Bazaar Package Importer
  • Author(s): Gediminas Paulauskas
  • Date: 2010-12-02 19:58:25 UTC
  • Revision ID: james.westby@ubuntu.com-20101202195825-7y6wai68ygqv95kh
Tags: upstream-3.7.8
ImportĀ upstreamĀ versionĀ 3.7.8

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
##############################################################################
 
2
#
 
3
# Copyright (c) 2004 Zope Foundation and Contributors.
 
4
# All Rights Reserved.
 
5
#
 
6
# This software is subject to the provisions of the Zope Public License,
 
7
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
 
8
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
 
9
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 
10
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
 
11
# FOR A PARTICULAR PURPOSE.
 
12
#
 
13
##############################################################################
 
14
"""Convert an http tcpwatch session to a doctest
 
15
 
 
16
$Id: dochttp.py 110724 2010-04-11 00:04:11Z tseaver $
 
17
"""
 
18
 
 
19
import errno
 
20
import optparse
 
21
import os
 
22
import re
 
23
import rfc822
 
24
import sys
 
25
 
 
26
usage = """usage: %prog <options> directory
 
27
 
 
28
Convert an http tcpwatch recorded sesssion to a doctest file, which is
 
29
written to standard output.
 
30
 
 
31
"""
 
32
 
 
33
parser = optparse.OptionParser(usage)
 
34
parser.add_option("-p", "--prefix", default="watch",
 
35
                  help="Prefix for recorded tcpwatch session files")
 
36
parser.add_option("-U", "--skip-url", action="append",
 
37
                  help="Regular expression for URLs to skip")
 
38
parser.add_option("-E", "--skip-extension", action="append",
 
39
                  help="URL file-extension to skip")
 
40
parser.add_option("-e", "--extension", action="append",
 
41
                  help="URL file-extension to include")
 
42
parser.add_option("-I", "--skip-request-header", action="append",
 
43
                  help="Request header to skip")
 
44
parser.add_option("-O", "--skip-response-header", action="append",
 
45
                  help="Response header to skip")
 
46
parser.add_option("-r", "--clean-redirects", action="store_true",
 
47
                  help="Strip content from redirect responses",
 
48
                  default=False)
 
49
 
 
50
default_options = [
 
51
    '-e', 'html',
 
52
 
 
53
    '-I', 'Accept-Charset', '-I', 'Accept-Encoding', '-I', 'Accept-Language',
 
54
    '-I', 'Accept', '-I', 'Connection', '-I', 'Host', '-I', 'Keep-Alive',
 
55
    '-I', 'User-Agent',
 
56
 
 
57
    '-O', 'Date', '-O', 'Server', '-O', 'X-Content-Type-Warning',
 
58
    '-O', 'X-Powered-By',
 
59
 
 
60
    ]
 
61
 
 
62
def dochttp(args=sys.argv[1:], default=None):
 
63
    """Convert a tcpwatch recorded sesssion to a doctest file"""
 
64
    if default is None:
 
65
        default = default_options
 
66
 
 
67
    options, args = parser.parse_args(default+args)
 
68
    try:
 
69
        directory, = args
 
70
    except:
 
71
        parser.print_help()
 
72
        sys.exit(1)
 
73
 
 
74
    skip_extensions = options.skip_extension or ()
 
75
    extensions = [ext for ext in (options.extension or ())
 
76
                  if ext not in skip_extensions]
 
77
    skip_urls = [re.compile(pattern) for pattern in (options.skip_url or ())]
 
78
 
 
79
    names = [name[:-len(".request")]
 
80
             for name in os.listdir(directory)
 
81
             if name.startswith(options.prefix) and name.endswith('.request')
 
82
             ]
 
83
    names.sort()
 
84
 
 
85
    extre = re.compile("[.](\w+)$")
 
86
 
 
87
    for name in names:
 
88
        requests =  Requests(
 
89
                        open(os.path.join(directory, name + ".request"), 'rb'),
 
90
                        options.skip_request_header,
 
91
                        )
 
92
        responses = Responses(
 
93
                        open(os.path.join(directory, name + ".response"), 'rb'),
 
94
                        options.skip_response_header,
 
95
                        )
 
96
 
 
97
        # We use map so as *not* to truncate at shortest input.
 
98
        # We want an error if the number of requests and responses
 
99
        # is different.
 
100
        for request, response in map(None, requests, responses):
 
101
            assert (request and response) or not (request or response)
 
102
 
 
103
            path = request.path
 
104
            ext = extre.search(path)
 
105
            if ext:
 
106
                ext = ext.group(1)
 
107
                if extensions:
 
108
                    if ext not in extensions:
 
109
                        continue
 
110
                else:
 
111
                    if ext in skip_extensions:
 
112
                        continue
 
113
            
 
114
            for skip_url in skip_urls:
 
115
                if skip_url.search(request.path):
 
116
                    break
 
117
            else:
 
118
                try:
 
119
                    output_test(request, response, options.clean_redirects)
 
120
                except IOError, e:
 
121
                    if e.errno == errno.EPIPE:
 
122
                        return
 
123
                    raise
 
124
    
 
125
 
 
126
def output_test(request, response, clean_redirects=False):
 
127
    print
 
128
    print
 
129
    print '  >>> print http(r"""'
 
130
    print '  ...', '\n  ... '.join(request.lines())+'""")'
 
131
    if response.code in (301, 302, 303) and clean_redirects:
 
132
        content_length = None
 
133
        if response.headers:
 
134
            for i in range(len(response.headers)):
 
135
                h, v = response.headers[i]
 
136
                if h == "Content-Length":
 
137
                    content_length = int(v)
 
138
                    response.headers[i] = (h, "...")
 
139
        lines = response.header_lines()
 
140
        if lines and content_length == 0:
 
141
            lines.append("...")
 
142
    else:
 
143
        lines = response.lines()
 
144
    print ' ', '\n  '.join([line.rstrip() and line or '<BLANKLINE>'
 
145
                             for line in lines])
 
146
 
 
147
class Message:
 
148
 
 
149
    start = ''
 
150
 
 
151
    def __init__(self, file, skip_headers):
 
152
        start = file.readline().rstrip()
 
153
        if start:
 
154
            self.start = start
 
155
            if start.startswith("HTTP/"):
 
156
                # This is a response; extract the response code:
 
157
                self.code = int(start.split()[1])
 
158
            headers = [split_header(header)
 
159
                       for header in rfc822.Message(file).headers
 
160
                       ]
 
161
            headers = [
 
162
                ('-'.join([s.capitalize() for s in name.split('-')]),
 
163
                 v.rstrip()
 
164
                 )
 
165
                for (name, v) in headers
 
166
                if name.lower() not in skip_headers
 
167
            ]
 
168
            self.headers = headers
 
169
            content_length = int(dict(headers).get('Content-Length', '0'))
 
170
            if content_length:
 
171
                self.body = file.read(content_length).split('\n')
 
172
            else:
 
173
                self.body = []
 
174
 
 
175
    def __nonzero__(self):
 
176
        return bool(self.start)
 
177
 
 
178
    def lines(self):
 
179
        output = self.header_lines()
 
180
        if output:
 
181
            output.extend(self.body)
 
182
        return output
 
183
 
 
184
    def header_lines(self):
 
185
        if self.start:
 
186
            output = [self.start]
 
187
            headers = ["%s: %s" % (name, v) for (name, v) in self.headers]
 
188
            headers.sort()
 
189
            output.extend(headers)
 
190
            output.append('')
 
191
        else:
 
192
            output = []
 
193
        return output
 
194
 
 
195
headerre = re.compile('(\S+): (.+)$')
 
196
def split_header(header):
 
197
    return headerre.match(header).group(1, 2)
 
198
 
 
199
def messages(cls, file, skip_headers):
 
200
    skip_headers = [name.lower() for name in (skip_headers or ())]
 
201
    while 1:
 
202
        message = cls(file, skip_headers)
 
203
        if message:
 
204
            yield message
 
205
        else:
 
206
            break
 
207
        
 
208
class Request(Message):
 
209
 
 
210
    path = ''
 
211
    
 
212
    def __init__(self, file, skip_headers):
 
213
        Message.__init__(self, file, skip_headers)
 
214
        if self.start:
 
215
            self.command, self.path, self.protocol = self.start.split()
 
216
    
 
217
def Requests(file, skip_headers):
 
218
    return messages(Request, file, skip_headers)
 
219
    
 
220
def Responses(file, skip_headers):
 
221
    return messages(Message, file, skip_headers)
 
222
 
 
223
main = dochttp
 
224
 
 
225
if __name__ == '__main__':
 
226
    main()