~ahasenack/landscape-client/landscape-client-1.5.5-0ubuntu0.9.04.0

« back to all changes in this revision

Viewing changes to landscape/package/store.py

  • Committer: Bazaar Package Importer
  • Author(s): Rick Clark
  • Date: 2008-09-08 16:35:57 UTC
  • mfrom: (1.1.1 upstream)
  • Revision ID: james.westby@ubuntu.com-20080908163557-l3ixzj5dxz37wnw2
Tags: 1.0.18-0ubuntu1
New upstream release 

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import time
 
2
import os
 
3
 
 
4
try:
 
5
    import sqlite3
 
6
except ImportError:
 
7
    from pysqlite2 import dbapi2 as sqlite3
 
8
 
 
9
from landscape.lib import bpickle
 
10
 
 
11
 
 
12
class UnknownHashIDRequest(Exception):
 
13
    """Raised for unknown hash id requests."""
 
14
 
 
15
 
 
16
def with_cursor(method):
 
17
    """Decorator that encloses the method in a database transaction.
 
18
 
 
19
    Even though SQLite is supposed to be useful in autocommit mode, we've
 
20
    found cases where the database continued to be locked for writing
 
21
    until the cursor was closed.  With this in mind, instead of using
 
22
    the autocommit mode, we explicitly terminate transactions and enforce
 
23
    cursor closing with this decorator.
 
24
    """
 
25
    def inner(self, *args, **kwargs):
 
26
        try:
 
27
            cursor = self._db.cursor()
 
28
            try:
 
29
                result = method(self, cursor, *args, **kwargs)
 
30
            finally:
 
31
                cursor.close()
 
32
            self._db.commit()
 
33
        except:
 
34
            self._db.rollback()
 
35
            raise
 
36
        return result
 
37
    return inner
 
38
 
 
39
 
 
40
class PackageStore(object):
 
41
 
 
42
    def __init__(self, filename):
 
43
        self._db = sqlite3.connect(filename)
 
44
        ensure_schema(self._db)
 
45
 
 
46
    @with_cursor
 
47
    def set_hash_ids(self, cursor, hash_ids):
 
48
        for hash, id in hash_ids.iteritems():
 
49
            cursor.execute("REPLACE INTO hash VALUES (?, ?)",
 
50
                           (id, buffer(hash)))
 
51
 
 
52
    @with_cursor
 
53
    def get_hash_id(self, cursor, hash):
 
54
        assert isinstance(hash, basestring)
 
55
        cursor.execute("SELECT id FROM hash WHERE hash=?", (buffer(hash),))
 
56
        value = cursor.fetchone()
 
57
        if value:
 
58
            return value[0]
 
59
        return None
 
60
 
 
61
    @with_cursor
 
62
    def get_id_hash(self, cursor, id):
 
63
        assert isinstance(id, (int, long))
 
64
        cursor.execute("SELECT hash FROM hash WHERE id=?", (id,))
 
65
        value = cursor.fetchone()
 
66
        if value:
 
67
            return str(value[0])
 
68
        return None
 
69
 
 
70
    @with_cursor
 
71
    def add_available(self, cursor, ids):
 
72
        for id in ids:
 
73
            cursor.execute("REPLACE INTO available VALUES (?)", (id,))
 
74
 
 
75
    @with_cursor
 
76
    def remove_available(self, cursor, ids):
 
77
        id_list = ",".join(str(int(id)) for id in ids)
 
78
        cursor.execute("DELETE FROM available WHERE id IN (%s)" % id_list)
 
79
 
 
80
    @with_cursor
 
81
    def clear_available(self, cursor):
 
82
        cursor.execute("DELETE FROM available")
 
83
 
 
84
    @with_cursor
 
85
    def get_available(self, cursor):
 
86
        cursor.execute("SELECT id FROM available")
 
87
        return [row[0] for row in cursor.fetchall()]
 
88
 
 
89
    @with_cursor
 
90
    def add_available_upgrades(self, cursor, ids):
 
91
        for id in ids:
 
92
            cursor.execute("REPLACE INTO available_upgrade VALUES (?)", (id,))
 
