~ahasenack/landscape-client/landscape-client-1.5.5-0ubuntu0.9.04.0

« back to all changes in this revision

Viewing changes to landscape/broker/store.py

  • Committer: Bazaar Package Importer
  • Author(s): Rick Clark
  • Date: 2008-09-08 16:35:57 UTC
  • mfrom: (1.1.1 upstream)
  • Revision ID: james.westby@ubuntu.com-20080908163557-l3ixzj5dxz37wnw2
Tags: 1.0.18-0ubuntu1
New upstream release 

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
"""Message storage."""
 
2
 
 
3
import time
 
4
import itertools
 
5
import logging
 
6
import os
 
7
 
 
8
from landscape.lib import bpickle
 
9
from landscape.lib.monitor import Monitor
 
10
from landscape import API
 
11
 
 
12
 
 
13
HELD = "h"
 
14
BROKEN = "b"
 
15
 
 
16
 
 
17
class MessageStore(object):
 
18
    """A message store which stores its messages in a file system hierarchy.
 
19
 
 
20
    The sequencing system we use in the message store may be quite
 
21
    confusing if you haven't looked at it in the last 10 minutes.  For
 
22
    that reason, let's review the terminology here.
 
23
 
 
24
    Assume we have 10 messages in the store, which we label by
 
25
    the following uppercase letters:
 
26
 
 
27
        A, B, C, D, E, F, G, H, I, J
 
28
                 ^
 
29
 
 
30
    Let's say that the next message we should send to the server is D.
 
31
    What we call "pending offset" is the displacement from the first
 
32
    message, which in our example above would be 3.  What we call
 
33
    "sequence" is the number that the server expects us to label message
 
34
    D as.  It could be pretty much any natural number, depending on the
 
35
    history of our exchanges with the server.  What we call "server
 
36
    sequence", is the next message number expected by the *client* itself,
 
37
    and is entirely unrelated to the stored messages.
 
38
    """
 
39
 
 
40
    api = API
 
41
 
 
42
    def __init__(self, persist, directory, directory_size=1000,
 
43
                 monitor_interval=60*60, get_time=time.time):
 
44
        self._get_time = get_time
 
45
        self._directory = directory
 
46
        self._directory_size = directory_size
 
47
        self._schemas = {}
 
48
        self._original_persist = persist
 
49
        self._persist = persist.root_at("message-store")
 
50
        message_dir = self._message_dir()
 
51
        if not os.path.isdir(message_dir):
 
52
            os.makedirs(message_dir)
 
53
 
 
54
    def commit(self):
 
55
        """Save metadata to disk."""
 
56
        self._original_persist.save()
 
57
 
 
58
    def set_accepted_types(self, types):
 
59
        """Specify the types of messages that the server will expect from us.
 
60
 
 
61
        If messages are added to the store which are not currently
 
62
        accepted, they will be saved but ignored until their type is
 
63
        accepted.
 
64
        """
 
65
        assert type(types) in (tuple, list, set)
 
66
        self._persist.set("accepted-types", sorted(set(types)))
 
67
        self._reprocess_holding()
 
68
 
 
69
    def get_accepted_types(self):
 
70
        return self._persist.get("accepted-types", ())
 
71
 
 
72
    def accepts(self, type):
 
73
        return type in self.get_accepted_types()
 
74
 
 
75
    def get_sequence(self):
 
76
        """
 
77
        Get the sequence number of the message that the server expects us to
 
78
        send on the next exchange.
 
79
        """
 
80
        return self._persist.get("sequence", 0)
 
81
 
 
82
    def set_sequence(self, number):
 
83
        """
 
84
        Set the sequence number of the message that the server expects us to
 
85
        send on the next exchange.
 
86
        """
 
87
        self._persist.set("sequence", number)
 
88
 
 
89
    def get_server_sequence(self):
 
90
        """
 
91
        Get the sequence number of the message that we will ask the server to
 
92
        send to us on the next exchange.
 
93
        """
 
94
        return self._persist.get("server_sequence", 0)
 
95
 
 
96
    def set_server_sequence(self, number):
 
97
        """
 
98
        Set the sequence number of the message that we will ask the server to
 
99
        send to us on the next exchange.
 
100
        """
 
101
        self._persist.set("server_sequence", number)
 
102
 
 
103
    def get_pending_offset(self):
 
104
        return self._persist.get("pending_offset", 0)
 
105
 
 
106
    def set_pending_offset(self, val):
 
