~spiv/ubuntu/lucid/paramiko/address-families-579530-lucid

« back to all changes in this revision

Viewing changes to paramiko/buffered_pipe.py

  • Committer: Bazaar Package Importer
  • Author(s): Jeremy T. Bouse
  • Date: 2006-12-26 15:48:42 UTC
  • mfrom: (1.1.3 upstream)
  • Revision ID: james.westby@ubuntu.com-20061226154842-a4h9l1h59t0osb7y
Tags: 1.6.4-1
New upstream release (Closes: #344734, #382348).

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2006 Robey Pointer <robey@lag.net>
 
2
#
 
3
# This file is part of paramiko.
 
4
#
 
5
# Paramiko is free software; you can redistribute it and/or modify it under the
 
6
# terms of the GNU Lesser General Public License as published by the Free
 
7
# Software Foundation; either version 2.1 of the License, or (at your option)
 
8
# any later version.
 
9
#
 
10
# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY
 
11
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
 
12
# A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
 
13
# details.
 
14
#
 
15
# You should have received a copy of the GNU Lesser General Public License
 
16
# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
 
17
# 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA.
 
18
 
 
19
"""
 
20
Attempt to generalize the "feeder" part of a Channel: an object which can be
 
21
read from and closed, but is reading from a buffer fed by another thread.  The
 
22
read operations are blocking and can have a timeout set.
 
23
"""
 
24
 
 
25
import array
 
26
import threading
 
27
import time
 
28
 
 
29
 
 
30
class PipeTimeout (IOError):
 
31
    """
 
32
    Indicates that a timeout was reached on a read from a L{BufferedPipe}.
 
33
    """
 
34
    pass
 
35
 
 
36
 
 
37
class BufferedPipe (object):
 
38
    """
 
39
    A buffer that obeys normal read (with timeout) & close semantics for a
 
40
    file or socket, but is fed data from another thread.  This is used by
 
41
    L{Channel}.
 
42
    """
 
43
    
 
44
    def __init__(self):
 
45
        self._lock = threading.Lock()
 
46
        self._cv = threading.Condition(self._lock)
 
47
        self._event = None
 
48
        self._buffer = array.array('B')
 
49
        self._closed = False
 
50
 
 
51
    def set_event(self, event):
 
52
        """
 
53
        Set an event on this buffer.  When data is ready to be read (or the
 
54
        buffer has been closed), the event will be set.  When no data is
 
55
        ready, the event will be cleared.
 
56
        
 
57
        @param event: the event to set/clear
 
58
        @type event: Event
 
59
        """
 
60
        self._event = event
 
61
        if len(self._buffer) > 0:
 
62
            event.set()
 
63
        else:
 
64
            event.clear()
 
65
        
 
66
    def feed(self, data):
 
67
        """
 
68
        Feed new data into this pipe.  This method is assumed to be called
 
69
        from a separate thread, so synchronization is done.
 
70
        
 
71
        @param data: the data to add
 
72
        @type data: str
 
73
        """
 
74
        self._lock.acquire()
 
75
        try:
 
76
            if self._event is not None:
 
77
                self._event.set()
 
78
            self._buffer.fromstring(data)
 
79
            self._cv.notifyAll()
 
80
        finally:
 
81
            self._lock.release()
 
82
 
 
83
    def read_ready(self):
 
84
        """
 
85
        Returns true if data is buffered and ready to be read from this
 
86
        feeder.  A C{False} result does not mean that the feeder has closed;
 
87
        it means you may need to wait before more data arrives.
 
88
        
 
89
        @return: C{True} if a L{read} call would immediately return at least
 
90
            one byte; C{False} otherwise.
 
91
        @rtype: bool
 
92
        """
 
93
        self._lock.acquire()
 
94
        try:
 
95
            if len(self._buffer) == 0:
 
96
                return False
 
97
            return True
 
98
        finally:
 
99
            self._lock.release()
 
100
 
 
101
    def read(self, nbytes, timeout=None):
 
102
        """
 
103
        Read data from the pipe.  The return value is a string representing
 
104
        the data received.  The maximum amount of data to be received at once
 
105
        is specified by C{nbytes}.  If a string of length zero is returned,
 
106
        the pipe has been closed.
 
107
 
 
108
        The optional C{timeout} argument can be a nonnegative float expressing
 
109
        seconds, or C{None} for no timeout.  If a float is given, a
 
110
        C{PipeTimeout} will be raised if the timeout period value has
 
111
        elapsed before any data arrives.
 
112
 
 
113
        @param nbytes: maximum number of bytes to read
 
114
        @type nbytes: int
 
115
        @param timeout: maximum seconds to wait (or C{None}, the default, to
 
116
            wait forever)
 
117
        @type timeout: float
 
118
        @return: data
 
119
        @rtype: str
 
120
        
 
121
        @raise PipeTimeout: if a timeout was specified and no data was ready
 
122
            before that timeout
 
123
        """
 
124
        out = ''
 
125
        self._lock.acquire()
 
126
        try:
 
127
            if len(self._buffer) == 0:
 
128
                if self._closed:
 
129
                    return out
 
130
                # should we block?
 
131
                if timeout == 0.0:
 
132
                    raise PipeTimeout()
 
133
                # loop here in case we get woken up but a different thread has
 
134
                # grabbed everything in the buffer.
 
135
                while (len(self._buffer) == 0) and not self._closed:
 
136
                    then = time.time()
 
137
                    self._cv.wait(timeout)
 
138
                    if timeout is not None:
 
139
                        timeout -= time.time() - then
 
140
                        if timeout <= 0.0:
 
141
                            raise PipeTimeout()
 
142
 
 
143
            # something's in the buffer and we have the lock!
 
144
            if len(self._buffer) <= nbytes:
 
145
                out = self._buffer.tostring()
 
146
                del self._buffer[:]
 
147
                if (self._event is not None) and not self._closed:
 
148
                    self._event.clear()
 
149
            else:
 
150
                out = self._buffer[:nbytes].tostring()
 
151
                del self._buffer[:nbytes]
 
152
        finally:
 
153
            self._lock.release()
 
154
 
 
155
        return out
 
156
    
 
157
    def empty(self):
 
158
        """
 
159
        Clear out the buffer and return all data that was in it.
 
160
        
 
161
        @return: any data that was in the buffer prior to clearing it out
 
162
        @rtype: str
 
163
        """
 
164
        self._lock.acquire()
 
165
        try:
 
166
            out = self._buffer.tostring()
 
167
            del self._buffer[:]
 
168
            if (self._event is not None) and not self._closed:
 
169
                self._event.clear()
 
170
            return out
 
171
        finally:
 
172
            self._lock.release()
 
173
    
 
174
    def close(self):
 
175
        """
 
176
        Close this pipe object.  Future calls to L{read} after the buffer
 
177
        has been emptied will return immediately with an empty string.
 
178
        """
 
179
        self._lock.acquire()
 
180
        try:
 
181
            self._closed = True
 
182
            self._cv.notifyAll()
 
183
            if self._event is not None:
 
184
                self._event.set()
 
185
        finally:
 
186
            self._lock.release()
 
187
 
 
188
    def __len__(self):
 
189
        """
 
190
        Return the number of bytes buffered.
 
191
        
 
192
        @return: number of bytes bufferes
 
193
        @rtype: int
 
194
        """
 
195
        self._lock.acquire()
 
196
        try:
 
197
            return len(self._buffer)
 
198
        finally:
 
199
            self._lock.release()
 
200