93
 
 
94
    @with_cursor
 
95
    def remove_available_upgrades(self, cursor, ids):
 
96
        id_list = ",".join(str(int(id)) for id in ids)
 
97
        cursor.execute("DELETE FROM available_upgrade WHERE id IN (%s)"
 
98
                       % id_list)
 
99
 
 
100
    @with_cursor
 
101
    def clear_available_upgrades(self, cursor):
 
102
        cursor.execute("DELETE FROM available_upgrade")
 
103
 
 
104
    @with_cursor
 
105
    def get_available_upgrades(self, cursor):
 
106
        cursor.execute("SELECT id FROM available_upgrade")
 
107
        return [row[0] for row in cursor.fetchall()]
 
108
 
 
109
    @with_cursor
 
110
    def add_installed(self, cursor, ids):
 
111
        for id in ids:
 
112
            cursor.execute("REPLACE INTO installed VALUES (?)", (id,))
 
113
 
 
114
    @with_cursor
 
115
    def remove_installed(self, cursor, ids):
 
116
        id_list = ",".join(str(int(id)) for id in ids)
 
117
        cursor.execute("DELETE FROM installed WHERE id IN (%s)" % id_list)
 
118
 
 
119
    @with_cursor
 
120
    def clear_installed(self, cursor):
 
121
        cursor.execute("DELETE FROM installed")
 
122
 
 
123
    @with_cursor
 
124
    def get_installed(self, cursor):
 
125
        cursor.execute("SELECT id FROM installed")
 
126
        return [row[0] for row in cursor.fetchall()]
 
127
 
 
128
    @with_cursor
 
129
    def add_hash_id_request(self, cursor, hashes):
 
130
        hashes = list(hashes)
 
131
        cursor.execute("INSERT INTO hash_id_request (hashes, timestamp)"
 
132
                       " VALUES (?,?)",
 
133
                       (buffer(bpickle.dumps(hashes)), time.time()))
 
134
        return HashIDRequest(self._db, cursor.lastrowid)
 
135
 
 
136
    @with_cursor
 
137
    def get_hash_id_request(self, cursor, request_id):
 
138
        cursor.execute("SELECT 1 FROM hash_id_request WHERE id=?",
 
139
                       (request_id,))
 
140
        if not cursor.fetchone():
 
141
            raise UnknownHashIDRequest(request_id)
 
142
        return HashIDRequest(self._db, request_id)
 
143
 
 
144
    @with_cursor
 
145
    def iter_hash_id_requests(self, cursor):
 
146
        cursor.execute("SELECT id FROM hash_id_request")
 
147
        for row in cursor.fetchall():
 
148
            yield HashIDRequest(self._db, row[0])
 
149
 
 
150
    @with_cursor
 
151
    def clear_hash_id_requests(self, cursor):
 
152
        cursor.execute("DELETE FROM hash_id_request")
 
153
 
 
154
    @with_cursor
 
155
    def add_task(self, cursor, queue, data):
 
156
        data = bpickle.dumps(data)
 
157
        cursor.execute("INSERT INTO task (queue, timestamp, data) "
 
158
                       "VALUES (?,?,?)", (queue, time.time(), buffer(data)))
 
159
        return PackageTask(self._db, cursor.lastrowid)
 
160
 
 
161
    @with_cursor
 
162
    def get_next_task(self, cursor, queue):
 
163
        cursor.execute("SELECT id FROM task WHERE queue=? ORDER BY timestamp",
 
164
                       (queue,))
 
165
        row = cursor.fetchone()
 
166
        if row:
 
167
            return PackageTask(self._db, row[0])
 
168
        return None
 
169
 
 
170
    @with_cursor
 
171
    def clear_tasks(self, cursor, except_tasks=()):
 
172
        cursor.execute("DELETE FROM task WHERE id NOT IN (%s)" %
 
173
                       ",".join([str(task.id) for task in except_tasks]))
 
174
 
 
175
 
 
176
class HashIDRequest(object):
 
177
 
 
178
    def __init__(self, db, id):
 
