~divmod-dev/divmod.org/trunk

« back to all changes in this revision

Viewing changes to Epsilon/epsilon/scripts/benchmark.py

  • Committer: Jean-Paul Calderone
  • Date: 2014-06-29 20:33:04 UTC
  • mfrom: (2749.1.1 remove-epsilon-1325289)
  • Revision ID: exarkun@twistedmatrix.com-20140629203304-gdkmbwl1suei4m97
mergeĀ lp:~exarkun/divmod.org/remove-epsilon-1325289

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# -*- test-case-name: epsilon.test.test_benchmark -*-
2
 
 
3
 
"""
4
 
Functions for running a Python file in a child process and recording resource
5
 
usage information and other statistics about it.
6
 
"""
7
 
 
8
 
import os, time, sys, socket, StringIO, pprint, errno
9
 
 
10
 
import twisted
11
 
from twisted.python import log, filepath, failure, util
12
 
from twisted.internet import reactor, protocol, error, defer
13
 
from twisted.protocols import policies
14
 
 
15
 
import epsilon
16
 
from epsilon import structlike
17
 
 
18
 
from epsilon import juice
19
 
from epsilon.test import utils
20
 
 
21
 
 
22
 
class diskstat(structlike.record(
23
 
    'readCount mergedReadCount readSectorCount readMilliseconds '
24
 
    'writeCount mergedWriteCount writeSectorCount writeMilliseconds '
25
 
    'outstandingIOCount ioMilliseconds weightedIOMilliseconds')):
26
 
    """
27
 
    Represent the I/O stats of a single device, as reported by Linux's disk
28
 
    stats.
29
 
    """
30
 
 
31
 
 
32
 
 
33
 
class partitionstat(structlike.record(
34
 
    'readCount readSectorCount writeCount writeSectorCount')):
35
 
    """
36
 
    Like diskstat, but for a partition.  Less information is made available by
37
 
    Linux for partitions, so this has fewer attributes.
38
 
    """
39
 
 
40
 
 
41
 
 
42
 
def parseDiskStatLine(L):
43
 
    """
44
 
    Parse a single line from C{/proc/diskstats} into a two-tuple of the name of
45
 
    the device to which it corresponds (ie 'hda') and an instance of the
46
 
    appropriate record type (either L{partitionstat} or L{diskstat}).
47
 
    """
48
 
    parts = L.split()
49
 
    device = parts[2]
50
 
    if len(parts) == 7:
51
 
        factory = partitionstat
52
 
    else:
53
 
        factory = diskstat
54
 
    return device, factory(*map(int, parts[3:]))
55
 
 
56
 
 
57
 
 
58
 
def parseDiskStats(fObj):
59
 
    """
60
 
    Parse a file-like object containing lines formatted like those in
61
 
    C{/proc/diskstats}.  Yield two-tuples of information for each line.
62
 
    """
63
 
    for L in fObj:
64
 
        yield parseDiskStatLine(L)
65
 
 
66
 
 
67
 
 
68
 
def captureStats():
69
 
    """
70
 
    Parse the current contents of C{/proc/diskstats} into a dict mapping device
71
 
    names to instances of the appropriate stat record.
72
 
    """
73
 
    return dict(parseDiskStats(file('/proc/diskstats')))
74
 
 
75
 
 
76
 
 
77
 
class ResourceSnapshot(structlike.record('time disk partition size')):
78
 
    """
79
 
    Represents the state of some resources on the system at a particular moment
80
 
    in time.
81
 
 
82
 
    @ivar time: The time at which the stats associated with this instance were
83
 
    recorded.
84
 
 
85
 
    @ivar disk: A C{diskstat} instance created from stats available that the
86
 
    given time.
87
 
 
88
 
    @ivar partition: A C{diskstat} instance created from stats available that
89
 
    the given time.
90
 
 
91
 
    @ivar size: Total size of all files beneath a particular directory.
92
 
    """
