~ibmcharmers/charms/xenial/ibm-cinder-storwize-svc/trunk

« back to all changes in this revision

Viewing changes to .tox/py35/lib/python3.5/site-packages/py/_io/capture.py

  • Committer: Ankammarao
  • Date: 2017-03-06 05:11:42 UTC
  • Revision ID: achittet@in.ibm.com-20170306051142-dpg27z4es1k56hfn
Marked tests folder executable

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import os
 
2
import sys
 
3
import py
 
4
import tempfile
 
5
 
 
6
try:
 
7
    from io import StringIO
 
8
except ImportError:
 
9
    from StringIO import StringIO
 
10
 
 
11
if sys.version_info < (3,0):
 
12
    class TextIO(StringIO):
 
13
        def write(self, data):
 
14
            if not isinstance(data, unicode):
 
15
                data = unicode(data, getattr(self, '_encoding', 'UTF-8'), 'replace')
 
16
            StringIO.write(self, data)
 
17
else:
 
18
    TextIO = StringIO
 
19
 
 
20
try:
 
21
    from io import BytesIO
 
22
except ImportError:
 
23
    class BytesIO(StringIO):
 
24
        def write(self, data):
 
25
            if isinstance(data, unicode):
 
26
                raise TypeError("not a byte value: %r" %(data,))
 
27
            StringIO.write(self, data)
 
28
 
 
29
patchsysdict = {0: 'stdin', 1: 'stdout', 2: 'stderr'}
 
30
 
 
31
class FDCapture:
 
32
    """ Capture IO to/from a given os-level filedescriptor. """
 
33
 
 
34
    def __init__(self, targetfd, tmpfile=None, now=True, patchsys=False):
 
35
        """ save targetfd descriptor, and open a new
 
36
            temporary file there.  If no tmpfile is
 
37
            specified a tempfile.Tempfile() will be opened
 
38
            in text mode.
 
39
        """
 
40
        self.targetfd = targetfd
 
41
        if tmpfile is None and targetfd != 0:
 
42
            f = tempfile.TemporaryFile('wb+')
 
43
            tmpfile = dupfile(f, encoding="UTF-8")
 
44
            f.close()
 
45
        self.tmpfile = tmpfile
 
46
        self._savefd = os.dup(self.targetfd)
 
47
        if patchsys:
 
48
            self._oldsys = getattr(sys, patchsysdict[targetfd])
 
49
        if now:
 
50
            self.start()
 
51
 
 
52
    def start(self):
 
53
        try:
 
54
            os.fstat(self._savefd)
 
55
        except OSError:
 
56
            raise ValueError("saved filedescriptor not valid, "
 
57
                "did you call start() twice?")
 
58
        if self.targetfd == 0 and not self.tmpfile:
 
59
            fd = os.open(devnullpath, os.O_RDONLY)
 
60
            os.dup2(fd, 0)
 
61
            os.close(fd)
 
62
            if hasattr(self, '_oldsys'):
 
63
                setattr(sys, patchsysdict[self.targetfd], DontReadFromInput())
 
64
        else:
 
65
            os.dup2(self.tmpfile.fileno(), self.targetfd)
 
66
            if hasattr(self, '_oldsys'):
 
67
                setattr(sys, patchsysdict[self.targetfd], self.tmpfile)
 
68
 
 
69
    def done(self):
 
70
        """ unpatch and clean up, returns the self.tmpfile (file object)
 
71
        """
 
72
        os.dup2(self._savefd, self.targetfd)
 
73
        os.close(self._savefd)
 
74
        if self.targetfd != 0:
 
75
            self.tmpfile.seek(0)
 
76
        if hasattr(self, '_oldsys'):
 
77
            setattr(sys, patchsysdict[self.targetfd], self._oldsys)
 
78
        return self.tmpfile
 
79
 
 
80
    def writeorg(self, data):
 
81
        """ write a string to the original file descriptor
 
82
        """
 
83
        tempfp = tempfile.TemporaryFile()
 
84
        try:
 
85
            os.dup2(self._savefd, tempfp.fileno())
 
86
            tempfp.write(data)
 
87
        finally:
 
88
            tempfp.close()
 
89
 
 
90
 
 
91
def dupfile(f, mode=None, buffering=0, raising=False, encoding=None):
 
92
    """ return a new open file object that's a duplicate of f
 
93
 
 
94
        mode is duplicated if not given, 'buffering' controls
 
95
        buffer size (defaulting to no buffering) and 'raising'
 
96
        defines whether an exception is raised when an incompatible
 
97
        file object is passed in (if raising is False, the file
 
98
        object itself will be returned)
 
99
    """
 
100
    try:
 
101
        fd = f.fileno()
 
102
        mode = mode or f.mode
 
103
    except AttributeError:
 
104
        if raising:
 
105
            raise
 
106
        return f
 
107
    newfd = os.dup(fd)
 
108
    if sys.version_info >= (3,0):
 
109
        if encoding is not None:
 
