Package paramiko :: Module buffered_pipe
[frames] | no frames]

Source Code for Module paramiko.buffered_pipe

  1  # Copyright (C) 2006-2007  Robey Pointer <robeypointer@gmail.com> 
  2  # 
  3  # This file is part of paramiko. 
  4  # 
  5  # Paramiko is free software; you can redistribute it and/or modify it under the 
  6  # terms of the GNU Lesser General Public License as published by the Free 
  7  # Software Foundation; either version 2.1 of the License, or (at your option) 
  8  # any later version. 
  9  # 
 10  # Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY 
 11  # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 
 12  # A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more 
 13  # details. 
 14  # 
 15  # You should have received a copy of the GNU Lesser General Public License 
 16  # along with Paramiko; if not, write to the Free Software Foundation, Inc., 
 17  # 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA. 
 18   
 19  """ 
 20  Attempt to generalize the "feeder" part of a Channel: an object which can be 
 21  read from and closed, but is reading from a buffer fed by another thread.  The 
 22  read operations are blocking and can have a timeout set. 
 23  """ 
 24   
 25  import array 
 26  import threading 
 27  import time 
 28   
 29   
30 -class PipeTimeout (IOError):
31 """ 32 Indicates that a timeout was reached on a read from a L{BufferedPipe}. 33 """ 34 pass
35 36
37 -class BufferedPipe (object):
38 """ 39 A buffer that obeys normal read (with timeout) & close semantics for a 40 file or socket, but is fed data from another thread. This is used by 41 L{Channel}. 42 """ 43
44 - def __init__(self):
45 self._lock = threading.Lock() 46 self._cv = threading.Condition(self._lock) 47 self._event = None 48 self._buffer = array.array('B') 49 self._closed = False
50
51 - def set_event(self, event):
52 """ 53 Set an event on this buffer. When data is ready to be read (or the 54 buffer has been closed), the event will be set. When no data is 55 ready, the event will be cleared. 56 57 @param event: the event to set/clear 58 @type event: Event 59 """ 60 self._event = event 61 if len(self._buffer) > 0: 62 event.set() 63 else: 64 event.clear()
65
66 - def feed(self, data):
67 """ 68 Feed new data into this pipe. This method is assumed to be called 69 from a separate thread, so synchronization is done. 70 71 @param data: the data to add 72 @type data: str 73 """ 74 self._lock.acquire() 75 try: 76 if self._event is not None: 77 self._event.set() 78 self._buffer.fromstring(data) 79 self._cv.notifyAll() 80 finally: 81 self._lock.release()
82
83 - def read_ready(self):
84 """ 85 Returns true if data is buffered and ready to be read from this 86 feeder. A C{False} result does not mean that the feeder has closed; 87 it means you may need to wait before more data arrives. 88 89 @return: C{True} if a L{read} call would immediately return at least 90 one byte; C{False} otherwise. 91 @rtype: bool 92 """ 93 self._lock.acquire() 94 try: 95 if len(self._buffer) == 0: 96 return False 97 return True 98 finally: 99 self._lock.release()
100
101 - def read(self, nbytes, timeout=None):
102 """ 103 Read data from the pipe. The return value is a string representing 104 the data received. The maximum amount of data to be received at once 105 is specified by C{nbytes}. If a string of length zero is returned, 106 the pipe has been closed. 107 108 The optional C{timeout} argument can be a nonnegative float expressing 109 seconds, or C{None} for no timeout. If a float is given, a 110 C{PipeTimeout} will be raised if the timeout period value has 111 elapsed before any data arrives. 112 113 @param nbytes: maximum number of bytes to read 114 @type nbytes: int 115 @param timeout: maximum seconds to wait (or C{None}, the default, to 116 wait forever) 117 @type timeout: float 118 @return: data 119 @rtype: str 120 121 @raise PipeTimeout: if a timeout was specified and no data was ready 122 before that timeout 123 """ 124 out = '' 125 self._lock.acquire() 126 try: 127 if len(self._buffer) == 0: 128 if self._closed: 129 return out 130 # should we block? 131 if timeout == 0.0: 132 raise PipeTimeout() 133 # loop here in case we get woken up but a different thread has 134 # grabbed everything in the buffer. 135 while (len(self._buffer) == 0) and not self._closed: 136 then = time.time() 137 self._cv.wait(timeout) 138 if timeout is not None: 139 timeout -= time.time() - then 140 if timeout <= 0.0: 141 raise PipeTimeout() 142 143 # something's in the buffer and we have the lock! 144 if len(self._buffer) <= nbytes: 145 out = self._buffer.tostring() 146 del self._buffer[:] 147 if (self._event is not None) and not self._closed: 148 self._event.clear() 149 else: 150 out = self._buffer[:nbytes].tostring() 151 del self._buffer[:nbytes] 152 finally: 153 self._lock.release() 154 155 return out
156
157 - def empty(self):
158 """ 159 Clear out the buffer and return all data that was in it. 160 161 @return: any data that was in the buffer prior to clearing it out 162 @rtype: str 163 """ 164 self._lock.acquire() 165 try: 166 out = self._buffer.tostring() 167 del self._buffer[:] 168 if (self._event is not None) and not self._closed: 169 self._event.clear() 170 return out 171 finally: 172 self._lock.release()
173
174 - def close(self):
175 """ 176 Close this pipe object. Future calls to L{read} after the buffer 177 has been emptied will return immediately with an empty string. 178 """ 179 self._lock.acquire() 180 try: 181 self._closed = True 182 self._cv.notifyAll() 183 if self._event is not None: 184 self._event.set() 185 finally: 186 self._lock.release()
187
188 - def __len__(self):
189 """ 190 Return the number of bytes buffered. 191 192 @return: number of bytes bufferes 193 @rtype: int 194 """ 195 self._lock.acquire() 196 try: 197 return len(self._buffer) 198 finally: 199 self._lock.release()
200