~ntt-pf-lab/nova/monkey_patch_notification

« back to all changes in this revision

Viewing changes to nova/datastore.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
 
2
 
 
3
# Copyright [2010] [Anso Labs, LLC]
 
4
#
 
5
#    Licensed under the Apache License, Version 2.0 (the "License");
 
6
#    you may not use this file except in compliance with the License.
 
7
#    You may obtain a copy of the License at
 
8
#
 
9
#        http://www.apache.org/licenses/LICENSE-2.0
 
10
#
 
11
#    Unless required by applicable law or agreed to in writing, software
 
12
#    distributed under the License is distributed on an "AS IS" BASIS,
 
13
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
14
#    See the License for the specific language governing permissions and
 
15
#    limitations under the License.
 
16
 
 
17
"""
 
18
Datastore:
 
19
 
 
20
Providers the Keeper class, a simple pseudo-dictionary that
 
21
persists on disk.
 
22
 
 
23
MAKE Sure that ReDIS is running, and your flags are set properly,
 
24
before trying to run this.
 
25
"""
 
26
 
 
27
import json
 
28
import logging
 
29
import os
 
30
import sqlite3
 
31
 
 
32
from nova import vendor
 
33
import redis
 
34
 
 
35
from nova import flags
 
36
from nova import utils
 
37
 
 
38
 
 
39
FLAGS = flags.FLAGS
 
40
flags.DEFINE_string('datastore_path', utils.abspath('../keeper'),
 
41
                    'where keys are stored on disk')
 
42
flags.DEFINE_string('redis_host', '127.0.0.1',
 
43
                    'Host that redis is running on.')
 
44
flags.DEFINE_integer('redis_port', 6379,
 
45
                    'Port that redis is running on.')
 
46
flags.DEFINE_integer('redis_db', 0, 'Multiple DB keeps tests away')
 
47
flags.DEFINE_string('keeper_backend', 'redis',
 
48
                    'which backend to use for keeper')
 
49
 
 
50
 
 
51
class Redis(object):
 
52
    def __init__(self):
 
53
        if hasattr(self.__class__, '_instance'):
 
54
            raise Exception('Attempted to instantiate singleton')
 
55
 
 
56
    @classmethod
 
57
    def instance(cls):
 
58
        if not hasattr(cls, '_instance'):
 
59
            inst = redis.Redis(host=FLAGS.redis_host, port=FLAGS.redis_port, db=FLAGS.redis_db)
 
60
            cls._instance = inst
 
61
        return cls._instance
 
62
 
 
63
 
 
64
class RedisModel(object):
 
65
    """ Wrapper around redis-backed properties """
 
66
    object_type = 'generic'
 
67
    def __init__(self, object_id):
 
68
        """ loads an object from the datastore if exists """
 
69
        self.object_id = object_id
 
70
        self.initial_state = {}
 
71
        self.state = Redis.instance().hgetall(self.__redis_key)
 
72
        if self.state:
 
73
            self.initial_state = self.state
 
74
        else:
 
75
            self.set_default_state()
 
76
 
 
77
    def set_default_state(self):
 
78
        self.state = {'state' : 'pending'}
 
79
        self.state[self.object_type+"_id"] = self.object_id
 
80
 
 
81
    @property
 
82
    def __redis_key(self):
 
83
        """ Magic string for instance keys """
 
84
        return '%s:%s' % (self.object_type, self.object_id)
 
85
 
 
86
    def __repr__(self):
 
87
        return "<%s:%s>" % (self.object_type, self.object_id)
 
88
 
 
89
    def __str__(self):
 
90
        return str(self.state)
 
91
 
 
92
    def keys(self):
 
93
        return self.state.keys()
 
94
 
 
95
    def copy(self):
 
96
        copyDict = {}
 
97
        for item in self.keys():
 
98
            copyDict[item] = self[item]
 
99
        return copyDict
 
100
 
 
101
    def get(self, item, default):
 
102
        return self.state.get(item, default)
 
103
 
 
104
    def __getitem__(self, item):
 
105
        return self.state[item]
 
106
 
 
107
    def __setitem__(self, item, val):
 
108
        self.state[item] = val
 
109
        return self.state[item]
 
110
 
 
111
    def __delitem__(self, item):
 
112
        """ We don't support this """
 
113
        raise Exception("Silly monkey, we NEED all our properties.")
 
114
 
 
115
    def save(self):
 
116
        """ update the directory with the state from this instance """
 
117
        # TODO(ja): implement hmset in redis-py and use it
 
118
        # instead of multiple calls to hset
 
119
        for key, val in self.state.iteritems():
 
