~fusion-developers/entropy-store/818855-limit-upload-scheduler

« back to all changes in this revision

Viewing changes to entropy/store.py

  • Committer: Tristan Seligmann
  • Date: 2009-05-30 23:07:17 UTC
  • Revision ID: mithrandi@mithrandi.za.net-20090530230717-y8ujqfw3rjj957i5
Mix interfaces up; add IContentObject interface and make IContentStore asynchronous.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
"""
 
2
Object data store.
 
3
 
 
4
This service acts as a cache / access point for a backend object store;
 
5
currently Amazon S3 is used as the backend store, but the architecture should
 
6
be flexible enough to allow other possibilities. The system is designed to
 
7
handle objects in an immutable fashion; once an object is created, it exists in
 
8
perpetuity, and the contents will never change.
 
9
 
 
10
The service's functionality is two-fold; firstly, it handles requests for
 
11
retrieval of objects, servicing them from the local cache, fetching them from a
 
12
neighbour cache, or retrieving them from the backend store. Secondly, it
 
13
handles requests for storage of a new object; the object is first cached
 
14
locally to ensure local view consistency, and then queued for backend storage
 
15
in a reliable fashion.
 
16
"""
 
17
import hashlib
 
18
 
 
19
from zope.interface import implements
 
20
 
 
21
from epsilon.extime import Time
 
22
 
 
23
from axiom.item import Item, transacted
 
24
from axiom.attributes import text, path, timestamp, AND, inmemory
 
25
from axiom.dependency import dependsOn
 
26
 
 
27
from twisted.web import http
 
28
from twisted.python.components import registerAdapter
 
29
 
 
30
from nevow.inevow import IResource, IRequest
 
31
from nevow.static import File
 
32
from nevow.rend import NotFound
 
33
from nevow.url import URL
 
34
 
 
35
from entropy.ientropy import IContentStore, IContentObject
 
36
from entropy.errors import CorruptObject, NonexistentObject
 
37
from entropy.hash import getHash
 
38
from entropy.util import deferred
 
39
 
 
40
 
 
41
class ImmutableObject(Item):
 
42
    """
 
43
    An immutable object.
 
44
 
 
45
    Immutable objects are addressed by content hash, and consist of the object
 
46
    data as a binary blob, and object key/value metadata pairs.
 
47
    """
 
48
    implements(IContentObject)
 
49
 
 
50
    hash = text(allowNone=False)
 
51
    contentDigest = text(allowNone=False)
 
52
    content = path(allowNone=False)
 
53
    contentType = text(allowNone=False)
 
54
    created = timestamp(allowNone=False, defaultFactory=lambda: Time())
 
55
 
 
56
    @property
 
57
    def metadata(self):
 
58
        return {}
 
59
 
 
60
    @property
 
61
    def objectId(self):
 
62
        return u'%s:%s' % (self.hash, self.contentDigest)
 
63
 
 
64
    def _getDigest(self):
 
65
        fp = self.content.open()
 
66
        try:
 
67
            h = getHash(self.hash)(fp.read())
 
68
            return unicode(h.hexdigest(), 'ascii')
 
69
        finally:
 
70
            fp.close()
 
71
 
 
72
    def verify(self):
 
73
        digest = self._getDigest()
 
74
        if self.contentDigest != digest:
 
75
            raise CorruptObject('expected: %r actual: %r' % (self.contentDigest, digest))
 
76
 
 
77
    def getContent(self):
 
78
        return self.content.path.getContent()
 
79
 
 
80
def objectResource(obj):
 
81
    """
 
82
    Adapt L{ImmutableObject) to L{IResource}.
 
83
    """
 
84
    # XXX: Not sure if we should do this on every single resource retrieval.
 
85
    obj.verify()
 
86
    return File(obj.content.path, defaultType=obj.contentType.encode('ascii'))
 
87
 
 
88
registerAdapter(objectResource, ImmutableObject, IResource)
 
89
 
 
90
 
 
91
class ContentStore(Item):
 
92
    """
 
93
    Manager for stored objects.
 
94
    """
 
95
    implements(IContentStore)
 
96
 
 
97
    hash = text(allowNone=False, default=u'sha256')
 
98
 
 
99
    # IContentStore
 
100
 
 
101
    @deferred
 
102
    @transacted
 
103
    def storeObject(self, content, contentType, metadata={}, created=None):
 
104
        if metadata != {}:
 
105
            raise NotImplementedError('metadata not yet supported')
 
106
 
 
107
        contentDigest = unicode(getHash(self.hash)(content).hexdigest(), 'ascii')
 
108
 
 
109
        obj = self.store.findUnique(ImmutableObject,
 
110
                                    AND(ImmutableObject.hash == self.hash,
 
111
                                        ImmutableObject.contentDigest == contentDigest),
 
112
                                    default=None)
 
113
        if obj is None:
 