93
 
 
94
 
 
95
 
 
96
 
class ProcessDied(Exception):
97
 
    """
98
 
    Encapsulates process state and failure mode.
99
 
    """
100
 
    def __init__(self, exitCode, signal, status, output):
101
 
        self.exitCode = exitCode
102
 
        self.signal = signal
103
 
        self.status = status
104
 
        self.output = output
105
 
        Exception.__init__(self)
106
 
 
107
 
 
108
 
 
109
 
class BasicProcess(protocol.ProcessProtocol, policies.TimeoutMixin):
110
 
    """
111
 
    The simplest possible process protocol.  It doesn't do anything except what
112
 
    is absolutely mandatory of any conceivable ProcessProtocol.
113
 
    """
114
 
    timedOut = False
115
 
 
116
 
    BACKCHANNEL_OUT = 3
117
 
    BACKCHANNEL_IN = 4
118
 
 
119
 
    def __init__(self, whenFinished, path):
120
 
        self.whenFinished = whenFinished
121
 
        self.path = path
122
 
        self.output = []
123
 
 
124
 
 
125
 
    def connectionMade(self):
126
 
        self.setTimeout(900.0)
127
 
 
128
 
 
129
 
    def timeoutConnection(self):
130
 
        self.timedOut = True
131
 
        self.transport.signalProcess('KILL')
132
 
 
133
 
 
134
 
    def childDataReceived(self, childFD, data):
135
 
        self.resetTimeout()
136
 
        self.output.append((childFD, data))
137
 
 
138
 
 
139
 
    def childConnectionLost(self, childFD):
140
 
        self.resetTimeout()
141
 
        self.output.append((childFD, None))
142
 
 
143
 
 
144
 
    def processEnded(self, reason):
145
 
        # XXX Okay, I'm a liar.  This doesn't do everything.  Strictly speaking
146
 
        # we shouldn't fire completion notification until the process has
147
 
        # terminated *and* the file descriptors have all been closed.  We're
148
 
        # not supporting passing file descriptors from the child to a
149
 
        # grandchild here, though.  Don't Do It.
150
 
        d, self.whenFinished = self.whenFinished, None
151
 
        o, self.output = self.output, None
152
 
        if reason.check(error.ProcessDone):
153
 
            d.callback((self, reason.value.status, o))
154
 
        elif self.timedOut:
155
 
            d.errback(error.TimeoutError())
156
 
        elif reason.check(error.ProcessTerminated):
157
 
            d.errback(failure.Failure(ProcessDied(
158
 
                reason.value.exitCode,
159
 
                reason.value.signal,
160
 
                reason.value.status, o)))
161
 
        else:
162
 
            d.errback(reason.value)
163
 
        self.setTimeout(None)
164
 
 
165
 
 
166
 
    def spawn(cls, executable, args, path, env, spawnProcess=None):
167
 
        """
168
 
        Run an executable with some arguments in the given working directory with
169
 
        the given environment variables.
170
 
 
171
 
        Returns a Deferred which fires with a two-tuple of (exit status, output
172
 
        list) if the process terminates without timing out or being killed by a
173
 
        signal.  Otherwise, the Deferred errbacks with either L{error.TimeoutError}
174
 
        if any 10 minute period passes with no events or L{ProcessDied} if it is
175
 
        killed by a signal.
176
 
 
177
 
        On success, the output list is of two-tuples of (file descriptor, bytes).
178
 
        """
179
 
        d = defer.Deferred()
180
 
        proto = cls(d, filepath.FilePath(path))
181
 
        if spawnProcess is None:
182
 
            spawnProcess = reactor.spawnProcess
183
 
        spawnProcess(
184
 
            proto,
185
 
            executable,
186
 
            [executable] + args,
187
 
            path=path,
188
 
            env=env,
189
 
            childFDs={0: 'w', 1: 'r', 2: 'r',
190
 
                      cls.BACKCHANNEL_OUT: 'r',
191
 
                      cls.BACKCHANNEL_IN: 'w'})