120
            # if (not self.initial_state.has_key(key)
 
121
            # or self.initial_state[key] != val):
 
122
                Redis.instance().hset(self.__redis_key, key, val)
 
123
        if self.initial_state == {}:
 
124
            self.first_save()
 
125
        self.initial_state = self.state
 
126
        return True
 
127
 
 
128
    def first_save(self):
 
129
        pass
 
130
 
 
131
    def destroy(self):
 
132
        """ deletes all related records from datastore.
 
133
         does NOT do anything to running state.
 
134
        """
 
135
        Redis.instance().delete(self.__redis_key)
 
136
        return True
 
137
 
 
138
 
 
139
def slugify(key, prefix=None):
 
140
    """
 
141
    Key has to be a valid filename. Slugify solves that.
 
142
    """
 
143
    return "%s%s" % (prefix, key)
 
144
 
 
145
 
 
146
class SqliteKeeper(object):
 
147
    """ Keeper implementation in SQLite, mostly for in-memory testing """
 
148
    _conn = {} # class variable
 
149
 
 
150
    def __init__(self, prefix):
 
151
        self.prefix = prefix
 
152
 
 
153
    @property
 
154
    def conn(self):
 
155
        if self.prefix not in self.__class__._conn:
 
156
            logging.debug('no sqlite connection (%s), making new', self.prefix)
 
157
            if FLAGS.datastore_path != ':memory:':
 
158
                try:
 
159
                    os.mkdir(FLAGS.datastore_path)
 
160
                except Exception:
 
161
                    pass
 
162
                conn = sqlite3.connect(os.path.join(
 
163
                    FLAGS.datastore_path, '%s.sqlite' % self.prefix))
 
164
            else:
 
165
                conn = sqlite3.connect(':memory:')
 
166
 
 
167
            c = conn.cursor()
 
168
            try:
 
169
                c.execute('''CREATE TABLE data (item text, value text)''')
 
170
                conn.commit()
 
171
            except Exception:
 
172
                logging.exception('create table failed')
 
173
            finally:
 
174
                c.close()
 
175
 
 
176
            self.__class__._conn[self.prefix] = conn
 
177
 
 
178
        return self.__class__._conn[self.prefix]
 
179
 
 
180
    def __delitem__(self, item):
 
181
        #logging.debug('sqlite deleting %s', item)
 
182
        c = self.conn.cursor()
 
183
        try:
 
184
            c.execute('DELETE FROM data WHERE item = ?', (item, ))
 
185
            self.conn.commit()
 
186
        except Exception:
 
187
            logging.exception('delete failed: %s', item)
 
188
        finally:
 
189
            c.close()
 
190
 
 
191
    def __getitem__(self, item):
 
192
        #logging.debug('sqlite getting %s', item)
 
193
        result = None
 
194
        c = self.conn.cursor()
 
195
        try:
 
196
            c.execute('SELECT value FROM data WHERE item = ?', (item, ))
 
197
            row = c.fetchone()
 
198
            if row:
 
199
                result = json.loads(row[0])
 
200
            else:
 
201
                result = None
 
202
        except Exception:
 
203
            logging.exception('select failed: %s', item)
 
204
        finally:
 
205
            c.close()
 
206
        #logging.debug('sqlite got %s: %s', item, result)
 
207
        return result
 
208
 
 
209
    def __setitem__(self, item, value):
 
210
        serialized_value = json.dumps(value)
 
211
        insert = True
 
212
        if self[item] is not None:
 
213
            insert = False
 
214
        #logging.debug('sqlite insert %s: %s', item, value)
 
215
        c = self.conn.cursor()
 
216
        try:
 
217
            if insert:
 
218
                c.execute('INSERT INTO data VALUES (?, ?)',
 
219
                         (item, serialized_value))
 
220
            else:
 
221
                c.execute('UPDATE data SET item=?, value=? WHERE item = ?',
 
222
                          (item, serialized_value, item))
 
223
 
 
224
            self.conn.commit()
 
225
        except Exception:
 
226
            logging.exception('select failed: %s', item)
 
227
        finally:
 
228
            c.close()
 
229
 
 
230
    def clear(self):
 
231
        if self.prefix not in self.__class__._conn:
 
232
            return
 
233
        self.conn.close()
 
234
        if FLAGS.datastore_path != ':memory:':
 
235
            os.unlink(os.path.join(FLAGS.datastore_path, '%s.sqlite' % self.prefix))
 
236
        del self.__class__._conn[self.prefix]
 
237
 
 
238
    def clear_all(self):
 
239
        for k, conn in self.__class__._conn.iteritems():
 
240
            conn.close()
 
