~eday/burrow/prototype-conversion

« back to all changes in this revision

Viewing changes to burrowd/backend/memory.py

  • Committer: Eric Day
  • Date: 2011-03-17 23:42:41 UTC
  • Revision ID: eday@oddments.org-20110317234241-ult80xn9d1lon867
First chunk of code from prototype. Beyond the prototype, configuration, module loading, and log handling was added.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2011 OpenStack LLC.
 
2
#
 
3
# Licensed under the Apache License, Version 2.0 (the "License");
 
4
# you may not use this file except in compliance with the License.
 
5
# You may obtain a copy of the License at
 
6
#
 
7
#     http://www.apache.org/licenses/LICENSE-2.0
 
8
#
 
9
# Unless required by applicable law or agreed to in writing, software
 
10
# distributed under the License is distributed on an "AS IS" BASIS,
 
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
12
# See the License for the specific language governing permissions and
 
13
# limitations under the License.
 
14
 
 
15
'''Memory backend for the burrow server.'''
 
16
 
 
17
import time
 
18
 
 
19
import burrowd.backend
 
20
 
 
21
 
 
22
class Backend(burrowd.backend.Backend):
 
23
 
 
24
    def __init__(self, config):
 
25
        super(Backend, self).__init__(config)
 
26
        self.accounts = {}
 
27
 
 
28
    def delete_accounts(self):
 
29
        self.accounts.clear()
 
30
 
 
31
    def get_accounts(self):
 
32
        return self.accounts.keys()
 
33
 
 
34
    def delete_account(self, account):
 
35
        del self.accounts[account]
 
36
 
 
37
    def get_queues(self, account):
 
38
        if account not in self.accounts:
 
39
            return []
 
40
        return self.accounts[account].keys()
 
41
 
 
42
    def queue_exists(self, account, queue):
 
43
        return account in self.accounts and queue in self.accounts[account]
 
44
 
 
45
    def delete_messages(self, account, queue, limit, marker, match_hidden):
 
46
        messages = self._scan_queue(account, queue, limit, marker,
 
47
            match_hidden, delete=True)
 
48
        if len(self.accounts[account][queue]) == 0:
 
49
            del self.accounts[account][queue]
 
50
        if len(self.accounts[account]) == 0:
 
51
            del self.accounts[account]
 
52
        return messages
 
53
 
 
54
    def get_messages(self, account, queue, limit, marker, match_hidden):
 
55
        return self._scan_queue(account, queue, limit, marker, match_hidden)
 
56
 
 
57
    def update_messages(self, account, queue, limit, marker, match_hidden, ttl,
 
58
                        hide):
 
59
        return self._scan_queue(account, queue, limit, marker, match_hidden,
 
60
                                ttl=ttl, hide=hide)
 
61
 
 
62
    def delete_message(self, account, queue, message_id):
 
63
        for index in range(0, len(self.accounts[account][queue])):
 
64
            message = self.accounts[account][queue][index]
 
65
            if message['id'] == message_id:
 
66
                del self.accounts[account][queue][index]
 
67
                if len(self.accounts[account][queue]) == 0:
 
68
                    del self.accounts[account][queue]
 
69
                if len(self.accounts[account]) == 0:
 
70
                    del self.accounts[account]
 
71
                return message
 
72
        return None
 
73
 
 
74
    def get_message(self, account, queue, message_id):
 
75
        for index in range(0, len(self.accounts[account][queue])):
 
76
            message = self.accounts[account][queue][index]
 
77
            if message['id'] == message_id:
 
78
                return message
 
79
        return None
 
80
 
 
81
    def put_message(self, account, queue, message_id, ttl, hide, body):
 
82
        if account not in self.accounts:
 
83
            self.accounts[account] = {}
 
84
        if queue not in self.accounts[account]:
 
85
            self.accounts[account][queue] = []
 
86
        for index in range(0, len(self.accounts[account][queue])):
 
87
            message = self.accounts[account][queue][index]
 
88
            if message['id'] == message_id:
 
89
                message['ttl'] = ttl
 
90
                message['hide'] = hide
 
91
                message['body'] = body
 
92
                if hide == 0:
 
93
                    self.notify(account, queue)
 
94
                return False
 
95
        message = dict(id=message_id, ttl=ttl, hide=hide, body=body)
 
96
        self.accounts[account][queue].append(message)
 
97
        self.notify(account, queue)
 
98
        return True
 
99
 
 
100
    def update_message(self, account, queue, message_id, ttl, hide):
 
101
        for index in range(0, len(self.accounts[account][queue])):
 
102
            message = self.accounts[account][queue][index]
 
103
            if message['id'] == message_id:
 
104
                if ttl is not None:
 
105
                    message['ttl'] = ttl
 
106
                if hide is not None:
 
107
                    message['hide'] = hide
 
108
                    if hide == 0:
 
109
                        self.notify(account, queue)
 
110
                return message
 
111
        return None
 
112
 
 
113
    def clean(self):
 
114
        now = int(time.time())
 
115
        for account in self.accounts.keys():
 
116
            for queue in self.accounts[account].keys():
 
117
                index = 0
 
118
                notify = False
 
119
                total = len(self.accounts[account][queue])
 
120
                while index < total:
 
121
                    message = self.accounts[account][queue][index]
 
122
                    if 0 < message['ttl'] <= now:
 
123
                        del self.accounts[account][queue][index]
 
124
                        total -= 1
 
125
                    else:
 
126
                        if 0 < message['hide'] <= now:
 
127
                            message['hide'] = 0
 
128
                            notify = True
 
129
                        index += 1
 
130
                if notify:
 
131
                    self.notify(account, queue)
 
132
                if len(self.accounts[account][queue]) == 0:
 
133
                    del self.accounts[account][queue]
 
134
            if len(self.accounts[account]) == 0:
 
135
                del self.accounts[account]
 
136
 
 
137
    def _scan_queue(self, account, queue, limit, marker, match_hidden,
 
138
                    ttl=None, hide=None, delete=False):
 
139
        index = 0
 
140
        notify = False
 
141
        if marker is not None:
 
142
            found = False
 
143
            for index in range(0, len(self.accounts[account][queue])):
 
144
                message = self.accounts[account][queue][index]
 
145
                if message['id'] == marker:
 
146
                    index += 1
 
147
                    found = True
 
148
                    break
 
149
            if not found:
 
150
                index = 0
 
151
        messages = []
 
152
        total = len(self.accounts[account][queue])
 
153
        while index < total:
 
154
            message = self.accounts[account][queue][index]
 
155
            if not match_hidden and message['hide'] != 0:
 
156
                index += 1
 
157
                continue
 
158
            if ttl is not None:
 
159
                message['ttl'] = ttl
 
160
            if hide is not None:
 
161
                message['hide'] = hide
 
162
                if hide == 0:
 
163
                    notify = True
 
164
            if delete:
 
165
                del self.accounts[account][queue][index]
 
166
                total -= 1
 
167
            else:
 
168
                index += 1
 
169
            messages.append(message)
 
170
            if limit:
 
171
                limit -= 1
 
172
                if limit == 0:
 
173
                    break
 
174
        if notify:
 
175
            self.notify(account, queue)
 
176
        return messages