1
# -*- test-case-name: epsilon.test.test_benchmark -*-
4
Functions for running a Python file in a child process and recording resource
5
usage information and other statistics about it.
8
import os, time, sys, socket, StringIO, pprint, errno
11
from twisted.python import log, filepath, failure, util
12
from twisted.internet import reactor, protocol, error, defer
13
from twisted.protocols import policies
16
from epsilon import structlike
18
from epsilon import juice
19
from epsilon.test import utils
22
class diskstat(structlike.record(
23
'readCount mergedReadCount readSectorCount readMilliseconds '
24
'writeCount mergedWriteCount writeSectorCount writeMilliseconds '
25
'outstandingIOCount ioMilliseconds weightedIOMilliseconds')):
27
Represent the I/O stats of a single device, as reported by Linux's disk
33
class partitionstat(structlike.record(
34
'readCount readSectorCount writeCount writeSectorCount')):
36
Like diskstat, but for a partition. Less information is made available by
37
Linux for partitions, so this has fewer attributes.
42
def parseDiskStatLine(L):
44
Parse a single line from C{/proc/diskstats} into a two-tuple of the name of
45
the device to which it corresponds (ie 'hda') and an instance of the
46
appropriate record type (either L{partitionstat} or L{diskstat}).
51
factory = partitionstat
54
return device, factory(*map(int, parts[3:]))
58
def parseDiskStats(fObj):
60
Parse a file-like object containing lines formatted like those in
61
C{/proc/diskstats}. Yield two-tuples of information for each line.
64
yield parseDiskStatLine(L)
70
Parse the current contents of C{/proc/diskstats} into a dict mapping device
71
names to instances of the appropriate stat record.
73
return dict(parseDiskStats(file('/proc/diskstats')))
77
class ResourceSnapshot(structlike.record('time disk partition size')):
79
Represents the state of some resources on the system at a particular moment
82
@ivar time: The time at which the stats associated with this instance were
85
@ivar disk: A C{diskstat} instance created from stats available that the
88
@ivar partition: A C{diskstat} instance created from stats available that
91
@ivar size: Total size of all files beneath a particular directory.
96
class ProcessDied(Exception):
98
Encapsulates process state and failure mode.
100
def __init__(self, exitCode, signal, status, output):
101
self.exitCode = exitCode
105
Exception.__init__(self)
109
class BasicProcess(protocol.ProcessProtocol, policies.TimeoutMixin):
111
The simplest possible process protocol. It doesn't do anything except what
112
is absolutely mandatory of any conceivable ProcessProtocol.
119
def __init__(self, whenFinished, path):
120
self.whenFinished = whenFinished
125
def connectionMade(self):
126
self.setTimeout(900.0)
129
def timeoutConnection(self):
131
self.transport.signalProcess('KILL')
134
def childDataReceived(self, childFD, data):
136
self.output.append((childFD, data))
139
def childConnectionLost(self, childFD):
141
self.output.append((childFD, None))
144
def processEnded(self, reason):
145
# XXX Okay, I'm a liar. This doesn't do everything. Strictly speaking
146
# we shouldn't fire completion notification until the process has
147
# terminated *and* the file descriptors have all been closed. We're
148
# not supporting passing file descriptors from the child to a
149
# grandchild here, though. Don't Do It.
150
d, self.whenFinished = self.whenFinished, None
151
o, self.output = self.output, None
152
if reason.check(error.ProcessDone):
153
d.callback((self, reason.value.status, o))
155
d.errback(error.TimeoutError())
156
elif reason.check(error.ProcessTerminated):
157
d.errback(failure.Failure(ProcessDied(
158
reason.value.exitCode,
160
reason.value.status, o)))
162
d.errback(reason.value)
163
self.setTimeout(None)
166
def spawn(cls, executable, args, path, env, spawnProcess=None):
168
Run an executable with some arguments in the given working directory with
169
the given environment variables.
171
Returns a Deferred which fires with a two-tuple of (exit status, output
172
list) if the process terminates without timing out or being killed by a
173
signal. Otherwise, the Deferred errbacks with either L{error.TimeoutError}
174
if any 10 minute period passes with no events or L{ProcessDied} if it is
177
On success, the output list is of two-tuples of (file descriptor, bytes).
180
proto = cls(d, filepath.FilePath(path))
181
if spawnProcess is None:
182
spawnProcess = reactor.spawnProcess
189
childFDs={0: 'w', 1: 'r', 2: 'r',
190
cls.BACKCHANNEL_OUT: 'r',
191
cls.BACKCHANNEL_IN: 'w'})
193
spawn = classmethod(spawn)
197
class Change(object):
199
Stores two ResourceSnapshots taken at two different times.
201
def start(self, path, disk, partition):
202
# Do these three things as explicit, separate statments to make sure
203
# gathering disk stats isn't accidentally included in the duration.
204
startSize = getSize(path)
205
beforeDiskStats = captureStats()
206
startTime = time.time()
207
self.before = ResourceSnapshot(
209
disk=beforeDiskStats.get(disk, None),
210
partition=beforeDiskStats.get(partition, None),
214
def stop(self, path, disk, partition):
215
# Do these three things as explicit, separate statments to make sure
216
# gathering disk stats isn't accidentally included in the duration.
217
endTime = time.time()
218
afterDiskStats = captureStats()
219
endSize = getSize(path)
220
self.after = ResourceSnapshot(
222
disk=afterDiskStats.get(disk, None),
223
partition=afterDiskStats.get(partition, None),
228
class BenchmarkProcess(BasicProcess):
234
def __init__(self, *a, **kw):
235
BasicProcess.__init__(self, *a, **kw)
237
# Figure out where the process is running.
238
self.partition = discoverCurrentWorkingDevice().split('/')[-1]
239
self.disk = self.partition.rstrip('0123456789')
241
# Keep track of stats for the entire process run.
242
self.overallChange = Change()
243
self.overallChange.start(self.path, self.disk, self.partition)
245
# Just keep track of stats between START and STOP events.
246
self.benchmarkChange = Change()
249
def connectionMade(self):
250
return BasicProcess.connectionMade(self)
253
def startTiming(self):
254
self.benchmarkChange.start(self.path, self.disk, self.partition)
255
self.transport.writeToChild(self.BACKCHANNEL_IN, self.START)
258
def stopTiming(self):
259
self.benchmarkChange.stop(self.path, self.disk, self.partition)
260
self.transport.writeToChild(self.BACKCHANNEL_IN, self.STOP)
263
def childDataReceived(self, childFD, data):
264
if childFD == self.BACKCHANNEL_OUT:
267
if byte == self.START:
269
elif byte == self.STOP:
272
self.transport.signalProcess('QUIT')
274
return BasicProcess.childDataReceived(self, childFD, data)
277
def processEnded(self, reason):
278
self.overallChange.stop(self.path, self.disk, self.partition)
279
return BasicProcess.processEnded(self, reason)
284
class Results(juice.Command):
285
commandName = 'Result'
287
# Stats version - change this whenever the meaning of something changes
288
# or a field is added or removed.
289
('version', juice.Integer()),
291
# If an error occurred while collecting these stats - this probably
292
# means they're bogus.
293
('error', juice.Boolean()),
295
# If a particular timeout (See BasicProcess.connectionMade) elapsed
296
# with no events whatsoever from the benchmark process.
297
('timeout', juice.Boolean()),
299
# A unique name identifying the benchmark for which these are stats.
300
('name', juice.Unicode()),
302
# The name of the benchmark associated with these stats.
303
('host', juice.Unicode()),
305
# The sector size of the disk on which these stats were collected
306
# (sectors are a gross lie, this is really the block size, and
307
# everything else that talks about sectors is really talking about
309
('sector_size', juice.Integer()),
311
# Hex version info for the Python which generated these stats.
312
('python_version', juice.Unicode()),
314
# Twisted SVN revision number used to generate these stats.
315
('twisted_version', juice.Unicode()),
317
# Divmod SVN revision number used to generate these stats.
318
('divmod_version', juice.Unicode()),
320
# Number of seconds between process startup and termination.
321
('elapsed', juice.Float()),
323
# Size, in bytes, of the directory in which the child process was run.
324
('filesystem_growth', juice.Integer()),
326
# Number of reads issued on the partition over the lifetime of the
327
# child process. This may include reads from other processes, if any
328
# were active on the same disk when the stats were collected.
329
('read_count', juice.Integer(optional=True)),
331
# Number of sectors which were read from the partition over the
332
# lifetime of the child process. Same caveat as above.
333
('read_sectors', juice.Integer(optional=True)),
335
# Number of writes issued to the partition over the lifetime of the
336
# child process. Same caveat as above.
337
('write_count', juice.Integer(optional=True)),
339
# Number of sectors which were written to the partition over the
340
# lifetime of the child process. Same caveat as above.
341
('write_sectors', juice.Integer(optional=True)),
343
# Number of milliseconds spent blocked on reading from the disk over
344
# the lifetime of the child process. Same caveat as above.
345
('read_ms', juice.Integer(optional=True)),
347
# Number of milliseconds spent blocked on writing to the disk over the
348
# lifetime of the child process. Same caveat as above.
349
('write_ms', juice.Integer(optional=True)),
353
hostname = socket.gethostname()
354
assert hostname != 'localhost', "Fix your computro."
356
def formatResults(name, sectorSize, before, after, error, timeout):
357
output = StringIO.StringIO()
358
jj = juice.Juice(issueGreeting=False)
359
tt = utils.FileWrapper(output)
360
jj.makeConnection(tt)
362
if after.partition is not None:
363
read_count = after.partition.readCount - before.partition.readCount
364
read_sectors = after.partition.readSectorCount - before.partition.readSectorCount
365
write_count = after.partition.writeCount - before.partition.writeCount
366
write_sectors = after.partition.writeSectorCount - before.partition.writeSectorCount
373
if after.disk is not None:
374
read_ms = after.disk.readMilliseconds - before.disk.readMilliseconds
375
write_ms = after.disk.writeMilliseconds - before.disk.writeMilliseconds
380
twisted_version = twisted.version._getSVNVersion()
381
if twisted_version is None:
382
twisted_version = twisted.version.short()
383
epsilon_version = epsilon.version._getSVNVersion()
384
if epsilon_version is None:
385
epsilon_version = epsilon.version.short()
388
version=STATS_VERSION,
393
elapsed=after.time - before.time,
394
sector_size=sectorSize,
395
read_count=read_count,
396
read_sectors=read_sectors,
398
write_count=write_count,
399
write_sectors=write_sectors,
401
filesystem_growth=after.size - before.size,
402
python_version=unicode(sys.hexversion),
403
twisted_version=twisted_version,
404
divmod_version=epsilon_version,
405
).do(jj, requiresAnswer=False)
406
return output.getvalue()
410
def reportResults(results):
413
fObj = file('output', 'ab')
419
def discoverCurrentWorkingDevice():
421
Return a short string naming the device which backs the current working
422
directory, ie C{/dev/hda1}.
426
for L in file('/etc/mtab'):
428
if cwd.startswith(parts[1]):
429
possibilities.append((len(parts[1]), parts[0]))
431
return possibilities[-1][-1]
437
@type p: L{twisted.python.filepath.FilePath}
438
@return: The size, in bytes, of the given path and all its children.
440
return sum(getOneSize(ch) for ch in p.walk())
445
@type ch: L{twisted.python.filepath.FilePath}
446
@return: The size, in bytes, of the given path only.
451
if e.errno == errno.ENOENT:
452
# XXX FilePath is broken
453
if os.path.islink(ch.path):
454
return len(os.readlink(ch.path))
462
def getSectorSize(p):
463
return os.statvfs(p.path).f_bsize
466
def _bench(name, workingPath, function):
469
err = timeout = False
470
if isinstance(result, failure.Failure):
472
if result.check(error.TimeoutError):
473
log.msg("Failing because timeout!")
475
elif result.check(ProcessDied):
476
log.msg("Failing because Failure!")
477
pprint.pprint(result.value.output)
478
print result.value.exitCode, result.value.signal
482
proto, status, output = result
483
stderr = [bytes for (fd, bytes) in output if fd == 2]
484
if status or stderr != [None]:
486
log.msg("Failing because stderr or bad status")
487
pprint.pprint(result)
489
for n, change in [(name + '-overall', proto.overallChange),
490
(name + '-benchmark', proto.benchmarkChange)]:
491
reportResults(formatResults(
493
getSectorSize(workingPath),
499
return d.addBoth(later)
503
def bench(name, path, func):
504
log.startLogging(sys.stdout)
505
log.msg("Running " + name)
507
d = _bench(name, path, func)
508
d.addErrback(log.err)
509
d.addCallback(lambda ign: reactor.stop())
514
def makeBenchmarkRunner(path, args):
516
Make a function that will run two Python processes serially: first one
517
which calls the setup function from the given file, then one which calls
518
the execute function from the given file.
521
return BenchmarkProcess.spawn(
522
executable=sys.executable,
523
args=['-Wignore'] + args,
532
Start recording stats. Call this from a benchmark script when your setup
533
is done. Call this at most once.
535
@raise RuntimeError: Raised if the parent process responds with anything
536
other than an acknowledgement of this message.
538
os.write(BenchmarkProcess.BACKCHANNEL_OUT, BenchmarkProcess.START)
539
response = util.untilConcludes(os.read, BenchmarkProcess.BACKCHANNEL_IN, 1)
540
if response != BenchmarkProcess.START:
542
"Parent process responded with %r instead of START " % (response,))
548
Stop recording stats. Call this from a benchmark script when the code you
549
want benchmarked has finished. Call this exactly the same number of times
550
you call L{start} and only after calling it.
552
@raise RuntimeError: Raised if the parent process responds with anything
553
other than an acknowledgement of this message.
555
os.write(BenchmarkProcess.BACKCHANNEL_OUT, BenchmarkProcess.STOP)
556
response = util.untilConcludes(os.read, BenchmarkProcess.BACKCHANNEL_IN, 1)
557
if response != BenchmarkProcess.STOP:
559
"Parent process responded with %r instead of STOP" % (response,))
565
Run me with the filename of a benchmark script as an argument. I will time
566
it and append the results to a file named output in the current working
570
path = filepath.FilePath('.stat').temporarySibling()
572
func = makeBenchmarkRunner(path, sys.argv[1:])
574
bench(name, path, func)
580
if __name__ == '__main__':