110
            mode = mode.replace("b", "")
 
111
            buffering = True
 
112
        return os.fdopen(newfd, mode, buffering, encoding, closefd=True)
 
113
    else:
 
114
        f = os.fdopen(newfd, mode, buffering)
 
115
        if encoding is not None:
 
116
            return EncodedFile(f, encoding)
 
117
        return f
 
118
 
 
119
class EncodedFile(object):
 
120
    def __init__(self, _stream, encoding):
 
121
        self._stream = _stream
 
122
        self.encoding = encoding
 
123
 
 
124
    def write(self, obj):
 
125
        if isinstance(obj, unicode):
 
126
            obj = obj.encode(self.encoding)
 
127
        elif isinstance(obj, str):
 
128
            pass
 
129
        else:
 
130
            obj = str(obj)
 
131
        self._stream.write(obj)
 
132
 
 
133
    def writelines(self, linelist):
 
134
        data = ''.join(linelist)
 
135
        self.write(data)
 
136
 
 
137
    def __getattr__(self, name):
 
138
        return getattr(self._stream, name)
 
139
 
 
140
class Capture(object):
 
141
    def call(cls, func, *args, **kwargs):
 
142
        """ return a (res, out, err) tuple where
 
143
            out and err represent the output/error output
 
144
            during function execution.
 
145
            call the given function with args/kwargs
 
146
            and capture output/error during its execution.
 
147
        """
 
148
        so = cls()
 
149
        try:
 
150
            res = func(*args, **kwargs)
 
151
        finally:
 
152
            out, err = so.reset()
 
153
        return res, out, err
 
154
    call = classmethod(call)
 
155
 
 
156
    def reset(self):
 
157
        """ reset sys.stdout/stderr and return captured output as strings. """
 
158
        if hasattr(self, '_reset'):
 
159
            raise ValueError("was already reset")
 
160
        self._reset = True
 
161
        outfile, errfile = self.done(save=False)
 
162
        out, err = "", ""
 
163
        if outfile and not outfile.closed:
 
164
            out = outfile.read()
 
165
            outfile.close()
 
166
        if errfile and errfile != outfile and not errfile.closed:
 
167
            err = errfile.read()
 
168
            errfile.close()
 
169
        return out, err
 
170
 
 
171
    def suspend(self):
 
172
        """ return current snapshot captures, memorize tempfiles. """
 
173
        outerr = self.readouterr()
 
174
        outfile, errfile = self.done()
 
175
        return outerr
 
176
 
 
177
 
 
178
class StdCaptureFD(Capture):
 
179
    """ This class allows to capture writes to FD1 and FD2
 
180
        and may connect a NULL file to FD0 (and prevent
 
181
        reads from sys.stdin).  If any of the 0,1,2 file descriptors
 
182
        is invalid it will not be captured.
 
183
    """
 
184
    def __init__(self, out=True, err=True, mixed=False,
 
185
        in_=True, patchsys=True, now=True):
 
186
        self._options = {
 
187
            "out": out,
 
188
            "err": err,
 
189
            "mixed": mixed,
 
190
            "in_": in_,
 
191
            "patchsys": patchsys,
 
192
            "now": now,
 
193
        }
 
194
        self._save()
 
195
        if now:
 
196
            self.startall()
 
197
 
 
198
    def _save(self):
 
199
        in_ = self._options['in_']
 
200
        out = self._options['out']
 
201
        err = self._options['err']
 
202
        mixed = self._options['mixed']
 
203
        patchsys = self._options['patchsys']
 
204
        if in_:
 
205
            try:
 
206
                self.in_ = FDCapture(0, tmpfile=None, now=False,
 
207
                    patchsys=patchsys)
 
208
            except OSError:
 
209
                pass
 
210
        if out:
 
211
            tmpfile = None
 
212
            if hasattr(out, 'write'):
 
213
                tmpfile = out
 
214
            try:
 
215
                self.out = FDCapture(1, tmpfile=tmpfile,
 
216
                           now=False, patchsys=patchsys)
 
217
                self._options['out'] = self.out.tmpfile
 
218
            except OSError:
 
219
                pass
 
220
        if err:
 
221
            if out and mixed:
 
222
                tmpfile = self.out.tmpfile
 
223
            elif hasattr(err, 'write'):
 
224
                tmpfile = err
 
225
            else:
 
226
                tmpfile = None
 
227
            try:
 
228
                self.err = FDCapture(2, tmpfile=tmpfile,
 
229
                           now=False, patchsys=patchsys)
 
230
                self._options['err'] = self.err.tmpfile
 
231
            except OSError:
 
232
                pass
 
233
 
 
234
    def startall(self):
 
235
        if hasattr(self, 'in_'):
 
236
            self.in_.start()
 
237
        if hasattr(self, 'out'):
 
238
            self.out.start()
 
239
        if hasattr(self, 'err'):
 
240
            self.err.start()
 
241
 
 
242
    def resume(self):
 
