~eday/burrow/prototype-conversion

« back to all changes in this revision

Viewing changes to burrowd/backend/sqlite.py

  • Committer: Eric Day
  • Date: 2011-03-17 23:42:41 UTC
  • Revision ID: eday@oddments.org-20110317234241-ult80xn9d1lon867
First chunk of code from prototype. Beyond the prototype, configuration, module loading, and log handling was added.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2011 OpenStack LLC.
 
2
#
 
3
# Licensed under the Apache License, Version 2.0 (the "License");
 
4
# you may not use this file except in compliance with the License.
 
5
# You may obtain a copy of the License at
 
6
#
 
7
#     http://www.apache.org/licenses/LICENSE-2.0
 
8
#
 
9
# Unless required by applicable law or agreed to in writing, software
 
10
# distributed under the License is distributed on an "AS IS" BASIS,
 
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
12
# See the License for the specific language governing permissions and
 
13
# limitations under the License.
 
14
 
 
15
'''Memory backend for the burrow server.'''
 
16
 
 
17
import sqlite3
 
18
import time
 
19
 
 
20
import burrowd.backend
 
21
 
 
22
# Default configuration values for this module.
 
23
DEFAULT_DATABASE = ':memory:'
 
24
 
 
25
 
 
26
class Backend(burrowd.backend.Backend):
 
27
 
 
28
    def __init__(self, config):
 
29
        super(Backend, self).__init__(config)
 
30
        database = self.config.get('database', DEFAULT_DATABASE)
 
31
        self.db = sqlite3.connect(database)
 
32
        queries = [
 
33
            'CREATE TABLE queues ('
 
34
                'account VARCHAR(255) NOT NULL,'
 
35
                'queue VARCHAR(255) NOT NULL,'
 
36
                'PRIMARY KEY (account, queue))',
 
37
            'CREATE TABLE messages ('
 
38
                'queue INT UNSIGNED NOT NULL,'
 
39
                'name VARCHAR(255) NOT NULL,'
 
40
                'ttl INT UNSIGNED NOT NULL,'
 
41
                'hide INT UNSIGNED NOT NULL,'
 
42
                'body BLOB NOT NULL,'
 
43
                'PRIMARY KEY (queue, name))']
 
44
        for query in queries:
 
45
            self.db.execute(query)
 
46
 
 
47
    def delete_accounts(self):
 
48
        self.db.execute("DELETE FROM queues")
 
49
        self.db.execute("DELETE FROM messages")
 
50
 
 
51
    def get_accounts(self):
 
52
        result = self.db.execute("SELECT account FROM queues").fetchall()
 
53
        return [row[0] for row in result]
 
54
 
 
55
    def delete_account(self, account):
 
56
        query = "SELECT rowid FROM queues WHERE account='%s'" % account
 
57
        result = self.db.execute(query).fetchall()
 
58
        if len(result) == 0:
 
59
            return
 
60
        queues = [str(queue[0]) for queue in result]
 
61
        query = "DELETE FROM messages WHERE queue IN (%s)" % (','.join(queues))
 
62
        self.db.execute(query)
 
63
        self.db.execute("DELETE FROM queues WHERE account='%s'" % account)
 
64
 
 
65
    def get_queues(self, account):
 
66
        query = "SELECT queue FROM queues WHERE account='%s'" % account
 
67
        result = self.db.execute(query).fetchall()
 
68
        return [row[0] for row in result]
 
69
 
 
70
    def queue_exists(self, account, queue):
 
71
        query = "SELECT COUNT(*) FROM queues " \
 
72
            "WHERE account='%s' AND queue='%s'" % \
 
73
            (account, queue)
 
74
        result = self.db.execute(query).fetchall()
 
75
        if len(result) == 0:
 
76
            return False
 
77
        self.rowid = result[0][0]
 
78
        return True
 
79
 
 
80
    def delete_messages(self, account, queue, limit, marker, match_hidden):
 
81
        messages = self.get_messages(account, queue, limit, marker,
 
82
            match_hidden)
 
