1
"""Helpers for reliable persistent message queues."""
6
def got_next_expected(store, next_expected):
7
"""Our peer has told us what it expects our next message's sequence to be.
9
Call this with the message store and sequence number that the peer
10
wants next; this will do various things based on what *this* side
11
has in its outbound queue store.
13
1. The peer expects a sequence greater than what we last
14
sent. This is the common case and generally it should be
15
expecting last_sent_sequence+len(messages_sent)+1.
17
2. The peer expects a sequence number our side has already sent,
18
and we no longer have that message. In this case, just send
19
*all* messages we have, including the previous generation,
20
starting at the sequence number the peer expects (meaning that
21
messages have probably been lost).
23
3. The peer expects a sequence number we already sent, and we
24
still have that message cached. In this case, we send starting
27
If the next expected sequence from the server refers to a message
28
older than we have, then L{ANCIENT} will be returned.
31
old_sequence = store.get_sequence()
32
if next_expected > old_sequence:
33
store.delete_old_messages()
34
pending_offset = next_expected - old_sequence
35
elif next_expected < (old_sequence - store.get_pending_offset()):
36
# "Ancient": The other side wants messages we don't have,
37
# so let's just reset our counter to what it expects.
41
# No messages transferred, or
42
# "Old": We'll try to send these old messages that the
43
# other side still wants.
44
pending_offset = (store.get_pending_offset() + next_expected
47
store.set_pending_offset(pending_offset)
48
store.set_sequence(next_expected)