~brad-marshall/charms/trusty/apache2-wsgi/fix-haproxy-relations

« back to all changes in this revision

Viewing changes to hooks/lib/sh.py

  • Committer: Robin Winslow
  • Date: 2014-05-27 14:00:44 UTC
  • Revision ID: robin.winslow@canonical.com-20140527140044-8rpmb3wx4djzwa83
Add all files

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#===============================================================================
 
2
# Copyright (C) 2011-2012 by Andrew Moffat
 
3
#
 
4
# Permission is hereby granted, free of charge, to any person obtaining a copy
 
5
# of this software and associated documentation files (the "Software"), to deal
 
6
# in the Software without restriction, including without limitation the rights
 
7
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 
8
# copies of the Software, and to permit persons to whom the Software is
 
9
# furnished to do so, subject to the following conditions:
 
10
#
 
11
# The above copyright notice and this permission notice shall be included in
 
12
# all copies or substantial portions of the Software.
 
13
#
 
14
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 
15
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 
16
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 
17
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 
18
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 
19
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 
20
# THE SOFTWARE.
 
21
#===============================================================================
 
22
 
 
23
 
 
24
__version__ = "1.09"
 
25
__project_url__ = "https://github.com/amoffat/sh"
 
26
 
 
27
 
 
28
 
 
29
import platform
 
30
 
 
31
if "windows" in platform.system().lower():
 
