8
from landscape.lib import bpickle
9
from landscape.lib.monitor import Monitor
10
from landscape import API
17
class MessageStore(object):
18
"""A message store which stores its messages in a file system hierarchy.
20
The sequencing system we use in the message store may be quite
21
confusing if you haven't looked at it in the last 10 minutes. For
22
that reason, let's review the terminology here.
24
Assume we have 10 messages in the store, which we label by
25
the following uppercase letters:
27
A, B, C, D, E, F, G, H, I, J
30
Let's say that the next message we should send to the server is D.
31
What we call "pending offset" is the displacement from the first
32
message, which in our example above would be 3. What we call
33
"sequence" is the number that the server expects us to label message
34
D as. It could be pretty much any natural number, depending on the
35
history of our exchanges with the server. What we call "server
36
sequence", is the next message number expected by the *client* itself,
37
and is entirely unrelated to the stored messages.
42
def __init__(self, persist, directory, directory_size=1000,
43
monitor_interval=60*60, get_time=time.time):
44
self._get_time = get_time
45
self._directory = directory
46
self._directory_size = directory_size
48
self._original_persist = persist
49
self._persist = persist.root_at("message-store")
50
message_dir = self._message_dir()
51
if not os.path.isdir(message_dir):
52
os.makedirs(message_dir)
55
"""Save metadata to disk."""
56
self._original_persist.save()
58
def set_accepted_types(self, types):
59
"""Specify the types of messages that the server will expect from us.
61
If messages are added to the store which are not currently
62
accepted, they will be saved but ignored until their type is
65
assert type(types) in (tuple, list, set)
66
self._persist.set("accepted-types", sorted(set(types)))
67
self._reprocess_holding()
69
def get_accepted_types(self):
70
return self._persist.get("accepted-types", ())
72
def accepts(self, type):
73
return type in self.get_accepted_types()
75
def get_sequence(self):
77
Get the sequence number of the message that the server expects us to
78
send on the next exchange.
80
return self._persist.get("sequence", 0)
82
def set_sequence(self, number):
84
Set the sequence number of the message that the server expects us to
85
send on the next exchange.
87
self._persist.set("sequence", number)
89
def get_server_sequence(self):
91
Get the sequence number of the message that we will ask the server to
92
send to us on the next exchange.
94
return self._persist.get("server_sequence", 0)
96
def set_server_sequence(self, number):
98
Set the sequence number of the message that we will ask the server to
99
send to us on the next exchange.
101
self._persist.set("server_sequence", number)
103
def get_pending_offset(self):
104
return self._persist.get("pending_offset", 0)
106
def set_pending_offset(self, val):
108
Set the offset into the message pool to consider assigned to the
109
current sequence number as returned by l{get_sequence}.
111
self._persist.set("pending_offset", val)
113
def add_pending_offset(self, val):
114
self.set_pending_offset(self.get_pending_offset() + val)
116
def count_pending_messages(self):
117
"""Return the number of pending messages."""
118
return sum(1 for x in self._walk_pending_messages())
120
def get_pending_messages(self, max=None):
121
"""Get any pending messages that aren't being held, up to max."""
122
accepted_types = self.get_accepted_types()
124
for filename in self._walk_pending_messages():
125
if max is not None and len(messages) >= max:
127
data = self._get_content(self._message_dir(filename))
129
message = bpickle.loads(data)
130
except ValueError, e:
132
self._add_flags(filename, BROKEN)
134
if message["type"] not in accepted_types:
135
self._add_flags(filename, HELD)
137
messages.append(message)
140
def delete_old_messages(self):
141
"""Delete messages which are unlikely to be needed in the future."""
142
filenames = self._get_sorted_filenames()
143
for fn in itertools.islice(self._walk_messages(exclude=HELD+BROKEN),
144
self.get_pending_offset()):
146
containing_dir = os.path.split(fn)[0]
147
if not os.listdir(containing_dir):
148
os.rmdir(containing_dir)
150
def delete_all_messages(self):
151
"""Remove ALL stored messages."""
152
self.set_pending_offset(0)
153
for filename in self._walk_messages():
156
def add_schema(self, schema):
157
"""Add a schema to be applied to messages of the given type.
159
The schema must be an instance of L{landscape.schema.Message}.
161
self._schemas[schema.type] = schema
163
def is_pending(self, message_id):
164
"""Return bool indicating if C{message_id} still hasn't been delivered.
166
@param message_id: Identifier returned by the L{add()} method.
169
pending_offset = self.get_pending_offset()
170
for filename in self._walk_messages(exclude=BROKEN):
171
flags = self._get_flags(filename)
172
if ((HELD in flags or i >= pending_offset) and
173
os.stat(filename).st_ino == message_id):
175
if BROKEN not in flags and HELD not in flags:
179
def add(self, message):
180
"""Queue a message for delivery.
182
@return: message_id, which is an identifier for the added message.
184
assert "type" in message
185
message = self._schemas[message["type"]].coerce(message)
187
if "api" not in message:
188
message["api"] = self.api
190
message_data = bpickle.dumps(message)
192
filename = self._get_next_message_filename()
194
file = open(filename + ".tmp", "w")
195
file.write(message_data)
197
os.rename(filename + ".tmp", filename)
199
if not self.accepts(message["type"]):
200
filename = self._set_flags(filename, HELD)
202
# For now we use the inode as the message id, as it will work
203
# correctly even faced with holding/unholding. It will break
204
# if the store is copied over for some reason, but this shouldn't
205
# present an issue given the current uses. In the future we
206
# should have a nice transactional storage (e.g. sqlite) which
207
# will offer a more strong primary key.
208
message_id = os.stat(filename).st_ino
212
def _get_next_message_filename(self):
213
message_dirs = self._get_sorted_filenames()
215
newest_dir = message_dirs[-1]
217
os.makedirs(self._message_dir("0"))
220
message_filenames = self._get_sorted_filenames(newest_dir)
221
if not message_filenames:
222
filename = self._message_dir(newest_dir, "0")
223
elif len(message_filenames) < self._directory_size:
224
filename = str(int(message_filenames[-1].split("_")[0]) + 1)
225
filename = self._message_dir(newest_dir, filename)
227
newest_dir = self._message_dir(str(int(newest_dir) + 1))
228
os.makedirs(newest_dir)
229
filename = os.path.join(newest_dir, "0")
233
def _walk_pending_messages(self):
234
"""Walk the files which are definitely pending."""
235
pending_offset = self.get_pending_offset()
236
for i, filename in enumerate(self._walk_messages(exclude=HELD+BROKEN)):
237
if i >= pending_offset:
240
def _walk_messages(self, exclude=None):
242
exclude = set(exclude)
243
message_dirs = self._get_sorted_filenames()
244
for message_dir in message_dirs:
245
for filename in self._get_sorted_filenames(message_dir):
246
flags = set(self._get_flags(filename))
247
if (not exclude or not exclude & flags):
248
yield self._message_dir(message_dir, filename)
250
def _get_sorted_filenames(self, dir=""):
251
message_files = [x for x in os.listdir(self._message_dir(dir))
252
if not x.endswith(".tmp")]
253
message_files.sort(key=lambda x: int(x.split("_")[0]))
256
def _message_dir(self, *args):
257
return os.path.join(self._directory, *args)
259
def _get_content(self, filename):
260
file = open(filename)
266
def _reprocess_holding(self):
268
Unhold accepted messages left behind, and hold unaccepted
272
pending_offset = self.get_pending_offset()
273
accepted_types = self.get_accepted_types()
274
for old_filename in self._walk_messages():
275
flags = self._get_flags(old_filename)
277
message = bpickle.loads(self._get_content(old_filename))
278
except ValueError, e:
280
if HELD not in flags:
283
accepted = message["type"] in accepted_types
286
new_filename = self._get_next_message_filename()
287
os.rename(old_filename, new_filename)
288
self._set_flags(new_filename, set(flags)-set(HELD))
290
if not accepted and offset >= pending_offset:
291
self._set_flags(old_filename, set(flags)|set(HELD))
294
def _get_flags(self, path):
295
basename = os.path.basename(path)
297
return basename.split("_")[1]
300
def _set_flags(self, path, flags):
301
dirname, basename = os.path.split(path)
302
new_path = os.path.join(dirname, basename.split("_")[0])
304
new_path += "_"+"".join(sorted(set(flags)))
305
os.rename(path, new_path)
308
def _add_flags(self, path, flags):
309
self._set_flags(path, self._get_flags(path)+flags)
312
def get_default_message_store(*args, **kwargs):
313
"""Get a L{MessageStore} object with all Landscape message schemas added."""
314
from landscape. message_schemas import message_schemas
315
store = MessageStore(*args, **kwargs)
316
for schema in message_schemas.values():
317
store.add_schema(schema)