7
from pysqlite2 import dbapi2 as sqlite3
9
from landscape.lib import bpickle
12
class UnknownHashIDRequest(Exception):
13
"""Raised for unknown hash id requests."""
16
def with_cursor(method):
17
"""Decorator that encloses the method in a database transaction.
19
Even though SQLite is supposed to be useful in autocommit mode, we've
20
found cases where the database continued to be locked for writing
21
until the cursor was closed. With this in mind, instead of using
22
the autocommit mode, we explicitly terminate transactions and enforce
23
cursor closing with this decorator.
25
def inner(self, *args, **kwargs):
27
cursor = self._db.cursor()
29
result = method(self, cursor, *args, **kwargs)
40
class PackageStore(object):
42
def __init__(self, filename):
43
self._db = sqlite3.connect(filename)
44
ensure_schema(self._db)
47
def set_hash_ids(self, cursor, hash_ids):
48
for hash, id in hash_ids.iteritems():
49
cursor.execute("REPLACE INTO hash VALUES (?, ?)",
53
def get_hash_id(self, cursor, hash):
54
assert isinstance(hash, basestring)
55
cursor.execute("SELECT id FROM hash WHERE hash=?", (buffer(hash),))
56
value = cursor.fetchone()
62
def get_id_hash(self, cursor, id):
63
assert isinstance(id, (int, long))
64
cursor.execute("SELECT hash FROM hash WHERE id=?", (id,))
65
value = cursor.fetchone()
71
def add_available(self, cursor, ids):
73
cursor.execute("REPLACE INTO available VALUES (?)", (id,))
76
def remove_available(self, cursor, ids):
77
id_list = ",".join(str(int(id)) for id in ids)
78
cursor.execute("DELETE FROM available WHERE id IN (%s)" % id_list)
81
def clear_available(self, cursor):
82
cursor.execute("DELETE FROM available")
85
def get_available(self, cursor):
86
cursor.execute("SELECT id FROM available")
87
return [row[0] for row in cursor.fetchall()]
90
def add_available_upgrades(self, cursor, ids):
92
cursor.execute("REPLACE INTO available_upgrade VALUES (?)", (id,))
95
def remove_available_upgrades(self, cursor, ids):
96
id_list = ",".join(str(int(id)) for id in ids)
97
cursor.execute("DELETE FROM available_upgrade WHERE id IN (%s)"
101
def clear_available_upgrades(self, cursor):
102
cursor.execute("DELETE FROM available_upgrade")
105
def get_available_upgrades(self, cursor):
106
cursor.execute("SELECT id FROM available_upgrade")
107
return [row[0] for row in cursor.fetchall()]
110
def add_installed(self, cursor, ids):
112
cursor.execute("REPLACE INTO installed VALUES (?)", (id,))
115
def remove_installed(self, cursor, ids):
116
id_list = ",".join(str(int(id)) for id in ids)
117
cursor.execute("DELETE FROM installed WHERE id IN (%s)" % id_list)
120
def clear_installed(self, cursor):
121
cursor.execute("DELETE FROM installed")
124
def get_installed(self, cursor):
125
cursor.execute("SELECT id FROM installed")
126
return [row[0] for row in cursor.fetchall()]
129
def add_hash_id_request(self, cursor, hashes):
130
hashes = list(hashes)
131
cursor.execute("INSERT INTO hash_id_request (hashes, timestamp)"
133
(buffer(bpickle.dumps(hashes)), time.time()))
134
return HashIDRequest(self._db, cursor.lastrowid)
137
def get_hash_id_request(self, cursor, request_id):
138
cursor.execute("SELECT 1 FROM hash_id_request WHERE id=?",
140
if not cursor.fetchone():
141
raise UnknownHashIDRequest(request_id)
142
return HashIDRequest(self._db, request_id)
145
def iter_hash_id_requests(self, cursor):
146
cursor.execute("SELECT id FROM hash_id_request")
147
for row in cursor.fetchall():
148
yield HashIDRequest(self._db, row[0])
151
def clear_hash_id_requests(self, cursor):
152
cursor.execute("DELETE FROM hash_id_request")
155
def add_task(self, cursor, queue, data):
156
data = bpickle.dumps(data)
157
cursor.execute("INSERT INTO task (queue, timestamp, data) "
158
"VALUES (?,?,?)", (queue, time.time(), buffer(data)))
159
return PackageTask(self._db, cursor.lastrowid)
162
def get_next_task(self, cursor, queue):
163
cursor.execute("SELECT id FROM task WHERE queue=? ORDER BY timestamp",
165
row = cursor.fetchone()
167
return PackageTask(self._db, row[0])
171
def clear_tasks(self, cursor, except_tasks=()):
172
cursor.execute("DELETE FROM task WHERE id NOT IN (%s)" %
173
",".join([str(task.id) for task in except_tasks]))
176
class HashIDRequest(object):
178
def __init__(self, db, id):
184
def hashes(self, cursor):
185
cursor.execute("SELECT hashes FROM hash_id_request WHERE id=?",
187
return bpickle.loads(str(cursor.fetchone()[0]))
190
def _get_timestamp(self, cursor):
191
cursor.execute("SELECT timestamp FROM hash_id_request WHERE id=?",
193
return cursor.fetchone()[0]
196
def _set_timestamp(self, cursor, value):
197
cursor.execute("UPDATE hash_id_request SET timestamp=? WHERE id=?",
200
timestamp = property(_get_timestamp, _set_timestamp)
203
def _get_message_id(self, cursor):
204
cursor.execute("SELECT message_id FROM hash_id_request WHERE id=?",
206
return cursor.fetchone()[0]
209
def _set_message_id(self, cursor, value):
210
cursor.execute("UPDATE hash_id_request SET message_id=? WHERE id=?",
213
message_id = property(_get_message_id, _set_message_id)
216
def remove(self, cursor):
217
cursor.execute("DELETE FROM hash_id_request WHERE id=?", (self.id,))
220
class PackageTask(object):
222
def __init__(self, db, id):
228
cursor.execute("SELECT queue, timestamp, data FROM task "
230
row = cursor.fetchone()
235
self.timestamp = row[1]
236
self.data = bpickle.loads(str(row[2]))
239
def remove(self, cursor):
240
cursor.execute("DELETE FROM task WHERE id=?", (self.id,))
243
def ensure_schema(db):
244
# FIXME This needs a "patch" table with a "version" column which will
245
# help with upgrades. It should also be used to decide when to
246
# create the schema from the ground up, rather than that using
250
cursor.execute("CREATE TABLE hash"
251
" (id INTEGER PRIMARY KEY, hash BLOB UNIQUE)")
252
cursor.execute("CREATE TABLE available"
253
" (id INTEGER PRIMARY KEY)")
254
cursor.execute("CREATE TABLE available_upgrade"
255
" (id INTEGER PRIMARY KEY)")
256
cursor.execute("CREATE TABLE installed"
257
" (id INTEGER PRIMARY KEY)")
258
cursor.execute("CREATE TABLE hash_id_request"
259
" (id INTEGER PRIMARY KEY, timestamp TIMESTAMP,"
260
" message_id INTEGER, hashes BLOB)")
261
cursor.execute("CREATE TABLE task"
262
" (id INTEGER PRIMARY KEY, queue TEXT,"
263
" timestamp TIMESTAMP, data BLOB)")
264
except sqlite3.OperationalError: