Package paramiko :: Module channel
[frames] | no frames]

Source Code for Module paramiko.channel

   1  # Copyright (C) 2003-2007  Robey Pointer <robeypointer@gmail.com> 
   2  # 
   3  # This file is part of paramiko. 
   4  # 
   5  # Paramiko is free software; you can redistribute it and/or modify it under the 
   6  # terms of the GNU Lesser General Public License as published by the Free 
   7  # Software Foundation; either version 2.1 of the License, or (at your option) 
   8  # any later version. 
   9  # 
  10  # Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY 
  11  # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 
  12  # A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more 
  13  # details. 
  14  # 
  15  # You should have received a copy of the GNU Lesser General Public License 
  16  # along with Paramiko; if not, write to the Free Software Foundation, Inc., 
  17  # 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA. 
  18   
  19  """ 
  20  Abstraction for an SSH2 channel. 
  21  """ 
  22   
  23  import binascii 
  24  import sys 
  25  import time 
  26  import threading 
  27  import socket 
  28  import os 
  29   
  30  from paramiko.common import * 
  31  from paramiko import util 
  32  from paramiko.message import Message 
  33  from paramiko.ssh_exception import SSHException 
  34  from paramiko.file import BufferedFile 
  35  from paramiko.buffered_pipe import BufferedPipe, PipeTimeout 
  36  from paramiko import pipe 
  37   
  38   
  39  # lower bound on the max packet size we'll accept from the remote host 
  40  MIN_PACKET_SIZE = 1024 
  41   
  42   
