1
"""Provide access to the persistent data used by the L{MessageExchange}."""
7
from pysqlite2 import dbapi2 as sqlite3
9
from landscape.lib.store import with_cursor
12
class MessageContext(object):
13
"""Stores a context for incoming messages that require a response.
15
The context consists of
17
- the "operation-id" value
18
- the secure ID that was in effect when the message was received
20
- the time when the message was received
22
This data will be used to detect secure ID changes between the time at
23
which the request message came in and the completion of the request.
24
If the secure ID did change the result message is obolete and will not be
27
@param db: the sqlite database handle.
28
@param id: the database key value for this instance.
31
def __init__(self, db, operation_id, secure_id, message_type, timestamp):
33
self.operation_id = operation_id
34
self.secure_id = secure_id
35
self.message_type = message_type
36
self.timestamp = timestamp
39
def remove(self, cursor):
41
"DELETE FROM message_context WHERE operation_id=?",
45
class ExchangeStore(object):
46
"""Message meta data required by the L{MessageExchange}.
48
The implementation uses a SQLite database as backend, with a single table
49
called "message_context", whose schema is defined in
50
L{ensure_exchange_schema}.
52
@param filename: The name of the file that contains the sqlite database.
56
def __init__(self, filename):
57
self._filename = filename
59
def _ensure_schema(self):
60
ensure_exchange_schema(self._db)
63
def add_message_context(
64
self, cursor, operation_id, secure_id, message_type):
65
"""Add a L{MessageContext} with the given data."""
66
params = (operation_id, secure_id, message_type, time.time())
68
"INSERT INTO message_context "
69
" (operation_id, secure_id, message_type, timestamp) "
70
" VALUES (?,?,?,?)", params)
71
return MessageContext(self._db, *params)
74
def get_message_context(self, cursor, operation_id):
75
"""The L{MessageContext} for the given C{operation_id} or C{None}."""
77
"SELECT operation_id, secure_id, message_type, timestamp "
78
"FROM message_context WHERE operation_id=?", (operation_id,))
79
row = cursor.fetchone()
81
return MessageContext(self._db, *row)
86
def all_operation_ids(self, cursor):
87
"""Return all operation IDs currently stored in C{message_context}."""
88
cursor.execute("SELECT operation_id FROM message_context")
89
result = cursor.fetchall()
90
return [row[0] for row in result]
93
def ensure_exchange_schema(db):
94
"""Create all tables needed by a L{ExchangeStore}.
96
@param db: A connection to a SQLite database.
101
"CREATE TABLE message_context"
102
" (id INTEGER PRIMARY KEY, timestamp TIMESTAMP, "
103
" secure_id TEXT NOT NULL, operation_id INTEGER NOT NULL, "
104
" message_type text NOT NULL)")
106
"CREATE UNIQUE INDEX msgctx_operationid_idx ON "
107
"message_context(operation_id)")
108
except (sqlite3.OperationalError, sqlite3.DatabaseError):