2
# Copyright (c) 2012 Canonical
4
# This file is part of Checkbox.
6
# Storm is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU Lesser General Public License as
8
# published by the Free Software Foundation; either version 2.1 of
9
# the License, or (at your option) any later version.
11
# Storm is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
# GNU Lesser General Public License for more details.
16
# You should have received a copy of the GNU Lesser General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
27
from checkbox.lib.file import File
29
from checkbox.io.selector import (
33
from checkbox.io.stream import Stream
36
# Number of bytes in atomic write to a pipe.
51
_buffer_size = PIPE_BUF
54
super(Pipe, self).__init__()
59
self._watchdog_client = None
60
self._watchdog_server = None
65
def get_description(self):
69
return "fd:%d" % self._fd
71
def connect_to_fd(self, fd):
73
return self._writer.initialize_from_fd(fd, "w")
75
def listen_on_fd(self, fd):
77
return self._reader.initialize_from_fd(fd, "r")
79
def handle_incoming_message(self):
80
"""Receive messages from the reader and buffer them.
82
:return: Whether a message becomes ready.
84
bytes = self.get_bytes_raw(PIPE_BUF)
92
def get_bytes(self, size=-1):
93
"""See Stream.get_bytes"""
95
size = self._buffer_size
97
size = min(size, self._buffer_size)
99
while size > len(self._bytes):
100
if not self.handle_incoming_message():
103
size = min(size, len(self._bytes))
104
bytes = self._bytes[:size]
105
self._bytes = self._bytes[size:]
109
def get_bytes_raw(self, size):
110
"""Get bytes directly from the source.
112
:param size: Number of bytes to get.
113
:return: Bytes gotten.
115
read_fd = self._reader.fileno()
117
selector = Selector()
118
selector.set_timeout(self._timeout)
119
selector.add_fd(read_fd, SelectorIO.READ)
121
# Select with both the real and watchdog descriptors
122
if self._watchdog_client:
123
selector.add_fd(self._watchdog_client.fileno(), SelectorIO.READ)
127
if selector.has_timed_out:
129
elif not selector.has_ready:
132
if (self._watchdog_client and
134
self._watchdog_client.fileno(), SelectorIO.READ) and
135
not selector.check_fd(read_fd, SelectorIO.READ)):
136
logging.info("Error reading from source, watchdog side closed")
139
bytes = self._reader.read(size)
142
"Error reading from source, errno %d", self._reader.errno)
150
"""See Stream.peek"""
151
while not self._bytes:
152
if not self.handle_incoming_message():
155
return self._bytes[0]
157
def search(self, sub):
158
"""See Stream.search"""
162
index = self._bytes.find(sub, position)
164
bytes = self._bytes[:index]
165
self._bytes = self._bytes[index + len(sub):]
168
position = len(self._bytes)
170
if not self.handle_incoming_message():
175
def put_bytes(self, bytes):
176
"""See Stream.put_bytes"""
177
if len(bytes) > self._buffer_size:
178
bytes = bytes[:self._buffer_size]
180
return self.put_bytes_raw(bytes)
182
def put_bytes_raw(self, bytes):
183
"""Put bytes directly onto the destination.
185
:param bytes: Bytes to put.
186
:return: Number of bytes put.
188
# Select with both the real and watchdog descriptors
189
if self._watchdog_client:
190
writer_fd = self._writer.fileno()
191
watchdog_fd = self._watchdog_client.fileno()
193
selector = Selector()
194
selector.add_fd(writer_fd, SelectorIO.WRITE)
195
selector.add_fd(watchdog_fd, SelectorIO.READ)
199
if not selector.has_ready:
200
logging.info("Put bytes failed")
203
if selector.check_fd(watchdog_fd, SelectorIO.READ):
204
logging.info("Error writing to fifo, watchdog side closed")
207
size = self._writer.write(bytes)
210
"Error writing to fifo, errno %d", self._writer.errno)