107
        """
 
108
        Set the offset into the message pool to consider assigned to the
 
109
        current sequence number as returned by l{get_sequence}.
 
110
        """
 
111
        self._persist.set("pending_offset", val)
 
112
 
 
113
    def add_pending_offset(self, val):
 
114
        self.set_pending_offset(self.get_pending_offset() + val)
 
115
 
 
116
    def count_pending_messages(self):
 
117
        """Return the number of pending messages."""
 
118
        return sum(1 for x in self._walk_pending_messages())
 
119
 
 
120
    def get_pending_messages(self, max=None):
 
121
        """Get any pending messages that aren't being held, up to max."""
 
122
        accepted_types = self.get_accepted_types()
 
123
        messages = []
 
124
        for filename in self._walk_pending_messages():
 
125
            if max is not None and len(messages) >= max:
 
126
                break
 
127
            data = self._get_content(self._message_dir(filename))
 
128
            try:
 
129
                message = bpickle.loads(data)
 
130
            except ValueError, e:
 
131
                logging.exception(e)
 
132
                self._add_flags(filename, BROKEN)
 
133
            else:
 
134
                if message["type"] not in accepted_types:
 
135
                    self._add_flags(filename, HELD)
 
136
                else:
 
137
                    messages.append(message)
 
138
        return messages
 
139
 
 
140
    def delete_old_messages(self):
 
141
        """Delete messages which are unlikely to be needed in the future."""
 
142
        filenames = self._get_sorted_filenames()
 
143
        for fn in itertools.islice(self._walk_messages(exclude=HELD+BROKEN),
 
144
                                   self.get_pending_offset()):
 
145
            os.unlink(fn)
 
146
            containing_dir = os.path.split(fn)[0]
 
147
            if not os.listdir(containing_dir):
 
148
                os.rmdir(containing_dir)
 
149
 
 
150
    def delete_all_messages(self):
 
151
        """Remove ALL stored messages."""
 
152
        self.set_pending_offset(0)
 
153
        for filename in self._walk_messages():
 
154
            os.unlink(filename)
 
155
 
 
156
    def add_schema(self, schema):
 
157
        """Add a schema to be applied to messages of the given type.
 
158
 
 
159
        The schema must be an instance of L{landscape.schema.Message}.
 
160
        """
 
161
        self._schemas[schema.type] = schema
 
162
 
 
163
    def is_pending(self, message_id):
 
164
        """Return bool indicating if C{message_id} still hasn't been delivered.
 
165
 
 
166
        @param message_id: Identifier returned by the L{add()} method.
 
167
        """
 
168
        i = 0
 
169
        pending_offset = self.get_pending_offset()
 
170
        for filename in self._walk_messages(exclude=BROKEN):
 
171
            flags = self._get_flags(filename)
 
172
            if ((HELD in flags or i >= pending_offset) and
 
173
                os.stat(filename).st_ino == message_id):
 
174
                return True
 
175
            if BROKEN not in flags and HELD not in flags:
 
176
                i += 1
 
177
        return False
 
178
 
 
179
    def add(self, message):
 
180
        """Queue a message for delivery.
 
181
        
 
182
        @return: message_id, which is an identifier for the added message.
 
183
        """
 
184
        assert "type" in message
 
185
        message = self._schemas[message["type"]].coerce(message)
 
186
 
 
187
        if "api" not in message:
 
188
            message["api"] = self.api
 
189
 
 
190
        message_data = bpickle.dumps(message)
 
191
 
 
192
        filename = self._get_next_message_filename()
 
193
 
 
194
        file = open(filename + ".tmp", "w")
 
195
        file.write(message_data)
 
196
        file.close()
 
197
        os.rename(filename + ".tmp", filename)
 
198
 
 
199
        if not self.accepts(message["type"]):
 
200
            filename = self._set_flags(filename, HELD)
 
201
 
 
202
        # For now we use the inode as the message id, as it will work
 
203
        # correctly even faced with holding/unholding.  It will break
 
204
        # if the store is copied over for some reason, but this shouldn't
 
205
        # present an issue given the current uses.  In the future we
 
206
        # should have a nice transactional storage (e.g. sqlite) which
 
207
        # will offer a more strong primary key.
 
208
        message_id = os.stat(filename).st_ino
 
209
 
 
210
        return message_id
 
211
 
 
212
    def _get_next_message_filename(self):
 
213
        message_dirs = self._get_sorted_filenames()
 
214
        if message_dirs:
 
215
            newest_dir = message_dirs[-1]
 
