1
##############################################################################
3
# Copyright (c) 2004 Zope Foundation and Contributors.
6
# This software is subject to the provisions of the Zope Public License,
7
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
8
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
9
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
10
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
11
# FOR A PARTICULAR PURPOSE.
13
##############################################################################
14
"""Convert an http tcpwatch session to a doctest
16
$Id: dochttp.py 110724 2010-04-11 00:04:11Z tseaver $
26
usage = """usage: %prog <options> directory
28
Convert an http tcpwatch recorded sesssion to a doctest file, which is
29
written to standard output.
33
parser = optparse.OptionParser(usage)
34
parser.add_option("-p", "--prefix", default="watch",
35
help="Prefix for recorded tcpwatch session files")
36
parser.add_option("-U", "--skip-url", action="append",
37
help="Regular expression for URLs to skip")
38
parser.add_option("-E", "--skip-extension", action="append",
39
help="URL file-extension to skip")
40
parser.add_option("-e", "--extension", action="append",
41
help="URL file-extension to include")
42
parser.add_option("-I", "--skip-request-header", action="append",
43
help="Request header to skip")
44
parser.add_option("-O", "--skip-response-header", action="append",
45
help="Response header to skip")
46
parser.add_option("-r", "--clean-redirects", action="store_true",
47
help="Strip content from redirect responses",
53
'-I', 'Accept-Charset', '-I', 'Accept-Encoding', '-I', 'Accept-Language',
54
'-I', 'Accept', '-I', 'Connection', '-I', 'Host', '-I', 'Keep-Alive',
57
'-O', 'Date', '-O', 'Server', '-O', 'X-Content-Type-Warning',
62
def dochttp(args=sys.argv[1:], default=None):
63
"""Convert a tcpwatch recorded sesssion to a doctest file"""
65
default = default_options
67
options, args = parser.parse_args(default+args)
74
skip_extensions = options.skip_extension or ()
75
extensions = [ext for ext in (options.extension or ())
76
if ext not in skip_extensions]
77
skip_urls = [re.compile(pattern) for pattern in (options.skip_url or ())]
79
names = [name[:-len(".request")]
80
for name in os.listdir(directory)
81
if name.startswith(options.prefix) and name.endswith('.request')
85
extre = re.compile("[.](\w+)$")
89
open(os.path.join(directory, name + ".request"), 'rb'),
90
options.skip_request_header,
92
responses = Responses(
93
open(os.path.join(directory, name + ".response"), 'rb'),
94
options.skip_response_header,
97
# We use map so as *not* to truncate at shortest input.
98
# We want an error if the number of requests and responses
100
for request, response in map(None, requests, responses):
101
assert (request and response) or not (request or response)
104
ext = extre.search(path)
108
if ext not in extensions:
111
if ext in skip_extensions:
114
for skip_url in skip_urls:
115
if skip_url.search(request.path):
119
output_test(request, response, options.clean_redirects)
121
if e.errno == errno.EPIPE:
126
def output_test(request, response, clean_redirects=False):
129
print ' >>> print http(r"""'
130
print ' ...', '\n ... '.join(request.lines())+'""")'
131
if response.code in (301, 302, 303) and clean_redirects:
132
content_length = None
134
for i in range(len(response.headers)):
135
h, v = response.headers[i]
136
if h == "Content-Length":
137
content_length = int(v)
138
response.headers[i] = (h, "...")
139
lines = response.header_lines()
140
if lines and content_length == 0:
143
lines = response.lines()
144
print ' ', '\n '.join([line.rstrip() and line or '<BLANKLINE>'
151
def __init__(self, file, skip_headers):
152
start = file.readline().rstrip()
155
if start.startswith("HTTP/"):
156
# This is a response; extract the response code:
157
self.code = int(start.split()[1])
158
headers = [split_header(header)
159
for header in rfc822.Message(file).headers
162
('-'.join([s.capitalize() for s in name.split('-')]),
165
for (name, v) in headers
166
if name.lower() not in skip_headers
168
self.headers = headers
169
content_length = int(dict(headers).get('Content-Length', '0'))
171
self.body = file.read(content_length).split('\n')
175
def __nonzero__(self):
176
return bool(self.start)
179
output = self.header_lines()
181
output.extend(self.body)
184
def header_lines(self):
186
output = [self.start]
187
headers = ["%s: %s" % (name, v) for (name, v) in self.headers]
189
output.extend(headers)
195
headerre = re.compile('(\S+): (.+)$')
196
def split_header(header):
197
return headerre.match(header).group(1, 2)
199
def messages(cls, file, skip_headers):
200
skip_headers = [name.lower() for name in (skip_headers or ())]
202
message = cls(file, skip_headers)
208
class Request(Message):
212
def __init__(self, file, skip_headers):
213
Message.__init__(self, file, skip_headers)
215
self.command, self.path, self.protocol = self.start.split()
217
def Requests(file, skip_headers):
218
return messages(Request, file, skip_headers)
220
def Responses(file, skip_headers):
221
return messages(Message, file, skip_headers)
225
if __name__ == '__main__':