114
            contentFile = self.store.newFile('objects', 'immutable', '%s:%s' % (self.hash, contentDigest))
 
115
            try:
 
116
                contentFile.write(content)
 
117
                contentFile.close()
 
118
            except:
 
119
                contentFile.abort()
 
120
                raise
 
121
 
 
122
            obj = ImmutableObject(store=self.store,
 
123
                                  contentDigest=contentDigest,
 
124
                                  hash=self.hash,
 
125
                                  content=contentFile.finalpath,
 
126
                                  contentType=contentType)
 
127
        else:
 
128
            obj.contentType = contentType
 
129
            obj.created = created
 
130
 
 
131
        return obj.objectId
 
132
 
 
133
    def importObject(self, obj):
 
134
        """
 
135
        Import an object from elsewhere.
 
136
 
 
137
        @param obj: the object to import.
 
138
        @type obj: IContentObject
 
139
        """
 
140
        return self.storeObject(obj.getContent(),
 
141
                                obj.contentType,
 
142
                                obj.metadata,
 
143
                                obj.created)
 
144
 
 
145
    @deferred
 
146
    @transacted
 
147
    def getObject(self, objectId):
 
148
        hash, contentDigest = objectId.split(u':', 1)
 
149
        obj = self.store.findUnique(ImmutableObject,
 
150
                                    AND(ImmutableObject.hash == hash,
 
151
                                        ImmutableObject.contentDigest == contentDigest),
 
152
                                    default=None)
 
153
        if obj is None:
 
154
            raise NonexistentObject(objectId)
 
155
        return obj
 
156
 
 
157
 
 
158
class ObjectCreator(object):
 
159
    """
 
160
    Resource for storing new objects.
 
161
 
 
162
    @ivar contentStore: The {IContentStore} provider to create objects in.
 
163
    """
 
164
    implements(IResource)
 
165
 
 
166
    def __init__(self, contentStore):
 
167
        self.contentStore = contentStore
 
168
 
 
169
    # IResource
 
170
    def renderHTTP(self, ctx):
 
171
        req = IRequest(ctx)
 
172
        if req.method == 'GET':
 
173
            return 'PUT data here to create an object.'
 
174
        elif req.method == 'PUT':
 
175
            return self.handlePUT(req)
 
176
        else:
 
177
            req.setResponseCode(http.NOT_ALLOWED)
 
178
            req.setHeader('Content-Type', 'text/plain')
 
179
            return 'Method not allowed'
 
180
 
 
181
    def handlePUT(self, req):
 
182
        data = req.content.read()
 
183
        contentType = unicode(req.getHeader('Content-Type') or 'application/octet-stream', 'ascii')
 
184
 
 
185
        contentMD5 = req.getHeader('Content-MD5')
 
186
        if contentMD5 is not None:
 
187
            expectedHash = contentMD5.decode('base64')
 
188
            actualHash = hashlib.md5(data).digest()
 
189
            if expectedHash != actualHash:
 
190
                raise ValueError('Expected hash %r does not match actual hash %r' % (expectedHash, actualHash))
 
191
 
 
192
        def _cb(objectId):
 
193
            req.setHeader('Content-Type', 'text/plain')
 
194
            objectId = objectId.encode('ascii')
 
195
            return objectId
 
196
 
 
197
        d = self.contentStore.storeObject(data, contentType)
 
198
        return d.addCallback(_cb)
 
199
 
 
200
 
 
201
class ContentResource(Item):
 
202
    """
 
203
    Resource for accessing the content store.
 
204
    """
 
205
    implements(IResource)
 
206
    powerupInterfaces = [IResource]
 
207
 
 
208
    addSlash = inmemory()
 
209
 
 
210
    contentStore = dependsOn(ContentStore)
 
211
 
 
212
    def childFactory(self, name):
 
213
        """
 
214
        Hook up children.
 
215
 
 
216
        / is the root, nothing to see her.
 
217
 
 
218
        /new is how new objects are stored.
 
219
 
 
220
        /<objectId> is where existing objects are retrieved.
 
221
        """
 
222
        if name == '':
 
223
            return self
 
224
        elif name == 'new':
 
225
            return ObjectCreator(self.contentStore)
 
226
        else:
 
227
            try:
 
228
                obj = self.contentStore.getObject(name)
 
229
            except NonexistentObject:
 
230
                pass
 
231
            else:
 
232
                return obj
 
233
        return None
 
234
 
 
235
    # IResource
 
236
    def renderHTTP(self, ctx):
 
237
        """
 
238
        Nothing to see here.
 
239
        """
 
240
        return 'Entropy'
 
241
 
 
242
    def locateChild(self, ctx, segments):
 
243
        """
 
244
        Dispatch to L{childFactory}.
 
245
        """
 
246
        if len(segments) >= 1:
 
247
            res = self.childFactory(segments[0])
 
248
            if res is not None:
 
249
                return IResource(res), segments[1:]
 
250
        return NotFound