83
        ids = [message['id'] for message in messages]
 
84
        query = "DELETE FROM messages WHERE queue=%d AND name IN (%s)" % \
 
85
            (self.rowid, ','.join(ids))
 
86
        self.db.execute(query)
 
87
        query = "SELECT rowid FROM messages WHERE queue=%d LIMIT 1" % \
 
88
            self.rowid
 
89
        if len(self.db.execute(query).fetchall()) == 0:
 
90
            query = "DELETE FROM queues WHERE rowid=%d" % self.rowid
 
91
            self.db.execute(query)
 
92
        return messages
 
93
 
 
94
    def get_messages(self, account, queue, limit, marker, match_hidden):
 
95
        if marker is not None:
 
96
            query = "SELECT rowid FROM messages " \
 
97
                "WHERE queue=%d AND name='%s'" % \
 
98
                (self.rowid, marker)
 
99
            result = self.db.execute(query).fetchall()
 
100
            if len(result) == 0:
 
101
                marker = None
 
102
            else:
 
103
                marker = result[0][0]
 
104
        query = "SELECT name,ttl,hide,body FROM messages WHERE queue=%d" % \
 
105
            self.rowid
 
106
        if match_hidden is False:
 
107
            query += " AND hide == 0"
 
108
        if marker is not None:
 
109
            query += " AND rowid > %d" % marker
 
110
        if limit is not None:
 
111
            query += " LIMIT %d" % limit
 
112
        result = self.db.execute(query).fetchall()
 
113
        messages = []
 
114
        for row in result:
 
115
            messages.append(dict(id=row[0], ttl=row[1], hide=row[2],
 
116
                body=row[3]))
 
117
        return messages
 
118
 
 
119
    def update_messages(self, account, queue, limit, marker, match_hidden, ttl,
 
120
                        hide):
 
121
        messages = self.get_messages(account, queue, limit, marker,
 
122
            match_hidden)
 
123
        query = "UPDATE messages SET"
 
124
        comma = ''
 
125
        if ttl is not None:
 
126
            query += "%s ttl=%d" % (comma, ttl)
 
127
            comma = ','
 
128
        if hide is not None:
 
129
            query += "%s hide=%d" % (comma, hide)
 
130
            comma = ','
 
131
        if comma == '':
 
132
            return (False, message)
 
133
        ids = []
 
134
        for message in messages:
 
135
            ids.append(message['id'])
 
136
            if ttl is not None:
 
137
                message['ttl'] = ttl
 
138
            if hide is not None:
 
139
                message['hide'] = hide
 
140
        query += " WHERE queue=%d AND name IN (%s)" % \
 
141
            (self.rowid, ','.join(ids))
 
142
        self.db.execute(query)
 
143
        self.notify(account, queue)
 
144
        return messages
 
145
 
 
146
    def delete_message(self, account, queue, message_id):
 
147
        message = self.get_message(account, queue, message_id)
 
148
        if message is None:
 
149
            return None
 
150
        query = "DELETE FROM messages WHERE queue=%d AND name='%s'" % \
 
151
            (self.rowid, message_id)
 
152
        self.db.execute(query)
 
153
        query = "SELECT rowid FROM messages WHERE queue=%d LIMIT 1" % \
 
154
            self.rowid
 
155
        if len(self.db.execute(query).fetchall()) == 0:
 
156
            query = "DELETE FROM queues WHERE rowid=%d" % self.rowid
 
157
            self.db.execute(query)
 
158
        return message
 
159
 
 
160
    def get_message(self, account, queue, message_id):
 
161
        query = "SELECT name,ttl,hide,body FROM messages " \
 
162
            "WHERE queue=%d AND name='%s'" % (self.rowid, message_id)
 
163
        result = self.db.execute(query).fetchall()
 
164
        if len(result) == 0:
 
165
            return None
 
166
        row = result[0]
 
167
        return dict(id=row[0], ttl=row[1], hide=row[2], body=row[3])
 