241
            if FLAGS.datastore_path != ':memory:':
 
242
                os.unlink(os.path.join(FLAGS.datastore_path,
 
243
                                       '%s.sqlite' % self.prefix))
 
244
        self.__class__._conn = {}
 
245
 
 
246
 
 
247
    def set_add(self, item, value):
 
248
        group = self[item]
 
249
        if not group:
 
250
            group = []
 
251
        group.append(value)
 
252
        self[item] = group
 
253
 
 
254
    def set_is_member(self, item, value):
 
255
        group = self[item]
 
256
        if not group:
 
257
            return False
 
258
        return value in group
 
259
 
 
260
    def set_remove(self, item, value):
 
261
        group = self[item]
 
262
        if not group:
 
263
            group = []
 
264
        group.remove(value)
 
265
        self[item] = group
 
266
 
 
267
    def set_fetch(self, item):
 
268
        # TODO(termie): I don't really know what set_fetch is supposed to do
 
269
        group = self[item]
 
270
        if not group:
 
271
            group = []
 
272
        return iter(group)
 
273
 
 
274
class JsonKeeper(object):
 
275
    """
 
276
    Simple dictionary class that persists using
 
277
    JSON in files saved to disk.
 
278
    """
 
279
    def __init__(self, prefix):
 
280
        self.prefix = prefix
 
281
 
 
282
    def __delitem__(self, item):
 
283
        """
 
284
        Removing a key means deleting a file from disk.
 
285
        """
 
286
        item = slugify(item, self.prefix)
 
287
        path = "%s/%s" % (FLAGS.datastore_path, item)
 
288
        if os.path.isfile(path):
 
289
            os.remove(path)
 
290
 
 
291
    def __getitem__(self, item):
 
292
        """
 
293
        Fetch file contents and dejsonify them.
 
294
        """
 
295
        item = slugify(item, self.prefix)
 
296
        path = "%s/%s" % (FLAGS.datastore_path, item)
 
297
        if os.path.isfile(path):
 
298
            return json.load(open(path, 'r'))
 
299
        return None
 
300
 
 
301
    def __setitem__(self, item, value):
 
302
        """
 
303
        JSON encode value and save to file.
 
304
        """
 
305
        item = slugify(item, self.prefix)
 
306
        path = "%s/%s" % (FLAGS.datastore_path, item)
 
307
        with open(path, "w") as blobfile:
 
308
            blobfile.write(json.dumps(value))
 
309
        return value
 
310
 
 
311
 
 
312
class RedisKeeper(object):
 
313
    """
 
314
    Simple dictionary class that persists using
 
315
    ReDIS.
 
316
    """
 
317
    def __init__(self, prefix="redis-"):
 
318
        self.prefix = prefix
 
319
        Redis.instance().ping()
 
320
 
 
321
    def __setitem__(self, item, value):
 
322
        """
 
323
        JSON encode value and save to file.
 
324
        """
 
325
        item = slugify(item, self.prefix)
 
326
        Redis.instance().set(item, json.dumps(value))
 
327
        return value
 
328
 
 
329
    def __getitem__(self, item):
 
330
        item = slugify(item, self.prefix)
 
331
        value = Redis.instance().get(item)
 
332
        if value:
 
333
            return json.loads(value)
 
334
 
 
335
    def __delitem__(self, item):
 
336
        item = slugify(item, self.prefix)
 
337
        return Redis.instance().delete(item)
 
338
 
 
339
    def clear(self):
 
340
        raise NotImplementedError()
 
341
 
 
342
    def clear_all(self):
 
343
        raise NotImplementedError()
 
344
 
 
345
    def set_add(self, item, value):
 
346
        item = slugify(item, self.prefix)
 
347
        return Redis.instance().sadd(item, json.dumps(value))
 
348
 
 
349
    def set_is_member(self, item, value):
 
350
        item = slugify(item, self.prefix)
 
351
        return Redis.instance().sismember(item, json.dumps(value))
 
352
 
 
353
    def set_remove(self, item, value):
 
354
        item = slugify(item, self.prefix)
 
355
        return Redis.instance().srem(item, json.dumps(value))
 
356
 
 
357
    def set_fetch(self, item):
 
358
        item = slugify(item, self.prefix)
 
359
        for obj in  Redis.instance().sinter([item]):
 
360
            yield json.loads(obj)
 
361
 
 
362
 
 
363
def Keeper(prefix=''):
 
364
    KEEPERS = {'redis': RedisKeeper,
 
365
               'sqlite': SqliteKeeper}
 
366
    return KEEPERS[FLAGS.keeper_backend](prefix)
 
367