216
        else:
 
217
            os.makedirs(self._message_dir("0"))
 
218
            newest_dir = "0"
 
219
 
 
220
        message_filenames = self._get_sorted_filenames(newest_dir)
 
221
        if not message_filenames:
 
222
            filename = self._message_dir(newest_dir, "0")
 
223
        elif len(message_filenames) < self._directory_size:
 
224
            filename = str(int(message_filenames[-1].split("_")[0]) + 1)
 
225
            filename = self._message_dir(newest_dir, filename)
 
226
        else:
 
227
            newest_dir = self._message_dir(str(int(newest_dir) + 1))
 
228
            os.makedirs(newest_dir)
 
229
            filename = os.path.join(newest_dir, "0")
 
230
 
 
231
        return filename
 
232
 
 
233
    def _walk_pending_messages(self):
 
234
        """Walk the files which are definitely pending."""
 
235
        pending_offset = self.get_pending_offset()
 
236
        for i, filename in enumerate(self._walk_messages(exclude=HELD+BROKEN)):
 
237
            if i >= pending_offset:
 
238
                yield filename
 
239
 
 
240
    def _walk_messages(self, exclude=None):
 
241
        if exclude:
 
242
            exclude = set(exclude)
 
243
        message_dirs = self._get_sorted_filenames()
 
244
        for message_dir in message_dirs:
 
245
            for filename in self._get_sorted_filenames(message_dir):
 
246
                flags = set(self._get_flags(filename))
 
247
                if (not exclude or not exclude & flags):
 
248
                    yield self._message_dir(message_dir, filename)
 
249
 
 
250
    def _get_sorted_filenames(self, dir=""):
 
251
        message_files = [x for x in os.listdir(self._message_dir(dir))
 
252
                         if not x.endswith(".tmp")]
 
253
        message_files.sort(key=lambda x: int(x.split("_")[0]))
 
254
        return message_files
 
255
 
 
256
    def _message_dir(self, *args):
 
257
        return os.path.join(self._directory, *args)
 
258
 
 
259
    def _get_content(self, filename):
 
260
        file = open(filename)
 
261
        try:
 
262
            return file.read()
 
263
        finally:
 
264
            file.close()
 
265
 
 
266
    def _reprocess_holding(self):
 
267
        """
 
268
        Unhold accepted messages left behind, and hold unaccepted
 
269
        pending messages.
 
270
        """
 
271
        offset = 0
 
272
        pending_offset = self.get_pending_offset()
 
273
        accepted_types = self.get_accepted_types()
 
274
        for old_filename in self._walk_messages():
 
275
            flags = self._get_flags(old_filename)
 
276
            try:
 
277
                message = bpickle.loads(self._get_content(old_filename))
 
278
            except ValueError, e:
 
279
                logging.exception(e)
 
280
                if HELD not in flags:
 
281
                    offset += 1
 
282
            else:
 
283
                accepted = message["type"] in accepted_types
 
284
                if HELD in flags:
 
285
                    if accepted:
 
286
                        new_filename = self._get_next_message_filename()
 
287
                        os.rename(old_filename, new_filename)
 
288
                        self._set_flags(new_filename, set(flags)-set(HELD))
 
289
                else:
 
290
                    if not accepted and offset >= pending_offset:
 
291
                        self._set_flags(old_filename, set(flags)|set(HELD))
 
292
                    offset += 1
 
293
 
 
294
    def _get_flags(self, path):
 
295
        basename = os.path.basename(path)
 
296
        if "_" in basename:
 
297
            return basename.split("_")[1]
 
298
        return ""
 
299
 
 
300
    def _set_flags(self, path, flags):
 
301
        dirname, basename = os.path.split(path)
 
302
        new_path = os.path.join(dirname, basename.split("_")[0])
 
303
        if flags:
 
304
            new_path += "_"+"".join(sorted(set(flags)))
 
305
        os.rename(path, new_path)
 
306
        return new_path
 
307
 
 
308
    def _add_flags(self, path, flags):
 
309
        self._set_flags(path, self._get_flags(path)+flags)
 
310
 
 
311
 
 
312
def get_default_message_store(*args, **kwargs):
 
313
    """Get a L{MessageStore} object with all Landscape message schemas added."""
 
314
    from landscape. message_schemas import message_schemas
 
315
    store = MessageStore(*args, **kwargs)
 
316
    for schema in message_schemas.values():
 
317
        store.add_schema(schema)
 
318
    return store