168
 
 
169
    def put_message(self, account, queue, message_id, ttl, hide, body):
 
170
        query = "SELECT rowid FROM queues " \
 
171
            "WHERE account='%s' AND queue='%s'" % (account, queue)
 
172
        result = self.db.execute(query).fetchall()
 
173
        if len(result) == 0:
 
174
            query = "INSERT INTO queues VALUES ('%s', '%s')" % (account, queue)
 
175
            rowid = self.db.execute(query).lastrowid
 
176
        else:
 
177
            rowid = result[0][0]
 
178
        query = "SELECT rowid FROM messages WHERE queue=%d AND name='%s'" % \
 
179
            (rowid, message_id)
 
180
        result = self.db.execute(query).fetchall()
 
181
        if len(result) == 0:
 
182
            query = "INSERT INTO messages VALUES (%d, '%s', %d, %d, '%s')" % \
 
183
                (rowid, message_id, ttl, hide, body)
 
184
            self.db.execute(query)
 
185
            self.notify(account, queue)
 
186
            return True
 
187
        query = "UPDATE messages SET ttl=%d, hide=%d, body='%s'" \
 
188
            "WHERE rowid=%d" % (ttl, hide, body, result[0][0])
 
189
        self.db.execute(query)
 
190
        if hide == 0:
 
191
            self.notify(account, queue)
 
192
        return False
 
193
 
 
194
    def update_message(self, account, queue, message_id, ttl, hide):
 
195
        message = self.get_message(account, queue, message_id)
 
196
        if message is None:
 
197
            return None
 
198
        query = "UPDATE messages SET"
 
199
        comma = ''
 
200
        if ttl is not None:
 
201
            query += "%s ttl=%d" % (comma, ttl)
 
202
            comma = ','
 
203
        if hide is not None:
 
204
            query += "%s hide=%d" % (comma, hide)
 
205
            comma = ','
 
206
        if comma == '':
 
207
            return message
 
208
        query += " WHERE queue=%d AND name='%s'" % (self.rowid, message_id)
 
209
        self.db.execute(query)
 
210
        if hide == 0:
 
211
            self.notify(account, queue)
 
212
        return message
 
213
 
 
214
    def clean(self):
 
215
        now = int(time.time())
 
216
        query = "SELECT rowid,queue FROM messages " \
 
217
            "WHERE ttl > 0 AND ttl <= %d" % now
 
218
        result = self.db.execute(query).fetchall()
 
219
        if len(result) > 0:
 
220
            messages = []
 
221
            queues = []
 
222
            for row in result:
 
223
                messages.append(str(row[0]))
 
224
                queues.append(row[1])
 
225
            query = 'DELETE FROM messages WHERE rowid in (%s)' % \
 
226
                ','.join(messages)
 
227
            self.db.execute(query)
 
228
            for queue in queues:
 
229
                query = "SELECT rowid FROM messages WHERE queue=%d LIMIT 1" % \
 
230
                    queue
 
231
                if len(self.db.execute(query).fetchall()) == 0:
 
232
                    query = "DELETE FROM queues WHERE rowid=%d" % queue
 
233
                    self.db.execute(query)
 
234
        query = "SELECT rowid,queue FROM messages WHERE " \
 
235
            "hide > 0 AND hide <= %d" % now
 
236
        result = self.db.execute(query).fetchall()
 
237
        if len(result) > 0:
 
238
            messages = []
 
239
            queues = []
 
240
            for row in result:
 
241
                messages.append(str(row[0]))
 
242
                queues.append(row[1])
 
243
            query = 'UPDATE messages SET hide=0 WHERE rowid in (%s)' % \
 
244
                ','.join(messages)
 
245
            self.db.execute(query)
 
246
            for queue in queues:
 
247
                query = "SELECT account,queue FROM queues WHERE rowid=%d" % \
 
248
                    queue
 
249
                result = self.db.execute(query).fetchall()[0]
 
250
                self.notify(result[0], result[1])