179
        self._db = db
 
180
        self.id = id
 
181
 
 
182
    @property
 
183
    @with_cursor
 
184
    def hashes(self, cursor):
 
185
        cursor.execute("SELECT hashes FROM hash_id_request WHERE id=?",
 
186
                       (self.id,))
 
187
        return bpickle.loads(str(cursor.fetchone()[0]))
 
188
 
 
189
    @with_cursor
 
190
    def _get_timestamp(self, cursor):
 
191
        cursor.execute("SELECT timestamp FROM hash_id_request WHERE id=?",
 
192
                       (self.id,))
 
193
        return cursor.fetchone()[0]
 
194
 
 
195
    @with_cursor
 
196
    def _set_timestamp(self, cursor, value):
 
197
        cursor.execute("UPDATE hash_id_request SET timestamp=? WHERE id=?",
 
198
                       (value, self.id))
 
199
 
 
200
    timestamp = property(_get_timestamp, _set_timestamp)
 
201
 
 
202
    @with_cursor
 
203
    def _get_message_id(self, cursor):
 
204
        cursor.execute("SELECT message_id FROM hash_id_request WHERE id=?",
 
205
                       (self.id,))
 
206
        return cursor.fetchone()[0]
 
207
 
 
208
    @with_cursor
 
209
    def _set_message_id(self, cursor, value):
 
210
        cursor.execute("UPDATE hash_id_request SET message_id=? WHERE id=?",
 
211
                       (value, self.id))
 
212
 
 
213
    message_id = property(_get_message_id, _set_message_id)
 
214
 
 
215
    @with_cursor
 
216
    def remove(self, cursor):
 
217
        cursor.execute("DELETE FROM hash_id_request WHERE id=?", (self.id,))
 
218
 
 
219
 
 
220
class PackageTask(object):
 
221
 
 
222
    def __init__(self, db, id):
 
223
        self._db = db
 
224
        self.id = id
 
225
 
 
226
        cursor = db.cursor()
 
227
        try:
 
228
            cursor.execute("SELECT queue, timestamp, data FROM task "
 
229
                           "WHERE id=?", (id,))
 
230
            row = cursor.fetchone()
 
231
        finally:
 
232
            cursor.close()
 
233
 
 
234
        self.queue = row[0]
 
235
        self.timestamp = row[1]
 
236
        self.data = bpickle.loads(str(row[2]))
 
237
 
 
238
    @with_cursor
 
239
    def remove(self, cursor):
 
240
        cursor.execute("DELETE FROM task WHERE id=?", (self.id,))
 
241
 
 
242
 
 
243
def ensure_schema(db):
 
244
    # FIXME This needs a "patch" table with a "version" column which will
 
245
    #       help with upgrades.  It should also be used to decide when to
 
246
    #       create the schema from the ground up, rather than that using
 
247
    #       try block.
 
248
    cursor = db.cursor()
 
249
    try:
 
250
        cursor.execute("CREATE TABLE hash"
 
251
                       " (id INTEGER PRIMARY KEY, hash BLOB UNIQUE)")
 
252
        cursor.execute("CREATE TABLE available"
 
253
                       " (id INTEGER PRIMARY KEY)")
 
254
        cursor.execute("CREATE TABLE available_upgrade"
 
255
                       " (id INTEGER PRIMARY KEY)")
 
256
        cursor.execute("CREATE TABLE installed"
 
257
                       " (id INTEGER PRIMARY KEY)")
 
258
        cursor.execute("CREATE TABLE hash_id_request"
 
259
                       " (id INTEGER PRIMARY KEY, timestamp TIMESTAMP,"
 
260
                       " message_id INTEGER, hashes BLOB)")
 
261
        cursor.execute("CREATE TABLE task"
 
262
                       " (id INTEGER PRIMARY KEY, queue TEXT,"
 
263
                       " timestamp TIMESTAMP, data BLOB)")
 
264
    except sqlite3.OperationalError:
 
265
        cursor.close()
 
266
        db.rollback()
 
267
    else:
 
268
        cursor.close()
 
269
        db.commit()