1
#===============================================================================
2
# Copyright (C) 2011-2012 by Andrew Moffat
4
# Permission is hereby granted, free of charge, to any person obtaining a copy
5
# of this software and associated documentation files (the "Software"), to deal
6
# in the Software without restriction, including without limitation the rights
7
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8
# copies of the Software, and to permit persons to whom the Software is
9
# furnished to do so, subject to the following conditions:
11
# The above copyright notice and this permission notice shall be included in
12
# all copies or substantial portions of the Software.
14
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21
#===============================================================================
25
__project_url__ = "https://github.com/amoffat/sh"
31
if "windows" in platform.system().lower():
32
raise ImportError("sh %s is currently only supported on linux and osx. \
33
please install pbs 0.110 (http://pypi.python.org/pypi/pbs) for windows \
34
support." % __version__)
39
IS_PY3 = sys.version_info[0] == 3
44
from glob import glob as original_glob
45
from types import ModuleType
46
from functools import partial
50
from locale import getpreferredencoding
51
DEFAULT_ENCODING = getpreferredencoding() or "utf-8"
55
from io import StringIO
56
from io import BytesIO as cStringIO
57
from queue import Queue, Empty
59
from StringIO import StringIO
60
from cStringIO import OutputType as cStringIO
61
from Queue import Queue, Empty
63
IS_OSX = platform.system() == "Darwin"
64
THIS_DIR = os.path.dirname(os.path.realpath(__file__))
81
from collections import deque
86
logging_enabled = False
95
def encode_to_py3bytes_or_py2str(s):
96
""" takes anything and attempts to return a py2 string or py3 bytes. this
97
is typically used when creating command + arguments to be executed via
100
fallback_encoding = "utf8"
105
s = bytes(s, DEFAULT_ENCODING)
106
except UnicodeEncodeError:
107
s = bytes(s, fallback_encoding)
109
# attempt to convert the thing to unicode from the system's encoding
111
s = unicode(s, DEFAULT_ENCODING)
112
# if the thing is already unicode, or it's a number, it can't be
113
# coerced to unicode with an encoding argument, but if we leave out
114
# the encoding argument, it will convert it to a string, then to unicode
118
# now that we have guaranteed unicode, encode to our system encoding,
119
# but attempt to fall back to something
121
s = s.encode(DEFAULT_ENCODING)
123
s = s.encode(fallback_encoding)
127
class ErrorReturnCode(Exception):
130
def __init__(self, full_cmd, stdout, stderr):
131
self.full_cmd = full_cmd
136
if self.stdout is None: exc_stdout = "<redirected>"
138
exc_stdout = self.stdout[:self.truncate_cap]
139
out_delta = len(self.stdout) - len(exc_stdout)
141
exc_stdout += ("... (%d more, please see e.stdout)" % out_delta).encode()
143
if self.stderr is None: exc_stderr = "<redirected>"
145
exc_stderr = self.stderr[:self.truncate_cap]
146
err_delta = len(self.stderr) - len(exc_stderr)
148
exc_stderr += ("... (%d more, please see e.stderr)" % err_delta).encode()
150
msg = "\n\n RAN: %r\n\n STDOUT:\n%s\n\n STDERR:\n%s" % \
151
(full_cmd, exc_stdout.decode(DEFAULT_ENCODING, "replace"),
152
exc_stderr.decode(DEFAULT_ENCODING, "replace"))
153
super(ErrorReturnCode, self).__init__(msg)
156
class SignalException(ErrorReturnCode): pass
158
SIGNALS_THAT_SHOULD_THROW_EXCEPTION = (
167
# we subclass AttributeError because:
168
# https://github.com/ipython/ipython/issues/2577
169
# https://github.com/amoffat/sh/issues/97#issuecomment-10610629
170
class CommandNotFound(AttributeError): pass
172
rc_exc_regex = re.compile("(ErrorReturnCode|SignalException)_(\d+)")
177
try: return rc_exc_cache[rc]
178
except KeyError: pass
181
name = "ErrorReturnCode_%d" % rc
182
exc = type(name, (ErrorReturnCode,), {"exit_code": rc})
184
name = "SignalException_%d" % abs(rc)
185
exc = type(name, (SignalException,), {"exit_code": rc})
187
rc_exc_cache[rc] = exc
195
return os.path.exists(fpath) and os.access(fpath, os.X_OK)
197
fpath, fname = os.path.split(program)
199
if is_exe(program): return program
201
if "PATH" not in os.environ: return None
202
for path in os.environ["PATH"].split(os.pathsep):
203
exe_file = os.path.join(path, program)
209
def resolve_program(program):
210
path = which(program)
212
# our actual command might have a dash in it, but we can't call
213
# that from python (we have to use underscores), so we'll check
214
# if a dash version of our underscore command exists and use that
216
if "_" in program: path = which(program.replace("_", "-"))
217
if not path: return None
221
# we add this thin wrapper to glob.glob because of a specific edge case where
222
# glob does not expand to anything. for example, if you try to do
223
# glob.glob("*.py") and there are no *.py files in the directory, glob.glob
224
# returns an empty list. this empty list gets passed to the command, and
225
# then the command fails with a misleading error message. this thin wrapper
226
# ensures that if there is no expansion, we pass in the original argument,
227
# so that when the command fails, the error message is clearer
229
return original_glob(arg) or arg
233
class Logger(object):
234
def __init__(self, name, context=None):
237
if context: self.context = "%s: %%s" % context
238
self.log = logging.getLogger(name)
240
def info(self, msg, *args):
241
if not logging_enabled: return
242
self.log.info(self.context, msg % args)
244
def debug(self, msg, *args):
245
if not logging_enabled: return
246
self.log.debug(self.context, msg % args)
248
def error(self, msg, *args):
249
if not logging_enabled: return
250
self.log.error(self.context, msg % args)
252
def exception(self, msg, *args):
253
if not logging_enabled: return
254
self.log.exception(self.context, msg % args)
258
class RunningCommand(object):
259
def __init__(self, cmd, call_args, stdin, stdout, stderr):
261
if len(cmd) > truncate:
262
logger_str = "command %r...(%d more) call_args %r" % \
263
(cmd[:truncate], len(cmd) - truncate, call_args)
265
logger_str = "command %r call_args %r" % (cmd, call_args)
267
self.log = Logger("command", logger_str)
268
self.call_args = call_args
271
# self.ran is used for auditing what actually ran. for example, in
272
# exceptions, or if you just want to know what was ran after the
275
self.ran = " ".join([arg.decode(DEFAULT_ENCODING, "ignore") for arg in cmd])
277
self.ran = " ".join(cmd)
281
# this flag is for whether or not we've handled the exit code (like
282
# by raising an exception). this is necessary because .wait() is called
283
# from multiple places, and wait() triggers the exit code to be
284
# processed. but we don't want to raise multiple exceptions, only
285
# one (if any at all)
286
self._handled_exit_code = False
288
self.should_wait = True
292
# with contexts shouldn't run at all yet, they prepend
293
# to every command in the context
294
if call_args["with"]:
295
spawn_process = False
296
Command._prepend_stack.append(self)
299
if callable(call_args["out"]) or callable(call_args["err"]):
300
self.should_wait = False
302
if call_args["piped"] or call_args["iter"] or call_args["iter_noblock"]:
303
self.should_wait = False
305
# we're running in the background, return self and let us lazily
307
if call_args["bg"]: self.should_wait = False
310
if call_args["err_to_out"]: stderr = STDOUT
313
# set up which stream should write to the pipe
314
# TODO, make pipe None by default and limit the size of the Queue
317
if call_args["iter"] == "out" or call_args["iter"] is True: pipe = STDOUT
318
elif call_args["iter"] == "err": pipe = STDERR
320
if call_args["iter_noblock"] == "out" or call_args["iter_noblock"] is True: pipe = STDOUT
321
elif call_args["iter_noblock"] == "err": pipe = STDERR
325
self.log.debug("starting process")
326
self.process = OProc(cmd, stdin, stdout, stderr,
327
self.call_args, pipe=pipe)
334
self._handle_exit_code(self.process.wait())
337
# here we determine if we had an exception, or an error code that we weren't
338
# expecting to see. if we did, we create and raise an exception
339
def _handle_exit_code(self, code):
340
if self._handled_exit_code: return
341
self._handled_exit_code = True
343
if code not in self.call_args["ok_code"] and \
344
(code > 0 or -code in SIGNALS_THAT_SHOULD_THROW_EXCEPTION):
345
raise get_rc_exc(code)(
356
return self.process.stdout
361
return self.process.stderr
366
return self.process.exit_code
370
return self.process.pid
373
return len(str(self))
376
# we don't actually do anything here because anything that should
377
# have been done would have been done in the Command.__call__ call.
378
# essentially all that has to happen is the comand be pushed on
386
# we do this because if get blocks, we can't catch a KeyboardInterrupt
387
# so the slight timeout allows for that.
389
try: chunk = self.process._pipe_queue.get(True, 0.001)
391
if self.call_args["iter_noblock"]: return errno.EWOULDBLOCK
395
raise StopIteration()
396
try: return chunk.decode(self.call_args["encoding"],
397
self.call_args["decode_errors"])
398
except UnicodeDecodeError: return chunk
403
def __exit__(self, typ, value, traceback):
404
if self.call_args["with"] and Command._prepend_stack:
405
Command._prepend_stack.pop()
408
if IS_PY3: return self.__unicode__()
409
else: return unicode(self).encode(self.call_args["encoding"])
411
def __unicode__(self):
412
if self.process and self.stdout:
413
return self.stdout.decode(self.call_args["encoding"],
414
self.call_args["decode_errors"])
417
def __eq__(self, other):
418
return unicode(self) == unicode(other)
420
def __contains__(self, item):
421
return item in str(self)
423
def __getattr__(self, p):
424
# let these three attributes pass through to the OProc object
425
if p in ("signal", "terminate", "kill"):
426
if self.process: return getattr(self.process, p)
427
else: raise AttributeError
428
return getattr(unicode(self), p)
431
try: return str(self)
432
except UnicodeDecodeError:
434
if self.stdout: return repr(self.stdout)
438
return long(str(self).strip())
441
return float(str(self).strip())
444
return int(str(self).strip())
450
class Command(object):
454
# currently unsupported
455
#"fg": False, # run command in foreground
457
"bg": False, # run command in background
458
"with": False, # prepend the command to every command after it
460
"out": None, # redirect STDOUT
461
"err": None, # redirect STDERR
462
"err_to_out": None, # redirect STDERR to STDOUT
465
# 1 for line, 0 for unbuffered, any other number for that amount
467
# stdout buffer size, same values as above
471
# this is how big the output buffers will be for stdout and stderr.
472
# this is essentially how much output they will store from the process.
473
# we use a deque, so if it overflows past this amount, the first items
474
# get pushed off as each new item gets added.
477
# this is not a *BYTE* size, this is a *CHUNK* size...meaning, that if
478
# you're buffering out/err at 1024 bytes, the internal buffer size will
479
# be "internal_bufsize" CHUNKS of 1024 bytes
480
"internal_bufsize": 3 * 1024 ** 2,
485
"iter_noblock": None,
489
# the separator delimiting between a long-argument's name and its value
490
# for example, --arg=derp, '=' is the long_sep
493
# this is for programs that expect their input to be from a terminal.
494
# ssh is one of those programs
498
"encoding": DEFAULT_ENCODING,
499
"decode_errors": "strict",
501
# how long the process should run before it is auto-killed
504
# these control whether or not stdout/err will get aggregated together
505
# as the process runs. this has memory usage implications, so sometimes
506
# with long-running processes with a lot of data, it makes sense to
512
# if any redirection is used for stdout or stderr, internal buffering
513
# of that data is not stored. this forces it to be stored, as if
514
# the output is being T'd to both the redirected destination and our
519
# these are arguments that cannot be called together, because they wouldn't
521
_incompatible_call_args = (
522
#("fg", "bg", "Command can't be run in the foreground and background"),
523
("err", "err_to_out", "Stderr is already being redirected"),
524
("piped", "iter", "You cannot iterate when this command is being piped"),
528
# this method exists because of the need to have some way of letting
529
# manual object instantiation not perform the underscore-to-dash command
530
# conversion that resolve_program uses.
532
# there are 2 ways to create a Command object. using sh.Command(<program>)
533
# or by using sh.<program>. the method fed into sh.Command must be taken
534
# literally, and so no underscore-dash conversion is performed. the one
535
# for sh.<program> must do the underscore-dash converesion, because we
536
# can't type dashes in method names
538
def _create(cls, program, **default_kwargs):
539
path = resolve_program(program)
540
if not path: raise CommandNotFound(program)
544
cmd = cmd.bake(**default_kwargs)
549
def __init__(self, path):
552
raise CommandNotFound(path)
555
self._partial = False
556
self._partial_baked_args = []
557
self._partial_call_args = {}
559
# bugfix for functools.wraps. issue #121
560
self.__name__ = repr(self)
563
def __getattribute__(self, name):
565
getattr = partial(object.__getattribute__, self)
567
if name.startswith("_"): return getattr(name)
568
if name == "bake": return getattr("bake")
569
if name.endswith("_"): name = name[:-1]
571
return getattr("bake")(name)
575
def _extract_call_args(kwargs, to_override={}):
576
kwargs = kwargs.copy()
578
for parg, default in Command._call_args.items():
582
call_args[parg] = kwargs[key]
584
elif parg in to_override:
585
call_args[parg] = to_override[parg]
587
# test for incompatible call args
588
s1 = set(call_args.keys())
589
for args in Command._incompatible_call_args:
593
if s1.issuperset(args):
594
raise TypeError("Invalid special arguments %r: %s" % (args, error))
596
return call_args, kwargs
599
def _aggregate_keywords(self, keywords, sep, raw=False):
601
for k, v in keywords.items():
602
# we're passing a short arg as a kwarg, example:
606
processed.append(encode_to_py3bytes_or_py2str("-" + k))
608
processed.append(encode_to_py3bytes_or_py2str(v))
610
# we're doing a long arg
613
k = k.replace("_", "-")
616
processed.append(encode_to_py3bytes_or_py2str("--" + k))
620
arg = encode_to_py3bytes_or_py2str("--%s%s%s" % (k, sep, v))
621
processed.append(arg)
625
def _compile_args(self, args, kwargs, sep):
628
# aggregate positional args
630
if isinstance(arg, (list, tuple)):
632
warnings.warn("Empty list passed as an argument to %r. \
633
If you're using glob.glob(), please use sh.glob() instead." % self._path, stacklevel=3)
635
processed_args.append(encode_to_py3bytes_or_py2str(sub_arg))
636
elif isinstance(arg, dict):
637
processed_args += self._aggregate_keywords(arg, sep, raw=True)
639
processed_args.append(encode_to_py3bytes_or_py2str(arg))
641
# aggregate the keyword arguments
642
processed_args += self._aggregate_keywords(kwargs, sep)
644
return processed_args
647
# TODO needs documentation
648
def bake(self, *args, **kwargs):
649
fn = Command(self._path)
652
call_args, kwargs = self._extract_call_args(kwargs)
654
pruned_call_args = call_args
655
for k, v in Command._call_args.items():
657
if pruned_call_args[k] == v:
658
del pruned_call_args[k]
659
except KeyError: continue
661
fn._partial_call_args.update(self._partial_call_args)
662
fn._partial_call_args.update(pruned_call_args)
663
fn._partial_baked_args.extend(self._partial_baked_args)
664
sep = pruned_call_args.get("long_sep", self._call_args["long_sep"])
665
fn._partial_baked_args.extend(self._compile_args(args, kwargs, sep))
670
return self.__unicode__()
672
return unicode(self).encode(DEFAULT_ENCODING)
675
def __eq__(self, other):
676
try: return str(self) == str(other)
681
return "<Command %r>" % str(self)
684
def __unicode__(self):
685
baked_args = " ".join(self._partial_baked_args)
687
baked_args = " " + baked_args
688
return self._path + baked_args
693
def __exit__(self, typ, value, traceback):
694
Command._prepend_stack.pop()
697
def __call__(self, *args, **kwargs):
698
kwargs = kwargs.copy()
703
# aggregate any 'with' contexts
704
call_args = Command._call_args.copy()
705
for prepend in self._prepend_stack:
706
# don't pass the 'with' call arg
707
pcall_args = prepend.call_args.copy()
708
try: del pcall_args["with"]
711
call_args.update(pcall_args)
712
cmd.extend(prepend.cmd)
715
cmd.append(bytes(self._path, call_args["encoding"]))
717
cmd.append(self._path)
719
# here we extract the special kwargs and override any
720
# special kwargs from the possibly baked command
721
tmp_call_args, kwargs = self._extract_call_args(kwargs, self._partial_call_args)
722
call_args.update(tmp_call_args)
724
if not isinstance(call_args["ok_code"], (tuple, list)):
725
call_args["ok_code"] = [call_args["ok_code"]]
728
# check if we're piping via composition
729
stdin = call_args["in"]
731
first_arg = args.pop(0)
732
if isinstance(first_arg, RunningCommand):
733
# it makes sense that if the input pipe of a command is running
734
# in the background, then this command should run in the
736
if first_arg.call_args["bg"]: call_args["bg"] = True
737
stdin = first_arg.process._pipe_queue
740
args.insert(0, first_arg)
742
processed_args = self._compile_args(args, kwargs, call_args["long_sep"])
744
# makes sure our arguments are broken up correctly
745
split_args = self._partial_baked_args + processed_args
747
final_args = split_args
749
cmd.extend(final_args)
753
stdout = call_args["out"]
755
and not callable(stdout) \
756
and not hasattr(stdout, "write") \
757
and not isinstance(stdout, (cStringIO, StringIO)):
759
stdout = open(str(stdout), "wb")
763
stderr = call_args["err"]
764
if stderr and not callable(stderr) and not hasattr(stderr, "write") \
765
and not isinstance(stderr, (cStringIO, StringIO)):
766
stderr = open(str(stderr), "wb")
769
return RunningCommand(cmd, call_args, stdin, stdout, stderr)
774
# used in redirecting
780
# Process open = Popen
781
# Open Process = OProc
783
_procs_to_cleanup = set()
784
_registered_cleanup = False
785
_default_window_size = (24, 80)
787
def __init__(self, cmd, stdin, stdout, stderr, call_args,
788
persist=True, pipe=STDOUT):
790
self.call_args = call_args
792
self._single_tty = self.call_args["tty_in"] and self.call_args["tty_out"]
794
# this logic is a little convoluted, but basically this top-level
795
# if/else is for consolidating input and output TTYs into a single
796
# TTY. this is the only way some secure programs like ssh will
797
# output correctly (is if stdout and stdin are both the same TTY)
799
self._stdin_fd, self._slave_stdin_fd = pty.openpty()
801
self._stdout_fd = self._stdin_fd
802
self._slave_stdout_fd = self._slave_stdin_fd
804
self._stderr_fd = self._stdin_fd
805
self._slave_stderr_fd = self._slave_stdin_fd
807
# do not consolidate stdin and stdout
809
if self.call_args["tty_in"]:
810
self._slave_stdin_fd, self._stdin_fd = pty.openpty()
812
self._slave_stdin_fd, self._stdin_fd = os.pipe()
814
# tty_out is usually the default
815
if self.call_args["tty_out"]:
816
self._stdout_fd, self._slave_stdout_fd = pty.openpty()
818
self._stdout_fd, self._slave_stdout_fd = os.pipe()
820
# unless STDERR is going to STDOUT, it ALWAYS needs to be a pipe,
821
# and never a PTY. the reason for this is not totally clear to me,
822
# but it has to do with the fact that if STDERR isn't set as the
823
# CTTY (because STDOUT is), the STDERR buffer won't always flush
824
# by the time the process exits, and the data will be lost.
825
# i've only seen this on OSX.
826
if stderr is not STDOUT:
827
self._stderr_fd, self._slave_stderr_fd = os.pipe()
829
gc_enabled = gc.isenabled()
830
if gc_enabled: gc.disable()
836
# ignoring SIGHUP lets us persist even after the parent process
838
signal.signal(signal.SIGHUP, signal.SIG_IGN)
840
# this piece of ugliness is due to a bug where we can lose output
841
# if we do os.close(self._slave_stdout_fd) in the parent after
842
# the child starts writing.
843
# see http://bugs.python.org/issue15898
849
if self.call_args["tty_out"]:
850
# set raw mode, so there isn't any weird translation of newlines
851
# to \r\n and other oddities. we're not outputting to a terminal
854
# we HAVE to do this here, and not in the parent thread, because
855
# we have to guarantee that this is set before the child process
856
# is run, and we can't do it twice.
857
tty.setraw(self._stdout_fd)
860
os.close(self._stdin_fd)
861
if not self._single_tty:
862
os.close(self._stdout_fd)
863
if stderr is not STDOUT: os.close(self._stderr_fd)
866
if self.call_args["cwd"]: os.chdir(self.call_args["cwd"])
867
os.dup2(self._slave_stdin_fd, 0)
868
os.dup2(self._slave_stdout_fd, 1)
870
# we're not directing stderr to stdout? then set self._slave_stderr_fd to
871
# fd 2, the common stderr fd
872
if stderr is STDOUT: os.dup2(self._slave_stdout_fd, 2)
873
else: os.dup2(self._slave_stderr_fd, 2)
875
# don't inherit file descriptors
876
max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[0]
877
os.closerange(3, max_fd)
880
# set our controlling terminal
881
if self.call_args["tty_out"]:
882
tmp_fd = os.open(os.ttyname(1), os.O_RDWR)
886
if self.call_args["tty_out"]:
889
# actually execute the process
890
if self.call_args["env"] is None:
891
os.execv(cmd[0], cmd)
893
os.execve(cmd[0], cmd, self.call_args["env"])
899
if gc_enabled: gc.enable()
901
if not OProc._registered_cleanup:
902
atexit.register(OProc._cleanup_procs)
903
OProc._registered_cleanup = True
906
self.started = _time.time()
908
self.exit_code = None
910
self.stdin = stdin or Queue()
911
self._pipe_queue = Queue()
913
# this is used to prevent a race condition when we're waiting for
914
# a process to end, and the OProc's internal threads are also checking
915
# for the processes's end
916
self._wait_lock = threading.Lock()
918
# these are for aggregating the stdout and stderr. we use a deque
919
# because we don't want to overflow
920
self._stdout = deque(maxlen=self.call_args["internal_bufsize"])
921
self._stderr = deque(maxlen=self.call_args["internal_bufsize"])
923
if self.call_args["tty_in"]: self.setwinsize(self._stdin_fd)
926
self.log = Logger("process", repr(self))
928
os.close(self._slave_stdin_fd)
929
if not self._single_tty:
930
os.close(self._slave_stdout_fd)
931
if stderr is not STDOUT: os.close(self._slave_stderr_fd)
933
self.log.debug("started process")
935
OProc._procs_to_cleanup.add(self)
938
if self.call_args["tty_in"]:
939
attr = termios.tcgetattr(self._stdin_fd)
940
attr[3] &= ~termios.ECHO
941
termios.tcsetattr(self._stdin_fd, termios.TCSANOW, attr)
943
# this represents the connection from a Queue object (or whatever
944
# we're using to feed STDIN) to the process's STDIN fd
945
self._stdin_stream = StreamWriter("stdin", self, self._stdin_fd,
946
self.stdin, self.call_args["in_bufsize"])
950
if pipe is STDOUT and not self.call_args["no_pipe"]:
951
stdout_pipe = self._pipe_queue
953
# this represents the connection from a process's STDOUT fd to
954
# wherever it has to go, sometimes a pipe Queue (that we will use
955
# to pipe data to other processes), and also an internal deque
956
# that we use to aggregate all the output
957
save_stdout = not self.call_args["no_out"] and \
958
(self.call_args["tee"] in (True, "out") or stdout is None)
959
self._stdout_stream = StreamReader("stdout", self, self._stdout_fd, stdout,
960
self._stdout, self.call_args["out_bufsize"], stdout_pipe,
961
save_data=save_stdout)
964
if stderr is STDOUT or self._single_tty: self._stderr_stream = None
967
if pipe is STDERR and not self.call_args["no_pipe"]:
968
stderr_pipe = self._pipe_queue
970
save_stderr = not self.call_args["no_err"] and \
971
(self.call_args["tee"] in ("err",) or stderr is None)
972
self._stderr_stream = StreamReader("stderr", self, self._stderr_fd, stderr,
973
self._stderr, self.call_args["err_bufsize"], stderr_pipe,
974
save_data=save_stderr)
976
# start the main io threads
977
self._input_thread = self._start_thread(self.input_thread, self._stdin_stream)
978
self._output_thread = self._start_thread(self.output_thread, self._stdout_stream, self._stderr_stream)
982
return "<Process %d %r>" % (self.pid, self.cmd[:500])
985
# also borrowed from pexpect.py
988
rows, cols = OProc._default_window_size
989
TIOCSWINSZ = getattr(termios, 'TIOCSWINSZ', -2146929561)
990
if TIOCSWINSZ == 2148037735: # L is not required in Python >= 2.2.
991
TIOCSWINSZ = -2146929561 # Same bits, but with sign.
993
s = struct.pack('HHHH', rows, cols, 0, 0)
994
fcntl.ioctl(fd, TIOCSWINSZ, s)
998
def _start_thread(fn, *args):
999
thrd = threading.Thread(target=fn, args=args)
1004
def in_bufsize(self, buf):
1005
self._stdin_stream.stream_bufferer.change_buffering(buf)
1007
def out_bufsize(self, buf):
1008
self._stdout_stream.stream_bufferer.change_buffering(buf)
1010
def err_bufsize(self, buf):
1011
if self._stderr_stream:
1012
self._stderr_stream.stream_bufferer.change_buffering(buf)
1015
def input_thread(self, stdin):
1017
while not done and self.alive:
1018
self.log.debug("%r ready for more input", stdin)
1019
done = stdin.write()
1024
def output_thread(self, stdout, stderr):
1028
if stdout is not None:
1029
readers.append(stdout)
1030
errors.append(stdout)
1031
if stderr is not None:
1032
readers.append(stderr)
1033
errors.append(stderr)
1036
outputs, inputs, err = select.select(readers, [], errors, 0.1)
1039
for stream in outputs:
1040
self.log.debug("%r ready to be read from", stream)
1041
done = stream.read()
1042
if done: readers.remove(stream)
1047
# test if the process has been running too long
1048
if self.call_args["timeout"]:
1050
if now - self.started > self.call_args["timeout"]:
1051
self.log.debug("we've been running too long")
1055
# this is here because stdout may be the controlling TTY, and
1056
# we can't close it until the process has ended, otherwise the
1057
# child will get SIGHUP. typically, if we've broken out of
1058
# the above loop, and we're here, the process is just about to
1059
# end, so it's probably ok to aggressively poll self.alive
1061
# the other option to this would be to do the CTTY close from
1062
# the method that does the actual os.waitpid() call, but the
1063
# problem with that is that the above loop might still be
1064
# running, and closing the fd will cause some operation to
1065
# fail. this is less complex than wrapping all the ops
1066
# in the above loop with out-of-band fd-close exceptions
1079
return "".encode(self.call_args["encoding"]).join(self._stdout)
1083
return "".encode(self.call_args["encoding"]).join(self._stderr)
1086
def signal(self, sig):
1087
self.log.debug("sending signal %d", sig)
1088
try: os.kill(self.pid, sig)
1089
except OSError: pass
1092
self.log.debug("killing")
1093
self.signal(signal.SIGKILL)
1095
def terminate(self):
1096
self.log.debug("terminating")
1097
self.signal(signal.SIGTERM)
1100
def _cleanup_procs():
1101
for proc in OProc._procs_to_cleanup:
1105
def _handle_exit_code(self, exit_code):
1106
# if we exited from a signal, let our exit code reflect that
1107
if os.WIFSIGNALED(exit_code): return -os.WTERMSIG(exit_code)
1108
# otherwise just give us a normal exit code
1109
elif os.WIFEXITED(exit_code): return os.WEXITSTATUS(exit_code)
1110
else: raise RuntimeError("Unknown child exit status!")
1114
if self.exit_code is not None: return False
1116
# what we're doing here essentially is making sure that the main thread
1117
# (or another thread), isn't calling .wait() on the process. because
1118
# .wait() calls os.waitpid(self.pid, 0), we can't do an os.waitpid
1119
# here...because if we did, and the process exited while in this
1120
# thread, the main thread's os.waitpid(self.pid, 0) would raise OSError
1121
# (because the process ended in another thread).
1123
# so essentially what we're doing is, using this lock, checking if
1124
# we're calling .wait(), and if we are, let .wait() get the exit code
1125
# and handle the status, otherwise let us do it.
1126
acquired = self._wait_lock.acquire(False)
1128
if self.exit_code is not None: return False
1132
# WNOHANG is just that...we're calling waitpid without hanging...
1133
# essentially polling the process
1134
pid, exit_code = os.waitpid(self.pid, os.WNOHANG)
1136
self.exit_code = self._handle_exit_code(exit_code)
1140
except OSError: return False
1142
finally: self._wait_lock.release()
1146
self.log.debug("acquiring wait lock to wait for completion")
1147
with self._wait_lock:
1148
self.log.debug("got wait lock")
1150
if self.exit_code is None:
1151
self.log.debug("exit code not set, waiting on pid")
1152
pid, exit_code = os.waitpid(self.pid, 0)
1153
self.exit_code = self._handle_exit_code(exit_code)
1155
self.log.debug("exit code already set (%d), no need to wait", self.exit_code)
1157
self._input_thread.join()
1158
self._output_thread.join()
1160
OProc._procs_to_cleanup.discard(self)
1162
return self.exit_code
1167
class DoneReadingStdin(Exception): pass
1168
class NoStdinData(Exception): pass
1172
# this guy is for reading from some input (the stream) and writing to our
1173
# opened process's stdin fd. the stream can be a Queue, a callable, something
1174
# with the "read" method, a string, or an iterable
1175
class StreamWriter(object):
1176
def __init__(self, name, process, stream, stdin, bufsize):
1178
self.process = weakref.ref(process)
1179
self.stream = stream
1182
self.log = Logger("streamwriter", repr(self))
1185
self.stream_bufferer = StreamBufferer(self.process().call_args["encoding"],
1188
# determine buffering for reading from the input we set for stdin
1189
if bufsize == 1: self.bufsize = 1024
1190
elif bufsize == 0: self.bufsize = 1
1191
else: self.bufsize = bufsize
1194
if isinstance(stdin, Queue):
1196
self.get_chunk = self.get_queue_chunk
1198
elif callable(stdin):
1199
log_msg = "callable"
1200
self.get_chunk = self.get_callable_chunk
1202
# also handles stringio
1203
elif hasattr(stdin, "read"):
1204
log_msg = "file descriptor"
1205
self.get_chunk = self.get_file_chunk
1207
elif isinstance(stdin, basestring):
1211
# TODO, make the split() be a generator
1212
self.stdin = iter((c + "\n" for c in stdin.split("\n")))
1214
self.stdin = iter(stdin[i:i + self.bufsize] for i in range(0, len(stdin), self.bufsize))
1215
self.get_chunk = self.get_iter_chunk
1218
log_msg = "general iterable"
1219
self.stdin = iter(stdin)
1220
self.get_chunk = self.get_iter_chunk
1222
self.log.debug("parsed stdin as a %s", log_msg)
1226
return "<StreamWriter %s for %r>" % (self.name, self.process())
1231
def get_queue_chunk(self):
1232
try: chunk = self.stdin.get(True, 0.01)
1233
except Empty: raise NoStdinData
1234
if chunk is None: raise DoneReadingStdin
1237
def get_callable_chunk(self):
1238
try: return self.stdin()
1239
except: raise DoneReadingStdin
1241
def get_iter_chunk(self):
1243
if IS_PY3: return self.stdin.__next__()
1244
else: return self.stdin.next()
1245
except StopIteration: raise DoneReadingStdin
1247
def get_file_chunk(self):
1248
if self.stream_bufferer.type == 1: chunk = self.stdin.readline()
1249
else: chunk = self.stdin.read(self.bufsize)
1250
if not chunk: raise DoneReadingStdin
1254
# the return value answers the questions "are we done writing forever?"
1256
# get_chunk may sometimes return bytes, and sometimes returns trings
1257
# because of the nature of the different types of STDIN objects we
1259
try: chunk = self.get_chunk()
1260
except DoneReadingStdin:
1261
self.log.debug("done reading")
1263
if self.process().call_args["tty_in"]:
1265
try: char = termios.tcgetattr(self.stream)[6][termios.VEOF]
1266
except: char = chr(4).encode()
1267
os.write(self.stream, char)
1272
self.log.debug("received no data")
1275
# if we're not bytes, make us bytes
1276
if IS_PY3 and hasattr(chunk, "encode"):
1277
chunk = chunk.encode(self.process().call_args["encoding"])
1279
for chunk in self.stream_bufferer.process(chunk):
1280
self.log.debug("got chunk size %d: %r", len(chunk), chunk[:30])
1282
self.log.debug("writing chunk to process")
1284
os.write(self.stream, chunk)
1286
self.log.debug("OSError writing stdin chunk")
1291
self.log.debug("closing, but flushing first")
1292
chunk = self.stream_bufferer.flush()
1293
self.log.debug("got chunk size %d to flush: %r", len(chunk), chunk[:30])
1295
if chunk: os.write(self.stream, chunk)
1296
if not self.process().call_args["tty_in"]:
1297
self.log.debug("we used a TTY, so closing the stream")
1298
os.close(self.stream)
1299
except OSError: pass
1303
class StreamReader(object):
1304
def __init__(self, name, process, stream, handler, buffer, bufsize,
1305
pipe_queue=None, save_data=True):
1307
self.process = weakref.ref(process)
1308
self.stream = stream
1309
self.buffer = buffer
1310
self.save_data = save_data
1311
self.encoding = process.call_args["encoding"]
1312
self.decode_errors = process.call_args["decode_errors"]
1314
self.pipe_queue = None
1315
if pipe_queue: self.pipe_queue = weakref.ref(pipe_queue)
1317
self.log = Logger("streamreader", repr(self))
1319
self.stream_bufferer = StreamBufferer(self.encoding, bufsize,
1322
# determine buffering
1323
if bufsize == 1: self.bufsize = 1024
1324
elif bufsize == 0: self.bufsize = 1
1325
else: self.bufsize = bufsize
1328
# here we're determining the handler type by doing some basic checks
1329
# on the handler object
1330
self.handler = handler
1331
if callable(handler): self.handler_type = "fn"
1332
elif isinstance(handler, StringIO): self.handler_type = "stringio"
1333
elif isinstance(handler, cStringIO):
1334
self.handler_type = "cstringio"
1335
elif hasattr(handler, "write"): self.handler_type = "fd"
1336
else: self.handler_type = None
1339
self.should_quit = False
1341
# here we choose how to call the callback, depending on how many
1342
# arguments it takes. the reason for this is to make it as easy as
1343
# possible for people to use, without limiting them. a new user will
1344
# assume the callback takes 1 argument (the data). as they get more
1345
# advanced, they may want to terminate the process, or pass some stdin
1346
# back, and will realize that they can pass a callback of more args
1347
if self.handler_type == "fn":
1349
if inspect.ismethod(handler):
1351
num_args = len(inspect.getargspec(handler).args)
1354
if inspect.isfunction(handler):
1355
num_args = len(inspect.getargspec(handler).args)
1357
# is an object instance with __call__ method
1360
num_args = len(inspect.getargspec(handler.__call__).args)
1363
self.handler_args = ()
1364
if num_args == implied_arg + 2:
1365
self.handler_args = (self.process().stdin,)
1366
elif num_args == implied_arg + 3:
1367
self.handler_args = (self.process().stdin, self.process)
1374
return "<StreamReader %s for %r>" % (self.name, self.process())
1377
chunk = self.stream_bufferer.flush()
1378
self.log.debug("got chunk size %d to flush: %r",
1379
len(chunk), chunk[:30])
1380
if chunk: self.write_chunk(chunk)
1382
if self.handler_type == "fd" and hasattr(self.handler, "close"):
1383
self.handler.flush()
1385
if self.pipe_queue and self.save_data: self.pipe_queue().put(None)
1386
try: os.close(self.stream)
1387
except OSError: pass
1390
def write_chunk(self, chunk):
1391
# in PY3, the chunk coming in will be bytes, so keep that in mind
1393
if self.handler_type == "fn" and not self.should_quit:
1394
# try to use the encoding first, if that doesn't work, send
1395
# the bytes, because it might be binary
1397
to_handler = chunk.decode(self.encoding, self.decode_errors)
1398
except UnicodeDecodeError:
1401
# this is really ugly, but we can't store self.process as one of
1402
# the handler args in self.handler_args, the reason being is that
1403
# it would create cyclic references, and prevent objects from
1404
# being garbage collected. so we're determining if this handler
1405
# even requires self.process (by the argument count), and if it
1406
# does, resolving the weakref to a hard reference and passing
1407
# that into the handler
1408
handler_args = self.handler_args
1409
if len(self.handler_args) == 2:
1410
handler_args = (self.handler_args[0], self.process())
1411
self.should_quit = self.handler(to_handler, *handler_args)
1413
elif self.handler_type == "stringio":
1414
self.handler.write(chunk.decode(self.encoding, self.decode_errors))
1416
elif self.handler_type in ("cstringio", "fd"):
1417
self.handler.write(chunk)
1419
# we should flush on an fd. chunk is already the correctly-buffered
1420
# size, so we don't need the fd buffering as well
1421
self.handler.flush()
1424
self.buffer.append(chunk)
1427
self.log.debug("putting chunk onto pipe: %r", chunk[:30])
1428
self.pipe_queue().put(chunk)
1432
# if we're PY3, we're reading bytes, otherwise we're reading
1434
try: chunk = os.read(self.stream, self.bufsize)
1435
except OSError as e:
1436
self.log.debug("got errno %d, done reading", e.errno)
1439
self.log.debug("got no chunk, done reading")
1442
self.log.debug("got chunk size %d: %r", len(chunk), chunk[:30])
1443
for chunk in self.stream_bufferer.process(chunk):
1444
self.write_chunk(chunk)
1449
# this is used for feeding in chunks of stdout/stderr, and breaking it up into
1450
# chunks that will actually be put into the internal buffers. for example, if
1451
# you have two processes, one being piped to the other, and you want that,
1452
# first process to feed lines of data (instead of the chunks however they
1453
# come in), OProc will use an instance of this class to chop up the data and
1454
# feed it as lines to be sent down the pipe
1455
class StreamBufferer(object):
1456
def __init__(self, encoding=DEFAULT_ENCODING, buffer_type=1,
1457
decode_errors="strict"):
1458
# 0 for unbuffered, 1 for line, everything else for that amount
1459
self.type = buffer_type
1461
self.n_buffer_count = 0
1462
self.encoding = encoding
1463
self.decode_errors = decode_errors
1465
# this is for if we change buffering types. if we change from line
1466
# buffered to unbuffered, its very possible that our self.buffer list
1467
# has data that was being saved up (while we searched for a newline).
1468
# we need to use that up, so we don't lose it
1469
self._use_up_buffer_first = False
1471
# the buffering lock is used because we might chance the buffering
1472
# types from a different thread. for example, if we have a stdout
1473
# callback, we might use it to change the way stdin buffers. so we
1475
self._buffering_lock = threading.RLock()
1476
self.log = Logger("stream_bufferer")
1479
def change_buffering(self, new_type):
1480
# TODO, when we stop supporting 2.6, make this a with context
1481
self.log.debug("acquiring buffering lock for changing buffering")
1482
self._buffering_lock.acquire()
1483
self.log.debug("got buffering lock for changing buffering")
1485
if new_type == 0: self._use_up_buffer_first = True
1487
self.type = new_type
1489
self._buffering_lock.release()
1490
self.log.debug("released buffering lock for changing buffering")
1493
def process(self, chunk):
1494
# MAKE SURE THAT THE INPUT IS PY3 BYTES
1495
# THE OUTPUT IS ALWAYS PY3 BYTES
1497
# TODO, when we stop supporting 2.6, make this a with context
1498
self.log.debug("acquiring buffering lock to process chunk (buffering: %d)", self.type)
1499
self._buffering_lock.acquire()
1500
self.log.debug("got buffering lock to process chunk (buffering: %d)", self.type)
1502
# we've encountered binary, permanently switch to N size buffering
1503
# since matching on newline doesn't make sense anymore
1505
try: chunk.decode(self.encoding, self.decode_errors)
1507
self.log.debug("detected binary data, changing buffering")
1508
self.change_buffering(1024)
1512
if self._use_up_buffer_first:
1513
self._use_up_buffer_first = False
1514
to_write = self.buffer
1516
to_write.append(chunk)
1522
elif self.type == 1:
1524
chunk = chunk.decode(self.encoding, self.decode_errors)
1526
newline = chunk.find("\n")
1527
if newline == -1: break
1529
chunk_to_write = chunk[:newline + 1]
1531
# this is ugly, but it's designed to take the existing
1532
# bytes buffer, join it together, tack on our latest
1533
# chunk, then convert the whole thing to a string.
1534
# it's necessary, i'm sure. read the whole block to
1536
chunk_to_write = "".encode(self.encoding).join(self.buffer) \
1537
+ chunk_to_write.encode(self.encoding)
1538
chunk_to_write = chunk_to_write.decode(self.encoding)
1541
self.n_buffer_count = 0
1543
chunk = chunk[newline + 1:]
1544
total_to_write.append(chunk_to_write.encode(self.encoding))
1547
self.buffer.append(chunk.encode(self.encoding))
1548
self.n_buffer_count += len(chunk)
1549
return total_to_write
1555
overage = self.n_buffer_count + len(chunk) - self.type
1557
ret = "".encode(self.encoding).join(self.buffer) + chunk
1558
chunk_to_write = ret[:self.type]
1559
chunk = ret[self.type:]
1560
total_to_write.append(chunk_to_write)
1562
self.n_buffer_count = 0
1564
self.buffer.append(chunk)
1565
self.n_buffer_count += len(chunk)
1567
return total_to_write
1569
self._buffering_lock.release()
1570
self.log.debug("released buffering lock for processing chunk (buffering: %d)", self.type)
1574
self.log.debug("acquiring buffering lock for flushing buffer")
1575
self._buffering_lock.acquire()
1576
self.log.debug("got buffering lock for flushing buffer")
1578
ret = "".encode(self.encoding).join(self.buffer)
1582
self._buffering_lock.release()
1583
self.log.debug("released buffering lock for flushing buffer")
1589
# this allows lookups to names that aren't found in the global scope to be
1590
# searched for as a program name. for example, if "ls" isn't found in this
1591
# module's scope, we consider it a system program and try to find it.
1593
# we use a dict instead of just a regular object as the base class because
1594
# the exec() statement used in this file requires the "globals" argument to
1596
class Environment(dict):
1597
def __init__(self, globs, baked_args={}):
1599
self.baked_args = baked_args
1601
def __setitem__(self, k, v):
1604
def __getitem__(self, k):
1605
try: return self.globs[k]
1606
except KeyError: pass
1608
# the only way we'd get to here is if we've tried to
1609
# import * from a repl. so, raise an exception, since
1610
# that's really the only sensible thing to do
1612
raise ImportError("Cannot import * from sh. \
1613
Please import sh or import programs individually.")
1615
# if we end with "_" just go ahead and skip searching
1616
# our namespace for python stuff. this was mainly for the
1617
# command "id", which is a popular program for finding
1618
# if a user exists, but also a python function for getting
1619
# the address of an object. so can call the python
1620
# version by "id" and the program version with "id_"
1621
if not k.endswith("_"):
1622
# check if we're naming a dynamically generated ReturnCode exception
1623
try: return rc_exc_cache[k]
1625
m = rc_exc_regex.match(k)
1627
exit_code = int(m.group(2))
1628
if m.group(1) == "SignalException":
1629
exit_code = -exit_code
1630
return get_rc_exc(exit_code)
1634
return getattr(self["__builtins__"], k)
1635
except AttributeError:
1637
elif not k.startswith("_"):
1641
# https://github.com/ipython/ipython/issues/2577
1642
# https://github.com/amoffat/sh/issues/97#issuecomment-10610629
1643
if k.startswith("__") and k.endswith("__"):
1644
raise AttributeError
1646
# how about an environment variable?
1648
return os.environ[k]
1652
# is it a custom builtin?
1653
builtin = getattr(self, "b_" + k, None)
1654
if builtin: return builtin
1656
# it must be a command then
1657
# we use _create instead of instantiating the class directly because
1658
# _create uses resolve_program, which will automatically do underscore-
1659
# to-dash conversions. instantiating directly does not use that
1660
return Command._create(k, **self.baked_args)
1663
# methods that begin with "b_" are custom builtins and will override any
1664
# program that exists in our path. this is useful for things like
1665
# common shell builtins that people are used to, but which aren't actually
1666
# full-fledged system binaries
1668
def b_cd(self, path):
1671
def b_which(self, program):
1672
return which(program)
1678
banner = "\n>> sh v{version}\n>> https://github.com/amoffat/sh\n"
1680
print(banner.format(version=__version__))
1682
try: line = raw_input("sh> ")
1683
except (ValueError, EOFError): break
1685
try: exec(compile(line, "<dummy>", "single"), env, env)
1686
except SystemExit: break
1687
except: print(traceback.format_exc())
1689
# cleans up our last line
1695
# this is a thin wrapper around THIS module (we patch sys.modules[__name__]).
1696
# this is in the case that the user does a "from sh import whatever"
1697
# in other words, they only want to import certain programs, not the whole
1698
# system PATH worth of commands. in this case, we just proxy the
1699
# import lookup to our Environment class
1700
class SelfWrapper(ModuleType):
1701
def __init__(self, self_module, baked_args={}):
1702
# this is super ugly to have to copy attributes like this,
1703
# but it seems to be the only way to make reload() behave
1704
# nicely. if i make these attributes dynamic lookups in
1705
# __getattr__, reload sometimes chokes in weird ways...
1706
for attr in ["__builtins__", "__doc__", "__name__", "__package__"]:
1707
setattr(self, attr, getattr(self_module, attr, None))
1709
# python 3.2 (2.7 and 3.3 work fine) breaks on osx (not ubuntu)
1710
# if we set this to None. and 3.3 needs a value for __path__
1712
self.self_module = self_module
1713
self.env = Environment(globals(), baked_args)
1715
def __setattr__(self, name, value):
1716
if hasattr(self, "env"): self.env[name] = value
1717
ModuleType.__setattr__(self, name, value)
1719
def __getattr__(self, name):
1720
if name == "env": raise AttributeError
1721
return self.env[name]
1723
# accept special keywords argument to define defaults for all operations
1724
# that will be processed with given by return SelfWrapper
1725
def __call__(self, **kwargs):
1726
return SelfWrapper(self.self_module, kwargs)
1731
# we're being run as a stand-alone script
1732
if __name__ == "__main__":
1734
arg = sys.argv.pop(1)
1741
def run_test(version, locale):
1742
py_version = "python%s" % version
1743
py_bin = which(py_version)
1746
print("Testing %s, locale %r" % (py_version.capitalize(),
1749
env = os.environ.copy()
1750
env["LC_ALL"] = locale
1751
p = subprocess.Popen([py_bin, os.path.join(THIS_DIR, "test.py")]
1752
+ sys.argv[1:], env=env)
1755
print("Couldn't find %s, skipping" % py_version.capitalize())
1757
versions = ("2.6", "2.7", "3.1", "3.2", "3.3")
1758
locales = ("en_US.UTF-8", "C")
1759
for locale in locales:
1760
for version in versions:
1761
run_test(version, locale)
1764
env = Environment(globals())
1767
# we're being imported from somewhere
1769
self = sys.modules[__name__]
1770
sys.modules[__name__] = SelfWrapper(self)