32
    raise ImportError("sh %s is currently only supported on linux and osx. \
 
33
please install pbs 0.110 (http://pypi.python.org/pypi/pbs) for windows \
 
34
support." % __version__)
 
35
 
 
36
 
 
37
 
 
38
import sys
 
39
IS_PY3 = sys.version_info[0] == 3
 
40
 
 
41
import traceback
 
42
import os
 
43
import re
 
44
from glob import glob as original_glob
 
45
from types import ModuleType
 
46
from functools import partial
 
47
import inspect
 
48
import time as _time
 
49
 
 
50
from locale import getpreferredencoding
 
51
DEFAULT_ENCODING = getpreferredencoding() or "utf-8"
 
52
 
 
53
 
 
54
if IS_PY3:
 
55
    from io import StringIO
 
56
    from io import BytesIO as cStringIO
 
57
    from queue import Queue, Empty
 
58
else:
 
59
    from StringIO import StringIO
 
60
    from cStringIO import OutputType as cStringIO
 
61
    from Queue import Queue, Empty
 
62
 
 
63
IS_OSX = platform.system() == "Darwin"
 
64
THIS_DIR = os.path.dirname(os.path.realpath(__file__))
 
65
 
 
66
 
 
67
import errno
 
68
import warnings
 
69
 
 
70
import pty
 
71
import termios
 
72
import signal
 
73
import gc
 
74
import select
 
75
import atexit
 
76
import threading
 
77
import tty
 
78
import fcntl
 
79
import struct
 
80
import resource
 
81
from collections import deque
 
82
import logging
 
83
import weakref
 
84
 
 
85
 
 
86
logging_enabled = False
 
87
 
 
88
 
 
89
if IS_PY3:
 
90
    raw_input = input
 
91
    unicode = str
 
92
    basestring = str
 
93
 
 
94
 
 
95
def encode_to_py3bytes_or_py2str(s):
 
96
    """ takes anything and attempts to return a py2 string or py3 bytes.  this
 
97
    is typically used when creating command + arguments to be executed via
 
98
    os.exec* """
 
99
 
 
100
    fallback_encoding = "utf8"
 
101
 
 
102
    if IS_PY3:
 
103
        s = str(s)
 
104
        try:
 
105
            s = bytes(s, DEFAULT_ENCODING)
 
106
        except UnicodeEncodeError:
 
107
            s = bytes(s, fallback_encoding)
 
108
    else:
 
109
        # attempt to convert the thing to unicode from the system's encoding
 
110
        try:
 
111
            s = unicode(s, DEFAULT_ENCODING)
 
112
        # if the thing is already unicode, or it's a number, it can't be
 
113
        # coerced to unicode with an encoding argument, but if we leave out
 
114
        # the encoding argument, it will convert it to a string, then to unicode
 
115
        except TypeError:
 
116
            s = unicode(s)
 
117
 
 
118
        # now that we have guaranteed unicode, encode to our system encoding,
 
119
        # but attempt to fall back to something
 
120
        try:
 
121
            s = s.encode(DEFAULT_ENCODING)
 
122
        except:
 
123
            s = s.encode(fallback_encoding)
 
124
    return s
 
125
 
 
126
 
 
127
class ErrorReturnCode(Exception):
 
128
    truncate_cap = 750
 
129
 
 
130
    def __init__(self, full_cmd, stdout, stderr):
 
131
        self.full_cmd = full_cmd
 
132
        self.stdout = stdout
 
133
        self.stderr = stderr
 
134
 
 
135
 
 
136
        if self.stdout is None: exc_stdout = "<redirected>"
 
137
        else:
 
138
            exc_stdout = self.stdout[:self.truncate_cap]
 
139
            out_delta = len(self.stdout) - len(exc_stdout)
 
140
            if out_delta:
 
141
                exc_stdout += ("... (%d more, please see e.stdout)" % out_delta).encode()
 
142
 
 
143
        if self.stderr is None: exc_stderr = "<redirected>"
 
144
        else:
 
145
            exc_stderr = self.stderr[:self.truncate_cap]
 
146
            err_delta = len(self.stderr) - len(exc_stderr)
 
147
            if err_delta:
 
148
                exc_stderr += ("... (%d more, please see e.stderr)" % err_delta).encode()
 
149
 
 
150
        msg = "\n\n  RAN: %r\n\n  STDOUT:\n%s\n\n  STDERR:\n%s" % \
 
151
            (full_cmd, exc_stdout.decode(DEFAULT_ENCODING, "replace"),
 
152
             exc_stderr.decode(DEFAULT_ENCODING, "replace"))
 
153
        super(ErrorReturnCode, self).__init__(msg)
 
154
 
 
155
 
 
156
class SignalException(ErrorReturnCode): pass
 
157
 
 
158
SIGNALS_THAT_SHOULD_THROW_EXCEPTION = (
 
159
    signal.SIGKILL,
 
160
    signal.SIGSEGV,
 
161
    signal.SIGTERM,
 
162
    signal.SIGINT,
 
163
    signal.SIGQUIT
 
164
)
 
165
 
 
166
 
 
167
# we subclass AttributeError because:
 
168
# https://github.com/ipython/ipython/issues/2577
 
169
# https://github.com/amoffat/sh/issues/97#issuecomment-10610629
 
170
class CommandNotFound(AttributeError): pass
 
171
 
 
172
rc_exc_regex = re.compile("(ErrorReturnCode|SignalException)_(\d+)")
 
173
rc_exc_cache = {}
 
174
 
 
175
def get_rc_exc(rc):
 
176
    rc = int(rc)
 
177
    try: return rc_exc_cache[rc]
 
178
    except KeyError: pass
 
179
 
 
180
    if rc > 0:
 
181
        name = "ErrorReturnCode_%d" % rc
 
182
        exc = type(name, (ErrorReturnCode,), {"exit_code": rc})
 
183
    else:
 
184
        name = "SignalException_%d" % abs(rc)
 
185
        exc = type(name, (SignalException,), {"exit_code": rc})
 
186
 
 
187
    rc_exc_cache[rc] = exc
 
188
    return exc
 
189
 
 
190
 
 
191
 
 
192
 
 
193
def which(program):
 
194
    def is_exe(fpath):
 
195
        return os.path.exists(fpath) and os.access(fpath, os.X_OK)
 
196
 
 
197
    fpath, fname = os.path.split(program)
 
198
    if fpath:
 
199
        if is_exe(program): return program
 
200
    else:
 
201
        if "PATH" not in os.environ: return None
 
202
        for path in os.environ["PATH"].split(os.pathsep):
 
203
            exe_file = os.path.join(path, program)
 
204
            if is_exe(exe_file):
 
205
                return exe_file
 
206
 
 
207
    return None
 
208
 
 
209
def resolve_program(program):
 
210
    path = which(program)
 
211
    if not path:
 
212
        # our actual command might have a dash in it, but we can't call
 
213
        # that from python (we have to use underscores), so we'll check
 
214
        # if a dash version of our underscore command exists and use that
 
215
        # if it does
 
216
        if "_" in program: path = which(program.replace("_", "-"))
 
217
        if not path: return None
 
218
    return path
 
219
 
 
220
 
 
221
# we add this thin wrapper to glob.glob because of a specific edge case where
 
222
# glob does not expand to anything.  for example, if you try to do
 
223
# glob.glob("*.py") and there are no *.py files in the directory, glob.glob
 
224
# returns an empty list.  this empty list gets passed to the command, and
 
225
# then the command fails with a misleading error message.  this thin wrapper
 
226
# ensures that if there is no expansion, we pass in the original argument,
 
227
# so that when the command fails, the error message is clearer
 
228
def glob(arg):
 
229
    return original_glob(arg) or arg
 
230
 
 
231
 
 
232
 
 
233
class Logger(object):
 
234
    def __init__(self, name, context=None):
 
235
        self.name = name
 
236
        self.context = "%s"
 
237
        if context: self.context = "%s: %%s" % context
 
238
        self.log = logging.getLogger(name)
 
239
 
 
240
    def info(self, msg, *args):
 
241
        if not logging_enabled: return
 
242
        self.log.info(self.context, msg % args)
 
243
 
 
244
    def debug(self, msg, *args):
 
245
        if not logging_enabled: return
 
246
        self.log.debug(self.context, msg % args)
 
247
 
 
248
    def error(self, msg, *args):
 
249
        if not logging_enabled: return
 
250
        self.log.error(self.context, msg % args)
 
251
 
 
252
    def exception(self, msg, *args):
 
253
        if not logging_enabled: return
 
254
        self.log.exception(self.context, msg % args)
 
255
 
 
256
 
 
257
 
 
258
class RunningCommand(object):
 
259
    def __init__(self, cmd, call_args, stdin, stdout, stderr):
 
260
        truncate = 20
 
261
        if len(cmd) > truncate:
 
262
            logger_str = "command %r...(%d more) call_args %r" % \
 
263
                (cmd[:truncate], len(cmd) - truncate, call_args)
 
264
        else:
 
265
            logger_str = "command %r call_args %r" % (cmd, call_args)
 
266
 
 
267
        self.log = Logger("command", logger_str)
 
268
        self.call_args = call_args
 
269
        self.cmd = cmd
 
270
 
 
271
        # self.ran is used for auditing what actually ran.  for example, in
 
272
        # exceptions, or if you just want to know what was ran after the
 
273
        # command ran
 
274
        if IS_PY3:
 
275
            self.ran = " ".join([arg.decode(DEFAULT_ENCODING, "ignore") for arg in cmd])
 
276
        else:
 
277
            self.ran = " ".join(cmd)
 
278
 
 
279
        self.process = None
 
280
 
 
281
        # this flag is for whether or not we've handled the exit code (like
 
282
        # by raising an exception).  this is necessary because .wait() is called
 
283
        # from multiple places, and wait() triggers the exit code to be
 
284
        # processed.  but we don't want to raise multiple exceptions, only
 
285
        # one (if any at all)
 
286
        self._handled_exit_code = False
 
287
 
 
288
        self.should_wait = True
 
289
        spawn_process = True
 
290
 
 
291
 
 
292
        # with contexts shouldn't run at all yet, they prepend
 
293
        # to every command in the context
 
294
        if call_args["with"]:
 
295
            spawn_process = False
 
296
            Command._prepend_stack.append(self)
 
297
 
 
298
 
 
299
        if callable(call_args["out"]) or callable(call_args["err"]):
 
300
            self.should_wait = False
 
301
 
 
302
        if call_args["piped"] or call_args["iter"] or call_args["iter_noblock"]:
 
303
            self.should_wait = False
 
304
 
 
305
        # we're running in the background, return self and let us lazily
 
306
        # evaluate
 
307
        if call_args["bg"]: self.should_wait = False
 
308
 
 
309
        # redirection
 
310
        if call_args["err_to_out"]: stderr = STDOUT
 
311
 
 
312
 
 
313
        # set up which stream should write to the pipe
 
314
        # TODO, make pipe None by default and limit the size of the Queue
 
315
        # in oproc.OProc
 
316
        pipe = STDOUT
 
317
        if call_args["iter"] == "out" or call_args["iter"] is True: pipe = STDOUT
 
318
        elif call_args["iter"] == "err": pipe = STDERR
 
319
 
 
320
        if call_args["iter_noblock"] == "out" or call_args["iter_noblock"] is True: pipe = STDOUT
 
321
        elif call_args["iter_noblock"] == "err": pipe = STDERR
 
322
 
 
323
 
 
324
        if spawn_process:
 
325
            self.log.debug("starting process")
 
326
            self.process = OProc(cmd, stdin, stdout, stderr,
 
327
                self.call_args, pipe=pipe)
 
328
 
 
329
            if self.should_wait:
 
330
                self.wait()
 
331
 
 
332
 
 
333
    def wait(self):
 
334
        self._handle_exit_code(self.process.wait())
 
335
        return self
 
336
 
 
337
    # here we determine if we had an exception, or an error code that we weren't
 
338
    # expecting to see.  if we did, we create and raise an exception
 
339
    def _handle_exit_code(self, code):
 
340
        if self._handled_exit_code: return
 
341
        self._handled_exit_code = True
 
342
 
 
343
        if code not in self.call_args["ok_code"] and \
 
344
        (code > 0 or -code in SIGNALS_THAT_SHOULD_THROW_EXCEPTION):
 
345
            raise get_rc_exc(code)(
 
346
                self.ran,
 
347
                self.process.stdout,
 
348
                self.process.stderr
 
349
            )
 
350
 
 
351
 
 
352
 
 
353
    @property
 
354
    def stdout(self):
 
355
        self.wait()
 
356
        return self.process.stdout
 
357
 
 
358
    @property
 
359
    def stderr(self):
 
360
        self.wait()
 
361
        return self.process.stderr
 
362
 
 
363
    @property
 
364
    def exit_code(self):
 
365
        self.wait()
 
366
        return self.process.exit_code
 
367
 
 
368
    @property
 
369
    def pid(self):
 
370
        return self.process.pid
 
371
 
 
372
    def __len__(self):
 
373
        return len(str(self))
 
374
 
 
375
    def __enter__(self):
 
376
        # we don't actually do anything here because anything that should
 
377
        # have been done would have been done in the Command.__call__ call.
 
378
        # essentially all that has to happen is the comand be pushed on
 
379
        # the prepend stack.
 
380
        pass
 
381
 
 
382
    def __iter__(self):
 
383
        return self
 
384
 
 
385
    def next(self):
 
386
        # we do this because if get blocks, we can't catch a KeyboardInterrupt
 
387
        # so the slight timeout allows for that.
 
388
        while True:
 
389
            try: chunk = self.process._pipe_queue.get(True, 0.001)
 
390
            except Empty:
 
391
                if self.call_args["iter_noblock"]: return errno.EWOULDBLOCK
 
392
            else:
 
393
                if chunk is None:
 
394
                    self.wait()
 
395
                    raise StopIteration()
 
396
                try: return chunk.decode(self.call_args["encoding"],
 
397
                    self.call_args["decode_errors"])
 
398
                except UnicodeDecodeError: return chunk
 
399
 
 
400
    # python 3
 
401
    __next__ = next
 
402
 
 
403
    def __exit__(self, typ, value, traceback):
 
404
        if self.call_args["with"] and Command._prepend_stack:
 
405
            Command._prepend_stack.pop()
 
406
 
 
407
    def __str__(self):
 
408
        if IS_PY3: return self.__unicode__()
 
409
        else: return unicode(self).encode(self.call_args["encoding"])
 
410
 
 
411
    def __unicode__(self):
 
412
        if self.process and self.stdout:
 
413
            return self.stdout.decode(self.call_args["encoding"],
 
414
                self.call_args["decode_errors"])
 
415
        return ""
 
416
 
 
417
    def __eq__(self, other):
 
418
        return unicode(self) == unicode(other)
 
419
 
 
420
    def __contains__(self, item):
 
421
        return item in str(self)
 
422
 
 
423
    def __getattr__(self, p):
 
424
        # let these three attributes pass through to the OProc object
 
425
        if p in ("signal", "terminate", "kill"):
 
426
            if self.process: return getattr(self.process, p)
 
427
            else: raise AttributeError
 
428
        return getattr(unicode(self), p)
 
429
 
 
430
    def __repr__(self):
 
431
        try: return str(self)
 
432
        except UnicodeDecodeError:
 
433
            if self.process:
 
434
                if self.stdout: return repr(self.stdout)
 
435
            return repr("")
 
436
 
 
437
    def __long__(self):
 
438
        return long(str(self).strip())
 
439
 
 
440
    def __float__(self):
 
441
        return float(str(self).strip())
 
442
 
 
443
    def __int__(self):
 
444
        return int(str(self).strip())
 
445
 
 
446
 
 
447
 
 
448
 
 
449
 
 
450
class Command(object):
 
451
    _prepend_stack = []
 
452
 
 
453
    _call_args = {
 
454
        # currently unsupported
 
455
        #"fg": False, # run command in foreground
 
456
 
 
457
        "bg": False, # run command in background
 
458
        "with": False, # prepend the command to every command after it
 
459
        "in": None,
 
460
        "out": None, # redirect STDOUT
 
461
        "err": None, # redirect STDERR
 
462
        "err_to_out": None, # redirect STDERR to STDOUT
 
463
 
 
464
        # stdin buffer size
 
465
        # 1 for line, 0 for unbuffered, any other number for that amount
 
466
        "in_bufsize": 0,
 
467
        # stdout buffer size, same values as above
 
468
        "out_bufsize": 1,
 
469
        "err_bufsize": 1,
 
470
 
 
471
        # this is how big the output buffers will be for stdout and stderr.
 
472
        # this is essentially how much output they will store from the process.
 
473
        # we use a deque, so if it overflows past this amount, the first items
 
474
        # get pushed off as each new item gets added.
 
475
        #
 
476
        # NOTICE
 
477
        # this is not a *BYTE* size, this is a *CHUNK* size...meaning, that if
 
478
        # you're buffering out/err at 1024 bytes, the internal buffer size will
 
479
        # be "internal_bufsize" CHUNKS of 1024 bytes
 
480
        "internal_bufsize": 3 * 1024 ** 2,
 
481
 
 
482
        "env": None,
 
483
        "piped": None,
 
484
        "iter": None,
 
485
        "iter_noblock": None,
 
486
        "ok_code": 0,
 
487
        "cwd": None,
 
488
 
 
489
        # the separator delimiting between a long-argument's name and its value
 
490
        # for example, --arg=derp, '=' is the long_sep
 
491
        "long_sep": "=",
 
492
 
 
493
        # this is for programs that expect their input to be from a terminal.
 
494
        # ssh is one of those programs
 
495
        "tty_in": False,
 
496
        "tty_out": True,
 
497
 
 
498
        "encoding": DEFAULT_ENCODING,
 
499
        "decode_errors": "strict",
 
500
 
 
501
        # how long the process should run before it is auto-killed
 
502
        "timeout": 0,
 
503
 
 
504
        # these control whether or not stdout/err will get aggregated together
 
505
        # as the process runs.  this has memory usage implications, so sometimes
 
506
        # with long-running processes with a lot of data, it makes sense to
 
507
        # set these to true
 
508
        "no_out": False,
 
509
        "no_err": False,
 
510
        "no_pipe": False,
 
511
 
 
512
        # if any redirection is used for stdout or stderr, internal buffering
 
513
        # of that data is not stored.  this forces it to be stored, as if
 
514
        # the output is being T'd to both the redirected destination and our
 
515
        # internal buffers
 
516
        "tee": None,
 
517
    }
 
518
 
 
519
    # these are arguments that cannot be called together, because they wouldn't
 
520
    # make any sense
 
521
    _incompatible_call_args = (
 
522
        #("fg", "bg", "Command can't be run in the foreground and background"),
 
523
        ("err", "err_to_out", "Stderr is already being redirected"),
 
524
        ("piped", "iter", "You cannot iterate when this command is being piped"),
 
525
    )
 
526
 
 
527
 
 
528
    # this method exists because of the need to have some way of letting
 
529
    # manual object instantiation not perform the underscore-to-dash command
 
530
    # conversion that resolve_program uses.
 
531
    #
 
532
    # there are 2 ways to create a Command object.  using sh.Command(<program>)
 
533
    # or by using sh.<program>.  the method fed into sh.Command must be taken
 
534
    # literally, and so no underscore-dash conversion is performed.  the one
 
535
    # for sh.<program> must do the underscore-dash converesion, because we
 
536
    # can't type dashes in method names
 
537
    @classmethod
 
538
    def _create(cls, program, **default_kwargs):
 
539
        path = resolve_program(program)
 
540
        if not path: raise CommandNotFound(program)
 
541
 
 
542
        cmd = cls(path)
 
543
        if default_kwargs:
 
544
            cmd = cmd.bake(**default_kwargs)
 
545
 
 
546
        return cmd
 
547
 
 
548
 
 
549
    def __init__(self, path):
 
550
        path = which(path)
 
551
        if not path:
 
552
            raise CommandNotFound(path)
 
553
        self._path = path
 
554
 
 
555
        self._partial = False
 
556
        self._partial_baked_args = []
 
557
        self._partial_call_args = {}
 
558
 
 
559
        # bugfix for functools.wraps.  issue #121
 
560
        self.__name__ = repr(self)
 
561
 
 
562
 
 
563
    def __getattribute__(self, name):
 
564
        # convenience
 
565
        getattr = partial(object.__getattribute__, self)
 
566
 
 
567
        if name.startswith("_"): return getattr(name)
 
568
        if name == "bake": return getattr("bake")
 
569
        if name.endswith("_"): name = name[:-1]
 
570
 
 
571
        return getattr("bake")(name)
 
572
 
 
573
 
 
574
    @staticmethod
 
575
    def _extract_call_args(kwargs, to_override={}):
 
576
        kwargs = kwargs.copy()
 
577
        call_args = {}
 
578
        for parg, default in Command._call_args.items():
 
579
            key = "_" + parg
 
580
 
 
581
            if key in kwargs:
 
582
                call_args[parg] = kwargs[key]
 
583
                del kwargs[key]
 
584
            elif parg in to_override:
 
585
                call_args[parg] = to_override[parg]
 
586
 
 
587
        # test for incompatible call args
 
588
        s1 = set(call_args.keys())
 
589
        for args in Command._incompatible_call_args:
 
590
            args = list(args)
 
591
            error = args.pop()
 
592
 
 
593
            if s1.issuperset(args):
 
594
                raise TypeError("Invalid special arguments %r: %s" % (args, error))
 
595
 
 
596
        return call_args, kwargs
 
597
 
 
598
 
 
599
    def _aggregate_keywords(self, keywords, sep, raw=False):
 
600
        processed = []
 
601
        for k, v in keywords.items():
 
602
            # we're passing a short arg as a kwarg, example:
 
603
            # cut(d="\t")
 
604
            if len(k) == 1:
 
605
                if v is not False:
 
606
                    processed.append(encode_to_py3bytes_or_py2str("-" + k))
 
607
                    if v is not True:
 
608
                        processed.append(encode_to_py3bytes_or_py2str(v))
 
609
 
 
610
            # we're doing a long arg
 
611
            else:
 
612
                if not raw:
 
613
                    k = k.replace("_", "-")
 
614
 
 
615
                if v is True:
 
616
                    processed.append(encode_to_py3bytes_or_py2str("--" + k))
 
617
                elif v is False:
 
618
                    pass
 
619
                else:
 
620
                    arg = encode_to_py3bytes_or_py2str("--%s%s%s" % (k, sep, v))
 
621
                    processed.append(arg)
 
622
        return processed
 
623
 
 
624
 
 
625
    def _compile_args(self, args, kwargs, sep):
 
626
        processed_args = []
 
627
 
 
628
        # aggregate positional args
 
629
        for arg in args:
 
630
            if isinstance(arg, (list, tuple)):
 
631
                if not arg:
 
632
                    warnings.warn("Empty list passed as an argument to %r. \
 
633
If you're using glob.glob(), please use sh.glob() instead." % self._path, stacklevel=3)
 
634
                for sub_arg in arg:
 
635
                    processed_args.append(encode_to_py3bytes_or_py2str(sub_arg))
 
636
            elif isinstance(arg, dict):
 
637
                processed_args += self._aggregate_keywords(arg, sep, raw=True)
 
638
            else:
 
639
                processed_args.append(encode_to_py3bytes_or_py2str(arg))
 
640
 
 
641
        # aggregate the keyword arguments
 
642
        processed_args += self._aggregate_keywords(kwargs, sep)
 
643
 
 
644
        return processed_args
 
645
 
 
646
 
 
647
    # TODO needs documentation
 
648
    def bake(self, *args, **kwargs):
 
649
        fn = Command(self._path)
 
650
        fn._partial = True
 
651
 
 
652
        call_args, kwargs = self._extract_call_args(kwargs)
 
653
 
 
654
        pruned_call_args = call_args
 
655
        for k, v in Command._call_args.items():
 
656
            try:
 
657
                if pruned_call_args[k] == v:
 
658
                    del pruned_call_args[k]
 
659
            except KeyError: continue
 
660
 
 
661
        fn._partial_call_args.update(self._partial_call_args)
 
662
        fn._partial_call_args.update(pruned_call_args)
 
663
        fn._partial_baked_args.extend(self._partial_baked_args)
 
664
        sep = pruned_call_args.get("long_sep", self._call_args["long_sep"])
 
665
        fn._partial_baked_args.extend(self._compile_args(args, kwargs, sep))
 
666
        return fn
 
667
 
 
668
    def __str__(self):
 
669
        if IS_PY3:
 
670
            return self.__unicode__()
 
671
        else:
 
672
            return unicode(self).encode(DEFAULT_ENCODING)
 
673
 
 
674
 
 
675
    def __eq__(self, other):
 
676
        try: return str(self) == str(other)
 
677
        except: return False
 
678
 
 
679
 
 
680
    def __repr__(self):
 
681
        return "<Command %r>" % str(self)
 
682
 
 
683
 
 
684
    def __unicode__(self):
 
685
        baked_args = " ".join(self._partial_baked_args)
 
686
        if baked_args:
 
687
            baked_args = " " + baked_args
 
688
        return self._path + baked_args
 
689
 
 
690
    def __enter__(self):
 
691
        self(_with=True)
 
692
 
 
693
    def __exit__(self, typ, value, traceback):
 
694
        Command._prepend_stack.pop()
 
695
 
 
696
 
 
697
    def __call__(self, *args, **kwargs):
 
698
        kwargs = kwargs.copy()
 
699
        args = list(args)
 
700
 
 
701
        cmd = []
 
702
 
 
703
        # aggregate any 'with' contexts
 
704
        call_args = Command._call_args.copy()
 
705
        for prepend in self._prepend_stack:
 
706
            # don't pass the 'with' call arg
 
707
            pcall_args = prepend.call_args.copy()
 
708
            try: del pcall_args["with"]
 
709
            except: pass
 
710
 
 
711
            call_args.update(pcall_args)
 
712
            cmd.extend(prepend.cmd)
 
713
 
 
714
        if IS_PY3:
 
715
            cmd.append(bytes(self._path, call_args["encoding"]))
 
716
        else:
 
717
            cmd.append(self._path)
 
718
 
 
719
        # here we extract the special kwargs and override any
 
720
        # special kwargs from the possibly baked command
 
721
        tmp_call_args, kwargs = self._extract_call_args(kwargs, self._partial_call_args)
 
722
        call_args.update(tmp_call_args)
 
723
 
 
724
        if not isinstance(call_args["ok_code"], (tuple, list)):
 
725
            call_args["ok_code"] = [call_args["ok_code"]]
 
726
 
 
727
 
 
728
        # check if we're piping via composition
 
729
        stdin = call_args["in"]
 
730
        if args:
 
731
            first_arg = args.pop(0)
 
732
            if isinstance(first_arg, RunningCommand):
 
733
                # it makes sense that if the input pipe of a command is running
 
734
                # in the background, then this command should run in the
 
735
                # background as well
 
736
                if first_arg.call_args["bg"]: call_args["bg"] = True
 
737
                stdin = first_arg.process._pipe_queue
 
738
 
 
739
            else:
 
740
                args.insert(0, first_arg)
 
741
 
 
742
        processed_args = self._compile_args(args, kwargs, call_args["long_sep"])
 
743
 
 
744
        # makes sure our arguments are broken up correctly
 
745
        split_args = self._partial_baked_args + processed_args
 
746
 
 
747
        final_args = split_args
 
748
 
 
749
        cmd.extend(final_args)
 
750
 
 
751
 
 
752
        # stdout redirection
 
753
        stdout = call_args["out"]
 
754
        if stdout \
 
755
            and not callable(stdout) \
 
756
            and not hasattr(stdout, "write") \
 
757
            and not isinstance(stdout, (cStringIO, StringIO)):
 
758
 
 
759
            stdout = open(str(stdout), "wb")
 
760
 
 
761
 
 
762
        # stderr redirection
 
763
        stderr = call_args["err"]
 
764
        if stderr and not callable(stderr) and not hasattr(stderr, "write") \
 
765
            and not isinstance(stderr, (cStringIO, StringIO)):
 
766
            stderr = open(str(stderr), "wb")
 
767
 
 
768
 
 
769
        return RunningCommand(cmd, call_args, stdin, stdout, stderr)
 
770
 
 
771
 
 
772
 
 
773
 
 
774
# used in redirecting
 
775
STDOUT = -1
 
776
STDERR = -2
 
777
 
 
778
 
 
779
 
 
780
# Process open = Popen
 
781
# Open Process = OProc
 
782
class OProc(object):
 
783
    _procs_to_cleanup = set()
 
784
    _registered_cleanup = False
 
785
    _default_window_size = (24, 80)
 
786
 
 
787
    def __init__(self, cmd, stdin, stdout, stderr, call_args,
 
788
            persist=True, pipe=STDOUT):
 
789
 
 
790
        self.call_args = call_args
 
791
 
 
792
        self._single_tty = self.call_args["tty_in"] and self.call_args["tty_out"]
 
793
 
 
794
        # this logic is a little convoluted, but basically this top-level
 
795
        # if/else is for consolidating input and output TTYs into a single
 
796
        # TTY.  this is the only way some secure programs like ssh will
 
797
        # output correctly (is if stdout and stdin are both the same TTY)
 
798
        if self._single_tty:
 
799
            self._stdin_fd, self._slave_stdin_fd = pty.openpty()
 
800
 
 
801
            self._stdout_fd = self._stdin_fd
 
802
            self._slave_stdout_fd = self._slave_stdin_fd
 
803
 
 
804
            self._stderr_fd = self._stdin_fd
 
805
            self._slave_stderr_fd = self._slave_stdin_fd
 
806
 
 
807
        # do not consolidate stdin and stdout
 
808
        else:
 
809
            if self.call_args["tty_in"]:
 
810
                self._slave_stdin_fd, self._stdin_fd = pty.openpty()
 
811
            else:
 
812
                self._slave_stdin_fd, self._stdin_fd = os.pipe()
 
813
 
 
814
            # tty_out is usually the default
 
815
            if self.call_args["tty_out"]:
 
816
                self._stdout_fd, self._slave_stdout_fd = pty.openpty()
 
817
            else:
 
818
                self._stdout_fd, self._slave_stdout_fd = os.pipe()
 
819
 
 
820
            # unless STDERR is going to STDOUT, it ALWAYS needs to be a pipe,
 
821
            # and never a PTY.  the reason for this is not totally clear to me,
 
822
            # but it has to do with the fact that if STDERR isn't set as the
 
823
            # CTTY (because STDOUT is), the STDERR buffer won't always flush
 
824
            # by the time the process exits, and the data will be lost.
 
825
            # i've only seen this on OSX.
 
826
            if stderr is not STDOUT:
 
827
                self._stderr_fd, self._slave_stderr_fd = os.pipe()
 
828
 
 
829
        gc_enabled = gc.isenabled()
 
830
        if gc_enabled: gc.disable()
 
831
        self.pid = os.fork()
 
832
 
 
833
 
 
834
        # child
 
835
        if self.pid == 0:
 
836
            # ignoring SIGHUP lets us persist even after the parent process
 
837
            # exits
 
838
            signal.signal(signal.SIGHUP, signal.SIG_IGN)
 
839
 
 
840
            # this piece of ugliness is due to a bug where we can lose output
 
841
            # if we do os.close(self._slave_stdout_fd) in the parent after
 
842
            # the child starts writing.
 
843
            # see http://bugs.python.org/issue15898
 
844
            if IS_OSX:
 
845
                _time.sleep(0.01)
 
846
 
 
847
            os.setsid()
 
848
 
 
849
            if self.call_args["tty_out"]:
 
850
                # set raw mode, so there isn't any weird translation of newlines
 
851
                # to \r\n and other oddities.  we're not outputting to a terminal
 
852
                # anyways
 
853
                #
 
854
                # we HAVE to do this here, and not in the parent thread, because
 
855
                # we have to guarantee that this is set before the child process
 
856
                # is run, and we can't do it twice.
 
857
                tty.setraw(self._stdout_fd)
 
858
 
 
859
 
 
860
            os.close(self._stdin_fd)
 
861
            if not self._single_tty:
 
862
                os.close(self._stdout_fd)
 
863
                if stderr is not STDOUT: os.close(self._stderr_fd)
 
864
 
 
865
 
 
866
            if self.call_args["cwd"]: os.chdir(self.call_args["cwd"])
 
867
            os.dup2(self._slave_stdin_fd, 0)
 
868
            os.dup2(self._slave_stdout_fd, 1)
 
869
 
 
870
            # we're not directing stderr to stdout?  then set self._slave_stderr_fd to
 
871
            # fd 2, the common stderr fd
 
872
            if stderr is STDOUT: os.dup2(self._slave_stdout_fd, 2)
 
873
            else: os.dup2(self._slave_stderr_fd, 2)
 
874
 
 
875
            # don't inherit file descriptors
 
876
            max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[0]
 
877
            os.closerange(3, max_fd)
 
878
 
 
879
 
 
880
            # set our controlling terminal
 
881
            if self.call_args["tty_out"]:
 
882
                tmp_fd = os.open(os.ttyname(1), os.O_RDWR)
 
883
                os.close(tmp_fd)
 
884
 
 
885
 
 
886
            if self.call_args["tty_out"]:
 
887
                self.setwinsize(1)
 
888
 
 
889
            # actually execute the process
 
890
            if self.call_args["env"] is None:
 
891
                os.execv(cmd[0], cmd)
 
892
            else:
 
893
                os.execve(cmd[0], cmd, self.call_args["env"])
 
894
 
 
895
            os._exit(255)
 
896
 
 
897
        # parent
 
898
        else:
 
899
            if gc_enabled: gc.enable()
 
900
 
 
901
            if not OProc._registered_cleanup:
 
902
                atexit.register(OProc._cleanup_procs)
 
903
                OProc._registered_cleanup = True
 
904
 
 
905
 
 
906
            self.started = _time.time()
 
907
            self.cmd = cmd
 
908
            self.exit_code = None
 
909
 
 
910
            self.stdin = stdin or Queue()
 
911
            self._pipe_queue = Queue()
 
912
 
 
913
            # this is used to prevent a race condition when we're waiting for
 
914
            # a process to end, and the OProc's internal threads are also checking
 
915
            # for the processes's end
 
916
            self._wait_lock = threading.Lock()
 
917
 
 
918
            # these are for aggregating the stdout and stderr.  we use a deque
 
919
            # because we don't want to overflow
 
920
            self._stdout = deque(maxlen=self.call_args["internal_bufsize"])
 
921
            self._stderr = deque(maxlen=self.call_args["internal_bufsize"])
 
922
 
 
923
            if self.call_args["tty_in"]: self.setwinsize(self._stdin_fd)
 
924
 
 
925
 
 
926
            self.log = Logger("process", repr(self))
 
927
 
 
928
            os.close(self._slave_stdin_fd)
 
929
            if not self._single_tty:
 
930
                os.close(self._slave_stdout_fd)
 
931
                if stderr is not STDOUT: os.close(self._slave_stderr_fd)
 
932
 
 
933
            self.log.debug("started process")
 
934
            if not persist:
 
935
                OProc._procs_to_cleanup.add(self)
 
936
 
 
937
 
 
938
            if self.call_args["tty_in"]:
 
939
                attr = termios.tcgetattr(self._stdin_fd)
 
940
                attr[3] &= ~termios.ECHO
 
941
                termios.tcsetattr(self._stdin_fd, termios.TCSANOW, attr)
 
942
 
 
943
            # this represents the connection from a Queue object (or whatever
 
944
            # we're using to feed STDIN) to the process's STDIN fd
 
945
            self._stdin_stream = StreamWriter("stdin", self, self._stdin_fd,
 
946
                self.stdin, self.call_args["in_bufsize"])
 
947
 
 
948
 
 
949
            stdout_pipe = None
 
950
            if pipe is STDOUT and not self.call_args["no_pipe"]:
 
951
                stdout_pipe = self._pipe_queue
 
952
 
 
953
            # this represents the connection from a process's STDOUT fd to
 
954
            # wherever it has to go, sometimes a pipe Queue (that we will use
 
955
            # to pipe data to other processes), and also an internal deque
 
956
            # that we use to aggregate all the output
 
957
            save_stdout = not self.call_args["no_out"] and \
 
958
                (self.call_args["tee"] in (True, "out") or stdout is None)
 
959
            self._stdout_stream = StreamReader("stdout", self, self._stdout_fd, stdout,
 
960
                self._stdout, self.call_args["out_bufsize"], stdout_pipe,
 
961
                save_data=save_stdout)
 
962
 
 
963
 
 
964
            if stderr is STDOUT or self._single_tty: self._stderr_stream = None
 
965
            else:
 
966
                stderr_pipe = None
 
967
                if pipe is STDERR and not self.call_args["no_pipe"]:
 
968
                    stderr_pipe = self._pipe_queue
 
969
 
 
970
                save_stderr = not self.call_args["no_err"] and \
 
971
                    (self.call_args["tee"] in ("err",) or stderr is None)
 
972
                self._stderr_stream = StreamReader("stderr", self, self._stderr_fd, stderr,
 
973
                    self._stderr, self.call_args["err_bufsize"], stderr_pipe,
 
974
                    save_data=save_stderr)
 
975
 
 
976
            # start the main io threads
 
977
            self._input_thread = self._start_thread(self.input_thread, self._stdin_stream)
 
978
            self._output_thread = self._start_thread(self.output_thread, self._stdout_stream, self._stderr_stream)
 
979
 
 
980
 
 
981
    def __repr__(self):
 
982
        return "<Process %d %r>" % (self.pid, self.cmd[:500])
 
983
 
 
984
 
 
985
    # also borrowed from pexpect.py
 
986
    @staticmethod
 
987
    def setwinsize(fd):
 
988
        rows, cols = OProc._default_window_size
 
989
        TIOCSWINSZ = getattr(termios, 'TIOCSWINSZ', -2146929561)
 
990
        if TIOCSWINSZ == 2148037735: # L is not required in Python >= 2.2.
 
991
            TIOCSWINSZ = -2146929561 # Same bits, but with sign.
 
992
 
 
993
        s = struct.pack('HHHH', rows, cols, 0, 0)
 
994
        fcntl.ioctl(fd, TIOCSWINSZ, s)
 
995
 
 
996
 
 
997
    @staticmethod
 
998
    def _start_thread(fn, *args):
 
999
        thrd = threading.Thread(target=fn, args=args)
 
1000
        thrd.daemon = True
 
1001
        thrd.start()
 
1002
        return thrd
 
1003
 
 
1004
    def in_bufsize(self, buf):
 
1005
        self._stdin_stream.stream_bufferer.change_buffering(buf)
 
1006
 
 
1007
    def out_bufsize(self, buf):
 
1008
        self._stdout_stream.stream_bufferer.change_buffering(buf)
 
1009
 
 
1010
    def err_bufsize(self, buf):
 
1011
        if self._stderr_stream:
 
1012
            self._stderr_stream.stream_bufferer.change_buffering(buf)
 
1013
 
 
1014
 
 
1015
    def input_thread(self, stdin):
 
1016
        done = False
 
1017
        while not done and self.alive:
 
1018
            self.log.debug("%r ready for more input", stdin)
 
1019
            done = stdin.write()
 
1020
 
 
1021
        stdin.close()
 
1022
 
 
1023
 
 
1024
    def output_thread(self, stdout, stderr):
 
1025
        readers = []
 
1026
        errors = []
 
1027
 
 
1028
        if stdout is not None:
 
1029
            readers.append(stdout)
 
1030
            errors.append(stdout)
 
1031
        if stderr is not None:
 
1032
            readers.append(stderr)
 
1033
            errors.append(stderr)
 
1034
 
 
1035
        while readers:
 
1036
            outputs, inputs, err = select.select(readers, [], errors, 0.1)
 
1037
 
 
1038
            # stdout and stderr
 
1039
            for stream in outputs:
 
1040
                self.log.debug("%r ready to be read from", stream)
 
1041
                done = stream.read()
 
1042
                if done: readers.remove(stream)
 
1043
 
 
1044
            for stream in err:
 
1045
                pass
 
1046
 
 
1047
            # test if the process has been running too long
 
1048
            if self.call_args["timeout"]:
 
1049
                now = _time.time()
 
1050
                if now - self.started > self.call_args["timeout"]:
 
1051
                    self.log.debug("we've been running too long")
 
1052
                    self.kill()
 
1053
 
 
1054
 
 
1055
        # this is here because stdout may be the controlling TTY, and
 
1056
        # we can't close it until the process has ended, otherwise the
 
1057
        # child will get SIGHUP.  typically, if we've broken out of
 
1058
        # the above loop, and we're here, the process is just about to
 
1059
        # end, so it's probably ok to aggressively poll self.alive
 
1060
        #
 
1061
        # the other option to this would be to do the CTTY close from
 
1062
        # the method that does the actual os.waitpid() call, but the
 
1063
        # problem with that is that the above loop might still be
 
1064
        # running, and closing the fd will cause some operation to
 
1065
        # fail.  this is less complex than wrapping all the ops
 
1066
        # in the above loop with out-of-band fd-close exceptions
 
1067
        while self.alive:
 
1068
            _time.sleep(0.001)
 
1069
 
 
1070
        if stdout:
 
1071
            stdout.close()
 
1072
 
 
1073
        if stderr:
 
1074
            stderr.close()
 
1075
 
 
1076
 
 
1077
    @property
 
1078
    def stdout(self):
 
1079
        return "".encode(self.call_args["encoding"]).join(self._stdout)
 
1080
 
 
1081
    @property
 
1082
    def stderr(self):
 
1083
        return "".encode(self.call_args["encoding"]).join(self._stderr)
 
1084
 
 
1085
 
 
1086
    def signal(self, sig):
 
1087
        self.log.debug("sending signal %d", sig)
 
1088
        try: os.kill(self.pid, sig)
 
1089
        except OSError: pass
 
1090
 
 
1091
    def kill(self):
 
1092
        self.log.debug("killing")
 
1093
        self.signal(signal.SIGKILL)
 
1094
 
 
1095
    def terminate(self):
 
1096
        self.log.debug("terminating")
 
1097
        self.signal(signal.SIGTERM)
 
1098
 
 
1099
    @staticmethod
 
1100
    def _cleanup_procs():
 
1101
        for proc in OProc._procs_to_cleanup:
 
1102
            proc.kill()
 
1103
 
 
1104
 
 
1105
    def _handle_exit_code(self, exit_code):
 
1106
        # if we exited from a signal, let our exit code reflect that
 
1107
        if os.WIFSIGNALED(exit_code): return -os.WTERMSIG(exit_code)
 
1108
        # otherwise just give us a normal exit code
 
1109
        elif os.WIFEXITED(exit_code): return os.WEXITSTATUS(exit_code)
 
1110
        else: raise RuntimeError("Unknown child exit status!")
 
1111
 
 
1112
    @property
 
1113
    def alive(self):
 
1114
        if self.exit_code is not None: return False
 
1115
 
 
1116
        # what we're doing here essentially is making sure that the main thread
 
1117
        # (or another thread), isn't calling .wait() on the process.  because
 
1118
        # .wait() calls os.waitpid(self.pid, 0), we can't do an os.waitpid
 
1119
        # here...because if we did, and the process exited while in this
 
1120
        # thread, the main thread's os.waitpid(self.pid, 0) would raise OSError
 
1121
        # (because the process ended in another thread).
 
1122
        #
 
1123
        # so essentially what we're doing is, using this lock, checking if
 
1124
        # we're calling .wait(), and if we are, let .wait() get the exit code
 
1125
        # and handle the status, otherwise let us do it.
 
1126
        acquired = self._wait_lock.acquire(False)
 
1127
        if not acquired:
 
1128
            if self.exit_code is not None: return False
 
1129
            return True
 
1130
 
 
1131
        try:
 
1132
            # WNOHANG is just that...we're calling waitpid without hanging...
 
1133
            # essentially polling the process
 
1134
            pid, exit_code = os.waitpid(self.pid, os.WNOHANG)
 
1135
            if pid == self.pid:
 
1136
                self.exit_code = self._handle_exit_code(exit_code)
 
1137
                return False
 
1138
 
 
1139
        # no child process
 
1140
        except OSError: return False
 
1141
        else: return True
 
1142
        finally: self._wait_lock.release()
 
1143
 
 
1144
 
 
1145
    def wait(self):
 
1146
        self.log.debug("acquiring wait lock to wait for completion")
 
1147
        with self._wait_lock:
 
1148
            self.log.debug("got wait lock")
 
1149
 
 
1150
            if self.exit_code is None:
 
1151
                self.log.debug("exit code not set, waiting on pid")
 
1152
                pid, exit_code = os.waitpid(self.pid, 0)
 
1153
                self.exit_code = self._handle_exit_code(exit_code)
 
1154
            else:
 
1155
                self.log.debug("exit code already set (%d), no need to wait", self.exit_code)
 
1156
 
 
1157
            self._input_thread.join()
 
1158
            self._output_thread.join()
 
1159
 
 
1160
            OProc._procs_to_cleanup.discard(self)
 
1161
 
 
1162
            return self.exit_code
 
1163
 
 
1164
 
 
1165
 
 
1166
 
 
1167
class DoneReadingStdin(Exception): pass
 
1168
class NoStdinData(Exception): pass
 
1169
 
 
1170
 
 
1171
 
 
1172
# this guy is for reading from some input (the stream) and writing to our
 
1173
# opened process's stdin fd.  the stream can be a Queue, a callable, something
 
1174
# with the "read" method, a string, or an iterable
 
1175
class StreamWriter(object):
 
1176
    def __init__(self, name, process, stream, stdin, bufsize):
 
1177
        self.name = name
 
1178
        self.process = weakref.ref(process)
 
1179
        self.stream = stream
 
1180
        self.stdin = stdin
 
1181
 
 
1182
        self.log = Logger("streamwriter", repr(self))
 
1183
 
 
1184
 
 
1185
        self.stream_bufferer = StreamBufferer(self.process().call_args["encoding"],
 
1186
            bufsize)
 
1187
 
 
1188
        # determine buffering for reading from the input we set for stdin
 
1189
        if bufsize == 1: self.bufsize = 1024
 
1190
        elif bufsize == 0: self.bufsize = 1
 
1191
        else: self.bufsize = bufsize
 
1192
 
 
1193
 
 
1194
        if isinstance(stdin, Queue):
 
1195
            log_msg = "queue"
 
1196
            self.get_chunk = self.get_queue_chunk
 
1197
 
 
1198
        elif callable(stdin):
 
1199
            log_msg = "callable"
 
1200
            self.get_chunk = self.get_callable_chunk
 
1201
 
 
1202
        # also handles stringio
 
1203
        elif hasattr(stdin, "read"):
 
1204
            log_msg = "file descriptor"
 
1205
            self.get_chunk = self.get_file_chunk
 
1206
 
 
1207
        elif isinstance(stdin, basestring):
 
1208
            log_msg = "string"
 
1209
 
 
1210
            if bufsize == 1:
 
1211
                # TODO, make the split() be a generator
 
1212
                self.stdin = iter((c + "\n" for c in stdin.split("\n")))
 
1213
            else:
 
1214
                self.stdin = iter(stdin[i:i + self.bufsize] for i in range(0, len(stdin), self.bufsize))
 
1215
            self.get_chunk = self.get_iter_chunk
 
1216
 
 
1217
        else:
 
1218
            log_msg = "general iterable"
 
1219
            self.stdin = iter(stdin)
 
1220
            self.get_chunk = self.get_iter_chunk
 
1221
 
 
1222
        self.log.debug("parsed stdin as a %s", log_msg)
 
1223
 
 
1224
 
 
1225
    def __repr__(self):
 
1226
        return "<StreamWriter %s for %r>" % (self.name, self.process())
 
1227
 
 
1228
    def fileno(self):
 
1229
        return self.stream
 
1230
 
 
1231
    def get_queue_chunk(self):
 
1232
        try: chunk = self.stdin.get(True, 0.01)
 
1233
        except Empty: raise NoStdinData
 
1234
        if chunk is None: raise DoneReadingStdin
 
1235
        return chunk
 
1236
 
 
1237
    def get_callable_chunk(self):
 
1238
        try: return self.stdin()
 
1239
        except: raise DoneReadingStdin
 
1240
 
 
1241
    def get_iter_chunk(self):
 
1242
        try:
 
1243
            if IS_PY3: return self.stdin.__next__()
 
1244
            else: return self.stdin.next()
 
1245
        except StopIteration: raise DoneReadingStdin
 
1246
 
 
1247
    def get_file_chunk(self):
 
1248
        if self.stream_bufferer.type == 1: chunk = self.stdin.readline()
 
1249
        else: chunk = self.stdin.read(self.bufsize)
 
1250
        if not chunk: raise DoneReadingStdin
 
1251
        else: return chunk
 
1252
 
 
1253
 
 
1254
    # the return value answers the questions "are we done writing forever?"
 
1255
    def write(self):
 
1256
        # get_chunk may sometimes return bytes, and sometimes returns trings
 
1257
        # because of the nature of the different types of STDIN objects we
 
1258
        # support
 
1259
        try: chunk = self.get_chunk()
 
1260
        except DoneReadingStdin:
 
1261
            self.log.debug("done reading")
 
1262
 
 
1263
            if self.process().call_args["tty_in"]:
 
1264
                # EOF time
 
1265
                try: char = termios.tcgetattr(self.stream)[6][termios.VEOF]
 
1266
                except: char = chr(4).encode()
 
1267
                os.write(self.stream, char)
 
1268
 
 
1269
            return True
 
1270
 
 
1271
        except NoStdinData:
 
1272
            self.log.debug("received no data")
 
1273
            return False
 
1274
 
 
1275
        # if we're not bytes, make us bytes
 
1276
        if IS_PY3 and hasattr(chunk, "encode"):
 
1277
            chunk = chunk.encode(self.process().call_args["encoding"])
 
1278
 
 
1279
        for chunk in self.stream_bufferer.process(chunk):
 
1280
            self.log.debug("got chunk size %d: %r", len(chunk), chunk[:30])
 
1281
 
 
1282
            self.log.debug("writing chunk to process")
 
1283
            try:
 
1284
                os.write(self.stream, chunk)
 
1285
            except OSError:
 
1286
                self.log.debug("OSError writing stdin chunk")
 
1287
                return True
 
1288
 
 
1289
 
 
1290
    def close(self):
 
1291
        self.log.debug("closing, but flushing first")
 
1292
        chunk = self.stream_bufferer.flush()
 
1293
        self.log.debug("got chunk size %d to flush: %r", len(chunk), chunk[:30])
 
1294
        try:
 
1295
            if chunk: os.write(self.stream, chunk)
 
1296
            if not self.process().call_args["tty_in"]:
 
1297
                self.log.debug("we used a TTY, so closing the stream")
 
1298
                os.close(self.stream)
 
1299
        except OSError: pass
 
1300
 
 
1301
 
 
1302
 
 
1303
class StreamReader(object):
 
1304
    def __init__(self, name, process, stream, handler, buffer, bufsize,
 
1305
            pipe_queue=None, save_data=True):
 
1306
        self.name = name
 
1307
        self.process = weakref.ref(process)
 
1308
        self.stream = stream
 
1309
        self.buffer = buffer
 
1310
        self.save_data = save_data
 
1311
        self.encoding = process.call_args["encoding"]
 
1312
        self.decode_errors = process.call_args["decode_errors"]
 
1313
 
 
1314
        self.pipe_queue = None
 
1315
        if pipe_queue: self.pipe_queue = weakref.ref(pipe_queue)
 
1316
 
 
1317
        self.log = Logger("streamreader", repr(self))
 
1318
 
 
1319
        self.stream_bufferer = StreamBufferer(self.encoding, bufsize,
 
1320
            self.decode_errors)
 
1321
 
 
1322
        # determine buffering
 
1323
        if bufsize == 1: self.bufsize = 1024
 
1324
        elif bufsize == 0: self.bufsize = 1
 
1325
        else: self.bufsize = bufsize
 
1326
 
 
1327
 
 
1328
        # here we're determining the handler type by doing some basic checks
 
1329
        # on the handler object
 
1330
        self.handler = handler
 
1331
        if callable(handler): self.handler_type = "fn"
 
1332
        elif isinstance(handler, StringIO): self.handler_type = "stringio"
 
1333
        elif isinstance(handler, cStringIO):
 
1334
            self.handler_type = "cstringio"
 
1335
        elif hasattr(handler, "write"): self.handler_type = "fd"
 
1336
        else: self.handler_type = None
 
1337
 
 
1338
 
 
1339
        self.should_quit = False
 
1340
 
 
1341
        # here we choose how to call the callback, depending on how many
 
1342
        # arguments it takes.  the reason for this is to make it as easy as
 
1343
        # possible for people to use, without limiting them.  a new user will
 
1344
        # assume the callback takes 1 argument (the data).  as they get more
 
1345
        # advanced, they may want to terminate the process, or pass some stdin
 
1346
        # back, and will realize that they can pass a callback of more args
 
1347
        if self.handler_type == "fn":
 
1348
            implied_arg = 0
 
1349
            if inspect.ismethod(handler):
 
1350
                implied_arg = 1
 
1351
                num_args = len(inspect.getargspec(handler).args)
 
1352
 
 
1353
            else:
 
1354
                if inspect.isfunction(handler):
 
1355
                    num_args = len(inspect.getargspec(handler).args)
 
1356
 
 
1357
                # is an object instance with __call__ method
 
1358
                else:
 
1359
                    implied_arg = 1
 
1360
                    num_args = len(inspect.getargspec(handler.__call__).args)
 
1361
 
 
1362
 
 
1363
            self.handler_args = ()
 
1364
            if num_args == implied_arg + 2:
 
1365
                self.handler_args = (self.process().stdin,)
 
1366
            elif num_args == implied_arg + 3:
 
1367
                self.handler_args = (self.process().stdin, self.process)
 
1368
 
 
1369
 
 
1370
    def fileno(self):
 
1371
        return self.stream
 
1372
 
 
1373
    def __repr__(self):
 
1374
        return "<StreamReader %s for %r>" % (self.name, self.process())
 
1375
 
 
1376
    def close(self):
 
1377
        chunk = self.stream_bufferer.flush()
 
1378
        self.log.debug("got chunk size %d to flush: %r",
 
1379
            len(chunk), chunk[:30])
 
1380
        if chunk: self.write_chunk(chunk)
 
1381
 
 
1382
        if self.handler_type == "fd" and hasattr(self.handler, "close"):
 
1383
            self.handler.flush()
 
1384
 
 
1385
        if self.pipe_queue and self.save_data: self.pipe_queue().put(None)
 
1386
        try: os.close(self.stream)
 
1387
        except OSError: pass
 
1388
 
 
1389
 
 
1390
    def write_chunk(self, chunk):
 
1391
        # in PY3, the chunk coming in will be bytes, so keep that in mind
 
1392
 
 
1393
        if self.handler_type == "fn" and not self.should_quit:
 
1394
            # try to use the encoding first, if that doesn't work, send
 
1395
            # the bytes, because it might be binary
 
1396
            try:
 
1397
                to_handler = chunk.decode(self.encoding, self.decode_errors)
 
1398
            except UnicodeDecodeError:
 
1399
                to_handler = chunk
 
1400
 
 
1401
            # this is really ugly, but we can't store self.process as one of
 
1402
            # the handler args in self.handler_args, the reason being is that
 
1403
            # it would create cyclic references, and prevent objects from
 
1404
            # being garbage collected.  so we're determining if this handler
 
1405
            # even requires self.process (by the argument count), and if it
 
1406
            # does, resolving the weakref to a hard reference and passing
 
1407
            # that into the handler
 
1408
            handler_args = self.handler_args
 
1409
            if len(self.handler_args) == 2:
 
1410
                handler_args = (self.handler_args[0], self.process())
 
1411
            self.should_quit = self.handler(to_handler, *handler_args)
 
1412
 
 
1413
        elif self.handler_type == "stringio":
 
1414
            self.handler.write(chunk.decode(self.encoding, self.decode_errors))
 
1415
 
 
1416
        elif self.handler_type in ("cstringio", "fd"):
 
1417
            self.handler.write(chunk)
 
1418
 
 
1419
            # we should flush on an fd.  chunk is already the correctly-buffered
 
1420
            # size, so we don't need the fd buffering as well
 
1421
            self.handler.flush()
 
1422
 
 
1423
        if self.save_data:
 
1424
            self.buffer.append(chunk)
 
1425
 
 
1426
            if self.pipe_queue:
 
1427
                self.log.debug("putting chunk onto pipe: %r", chunk[:30])
 
1428
                self.pipe_queue().put(chunk)
 
1429
 
 
1430
 
 
1431
    def read(self):
 
1432
        # if we're PY3, we're reading bytes, otherwise we're reading
 
1433
        # str
 
1434
        try: chunk = os.read(self.stream, self.bufsize)
 
1435
        except OSError as e:
 
1436
            self.log.debug("got errno %d, done reading", e.errno)
 
1437
            return True
 
1438
        if not chunk:
 
1439
            self.log.debug("got no chunk, done reading")
 
1440
            return True
 
1441
 
 
1442
        self.log.debug("got chunk size %d: %r", len(chunk), chunk[:30])
 
1443
        for chunk in self.stream_bufferer.process(chunk):
 
1444
            self.write_chunk(chunk)
 
1445
 
 
1446
 
 
1447
 
 
1448
 
 
1449
# this is used for feeding in chunks of stdout/stderr, and breaking it up into
 
1450
# chunks that will actually be put into the internal buffers.  for example, if
 
1451
# you have two processes, one being piped to the other, and you want that,
 
1452
# first process to feed lines of data (instead of the chunks however they
 
1453
# come in), OProc will use an instance of this class to chop up the data and
 
1454
# feed it as lines to be sent down the pipe
 
1455
class StreamBufferer(object):
 
1456
    def __init__(self, encoding=DEFAULT_ENCODING, buffer_type=1,
 
1457
            decode_errors="strict"):
 
1458
        # 0 for unbuffered, 1 for line, everything else for that amount
 
1459
        self.type = buffer_type
 
1460
        self.buffer = []
 
1461
        self.n_buffer_count = 0
 
1462
        self.encoding = encoding
 
1463
        self.decode_errors = decode_errors
 
1464
 
 
1465
        # this is for if we change buffering types.  if we change from line
 
1466
        # buffered to unbuffered, its very possible that our self.buffer list
 
1467
        # has data that was being saved up (while we searched for a newline).
 
1468
        # we need to use that up, so we don't lose it
 
1469
        self._use_up_buffer_first = False
 
1470
 
 
1471
        # the buffering lock is used because we might chance the buffering
 
1472
        # types from a different thread.  for example, if we have a stdout
 
1473
        # callback, we might use it to change the way stdin buffers.  so we
 
1474
        # lock
 
1475
        self._buffering_lock = threading.RLock()
 
1476
        self.log = Logger("stream_bufferer")
 
1477
 
 
1478
 
 
1479
    def change_buffering(self, new_type):
 
1480
        # TODO, when we stop supporting 2.6, make this a with context
 
1481
        self.log.debug("acquiring buffering lock for changing buffering")
 
1482
        self._buffering_lock.acquire()
 
1483
        self.log.debug("got buffering lock for changing buffering")
 
1484
        try:
 
1485
            if new_type == 0: self._use_up_buffer_first = True
 
1486
 
 
1487
            self.type = new_type
 
1488
        finally:
 
1489
            self._buffering_lock.release()
 
1490
            self.log.debug("released buffering lock for changing buffering")
 
1491
 
 
1492
 
 
1493
    def process(self, chunk):
 
1494
        # MAKE SURE THAT THE INPUT IS PY3 BYTES
 
1495
        # THE OUTPUT IS ALWAYS PY3 BYTES
 
1496
 
 
1497
        # TODO, when we stop supporting 2.6, make this a with context
 
1498
        self.log.debug("acquiring buffering lock to process chunk (buffering: %d)", self.type)
 
1499
        self._buffering_lock.acquire()
 
1500
        self.log.debug("got buffering lock to process chunk (buffering: %d)", self.type)
 
1501
        try:
 
1502
            # we've encountered binary, permanently switch to N size buffering
 
1503
            # since matching on newline doesn't make sense anymore
 
1504
            if self.type == 1:
 
1505
                try: chunk.decode(self.encoding, self.decode_errors)
 
1506
                except:
 
1507
                    self.log.debug("detected binary data, changing buffering")
 
1508
                    self.change_buffering(1024)
 
1509
 
 
1510
            # unbuffered
 
1511
            if self.type == 0:
 
1512
                if self._use_up_buffer_first:
 
1513
                    self._use_up_buffer_first = False
 
1514
                    to_write = self.buffer
 
1515
                    self.buffer = []
 
1516
                    to_write.append(chunk)
 
1517
                    return to_write
 
1518
 
 
1519
                return [chunk]
 
1520
 
 
1521
            # line buffered
 
1522
            elif self.type == 1:
 
1523
                total_to_write = []
 
1524
                chunk = chunk.decode(self.encoding, self.decode_errors)
 
1525
                while True:
 
1526
                    newline = chunk.find("\n")
 
1527
                    if newline == -1: break
 
1528
 
 
1529
                    chunk_to_write = chunk[:newline + 1]
 
1530
                    if self.buffer:
 
1531
                        # this is ugly, but it's designed to take the existing
 
1532
                        # bytes buffer, join it together, tack on our latest
 
1533
                        # chunk, then convert the whole thing to a string.
 
1534
                        # it's necessary, i'm sure.  read the whole block to
 
1535
                        # see why.
 
1536
                        chunk_to_write = "".encode(self.encoding).join(self.buffer) \
 
1537
                            + chunk_to_write.encode(self.encoding)
 
1538
                        chunk_to_write = chunk_to_write.decode(self.encoding)
 
1539
 
 
1540
                        self.buffer = []
 
1541
                        self.n_buffer_count = 0
 
1542
 
 
1543
                    chunk = chunk[newline + 1:]
 
1544
                    total_to_write.append(chunk_to_write.encode(self.encoding))
 
1545
 
 
1546
                if chunk:
 
1547
                    self.buffer.append(chunk.encode(self.encoding))
 
1548
                    self.n_buffer_count += len(chunk)
 
1549
                return total_to_write
 
1550
 
 
1551
            # N size buffered
 
1552
            else:
 
1553
                total_to_write = []
 
1554
                while True:
 
1555
                    overage = self.n_buffer_count + len(chunk) - self.type
 
1556
                    if overage >= 0:
 
1557
                        ret = "".encode(self.encoding).join(self.buffer) + chunk
 
1558
                        chunk_to_write = ret[:self.type]
 
1559
                        chunk = ret[self.type:]
 
1560
                        total_to_write.append(chunk_to_write)
 
1561
                        self.buffer = []
 
1562
                        self.n_buffer_count = 0
 
1563
                    else:
 
1564
                        self.buffer.append(chunk)
 
1565
                        self.n_buffer_count += len(chunk)
 
1566
                        break
 
1567
                return total_to_write
 
1568
        finally:
 
1569
            self._buffering_lock.release()
 
1570
            self.log.debug("released buffering lock for processing chunk (buffering: %d)", self.type)
 
1571
 
 
1572
 
 
1573
    def flush(self):
 
1574
        self.log.debug("acquiring buffering lock for flushing buffer")
 
1575
        self._buffering_lock.acquire()
 
1576
        self.log.debug("got buffering lock for flushing buffer")
 
1577
        try:
 
1578
            ret = "".encode(self.encoding).join(self.buffer)
 
1579
            self.buffer = []
 
1580
            return ret
 
1581
        finally:
 
1582
            self._buffering_lock.release()
 
1583
            self.log.debug("released buffering lock for flushing buffer")
 
1584
 
 
1585
 
 
1586
 
 
1587
 
 
1588
 
 
1589
# this allows lookups to names that aren't found in the global scope to be
 
1590
# searched for as a program name.  for example, if "ls" isn't found in this
 
1591
# module's scope, we consider it a system program and try to find it.
 
1592
#
 
1593
# we use a dict instead of just a regular object as the base class because
 
1594
# the exec() statement used in this file requires the "globals" argument to
 
1595
# be a dictionary
 
1596
class Environment(dict):
 
1597
    def __init__(self, globs, baked_args={}):
 
1598
        self.globs = globs
 
1599
        self.baked_args = baked_args
 
1600
 
 
1601
    def __setitem__(self, k, v):
 
1602
        self.globs[k] = v
 
1603
 
 
1604
    def __getitem__(self, k):
 
1605
        try: return self.globs[k]
 
1606
        except KeyError: pass
 
1607
 
 
1608
        # the only way we'd get to here is if we've tried to
 
1609
        # import * from a repl.  so, raise an exception, since
 
1610
        # that's really the only sensible thing to do
 
1611
        if k == "__all__":
 
1612
            raise ImportError("Cannot import * from sh. \
 
1613
Please import sh or import programs individually.")
 
1614
 
 
1615
        # if we end with "_" just go ahead and skip searching
 
1616
        # our namespace for python stuff.  this was mainly for the
 
1617
        # command "id", which is a popular program for finding
 
1618
        # if a user exists, but also a python function for getting
 
1619
        # the address of an object.  so can call the python
 
1620
        # version by "id" and the program version with "id_"
 
1621
        if not k.endswith("_"):
 
1622
            # check if we're naming a dynamically generated ReturnCode exception
 
1623
            try: return rc_exc_cache[k]
 
1624
            except KeyError:
 
1625
                m = rc_exc_regex.match(k)
 
1626
                if m:
 
1627
                    exit_code = int(m.group(2))
 
1628
                    if m.group(1) == "SignalException":
 
1629
                        exit_code = -exit_code
 
1630
                    return get_rc_exc(exit_code)
 
1631
 
 
1632
            # is it a builtin?
 
1633
            try:
 
1634
                return getattr(self["__builtins__"], k)
 
1635
            except AttributeError:
 
1636
                pass
 
1637
        elif not k.startswith("_"):
 
1638
            k = k.rstrip("_")
 
1639
 
 
1640
 
 
1641
        # https://github.com/ipython/ipython/issues/2577
 
1642
        # https://github.com/amoffat/sh/issues/97#issuecomment-10610629
 
1643
        if k.startswith("__") and k.endswith("__"):
 
1644
            raise AttributeError
 
1645
 
 
1646
        # how about an environment variable?
 
1647
        try:
 
1648
            return os.environ[k]
 
1649
        except KeyError:
 
1650
            pass
 
1651
 
 
1652
        # is it a custom builtin?
 
1653
        builtin = getattr(self, "b_" + k, None)
 
1654
        if builtin: return builtin
 
1655
 
 
1656
        # it must be a command then
 
1657
        # we use _create instead of instantiating the class directly because
 
1658
        # _create uses resolve_program, which will automatically do underscore-
 
1659
        # to-dash conversions.  instantiating directly does not use that
 
1660
        return Command._create(k, **self.baked_args)
 
1661
 
 
1662
 
 
1663
    # methods that begin with "b_" are custom builtins and will override any
 
1664
    # program that exists in our path.  this is useful for things like
 
1665
    # common shell builtins that people are used to, but which aren't actually
 
1666
    # full-fledged system binaries
 
1667
 
 
1668
    def b_cd(self, path):
 
1669
        os.chdir(path)
 
1670
 
 
1671
    def b_which(self, program):
 
1672
        return which(program)
 
1673
 
 
1674
 
 
1675
 
 
1676
 
 
1677
def run_repl(env):
 
1678
    banner = "\n>> sh v{version}\n>> https://github.com/amoffat/sh\n"
 
1679
 
 
1680
    print(banner.format(version=__version__))
 
1681
    while True:
 
1682
        try: line = raw_input("sh> ")
 
1683
        except (ValueError, EOFError): break
 
1684
 
 
1685
        try: exec(compile(line, "<dummy>", "single"), env, env)
 
1686
        except SystemExit: break
 
1687
        except: print(traceback.format_exc())
 
1688
 
 
1689
    # cleans up our last line
 
1690
    print("")
 
1691
 
 
1692
 
 
1693
 
 
1694
 
 
1695
# this is a thin wrapper around THIS module (we patch sys.modules[__name__]).
 
1696
# this is in the case that the user does a "from sh import whatever"
 
1697
# in other words, they only want to import certain programs, not the whole
 
1698
# system PATH worth of commands.  in this case, we just proxy the
 
1699
# import lookup to our Environment class
 
1700
class SelfWrapper(ModuleType):
 
1701
    def __init__(self, self_module, baked_args={}):
 
1702
        # this is super ugly to have to copy attributes like this,
 
1703
        # but it seems to be the only way to make reload() behave
 
1704
        # nicely.  if i make these attributes dynamic lookups in
 
1705
        # __getattr__, reload sometimes chokes in weird ways...
 
1706
        for attr in ["__builtins__", "__doc__", "__name__", "__package__"]:
 
1707
            setattr(self, attr, getattr(self_module, attr, None))
 
1708
 
 
1709
        # python 3.2 (2.7 and 3.3 work fine) breaks on osx (not ubuntu)
 
1710
        # if we set this to None.  and 3.3 needs a value for __path__
 
1711
        self.__path__ = []
 
1712
        self.self_module = self_module
 
1713
        self.env = Environment(globals(), baked_args)
 
1714
 
 
1715
    def __setattr__(self, name, value):
 
1716
        if hasattr(self, "env"): self.env[name] = value
 
1717
        ModuleType.__setattr__(self, name, value)
 
1718
 
 
1719
    def __getattr__(self, name):
 
1720
        if name == "env": raise AttributeError
 
1721
        return self.env[name]
 
1722
 
 
1723
    # accept special keywords argument to define defaults for all operations
 
1724
    # that will be processed with given by return SelfWrapper
 
1725
    def __call__(self, **kwargs):
 
1726
        return SelfWrapper(self.self_module, kwargs)
 
1727
 
 
1728
 
 
1729
 
 
1730
 
 
1731
# we're being run as a stand-alone script
 
1732
if __name__ == "__main__":
 
1733
    try:
 
1734
        arg = sys.argv.pop(1)
 
1735
    except:
 
1736
        arg = None
 
1737
 
 
1738
    if arg == "test":
 
1739
        import subprocess
 
1740
 
 
1741
        def run_test(version, locale):
 
1742
            py_version = "python%s" % version
 
1743
            py_bin = which(py_version)
 
1744
 
 
1745
            if py_bin:
 
1746
                print("Testing %s, locale %r" % (py_version.capitalize(),
 
1747
                    locale))
 
1748
 
 
1749
                env = os.environ.copy()
 
1750
                env["LC_ALL"] = locale
 
1751
                p = subprocess.Popen([py_bin, os.path.join(THIS_DIR, "test.py")]
 
1752
                    + sys.argv[1:], env=env)
 
1753
                p.wait()
 
1754
            else:
 
1755
                print("Couldn't find %s, skipping" % py_version.capitalize())
 
1756
 
 
1757
        versions = ("2.6", "2.7", "3.1", "3.2", "3.3")
 
1758
        locales = ("en_US.UTF-8", "C")
 
1759
        for locale in locales:
 
1760
            for version in versions:
 
1761
                run_test(version, locale)
 
1762
 
 
1763
    else:
 
1764
        env = Environment(globals())
 
1765
        run_repl(env)
 
1766
 
 
1767
# we're being imported from somewhere
 
1768
else:
 
1769
    self = sys.modules[__name__]
 
1770
    sys.modules[__name__] = SelfWrapper(self)