22
23
that reason, let's review the terminology here.
24
25
Assume we have 10 messages in the store, which we label by
25
the following uppercase letters:
26
the following uppercase letters::
27
28
A, B, C, D, E, F, G, H, I, J
42
43
def __init__(self, persist, directory, directory_size=1000,
43
44
monitor_interval=60*60, get_time=time.time):
46
@param persist: a L{Persist} used to save state parameters like
47
the accepted message types, sequence, server uuid etc.
48
@param directory: base of the file system hierarchy
44
50
self._get_time = get_time
45
51
self._directory = directory
46
52
self._directory_size = directory_size
52
58
os.makedirs(message_dir)
55
"""Save metadata to disk."""
61
"""Persist metadata to disk."""
56
62
self._original_persist.save()
58
64
def set_accepted_types(self, types):
67
73
self._reprocess_holding()
69
75
def get_accepted_types(self):
76
"""Get a list of all accepted message types."""
70
77
return self._persist.get("accepted-types", ())
72
79
def accepts(self, type):
80
"""Return bool indicating if C{type} is an accepted message type."""
73
81
return type in self.get_accepted_types()
75
83
def get_sequence(self):
77
Get the sequence number of the message that the server expects us to
78
send on the next exchange.
84
"""Get the current sequence.
86
@return: The sequence number of the message that the server expects us to
87
send on the next exchange.
80
89
return self._persist.get("sequence", 0)
82
91
def set_sequence(self, number):
92
"""Set the current sequence.
84
94
Set the sequence number of the message that the server expects us to
85
95
send on the next exchange.
87
97
self._persist.set("sequence", number)
89
99
def get_server_sequence(self):
91
Get the sequence number of the message that we will ask the server to
92
send to us on the next exchange.
100
"""Get the current server sequence.
102
@return: the sequence number of the message that we will ask the server to
103
send to us on the next exchange.
94
105
return self._persist.get("server_sequence", 0)
96
107
def set_server_sequence(self, number):
108
"""Set the current server sequence.
98
110
Set the sequence number of the message that we will ask the server to
99
111
send to us on the next exchange.
109
121
self._persist.set("server_uuid", uuid)
111
123
def get_pending_offset(self):
124
"""Get the current pending offset."""
112
125
return self._persist.get("pending_offset", 0)
114
127
def set_pending_offset(self, val):
128
"""Set the current pending offset.
116
130
Set the offset into the message pool to consider assigned to the
117
131
current sequence number as returned by l{get_sequence}.
119
133
self._persist.set("pending_offset", val)
121
135
def add_pending_offset(self, val):
136
"""Increment the current pending offset by C{val}."""
122
137
self.set_pending_offset(self.get_pending_offset() + val)
124
139
def count_pending_messages(self):
187
202
def add(self, message):
188
203
"""Queue a message for delivery.
205
@param message: a C{dict} with a C{type} key and other keys conforming
206
to the L{Message} schema for that specifc message type.
190
207
@return: message_id, which is an identifier for the added message.
192
209
assert "type" in message