192
 
        return d
193
 
    spawn = classmethod(spawn)
194
 
 
195
 
 
196
 
 
197
 
class Change(object):
198
 
    """
199
 
    Stores two ResourceSnapshots taken at two different times.
200
 
    """
201
 
    def start(self, path, disk, partition):
202
 
        # Do these three things as explicit, separate statments to make sure
203
 
        # gathering disk stats isn't accidentally included in the duration.
204
 
        startSize = getSize(path)
205
 
        beforeDiskStats = captureStats()
206
 
        startTime = time.time()
207
 
        self.before = ResourceSnapshot(
208
 
            time=startTime,
209
 
            disk=beforeDiskStats.get(disk, None),
210
 
            partition=beforeDiskStats.get(partition, None),
211
 
            size=startSize)
212
 
 
213
 
 
214
 
    def stop(self, path, disk, partition):
215
 
        # Do these three things as explicit, separate statments to make sure
216
 
        # gathering disk stats isn't accidentally included in the duration.
217
 
        endTime = time.time()
218
 
        afterDiskStats = captureStats()
219
 
        endSize = getSize(path)
220
 
        self.after = ResourceSnapshot(
221
 
            time=endTime,
222
 
            disk=afterDiskStats.get(disk, None),
223
 
            partition=afterDiskStats.get(partition, None),
224
 
            size=endSize)
225
 
 
226
 
 
227
 
 
228
 
class BenchmarkProcess(BasicProcess):
229
 
 
230
 
    START = '\0'
231
 
    STOP = '\1'
232
 
 
233
 
 
234
 
    def __init__(self, *a, **kw):
235
 
        BasicProcess.__init__(self, *a, **kw)
236
 
 
237
 
        # Figure out where the process is running.
238
 
        self.partition = discoverCurrentWorkingDevice().split('/')[-1]
239
 
        self.disk = self.partition.rstrip('0123456789')
240
 
 
241
 
        # Keep track of stats for the entire process run.
242
 
        self.overallChange = Change()
243
 
        self.overallChange.start(self.path, self.disk, self.partition)
244
 
 
245
 
        # Just keep track of stats between START and STOP events.
246
 
        self.benchmarkChange = Change()
247
 
 
248
 
 
249
 
    def connectionMade(self):
250
 
        return BasicProcess.connectionMade(self)
251
 
 
252
 
 
253
 
    def startTiming(self):
254
 
        self.benchmarkChange.start(self.path, self.disk, self.partition)
255
 
        self.transport.writeToChild(self.BACKCHANNEL_IN, self.START)
256
 
 
257
 
 
258
 
    def stopTiming(self):
259
 
        self.benchmarkChange.stop(self.path, self.disk, self.partition)
260
 
        self.transport.writeToChild(self.BACKCHANNEL_IN, self.STOP)
261
 
 
262
 
 
263
 
    def childDataReceived(self, childFD, data):
264
 
        if childFD == self.BACKCHANNEL_OUT:
265
 
            self.resetTimeout()
266
 
            for byte in data:
267
 
                if byte == self.START:
268
 
                    self.startTiming()
269
 
                elif byte == self.STOP:
270
 
                    self.stopTiming()
271
 
                else:
272
 
                    self.transport.signalProcess('QUIT')
273
 
        else:
274
 
            return BasicProcess.childDataReceived(self, childFD, data)
275
 
 
276
 
 
277
 
    def processEnded(self, reason):
278
 
        self.overallChange.stop(self.path, self.disk, self.partition)
279
 
        return BasicProcess.processEnded(self, reason)
280
 
 
281
 
 
282
 
 
283
 
STATS_VERSION = 0
284
 
class Results(juice.Command):
285
 
    commandName = 'Result'
