~cr3/checkbox/core

« back to all changes in this revision

Viewing changes to checkbox/io/pipe.py

  • Committer: Marc Tardif
  • Date: 2012-02-24 15:53:59 UTC
  • Revision ID: marc.tardif@canonical.com-20120224155359-dpbkbh24yfd2bh21
Added connector component.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#
 
2
# Copyright (c) 2012 Canonical
 
3
#
 
4
# This file is part of Checkbox.
 
5
#
 
6
# Storm is free software; you can redistribute it and/or modify
 
7
# it under the terms of the GNU Lesser General Public License as
 
8
# published by the Free Software Foundation; either version 2.1 of
 
9
# the License, or (at your option) any later version.
 
10
#
 
11
# Storm is distributed in the hope that it will be useful,
 
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
14
# GNU Lesser General Public License for more details.
 
15
#
 
16
# You should have received a copy of the GNU Lesser General Public License
 
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
18
#
 
19
__metaclass__ = type
 
20
 
 
21
__all__ = [
 
22
    "Pipe",
 
23
    ]
 
24
 
 
25
import logging
 
26
 
 
27
from checkbox.lib.file import File
 
28
 
 
29
from checkbox.io.selector import (
 
30
    Selector,
 
31
    SelectorIO,
 
32
    )
 
33
from checkbox.io.stream import Stream
 
34
 
 
35
 
 
36
# Number of bytes in atomic write to a pipe.
 
37
PIPE_BUF = 2 ** 12
 
38
 
 
39
 
 
40
class Pipe(Stream):
 
41
 
 
42
    __slots__ = (
 
43
        "_fd",
 
44
        "_bytes",
 
45
        "_reader",
 
46
        "_writer",
 
47
        "_watchdog_client",
 
48
        "_watchdog_server",
 
49
        )
 
50
 
 
51
    _buffer_size = PIPE_BUF
 
52
 
 
53
    def __init__(self):
 
54
        super(Pipe, self).__init__()
 
55
        self._fd = None
 
56
        self._bytes = ""
 
57
        self._reader = File()
 
58
        self._writer = File()
 
59
        self._watchdog_client = None
 
60
        self._watchdog_server = None
 
61
 
 
62
    def fileno(self):
 
63
        return self._fd
 
64
 
 
65
    def get_description(self):
 
66
        if self._fd is None:
 
67
            return None
 
68
 
 
69
        return "fd:%d" % self._fd
 
70
 
 
71
    def connect_to_fd(self, fd):
 
72
        self._fd = fd
 
73
        return self._writer.initialize_from_fd(fd, "w")
 
74
 
 
75
    def listen_on_fd(self, fd):
 
76
        self._fd = fd
 
77
        return self._reader.initialize_from_fd(fd, "r")
 
78
 
 
79
    def handle_incoming_message(self):
 
80
        """Receive messages from the reader and buffer them.
 
81
 
 
82
        :return: Whether a message becomes ready.
 
83
        """
 
84
        bytes = self.get_bytes_raw(PIPE_BUF)
 
85
        if bytes is None:
 
86
            return False
 
87
 
 
88
        self._bytes += bytes
 
89
 
 
90
        return True
 
91
 
 
92
    def get_bytes(self, size=-1):
 
93
        """See Stream.get_bytes"""
 
94
        if size < 0:
 
95
            size = self._buffer_size
 
96
        else:
 
97
            size = min(size, self._buffer_size)
 
98
 
 
99
        while size > len(self._bytes):
 
100
            if not self.handle_incoming_message():
 
101
                break
 
102
 
 
103
        size = min(size, len(self._bytes))
 
104
        bytes = self._bytes[:size]
 
105
        self._bytes = self._bytes[size:]
 
106
 
 
107
        return bytes
 
108
 
 
109
    def get_bytes_raw(self, size):
 
110
        """Get bytes directly from the source.
 
111
 
 
112
        :param size: Number of bytes to get.
 
113
        :return: Bytes gotten.
 
114
        """
 
115
        read_fd = self._reader.fileno()
 
116
 
 
117
        selector = Selector()
 
118
        selector.set_timeout(self._timeout)
 
119
        selector.add_fd(read_fd, SelectorIO.READ)
 
120
 
 
121
        # Select with both the real and watchdog descriptors
 
122
        if self._watchdog_client:
 
123
            selector.add_fd(self._watchdog_client.fileno(), SelectorIO.READ)
 
124
 
 
125
        selector.execute()
 
126
 
 
127
        if selector.has_timed_out:
 
128
            return None
 
129
        elif not selector.has_ready:
 
130
            return None
 
131
 
 
132
        if (self._watchdog_client and
 
133
            selector.check_fd(
 
134
                self._watchdog_client.fileno(), SelectorIO.READ) and
 
135
            not selector.check_fd(read_fd, SelectorIO.READ)):
 
136
            logging.info("Error reading from source, watchdog side closed")
 
137
            return None
 
138
 
 
139
        bytes = self._reader.read(size)
 
140
        if bytes is None:
 
141
            logging.info(
 
142
                "Error reading from source, errno %d", self._reader.errno)
 
143
 
 
144
        return bytes
 
145
 
 
146
    def get_end(self):
 
147
        return True
 
148
 
 
149
    def peek(self):
 
150
        """See Stream.peek"""
 
151
        while not self._bytes:
 
152
            if not self.handle_incoming_message():
 
153
                return None
 
154
 
 
155
        return self._bytes[0]
 
156
 
 
157
    def search(self, sub):
 
158
        """See Stream.search"""
 
159
        bytes = ""
 
160
        position = 0
 
161
        while True:
 
162
            index = self._bytes.find(sub, position)
 
163
            if index >= 0:
 
164
                bytes = self._bytes[:index]
 
165
                self._bytes = self._bytes[index + len(sub):]
 
166
                break
 
167
 
 
168
            position = len(self._bytes)
 
169
 
 
170
            if not self.handle_incoming_message():
 
171
                return None
 
172
 
 
173
        return bytes
 
174
 
 
175
    def put_bytes(self, bytes):
 
176
        """See Stream.put_bytes"""
 
177
        if len(bytes) > self._buffer_size:
 
178
            bytes = bytes[:self._buffer_size]
 
179
 
 
180
        return self.put_bytes_raw(bytes)
 
181
 
 
182
    def put_bytes_raw(self, bytes):
 
183
        """Put bytes directly onto the destination.
 
184
 
 
185
        :param bytes: Bytes to put.
 
186
        :return: Number of bytes put.
 
187
        """
 
188
        # Select with both the real and watchdog descriptors
 
189
        if self._watchdog_client:
 
190
            writer_fd = self._writer.fileno()
 
191
            watchdog_fd = self._watchdog_client.fileno()
 
192
 
 
193
            selector = Selector()
 
194
            selector.add_fd(writer_fd, SelectorIO.WRITE)
 
195
            selector.add_fd(watchdog_fd, SelectorIO.READ)
 
196
 
 
197
            selector.execute()
 
198
 
 
199
            if not selector.has_ready:
 
200
                logging.info("Put bytes failed")
 
201
                return -1
 
202
 
 
203
            if selector.check_fd(watchdog_fd, SelectorIO.READ):
 
204
                logging.info("Error writing to fifo, watchdog side closed")
 
205
                return -1
 
206
 
 
207
        size = self._writer.write(bytes)
 
208
        if size < 0:
 
209
            logging.info(
 
210
                "Error writing to fifo, errno %d", self._writer.errno)
 
211
 
 
212
        return size
 
213
 
 
214
    def put_end(self):
 
215
        return True