243
        """ resume capturing with original temp files. """
 
244
        self.startall()
 
245
 
 
246
    def done(self, save=True):
 
247
        """ return (outfile, errfile) and stop capturing. """
 
248
        outfile = errfile = None
 
249
        if hasattr(self, 'out') and not self.out.tmpfile.closed:
 
250
            outfile = self.out.done()
 
251
        if hasattr(self, 'err') and not self.err.tmpfile.closed:
 
252
            errfile = self.err.done()
 
253
        if hasattr(self, 'in_'):
 
254
            tmpfile = self.in_.done()
 
255
        if save:
 
256
            self._save()
 
257
        return outfile, errfile
 
258
 
 
259
    def readouterr(self):
 
260
        """ return snapshot value of stdout/stderr capturings. """
 
261
        if hasattr(self, "out"):
 
262
            out = self._readsnapshot(self.out.tmpfile)
 
263
        else:
 
264
            out = ""
 
265
        if hasattr(self, "err"):
 
266
            err = self._readsnapshot(self.err.tmpfile)
 
267
        else:
 
268
            err = ""
 
269
        return [out, err]
 
270
 
 
271
    def _readsnapshot(self, f):
 
272
        f.seek(0)
 
273
        res = f.read()
 
274
        enc = getattr(f, "encoding", None)
 
275
        if enc:
 
276
            res = py.builtin._totext(res, enc, "replace")
 
277
        f.truncate(0)
 
278
        f.seek(0)
 
279
        return res
 
280
 
 
281
 
 
282
class StdCapture(Capture):
 
283
    """ This class allows to capture writes to sys.stdout|stderr "in-memory"
 
284
        and will raise errors on tries to read from sys.stdin. It only
 
285
        modifies sys.stdout|stderr|stdin attributes and does not
 
286
        touch underlying File Descriptors (use StdCaptureFD for that).
 
287
    """
 
288
    def __init__(self, out=True, err=True, in_=True, mixed=False, now=True):
 
289
        self._oldout = sys.stdout
 
290
        self._olderr = sys.stderr
 
291
        self._oldin  = sys.stdin
 
292
        if out and not hasattr(out, 'file'):
 
293
            out = TextIO()
 
294
        self.out = out
 
295
        if err:
 
296
            if mixed:
 
297
                err = out
 
298
            elif not hasattr(err, 'write'):
 
299
                err = TextIO()
 
300
        self.err = err
 
301
        self.in_ = in_
 
302
        if now:
 
303
            self.startall()
 
304
 
 
305
    def startall(self):
 
306
        if self.out:
 
307
            sys.stdout = self.out
 
308
        if self.err:
 
309
            sys.stderr = self.err
 
310
        if self.in_:
 
311
            sys.stdin  = self.in_  = DontReadFromInput()
 
312
 
 
313
    def done(self, save=True):
 
314
        """ return (outfile, errfile) and stop capturing. """
 
315
        outfile = errfile = None
 
316
        if self.out and not self.out.closed:
 
317
            sys.stdout = self._oldout
 
318
            outfile = self.out
 
319
            outfile.seek(0)
 
320
        if self.err and not self.err.closed:
 
321
            sys.stderr = self._olderr
 
322
            errfile = self.err
 
323
            errfile.seek(0)
 
324
        if self.in_:
 
325
            sys.stdin = self._oldin
 
326
        return outfile, errfile
 
327
 
 
328
    def resume(self):
 
329
        """ resume capturing with original temp files. """
 
330
        self.startall()
 
331
 
 
332
    def readouterr(self):
 
333
        """ return snapshot value of stdout/stderr capturings. """
 
334
        out = err = ""
 
335
        if self.out:
 
336
            out = self.out.getvalue()
 
337
            self.out.truncate(0)
 
338
            self.out.seek(0)
 
339
        if self.err:
 
340
            err = self.err.getvalue()
 
341
            self.err.truncate(0)
 
342
            self.err.seek(0)
 
343
        return out, err
 
344
 
 
345
class DontReadFromInput:
 
346
    """Temporary stub class.  Ideally when stdin is accessed, the
 
347
    capturing should be turned off, with possibly all data captured
 
348
    so far sent to the screen.  This should be configurable, though,
 
349
    because in automated test runs it is better to crash than
 
350
    hang indefinitely.
 
351
    """
 
352
    def read(self, *args):
 
353
        raise IOError("reading from stdin while output is captured")
 
354
    readline = read
 
355
    readlines = read
 
356
    __iter__ = read
 
357
 
 
358
    def fileno(self):
 
359
        raise ValueError("redirected Stdin is pseudofile, has no fileno()")
 
360
    def isatty(self):
 
361
        return False
 
362
    def close(self):
 
363
        pass
 
364
 
 
365
try:
 
366
    devnullpath = os.devnull
 
367
except AttributeError:
 
368
    if os.name == 'nt':
 
369
        devnullpath = 'NUL'
 
370
    else:
 
371
        devnullpath = '/dev/null'