286
 
    arguments = [
287
 
        # Stats version - change this whenever the meaning of something changes
288
 
        # or a field is added or removed.
289
 
        ('version', juice.Integer()),
290
 
 
291
 
        # If an error occurred while collecting these stats - this probably
292
 
        # means they're bogus.
293
 
        ('error', juice.Boolean()),
294
 
 
295
 
        # If a particular timeout (See BasicProcess.connectionMade) elapsed
296
 
        # with no events whatsoever from the benchmark process.
297
 
        ('timeout', juice.Boolean()),
298
 
 
299
 
        # A unique name identifying the benchmark for which these are stats.
300
 
        ('name', juice.Unicode()),
301
 
 
302
 
        # The name of the benchmark associated with these stats.
303
 
        ('host', juice.Unicode()),
304
 
 
305
 
        # The sector size of the disk on which these stats were collected
306
 
        # (sectors are a gross lie, this is really the block size, and
307
 
        # everything else that talks about sectors is really talking about
308
 
        # blocks).
309
 
        ('sector_size', juice.Integer()),
310
 
 
311
 
        # Hex version info for the Python which generated these stats.
312
 
        ('python_version', juice.Unicode()),
313
 
 
314
 
        # Twisted SVN revision number used to generate these stats.
315
 
        ('twisted_version', juice.Unicode()),
316
 
 
317
 
        # Divmod SVN revision number used to generate these stats.
318
 
        ('divmod_version', juice.Unicode()),
319
 
 
320
 
        # Number of seconds between process startup and termination.
321
 
        ('elapsed', juice.Float()),
322
 
 
323
 
        # Size, in bytes, of the directory in which the child process was run.
324
 
        ('filesystem_growth', juice.Integer()),
325
 
 
326
 
        # Number of reads issued on the partition over the lifetime of the
327
 
        # child process.  This may include reads from other processes, if any
328
 
        # were active on the same disk when the stats were collected.
329
 
        ('read_count', juice.Integer(optional=True)),
330
 
 
331
 
        # Number of sectors which were read from the partition over the
332
 
        # lifetime of the child process. Same caveat as above.
333
 
        ('read_sectors', juice.Integer(optional=True)),
334
 
 
335
 
        # Number of writes issued to the partition over the lifetime of the
336
 
        # child process.  Same caveat as above.
337
 
        ('write_count', juice.Integer(optional=True)),
338
 
 
339
 
        # Number of sectors which were written to the partition over the
340
 
        # lifetime of the child process.  Same caveat as above.
341
 
        ('write_sectors', juice.Integer(optional=True)),
342
 
 
343
 
        # Number of milliseconds spent blocked on reading from the disk over
344
 
        # the lifetime of the child process.  Same caveat as above.
345
 
        ('read_ms', juice.Integer(optional=True)),
346
 
 
347
 
        # Number of milliseconds spent blocked on writing to the disk over the
348
 
        # lifetime of the child process.  Same caveat as above.
349
 
        ('write_ms', juice.Integer(optional=True)),
350
 
        ]
351
 
 
352
 
 
353
 
hostname = socket.gethostname()
354
 
assert hostname != 'localhost', "Fix your computro."
355
 
 
356
 
def formatResults(name, sectorSize, before, after, error, timeout):
357
 
    output = StringIO.StringIO()
358
 
    jj = juice.Juice(issueGreeting=False)
359
 
    tt = utils.FileWrapper(output)
360
 
    jj.makeConnection(tt)
361
 
 
362
 
    if after.partition is not None:
363
 
        read_count = after.partition.readCount - before.partition.readCount
364
 
        read_sectors = after.partition.readSectorCount - before.partition.readSectorCount
365
 
        write_count = after.partition.writeCount - before.partition.writeCount
366
 
        write_sectors = after.partition.writeSectorCount - before.partition.writeSectorCount
367
 
    else:
368
 
        read_count = None
369
 
        read_sectors = None
370
 
        write_count = None
371
 
        write_sectors = None
372
 
 
373
 
    if after.disk is not None:
374
 
        read_ms = after.disk.readMilliseconds - before.disk.readMilliseconds
375
 
        write_ms = after.disk.writeMilliseconds - before.disk.writeMilliseconds
376
 
    else:
377
 
        read_ms = None
378
 
        write_ms = None
379
 
 
380
 
    twisted_version = twisted.version._getSVNVersion()
381
 
    if twisted_version is None:
382
 
        twisted_version = twisted.version.short()
383
 
    epsilon_version = epsilon.version._getSVNVersion()
384
 
    if epsilon_version is None:
385
 
        epsilon_version = epsilon.version.short()
386
 
 
387
 
    Results(
388
 
        version=STATS_VERSION,
389
 
        error=error,
390
 
        timeout=timeout,
391
 
        name=name,
392
 
        host=hostname,
393
 
        elapsed=after.time - before.time,
394
 
        sector_size=sectorSize,
395
 
        read_count=read_count,
396
 
        read_sectors=read_sectors,
397
 
        read_ms=read_ms,
398
 
        write_count=write_count,
399
 
        write_sectors=write_sectors,
400
 
        write_ms=write_ms,
401
 
        filesystem_growth=after.size - before.size,
402
 
        python_version=unicode(sys.hexversion),
403
 
        twisted_version=twisted_version,
404
 
        divmod_version=epsilon_version,
405
 
        ).do(jj, requiresAnswer=False)
406
 
    return output.getvalue()
407
 
 
408
 
 
409
 
 
410
 
def reportResults(results):
411
 
    print results
412
 
    print
413
 
    fObj = file('output', 'ab')
414
 
    fObj.write(results)
415
 
    fObj.close()
416
 
 
417
 
 
418
 
 
419
 
def discoverCurrentWorkingDevice():
420
 
    """
421
 
    Return a short string naming the device which backs the current working
422
 
    directory, ie C{/dev/hda1}.
423
 
    """
424
 
    possibilities = []
425
 
    cwd = os.getcwd()
426
 
    for L in file('/etc/mtab'):
427
 
        parts = L.split()
428
 
        if cwd.startswith(parts[1]):
429
 
            possibilities.append((len(parts[1]), parts[0]))
430
 
    possibilities.sort()
431
 
    return possibilities[-1][-1]
432
 
 
433
 
 
434
 
 
435
 
def getSize(p):
436
 
    """
437
 
    @type p: L{twisted.python.filepath.FilePath}
438
 
    @return: The size, in bytes, of the given path and all its children.
439
 
    """
440
 
    return sum(getOneSize(ch) for ch in p.walk())
441
 
 
442
 
 
443
 
def getOneSize(ch):
444
 
    """
445
 
    @type ch: L{twisted.python.filepath.FilePath}
446
 
    @return: The size, in bytes, of the given path only.
447
 
    """
448
 
    try:
449
 
        return ch.getsize()
450
 
    except OSError, e:
451
 
        if e.errno == errno.ENOENT:
452
 
            # XXX FilePath is broken
453
 
            if os.path.islink(ch.path):
454
 
                return len(os.readlink(ch.path))
455
 
            else:
456
 
                raise
457
 
        else:
458
 
            raise
459
 
 
460
 
 
461
 
 
462
 
def getSectorSize(p):
463
 
    return os.statvfs(p.path).f_bsize
464
 
 
465
 
 
466
 
def _bench(name, workingPath, function):
467
 
    d = function()
468
 
    def later(result):
469
 
        err = timeout = False
470
 
        if isinstance(result, failure.Failure):
471
 
            err = True
472
 
            if result.check(error.TimeoutError):
473
 
                log.msg("Failing because timeout!")
474
 
                timeout = True
475
 
            elif result.check(ProcessDied):
476
 
                log.msg("Failing because Failure!")
477
 
                pprint.pprint(result.value.output)
478
 
                print result.value.exitCode, result.value.signal
479
 
            else:
480
 
                log.err(result)
