7
from io import StringIO
9
from StringIO import StringIO
11
if sys.version_info < (3,0):
12
class TextIO(StringIO):
13
def write(self, data):
14
if not isinstance(data, unicode):
15
data = unicode(data, getattr(self, '_encoding', 'UTF-8'), 'replace')
16
StringIO.write(self, data)
21
from io import BytesIO
23
class BytesIO(StringIO):
24
def write(self, data):
25
if isinstance(data, unicode):
26
raise TypeError("not a byte value: %r" %(data,))
27
StringIO.write(self, data)
29
patchsysdict = {0: 'stdin', 1: 'stdout', 2: 'stderr'}
32
""" Capture IO to/from a given os-level filedescriptor. """
34
def __init__(self, targetfd, tmpfile=None, now=True, patchsys=False):
35
""" save targetfd descriptor, and open a new
36
temporary file there. If no tmpfile is
37
specified a tempfile.Tempfile() will be opened
40
self.targetfd = targetfd
41
if tmpfile is None and targetfd != 0:
42
f = tempfile.TemporaryFile('wb+')
43
tmpfile = dupfile(f, encoding="UTF-8")
45
self.tmpfile = tmpfile
46
self._savefd = os.dup(self.targetfd)
48
self._oldsys = getattr(sys, patchsysdict[targetfd])
54
os.fstat(self._savefd)
56
raise ValueError("saved filedescriptor not valid, "
57
"did you call start() twice?")
58
if self.targetfd == 0 and not self.tmpfile:
59
fd = os.open(devnullpath, os.O_RDONLY)
62
if hasattr(self, '_oldsys'):
63
setattr(sys, patchsysdict[self.targetfd], DontReadFromInput())
65
os.dup2(self.tmpfile.fileno(), self.targetfd)
66
if hasattr(self, '_oldsys'):
67
setattr(sys, patchsysdict[self.targetfd], self.tmpfile)
70
""" unpatch and clean up, returns the self.tmpfile (file object)
72
os.dup2(self._savefd, self.targetfd)
73
os.close(self._savefd)
74
if self.targetfd != 0:
76
if hasattr(self, '_oldsys'):
77
setattr(sys, patchsysdict[self.targetfd], self._oldsys)
80
def writeorg(self, data):
81
""" write a string to the original file descriptor
83
tempfp = tempfile.TemporaryFile()
85
os.dup2(self._savefd, tempfp.fileno())
91
def dupfile(f, mode=None, buffering=0, raising=False, encoding=None):
92
""" return a new open file object that's a duplicate of f
94
mode is duplicated if not given, 'buffering' controls
95
buffer size (defaulting to no buffering) and 'raising'
96
defines whether an exception is raised when an incompatible
97
file object is passed in (if raising is False, the file
98
object itself will be returned)
102
mode = mode or f.mode
103
except AttributeError:
108
if sys.version_info >= (3,0):
109
if encoding is not None:
110
mode = mode.replace("b", "")
112
return os.fdopen(newfd, mode, buffering, encoding, closefd=True)
114
f = os.fdopen(newfd, mode, buffering)
115
if encoding is not None:
116
return EncodedFile(f, encoding)
119
class EncodedFile(object):
120
def __init__(self, _stream, encoding):
121
self._stream = _stream
122
self.encoding = encoding
124
def write(self, obj):
125
if isinstance(obj, unicode):
126
obj = obj.encode(self.encoding)
127
elif isinstance(obj, str):
131
self._stream.write(obj)
133
def writelines(self, linelist):
134
data = ''.join(linelist)
137
def __getattr__(self, name):
138
return getattr(self._stream, name)
140
class Capture(object):
141
def call(cls, func, *args, **kwargs):
142
""" return a (res, out, err) tuple where
143
out and err represent the output/error output
144
during function execution.
145
call the given function with args/kwargs
146
and capture output/error during its execution.
150
res = func(*args, **kwargs)
152
out, err = so.reset()
154
call = classmethod(call)
157
""" reset sys.stdout/stderr and return captured output as strings. """
158
if hasattr(self, '_reset'):
159
raise ValueError("was already reset")
161
outfile, errfile = self.done(save=False)
163
if outfile and not outfile.closed:
166
if errfile and errfile != outfile and not errfile.closed:
172
""" return current snapshot captures, memorize tempfiles. """
173
outerr = self.readouterr()
174
outfile, errfile = self.done()
178
class StdCaptureFD(Capture):
179
""" This class allows to capture writes to FD1 and FD2
180
and may connect a NULL file to FD0 (and prevent
181
reads from sys.stdin). If any of the 0,1,2 file descriptors
182
is invalid it will not be captured.
184
def __init__(self, out=True, err=True, mixed=False,
185
in_=True, patchsys=True, now=True):
191
"patchsys": patchsys,
199
in_ = self._options['in_']
200
out = self._options['out']
201
err = self._options['err']
202
mixed = self._options['mixed']
203
patchsys = self._options['patchsys']
206
self.in_ = FDCapture(0, tmpfile=None, now=False,
212
if hasattr(out, 'write'):
215
self.out = FDCapture(1, tmpfile=tmpfile,
216
now=False, patchsys=patchsys)
217
self._options['out'] = self.out.tmpfile
222
tmpfile = self.out.tmpfile
223
elif hasattr(err, 'write'):
228
self.err = FDCapture(2, tmpfile=tmpfile,
229
now=False, patchsys=patchsys)
230
self._options['err'] = self.err.tmpfile
235
if hasattr(self, 'in_'):
237
if hasattr(self, 'out'):
239
if hasattr(self, 'err'):
243
""" resume capturing with original temp files. """
246
def done(self, save=True):
247
""" return (outfile, errfile) and stop capturing. """
248
outfile = errfile = None
249
if hasattr(self, 'out') and not self.out.tmpfile.closed:
250
outfile = self.out.done()
251
if hasattr(self, 'err') and not self.err.tmpfile.closed:
252
errfile = self.err.done()
253
if hasattr(self, 'in_'):
254
tmpfile = self.in_.done()
257
return outfile, errfile
259
def readouterr(self):
260
""" return snapshot value of stdout/stderr capturings. """
261
if hasattr(self, "out"):
262
out = self._readsnapshot(self.out.tmpfile)
265
if hasattr(self, "err"):
266
err = self._readsnapshot(self.err.tmpfile)
271
def _readsnapshot(self, f):
274
enc = getattr(f, "encoding", None)
276
res = py.builtin._totext(res, enc, "replace")
282
class StdCapture(Capture):
283
""" This class allows to capture writes to sys.stdout|stderr "in-memory"
284
and will raise errors on tries to read from sys.stdin. It only
285
modifies sys.stdout|stderr|stdin attributes and does not
286
touch underlying File Descriptors (use StdCaptureFD for that).
288
def __init__(self, out=True, err=True, in_=True, mixed=False, now=True):
289
self._oldout = sys.stdout
290
self._olderr = sys.stderr
291
self._oldin = sys.stdin
292
if out and not hasattr(out, 'file'):
298
elif not hasattr(err, 'write'):
307
sys.stdout = self.out
309
sys.stderr = self.err
311
sys.stdin = self.in_ = DontReadFromInput()
313
def done(self, save=True):
314
""" return (outfile, errfile) and stop capturing. """
315
outfile = errfile = None
316
if self.out and not self.out.closed:
317
sys.stdout = self._oldout
320
if self.err and not self.err.closed:
321
sys.stderr = self._olderr
325
sys.stdin = self._oldin
326
return outfile, errfile
329
""" resume capturing with original temp files. """
332
def readouterr(self):
333
""" return snapshot value of stdout/stderr capturings. """
336
out = self.out.getvalue()
340
err = self.err.getvalue()
345
class DontReadFromInput:
346
"""Temporary stub class. Ideally when stdin is accessed, the
347
capturing should be turned off, with possibly all data captured
348
so far sent to the screen. This should be configurable, though,
349
because in automated test runs it is better to crash than
352
def read(self, *args):
353
raise IOError("reading from stdin while output is captured")
359
raise ValueError("redirected Stdin is pseudofile, has no fileno()")
366
devnullpath = os.devnull
367
except AttributeError:
371
devnullpath = '/dev/null'