43 -class Channel (object):
44 """ 45 A secure tunnel across an SSH L{Transport}. A Channel is meant to behave 46 like a socket, and has an API that should be indistinguishable from the 47 python socket API. 48 49 Because SSH2 has a windowing kind of flow control, if you stop reading data 50 from a Channel and its buffer fills up, the server will be unable to send 51 you any more data until you read some of it. (This won't affect other 52 channels on the same transport -- all channels on a single transport are 53 flow-controlled independently.) Similarly, if the server isn't reading 54 data you send, calls to L{send} may block, unless you set a timeout. This 55 is exactly like a normal network socket, so it shouldn't be too surprising. 56 """ 57
58 - def __init__(self, chanid):
59 """ 60 Create a new channel. The channel is not associated with any 61 particular session or L{Transport} until the Transport attaches it. 62 Normally you would only call this method from the constructor of a 63 subclass of L{Channel}. 64 65 @param chanid: the ID of this channel, as passed by an existing 66 L{Transport}. 67 @type chanid: int 68 """ 69 self.chanid = chanid 70 self.remote_chanid = 0 71 self.transport = None 72 self.active = False 73 self.eof_received = 0 74 self.eof_sent = 0 75 self.in_buffer = BufferedPipe() 76 self.in_stderr_buffer = BufferedPipe() 77 self.timeout = None 78 self.closed = False 79 self.ultra_debug = False 80 self.lock = threading.Lock() 81 self.out_buffer_cv = threading.Condition(self.lock) 82 self.in_window_size = 0 83 self.out_window_size = 0 84 self.in_max_packet_size = 0 85 self.out_max_packet_size = 0 86 self.in_window_threshold = 0 87 self.in_window_sofar = 0 88 self.status_event = threading.Event() 89 self._name = str(chanid) 90 self.logger = util.get_logger('paramiko.transport') 91 self._pipe = None 92 self.event = threading.Event() 93 self.event_ready = False 94 self.combine_stderr = False 95 self.exit_status = -1 96 self.origin_addr = None
97
98 - def __del__(self):
99 try: 100 self.close() 101 except: 102 pass
103
104 - def __repr__(self):
105 """ 106 Return a string representation of this object, for debugging. 107 108 @rtype: str 109 """ 110 out = '<paramiko.Channel %d' % self.chanid 111 if self.closed: 112 out += ' (closed)' 113 elif self.active: 114 if self.eof_received: 115 out += ' (EOF received)' 116 if self.eof_sent: 117 out += ' (EOF sent)' 118 out += ' (open) window=%d' % (self.out_window_size) 119 if len(self.in_buffer) > 0: 120 out += ' in-buffer=%d' % (len(self.in_buffer),) 121 out += ' -> ' + repr(self.transport) 122 out += '>' 123 return out
124
125 - def get_pty(self, term='vt100', width=80, height=24):
126 """ 127 Request a pseudo-terminal from the server. This is usually used right 128 after creating a client channel, to ask the server to provide some 129 basic terminal semantics for a shell invoked with L{invoke_shell}. 130 It isn't necessary (or desirable) to call this method if you're going 131 to exectue a single command with L{exec_command}. 132 133 @param term: the terminal type to emulate (for example, C{'vt100'}) 134 @type term: str 135 @param width: width (in characters) of the terminal screen 136 @type width: int 137 @param height: height (in characters) of the terminal screen 138 @type height: int 139 140 @raise SSHException: if the request was rejected or the channel was 141 closed 142 """ 143 if self.closed or self.eof_received or self.eof_sent or not self.active: 144 raise SSHException('Channel is not open') 145 m = Message() 146 m.add_byte(chr(MSG_CHANNEL_REQUEST)) 147 m.add_int(self.remote_chanid) 148 m.add_string('pty-req') 149 m.add_boolean(True) 150 m.add_string(term) 151 m.add_int(width) 152 m.add_int(height) 153 # pixel height, width (usually useless) 154 m.add_int(0).add_int(0) 155 m.add_string('') 156 self._event_pending() 157 self.transport._send_user_message(m) 158 self._wait_for_event()
159
160 - def invoke_shell(self):
161 """ 162 Request an interactive shell session on this channel. If the server 163 allows it, the channel will then be directly connected to the stdin, 164 stdout, and stderr of the shell. 165 166 Normally you would call L{get_pty} before this, in which case the 167 shell will operate through the pty, and the channel will be connected 168 to the stdin and stdout of the pty. 169 170 When the shell exits, the channel will be closed and can't be reused. 171 You must open a new channel if you wish to open another shell. 172 173 @raise SSHException: if the request was rejected or the channel was 174 closed 175 """ 176 if self.closed or self.eof_received or self.eof_sent or not self.active: 177 raise SSHException('Channel is not open') 178 m = Message() 179 m.add_byte(chr(MSG_CHANNEL_REQUEST)) 180 m.add_int(self.remote_chanid) 181 m.add_string('shell') 182 m.add_boolean(1) 183 self._event_pending() 184 self.transport._send_user_message(m) 185 self._wait_for_event()
186
187 - def exec_command(self, command):
188 """ 189 Execute a command on the server. If the server allows it, the channel 190 will then be directly connected to the stdin, stdout, and stderr of 191 the command being executed. 192 193 When the command finishes executing, the channel will be closed and 194 can't be reused. You must open a new channel if you wish to execute 195 another command. 196 197 @param command: a shell command to execute. 198 @type command: str 199 200 @raise SSHException: if the request was rejected or the channel was 201 closed 202 """ 203 if self.closed or self.eof_received or self.eof_sent or not self.active: 204 raise SSHException('Channel is not open') 205 m = Message() 206 m.add_byte(chr(MSG_CHANNEL_REQUEST)) 207 m.add_int(self.remote_chanid) 208 m.add_string('exec') 209 m.add_boolean(True) 210 m.add_string(command) 211 self._event_pending() 212 self.transport._send_user_message(m) 213 self._wait_for_event()
214
215 - def invoke_subsystem(self, subsystem):
216 """ 217 Request a subsystem on the server (for example, C{sftp}). If the 218 server allows it, the channel will then be directly connected to the 219 requested subsystem. 220 221 When the subsystem finishes, the channel will be closed and can't be 222 reused. 223 224 @param subsystem: name of the subsystem being requested. 225 @type subsystem: str 226 227 @raise SSHException: if the request was rejected or the channel was 228 closed 229 """ 230 if self.closed or self.eof_received or self.eof_sent or not self.active: 231 raise SSHException('Channel is not open') 232 m = Message() 233 m.add_byte(chr(MSG_CHANNEL_REQUEST)) 234 m.add_int(self.remote_chanid) 235 m.add_string('subsystem') 236 m.add_boolean(True) 237 m.add_string(subsystem) 238 self._event_pending() 239 self.transport._send_user_message(m) 240 self._wait_for_event()
241
242 - def resize_pty(self, width=80, height=24):
243 """ 244 Resize the pseudo-terminal. This can be used to change the width and 245 height of the terminal emulation created in a previous L{get_pty} call. 246 247 @param width: new width (in characters) of the terminal screen 248 @type width: int 249 @param height: new height (in characters) of the terminal screen 250 @type height: int 251 252 @raise SSHException: if the request was rejected or the channel was 253 closed 254 """ 255 if self.closed or self.eof_received or self.eof_sent or not self.active: 256 raise SSHException('Channel is not open') 257 m = Message() 258 m.add_byte(chr(MSG_CHANNEL_REQUEST)) 259 m.add_int(self.remote_chanid) 260 m.add_string('window-change') 261 m.add_boolean(True) 262 m.add_int(width) 263 m.add_int(height) 264 m.add_int(0).add_int(0) 265 self._event_pending() 266 self.transport._send_user_message(m) 267 self._wait_for_event()
268
269 - def exit_status_ready(self):
270 """ 271 Return true if the remote process has exited and returned an exit 272 status. You may use this to poll the process status if you don't 273 want to block in L{recv_exit_status}. Note that the server may not 274 return an exit status in some cases (like bad servers). 275 276 @return: True if L{recv_exit_status} will return immediately 277 @rtype: bool 278 @since: 1.7.3 279 """ 280 return self.closed or self.status_event.isSet()
281
282 - def recv_exit_status(self):
283 """ 284 Return the exit status from the process on the server. This is 285 mostly useful for retrieving the reults of an L{exec_command}. 286 If the command hasn't finished yet, this method will wait until 287 it does, or until the channel is closed. If no exit status is 288 provided by the server, -1 is returned. 289 290 @return: the exit code of the process on the server. 291 @rtype: int 292 293 @since: 1.2 294 """ 295 self.status_event.wait() 296 assert self.status_event.isSet() 297 return self.exit_status
298
299 - def send_exit_status(self, status):
300 """ 301 Send the exit status of an executed command to the client. (This 302 really only makes sense in server mode.) Many clients expect to 303 get some sort of status code back from an executed command after 304 it completes. 305 306 @param status: the exit code of the process 307 @type status: int 308 309 @since: 1.2 310 """ 311 # in many cases, the channel will not still be open here. 312 # that's fine. 313 m = Message() 314 m.add_byte(chr(MSG_CHANNEL_REQUEST)) 315 m.add_int(self.remote_chanid) 316 m.add_string('exit-status') 317 m.add_boolean(False) 318 m.add_int(status) 319 self.transport._send_user_message(m)
320
321 - def request_x11(self, screen_number=0, auth_protocol=None, auth_cookie=None, 322 single_connection=False, handler=None):
323 """ 324 Request an x11 session on this channel. If the server allows it, 325 further x11 requests can be made from the server to the client, 326 when an x11 application is run in a shell session. 327 328 From RFC4254:: 329 330 It is RECOMMENDED that the 'x11 authentication cookie' that is 331 sent be a fake, random cookie, and that the cookie be checked and 332 replaced by the real cookie when a connection request is received. 333 334 If you omit the auth_cookie, a new secure random 128-bit value will be 335 generated, used, and returned. You will need to use this value to 336 verify incoming x11 requests and replace them with the actual local 337 x11 cookie (which requires some knoweldge of the x11 protocol). 338 339 If a handler is passed in, the handler is called from another thread 340 whenever a new x11 connection arrives. The default handler queues up 341 incoming x11 connections, which may be retrieved using 342 L{Transport.accept}. The handler's calling signature is:: 343 344 handler(channel: Channel, (address: str, port: int)) 345 346 @param screen_number: the x11 screen number (0, 10, etc) 347 @type screen_number: int 348 @param auth_protocol: the name of the X11 authentication method used; 349 if none is given, C{"MIT-MAGIC-COOKIE-1"} is used 350 @type auth_protocol: str 351 @param auth_cookie: hexadecimal string containing the x11 auth cookie; 352 if none is given, a secure random 128-bit value is generated 353 @type auth_cookie: str 354 @param single_connection: if True, only a single x11 connection will be 355 forwarded (by default, any number of x11 connections can arrive 356 over this session) 357 @type single_connection: bool 358 @param handler: an optional handler to use for incoming X11 connections 359 @type handler: function 360 @return: the auth_cookie used 361 """ 362 if self.closed or self.eof_received or self.eof_sent or not self.active: 363 raise SSHException('Channel is not open') 364 if auth_protocol is None: 365 auth_protocol = 'MIT-MAGIC-COOKIE-1' 366 if auth_cookie is None: 367 auth_cookie = binascii.hexlify(self.transport.randpool.get_bytes(16)) 368 369 m = Message() 370 m.add_byte(chr(MSG_CHANNEL_REQUEST)) 371 m.add_int(self.remote_chanid) 372 m.add_string('x11-req') 373 m.add_boolean(True) 374 m.add_boolean(single_connection) 375 m.add_string(auth_protocol) 376 m.add_string(auth_cookie) 377 m.add_int(screen_number) 378 self._event_pending() 379 self.transport._send_user_message(m) 380 self._wait_for_event() 381 self.transport._set_x11_handler(handler) 382 return auth_cookie
383
384 - def get_transport(self):
385 """ 386 Return the L{Transport} associated with this channel. 387 388 @return: the L{Transport} that was used to create this channel. 389 @rtype: L{Transport} 390 """ 391 return self.transport
392
393 - def set_name(self, name):
394 """ 395 Set a name for this channel. Currently it's only used to set the name 396 of the channel in logfile entries. The name can be fetched with the 397 L{get_name} method. 398 399 @param name: new channel name 400 @type name: str 401 """ 402 self._name = name
403
404 - def get_name(self):
405 """ 406 Get the name of this channel that was previously set by L{set_name}. 407 408 @return: the name of this channel. 409 @rtype: str 410 """ 411 return self._name
412
413 - def get_id(self):
414 """ 415 Return the ID # for this channel. The channel ID is unique across 416 a L{Transport} and usually a small number. It's also the number 417 passed to L{ServerInterface.check_channel_request} when determining 418 whether to accept a channel request in server mode. 419 420 @return: the ID of this channel. 421 @rtype: int 422 """ 423 return self.chanid
424
425 - def set_combine_stderr(self, combine):
426 """ 427 Set whether stderr should be combined into stdout on this channel. 428 The default is C{False}, but in some cases it may be convenient to 429 have both streams combined. 430 431 If this is C{False}, and L{exec_command} is called (or C{invoke_shell} 432 with no pty), output to stderr will not show up through the L{recv} 433 and L{recv_ready} calls. You will have to use L{recv_stderr} and 434 L{recv_stderr_ready} to get stderr output. 435 436 If this is C{True}, data will never show up via L{recv_stderr} or 437 L{recv_stderr_ready}. 438 439 @param combine: C{True} if stderr output should be combined into 440 stdout on this channel. 441 @type combine: bool 442 @return: previous setting. 443 @rtype: bool 444 445 @since: 1.1 446 """ 447 data = '' 448 self.lock.acquire() 449 try: 450 old = self.combine_stderr 451 self.combine_stderr = combine 452 if combine and not old: 453 # copy old stderr buffer into primary buffer 454 data = self.in_stderr_buffer.empty() 455 finally: 456 self.lock.release() 457 if len(data) > 0: 458 self._feed(data) 459 return old
460 461 462 ### socket API 463 464
465 - def settimeout(self, timeout):
466 """ 467 Set a timeout on blocking read/write operations. The C{timeout} 468 argument can be a nonnegative float expressing seconds, or C{None}. If 469 a float is given, subsequent channel read/write operations will raise 470 a timeout exception if the timeout period value has elapsed before the 471 operation has completed. Setting a timeout of C{None} disables 472 timeouts on socket operations. 473 474 C{chan.settimeout(0.0)} is equivalent to C{chan.setblocking(0)}; 475 C{chan.settimeout(None)} is equivalent to C{chan.setblocking(1)}. 476 477 @param timeout: seconds to wait for a pending read/write operation 478 before raising C{socket.timeout}, or C{None} for no timeout. 479 @type timeout: float 480 """ 481 self.timeout = timeout
482
483 - def gettimeout(self):
484 """ 485 Returns the timeout in seconds (as a float) associated with socket 486 operations, or C{None} if no timeout is set. This reflects the last 487 call to L{setblocking} or L{settimeout}. 488 489 @return: timeout in seconds, or C{None}. 490 @rtype: float 491 """ 492 return self.timeout
493
494 - def setblocking(self, blocking):
495 """ 496 Set blocking or non-blocking mode of the channel: if C{blocking} is 0, 497 the channel is set to non-blocking mode; otherwise it's set to blocking 498 mode. Initially all channels are in blocking mode. 499 500 In non-blocking mode, if a L{recv} call doesn't find any data, or if a 501 L{send} call can't immediately dispose of the data, an error exception 502 is raised. In blocking mode, the calls block until they can proceed. An 503 EOF condition is considered "immediate data" for L{recv}, so if the 504 channel is closed in the read direction, it will never block. 505 506 C{chan.setblocking(0)} is equivalent to C{chan.settimeout(0)}; 507 C{chan.setblocking(1)} is equivalent to C{chan.settimeout(None)}. 508 509 @param blocking: 0 to set non-blocking mode; non-0 to set blocking 510 mode. 511 @type blocking: int 512 """ 513 if blocking: 514 self.settimeout(None) 515 else: 516 self.settimeout(0.0)
517
518 - def getpeername(self):
519 """ 520 Return the address of the remote side of this Channel, if possible. 521 This is just a wrapper around C{'getpeername'} on the Transport, used 522 to provide enough of a socket-like interface to allow asyncore to work. 523 (asyncore likes to call C{'getpeername'}.) 524 525 @return: the address if the remote host, if known 526 @rtype: tuple(str, int) 527 """ 528 return self.transport.getpeername()
529
530 - def close(self):
531 """ 532 Close the channel. All future read/write operations on the channel 533 will fail. The remote end will receive no more data (after queued data 534 is flushed). Channels are automatically closed when their L{Transport} 535 is closed or when they are garbage collected. 536 """ 537 self.lock.acquire() 538 try: 539 # only close the pipe when the user explicitly closes the channel. 540 # otherwise they will get unpleasant surprises. (and do it before 541 # checking self.closed, since the remote host may have already 542 # closed the connection.) 543 if self._pipe is not None: 544 self._pipe.close() 545 self._pipe = None 546 547 if not self.active or self.closed: 548 return 549 msgs = self._close_internal() 550 finally: 551 self.lock.release() 552 for m in msgs: 553 if m is not None: 554 self.transport._send_user_message(m)
555
556 - def recv_ready(self):
557 """ 558 Returns true if data is buffered and ready to be read from this 559 channel. A C{False} result does not mean that the channel has closed; 560 it means you may need to wait before more data arrives. 561 562 @return: C{True} if a L{recv} call on this channel would immediately 563 return at least one byte; C{False} otherwise. 564 @rtype: boolean 565 """ 566 return self.in_buffer.read_ready()
567
568 - def recv(self, nbytes):
569 """ 570 Receive data from the channel. The return value is a string 571 representing the data received. The maximum amount of data to be 572 received at once is specified by C{nbytes}. If a string of length zero 573 is returned, the channel stream has closed. 574 575 @param nbytes: maximum number of bytes to read. 576 @type nbytes: int 577 @return: data. 578 @rtype: str 579 580 @raise socket.timeout: if no data is ready before the timeout set by 581 L{settimeout}. 582 """ 583 try: 584 out = self.in_buffer.read(nbytes, self.timeout) 585 except PipeTimeout, e: 586 raise socket.timeout() 587 588 ack = self._check_add_window(len(out)) 589 # no need to hold the channel lock when sending this 590 if ack > 0: 591 m = Message() 592 m.add_byte(chr(MSG_CHANNEL_WINDOW_ADJUST)) 593 m.add_int(self.remote_chanid) 594 m.add_int(ack) 595 self.transport._send_user_message(m) 596 597 return out
598
599 - def recv_stderr_ready(self):
600 """ 601 Returns true if data is buffered and ready to be read from this 602 channel's stderr stream. Only channels using L{exec_command} or 603 L{invoke_shell} without a pty will ever have data on the stderr 604 stream. 605 606 @return: C{True} if a L{recv_stderr} call on this channel would 607 immediately return at least one byte; C{False} otherwise. 608 @rtype: boolean 609 610 @since: 1.1 611 """ 612 return self.in_stderr_buffer.read_ready()
613
614 - def recv_stderr(self, nbytes):
615 """ 616 Receive data from the channel's stderr stream. Only channels using 617 L{exec_command} or L{invoke_shell} without a pty will ever have data 618 on the stderr stream. The return value is a string representing the 619 data received. The maximum amount of data to be received at once is 620 specified by C{nbytes}. If a string of length zero is returned, the 621 channel stream has closed. 622 623 @param nbytes: maximum number of bytes to read. 624 @type nbytes: int 625 @return: data. 626 @rtype: str 627 628 @raise socket.timeout: if no data is ready before the timeout set by 629 L{settimeout}. 630 631 @since: 1.1 632 """ 633 try: 634 out = self.in_stderr_buffer.read(nbytes, self.timeout) 635 except PipeTimeout, e: 636 raise socket.timeout() 637 638 ack = self._check_add_window(len(out)) 639 # no need to hold the channel lock when sending this 640 if ack > 0: 641 m = Message() 642 m.add_byte(chr(MSG_CHANNEL_WINDOW_ADJUST)) 643 m.add_int(self.remote_chanid) 644 m.add_int(ack) 645 self.transport._send_user_message(m) 646 647 return out
648
649 - def send_ready(self):
650 """ 651 Returns true if data can be written to this channel without blocking. 652 This means the channel is either closed (so any write attempt would 653 return immediately) or there is at least one byte of space in the 654 outbound buffer. If there is at least one byte of space in the 655 outbound buffer, a L{send} call will succeed immediately and return 656 the number of bytes actually written. 657 658 @return: C{True} if a L{send} call on this channel would immediately 659 succeed or fail 660 @rtype: boolean 661 """ 662 self.lock.acquire() 663 try: 664 if self.closed or self.eof_sent: 665 return True 666 return self.out_window_size > 0 667 finally: 668 self.lock.release()
669
670 - def send(self, s):
671 """ 672 Send data to the channel. Returns the number of bytes sent, or 0 if 673 the channel stream is closed. Applications are responsible for 674 checking that all data has been sent: if only some of the data was 675 transmitted, the application needs to attempt delivery of the remaining 676 data. 677 678 @param s: data to send 679 @type s: str 680 @return: number of bytes actually sent 681 @rtype: int 682 683 @raise socket.timeout: if no data could be sent before the timeout set 684 by L{settimeout}. 685 """ 686 size = len(s) 687 self.lock.acquire() 688 try: 689 size = self._wait_for_send_window(size) 690 if size == 0: 691 # eof or similar 692 return 0 693 m = Message() 694 m.add_byte(chr(MSG_CHANNEL_DATA)) 695 m.add_int(self.remote_chanid) 696 m.add_string(s[:size]) 697 finally: 698 self.lock.release() 699 # Note: We release self.lock before calling _send_user_message. 700 # Otherwise, we can deadlock during re-keying. 701 self.transport._send_user_message(m) 702 return size
703
704 - def send_stderr(self, s):
705 """ 706 Send data to the channel on the "stderr" stream. This is normally 707 only used by servers to send output from shell commands -- clients 708 won't use this. Returns the number of bytes sent, or 0 if the channel 709 stream is closed. Applications are responsible for checking that all 710 data has been sent: if only some of the data was transmitted, the 711 application needs to attempt delivery of the remaining data. 712 713 @param s: data to send. 714 @type s: str 715 @return: number of bytes actually sent. 716 @rtype: int 717 718 @raise socket.timeout: if no data could be sent before the timeout set 719 by L{settimeout}. 720 721 @since: 1.1 722 """ 723 size = len(s) 724 self.lock.acquire() 725 try: 726 size = self._wait_for_send_window(size) 727 if size == 0: 728 # eof or similar 729 return 0 730 m = Message() 731 m.add_byte(chr(MSG_CHANNEL_EXTENDED_DATA)) 732 m.add_int(self.remote_chanid) 733 m.add_int(1) 734 m.add_string(s[:size]) 735 finally: 736 self.lock.release() 737 # Note: We release self.lock before calling _send_user_message. 738 # Otherwise, we can deadlock during re-keying. 739 self.transport._send_user_message(m) 740 return size
741
742 - def sendall(self, s):
743 """ 744 Send data to the channel, without allowing partial results. Unlike 745 L{send}, this method continues to send data from the given string until 746 either all data has been sent or an error occurs. Nothing is returned. 747 748 @param s: data to send. 749 @type s: str 750 751 @raise socket.timeout: if sending stalled for longer than the timeout 752 set by L{settimeout}. 753 @raise socket.error: if an error occured before the entire string was 754 sent. 755 756 @note: If the channel is closed while only part of the data hase been 757 sent, there is no way to determine how much data (if any) was sent. 758 This is irritating, but identically follows python's API. 759 """ 760 while s: 761 if self.closed: 762 # this doesn't seem useful, but it is the documented behavior of Socket 763 raise socket.error('Socket is closed') 764 sent = self.send(s) 765 s = s[sent:] 766 return None
767
768 - def sendall_stderr(self, s):
769 """ 770 Send data to the channel's "stderr" stream, without allowing partial 771 results. Unlike L{send_stderr}, this method continues to send data 772 from the given string until all data has been sent or an error occurs. 773 Nothing is returned. 774 775 @param s: data to send to the client as "stderr" output. 776 @type s: str 777 778 @raise socket.timeout: if sending stalled for longer than the timeout 779 set by L{settimeout}. 780 @raise socket.error: if an error occured before the entire string was 781 sent. 782 783 @since: 1.1 784 """ 785 while s: 786 if self.closed: 787 raise socket.error('Socket is closed') 788 sent = self.send_stderr(s) 789 s = s[sent:] 790 return None
791
792 - def makefile(self, *params):
793 """ 794 Return a file-like object associated with this channel. The optional 795 C{mode} and C{bufsize} arguments are interpreted the same way as by 796 the built-in C{file()} function in python. 797 798 @return: object which can be used for python file I/O. 799 @rtype: L{ChannelFile} 800 """ 801 return ChannelFile(*([self] + list(params)))
802
803 - def makefile_stderr(self, *params):
804 """ 805 Return a file-like object associated with this channel's stderr 806 stream. Only channels using L{exec_command} or L{invoke_shell} 807 without a pty will ever have data on the stderr stream. 808 809 The optional C{mode} and C{bufsize} arguments are interpreted the 810 same way as by the built-in C{file()} function in python. For a 811 client, it only makes sense to open this file for reading. For a 812 server, it only makes sense to open this file for writing. 813 814 @return: object which can be used for python file I/O. 815 @rtype: L{ChannelFile} 816 817 @since: 1.1 818 """ 819 return ChannelStderrFile(*([self] + list(params)))
820
821 - def fileno(self):
822 """ 823 Returns an OS-level file descriptor which can be used for polling, but 824 but I{not} for reading or writing. This is primaily to allow python's 825 C{select} module to work. 826 827 The first time C{fileno} is called on a channel, a pipe is created to 828 simulate real OS-level file descriptor (FD) behavior. Because of this, 829 two OS-level FDs are created, which will use up FDs faster than normal. 830 (You won't notice this effect unless you have hundreds of channels 831 open at the same time.) 832 833 @return: an OS-level file descriptor 834 @rtype: int 835 836 @warning: This method causes channel reads to be slightly less 837 efficient. 838 """ 839 self.lock.acquire() 840 try: 841 if self._pipe is not None: 842 return self._pipe.fileno() 843 # create the pipe and feed in any existing data 844 self._pipe = pipe.make_pipe() 845 p1, p2 = pipe.make_or_pipe(self._pipe) 846 self.in_buffer.set_event(p1) 847 self.in_stderr_buffer.set_event(p2) 848 return self._pipe.fileno() 849 finally: 850 self.lock.release()
851
852 - def shutdown(self, how):
853 """ 854 Shut down one or both halves of the connection. If C{how} is 0, 855 further receives are disallowed. If C{how} is 1, further sends 856 are disallowed. If C{how} is 2, further sends and receives are 857 disallowed. This closes the stream in one or both directions. 858 859 @param how: 0 (stop receiving), 1 (stop sending), or 2 (stop 860 receiving and sending). 861 @type how: int 862 """ 863 if (how == 0) or (how == 2): 864 # feign "read" shutdown 865 self.eof_received = 1 866 if (how == 1) or (how == 2): 867 self.lock.acquire() 868 try: 869 m = self._send_eof() 870 finally: 871 self.lock.release() 872 if m is not None: 873 self.transport._send_user_message(m)
874
875 - def shutdown_read(self):
876 """ 877 Shutdown the receiving side of this socket, closing the stream in 878 the incoming direction. After this call, future reads on this 879 channel will fail instantly. This is a convenience method, equivalent 880 to C{shutdown(0)}, for people who don't make it a habit to 881 memorize unix constants from the 1970s. 882 883 @since: 1.2 884 """ 885 self.shutdown(0)
886
887 - def shutdown_write(self):
888 """ 889 Shutdown the sending side of this socket, closing the stream in 890 the outgoing direction. After this call, future writes on this 891 channel will fail instantly. This is a convenience method, equivalent 892 to C{shutdown(1)}, for people who don't make it a habit to 893 memorize unix constants from the 1970s. 894 895 @since: 1.2 896 """ 897 self.shutdown(1)
898 899 900 ### calls from Transport 901 902
903 - def _set_transport(self, transport):
906
907 - def _set_window(self, window_size, max_packet_size):
908 self.in_window_size = window_size 909 self.in_max_packet_size = max_packet_size 910 # threshold of bytes we receive before we bother to send a window update 911 self.in_window_threshold = window_size // 10 912 self.in_window_sofar = 0 913 self._log(DEBUG, 'Max packet in: %d bytes' % max_packet_size)
914
915 - def _set_remote_channel(self, chanid, window_size, max_packet_size):
916 self.remote_chanid = chanid 917 self.out_window_size = window_size 918 self.out_max_packet_size = max(max_packet_size, MIN_PACKET_SIZE) 919 self.active = 1 920 self._log(DEBUG, 'Max packet out: %d bytes' % max_packet_size)
921
922 - def _request_success(self, m):
923 self._log(DEBUG, 'Sesch channel %d request ok' % self.chanid) 924 self.event_ready = True 925 self.event.set() 926 return
927
928 - def _request_failed(self, m):
929 self.lock.acquire() 930 try: 931 msgs = self._close_internal() 932 finally: 933 self.lock.release() 934 for m in msgs: 935 if m is not None: 936 self.transport._send_user_message(m)
937
938 - def _feed(self, m):
939 if type(m) is str: 940 # passed from _feed_extended 941 s = m 942 else: 943 s = m.get_string() 944 self.in_buffer.feed(s)
945
946 - def _feed_extended(self, m):
947 code = m.get_int() 948 s = m.get_string() 949 if code != 1: 950 self._log(ERROR, 'unknown extended_data type %d; discarding' % code) 951 return 952 if self.combine_stderr: 953 self._feed(s) 954 else: 955 self.in_stderr_buffer.feed(s)
956
957 - def _window_adjust(self, m):
958 nbytes = m.get_int() 959 self.lock.acquire() 960 try: 961 if self.ultra_debug: 962 self._log(DEBUG, 'window up %d' % nbytes) 963 self.out_window_size += nbytes 964 self.out_buffer_cv.notifyAll() 965 finally: 966 self.lock.release()
967
968 - def _handle_request(self, m):
969 key = m.get_string() 970 want_reply = m.get_boolean() 971 server = self.transport.server_object 972 ok = False 973 if key == 'exit-status': 974 self.exit_status = m.get_int() 975 self.status_event.set() 976 ok = True 977 elif key == 'xon-xoff': 978 # ignore 979 ok = True 980 elif key == 'pty-req': 981 term = m.get_string() 982 width = m.get_int() 983 height = m.get_int() 984 pixelwidth = m.get_int() 985 pixelheight = m.get_int() 986 modes = m.get_string() 987 if server is None: 988 ok = False 989 else: 990 ok = server.check_channel_pty_request(self, term, width, height, pixelwidth, 991 pixelheight, modes) 992 elif key == 'shell': 993 if server is None: 994 ok = False 995 else: 996 ok = server.check_channel_shell_request(self) 997 elif key == 'exec': 998 cmd = m.get_string() 999 if server is None: 1000 ok = False 1001 else: 1002 ok = server.check_channel_exec_request(self, cmd) 1003 elif key == 'subsystem': 1004 name = m.get_string() 1005 if server is None: 1006 ok = False 1007 else: 1008 ok = server.check_channel_subsystem_request(self, name) 1009 elif key == 'window-change': 1010 width = m.get_int() 1011 height = m.get_int() 1012 pixelwidth = m.get_int() 1013 pixelheight = m.get_int() 1014 if server is None: 1015 ok = False 1016 else: 1017 ok = server.check_channel_window_change_request(self, width, height, pixelwidth, 1018 pixelheight) 1019 elif key == 'x11-req': 1020 single_connection = m.get_boolean() 1021 auth_proto = m.get_string() 1022 auth_cookie = m.get_string() 1023 screen_number = m.get_int() 1024 if server is None: 1025 ok = False 1026 else: 1027 ok = server.check_channel_x11_request(self, single_connection, 1028 auth_proto, auth_cookie, screen_number) 1029 else: 1030 self._log(DEBUG, 'Unhandled channel request "%s"' % key) 1031 ok = False 1032 if want_reply: 1033 m = Message() 1034 if ok: 1035 m.add_byte(chr(MSG_CHANNEL_SUCCESS)) 1036 else: 1037 m.add_byte(chr(MSG_CHANNEL_FAILURE)) 1038 m.add_int(self.remote_chanid) 1039 self.transport._send_user_message(m)
1040
1041 - def _handle_eof(self, m):
1042 self.lock.acquire() 1043 try: 1044 if not self.eof_received: 1045 self.eof_received = True 1046 self.in_buffer.close() 1047 self.in_stderr_buffer.close() 1048 if self._pipe is not None: 1049 self._pipe.set_forever() 1050 finally: 1051 self.lock.release() 1052 self._log(DEBUG, 'EOF received (%s)', self._name)
1053
1054 - def _handle_close(self, m):
1055 self.lock.acquire() 1056 try: 1057 msgs = self._close_internal() 1058 self.transport._unlink_channel(self.chanid) 1059 finally: 1060 self.lock.release() 1061 for m in msgs: 1062 if m is not None: 1063 self.transport._send_user_message(m)
1064 1065 1066 ### internals... 1067 1068
1069 - def _log(self, level, msg, *args):
1070 self.logger.log(level, "[chan " + self._name + "] " + msg, *args)
1071
1072 - def _event_pending(self):
1073 self.event.clear() 1074 self.event_ready = False
1075
1076 - def _wait_for_event(self):
1077 self.event.wait() 1078 assert self.event.isSet() 1079 if self.event_ready: 1080 return 1081 e = self.transport.get_exception() 1082 if e is None: 1083 e = SSHException('Channel closed.') 1084 raise e
1085
1086 - def _set_closed(self):
1087 # you are holding the lock. 1088 self.closed = True 1089 self.in_buffer.close() 1090 self.in_stderr_buffer.close() 1091 self.out_buffer_cv.notifyAll() 1092 # Notify any waiters that we are closed 1093 self.event.set() 1094 self.status_event.set() 1095 if self._pipe is not None: 1096 self._pipe.set_forever()
1097
1098 - def _send_eof(self):
1099 # you are holding the lock. 1100 if self.eof_sent: 1101 return None 1102 m = Message() 1103 m.add_byte(chr(MSG_CHANNEL_EOF)) 1104 m.add_int(self.remote_chanid) 1105 self.eof_sent = True 1106 self._log(DEBUG, 'EOF sent (%s)', self._name) 1107 return m
1108
1109 - def _close_internal(self):
1110 # you are holding the lock. 1111 if not self.active or self.closed: 1112 return None, None 1113 m1 = self._send_eof() 1114 m2 = Message() 1115 m2.add_byte(chr(MSG_CHANNEL_CLOSE)) 1116 m2.add_int(self.remote_chanid) 1117 self._set_closed() 1118 # can't unlink from the Transport yet -- the remote side may still 1119 # try to send meta-data (exit-status, etc) 1120 return m1, m2
1121 1132
1133 - def _check_add_window(self, n):
1134 self.lock.acquire() 1135 try: 1136 if self.closed or self.eof_received or not self.active: 1137 return 0 1138 if self.ultra_debug: 1139 self._log(DEBUG, 'addwindow %d' % n) 1140 self.in_window_sofar += n 1141 if self.in_window_sofar <= self.in_window_threshold: 1142 return 0 1143 if self.ultra_debug: 1144 self._log(DEBUG, 'addwindow send %d' % self.in_window_sofar) 1145 out = self.in_window_sofar 1146 self.in_window_sofar = 0 1147 return out 1148 finally: 1149 self.lock.release()
1150
1151 - def _wait_for_send_window(self, size):
1152 """ 1153 (You are already holding the lock.) 1154 Wait for the send window to open up, and allocate up to C{size} bytes 1155 for transmission. If no space opens up before the timeout, a timeout 1156 exception is raised. Returns the number of bytes available to send 1157 (may be less than requested). 1158 """ 1159 # you are already holding the lock 1160 if self.closed or self.eof_sent: 1161 return 0 1162 if self.out_window_size == 0: 1163 # should we block? 1164 if self.timeout == 0.0: 1165 raise socket.timeout() 1166 # loop here in case we get woken up but a different thread has filled the buffer 1167 timeout = self.timeout 1168 while self.out_window_size == 0: 1169 if self.closed or self.eof_sent: 1170 return 0 1171 then = time.time() 1172 self.out_buffer_cv.wait(timeout) 1173 if timeout != None: 1174 timeout -= time.time() - then 1175 if timeout <= 0.0: 1176 raise socket.timeout() 1177 # we have some window to squeeze into 1178 if self.closed or self.eof_sent: 1179 return 0 1180 if self.out_window_size < size: 1181 size = self.out_window_size 1182 if self.out_max_packet_size - 64 < size: 1183 size = self.out_max_packet_size - 64 1184 self.out_window_size -= size 1185 if self.ultra_debug: 1186 self._log(DEBUG, 'window down to %d' % self.out_window_size) 1187 return size
1188 1189
1190 -class ChannelFile (BufferedFile):
1191 """ 1192 A file-like wrapper around L{Channel}. A ChannelFile is created by calling 1193 L{Channel.makefile}. 1194 1195 @bug: To correctly emulate the file object created from a socket's 1196 C{makefile} method, a L{Channel} and its C{ChannelFile} should be able 1197 to be closed or garbage-collected independently. Currently, closing 1198 the C{ChannelFile} does nothing but flush the buffer. 1199 """ 1200
1201 - def __init__(self, channel, mode = 'r', bufsize = -1):
1202 self.channel = channel 1203 BufferedFile.__init__(self) 1204 self._set_mode(mode, bufsize)
1205
1206 - def __repr__(self):
1207 """ 1208 Returns a string representation of this object, for debugging. 1209 1210 @rtype: str 1211 """ 1212 return '<paramiko.ChannelFile from ' + repr(self.channel) + '>'
1213
1214 - def _read(self, size):
1215 return self.channel.recv(size)
1216
1217 - def _write(self, data):
1218 self.channel.sendall(data) 1219 return len(data)
1220 1221
1222 -class ChannelStderrFile (ChannelFile):
1223 - def __init__(self, channel, mode = 'r', bufsize = -1):
1224 ChannelFile.__init__(self, channel, mode, bufsize)
1225
1226 - def _read(self, size):
1227 return self.channel.recv_stderr(size)
1228
1229 - def _write(self, data):
1230 self.channel.sendall_stderr(data) 1231 return len(data)
1232 1233 1234 # vim: set shiftwidth=4 expandtab : 1235