481
 
        else:
482
 
            proto, status, output = result
483
 
            stderr = [bytes for (fd, bytes) in output if fd == 2]
484
 
            if status or stderr != [None]:
485
 
                err = True
486
 
                log.msg("Failing because stderr or bad status")
487
 
                pprint.pprint(result)
488
 
 
489
 
            for n, change in [(name + '-overall', proto.overallChange),
490
 
                              (name + '-benchmark', proto.benchmarkChange)]:
491
 
                reportResults(formatResults(
492
 
                    n,
493
 
                    getSectorSize(workingPath),
494
 
                    change.before,
495
 
                    change.after,
496
 
                    err,
497
 
                    timeout))
498
 
 
499
 
    return d.addBoth(later)
500
 
 
501
 
 
502
 
 
503
 
def bench(name, path, func):
504
 
    log.startLogging(sys.stdout)
505
 
    log.msg("Running " + name)
506
 
 
507
 
    d = _bench(name, path, func)
508
 
    d.addErrback(log.err)
509
 
    d.addCallback(lambda ign: reactor.stop())
510
 
 
511
 
    reactor.run()
512
 
 
513
 
 
514
 
def makeBenchmarkRunner(path, args):
515
 
    """
516
 
    Make a function that will run two Python processes serially: first one
517
 
    which calls the setup function from the given file, then one which calls
518
 
    the execute function from the given file.
519
 
    """
520
 
    def runner():
521
 
        return BenchmarkProcess.spawn(
522
 
            executable=sys.executable,
523
 
            args=['-Wignore'] + args,
524
 
            path=path.path,
525
 
            env=os.environ)
526
 
    return runner
527
 
 
528
 
 
529
 
 
530
 
def start():
531
 
    """
532
 
    Start recording stats.  Call this from a benchmark script when your setup
533
 
    is done.  Call this at most once.
534
 
 
535
 
    @raise RuntimeError: Raised if the parent process responds with anything
536
 
    other than an acknowledgement of this message.
537
 
    """
538
 
    os.write(BenchmarkProcess.BACKCHANNEL_OUT, BenchmarkProcess.START)
539
 
    response = util.untilConcludes(os.read, BenchmarkProcess.BACKCHANNEL_IN, 1)
540
 
    if response != BenchmarkProcess.START:
541
 
        raise RuntimeError(
542
 
            "Parent process responded with %r instead of START " % (response,))
543
 
 
544
 
 
545
 
 
546
 
def stop():
547
 
    """
548
 
    Stop recording stats.  Call this from a benchmark script when the code you
549
 
    want benchmarked has finished.  Call this exactly the same number of times
550
 
    you call L{start} and only after calling it.
551
 
 
552
 
    @raise RuntimeError: Raised if the parent process responds with anything
553
 
    other than an acknowledgement of this message.
554
 
    """
555
 
    os.write(BenchmarkProcess.BACKCHANNEL_OUT, BenchmarkProcess.STOP)
556
 
    response = util.untilConcludes(os.read, BenchmarkProcess.BACKCHANNEL_IN, 1)
557
 
    if response != BenchmarkProcess.STOP:
558
 
        raise RuntimeError(
559
 
            "Parent process responded with %r instead of STOP" % (response,))
560
 
 
561
 
 
562
 
 
563
 
def main():
564
 
    """
565
 
    Run me with the filename of a benchmark script as an argument.  I will time
566
 
    it and append the results to a file named output in the current working
567
 
    directory.
568
 
    """
569
 
    name = sys.argv[1]
570
 
    path = filepath.FilePath('.stat').temporarySibling()
571
 
    path.makedirs()
572
 
    func = makeBenchmarkRunner(path, sys.argv[1:])
573
 
    try:
574
 
        bench(name, path, func)
575
 
    finally:
576
 
        path.remove()
577
 
 
578
 
 
579
 
 
580
 
if __name__ == '__main__':
581
 
    main()