1
# Copyright (C) 2006 Robey Pointer <robey@lag.net>
3
# This file is part of paramiko.
5
# Paramiko is free software; you can redistribute it and/or modify it under the
6
# terms of the GNU Lesser General Public License as published by the Free
7
# Software Foundation; either version 2.1 of the License, or (at your option)
10
# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY
11
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
12
# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15
# You should have received a copy of the GNU Lesser General Public License
16
# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
17
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
20
Attempt to generalize the "feeder" part of a Channel: an object which can be
21
read from and closed, but is reading from a buffer fed by another thread. The
22
read operations are blocking and can have a timeout set.
30
class PipeTimeout (IOError):
32
Indicates that a timeout was reached on a read from a L{BufferedPipe}.
37
class BufferedPipe (object):
39
A buffer that obeys normal read (with timeout) & close semantics for a
40
file or socket, but is fed data from another thread. This is used by
45
self._lock = threading.Lock()
46
self._cv = threading.Condition(self._lock)
48
self._buffer = array.array('B')
51
def set_event(self, event):
53
Set an event on this buffer. When data is ready to be read (or the
54
buffer has been closed), the event will be set. When no data is
55
ready, the event will be cleared.
57
@param event: the event to set/clear
61
if len(self._buffer) > 0:
68
Feed new data into this pipe. This method is assumed to be called
69
from a separate thread, so synchronization is done.
71
@param data: the data to add
76
if self._event is not None:
78
self._buffer.fromstring(data)
85
Returns true if data is buffered and ready to be read from this
86
feeder. A C{False} result does not mean that the feeder has closed;
87
it means you may need to wait before more data arrives.
89
@return: C{True} if a L{read} call would immediately return at least
90
one byte; C{False} otherwise.
95
if len(self._buffer) == 0:
101
def read(self, nbytes, timeout=None):
103
Read data from the pipe. The return value is a string representing
104
the data received. The maximum amount of data to be received at once
105
is specified by C{nbytes}. If a string of length zero is returned,
106
the pipe has been closed.
108
The optional C{timeout} argument can be a nonnegative float expressing
109
seconds, or C{None} for no timeout. If a float is given, a
110
C{PipeTimeout} will be raised if the timeout period value has
111
elapsed before any data arrives.
113
@param nbytes: maximum number of bytes to read
115
@param timeout: maximum seconds to wait (or C{None}, the default, to
121
@raise PipeTimeout: if a timeout was specified and no data was ready
127
if len(self._buffer) == 0:
133
# loop here in case we get woken up but a different thread has
134
# grabbed everything in the buffer.
135
while (len(self._buffer) == 0) and not self._closed:
137
self._cv.wait(timeout)
138
if timeout is not None:
139
timeout -= time.time() - then
143
# something's in the buffer and we have the lock!
144
if len(self._buffer) <= nbytes:
145
out = self._buffer.tostring()
147
if (self._event is not None) and not self._closed:
150
out = self._buffer[:nbytes].tostring()
151
del self._buffer[:nbytes]
159
Clear out the buffer and return all data that was in it.
161
@return: any data that was in the buffer prior to clearing it out
166
out = self._buffer.tostring()
168
if (self._event is not None) and not self._closed:
176
Close this pipe object. Future calls to L{read} after the buffer
177
has been emptied will return immediately with an empty string.
183
if self._event is not None:
190
Return the number of bytes buffered.
192
@return: number of bytes bufferes
197
return len(self._buffer)