4
This service acts as a cache / access point for a backend object store;
5
currently Amazon S3 is used as the backend store, but the architecture should
6
be flexible enough to allow other possibilities. The system is designed to
7
handle objects in an immutable fashion; once an object is created, it exists in
8
perpetuity, and the contents will never change.
10
The service's functionality is two-fold; firstly, it handles requests for
11
retrieval of objects, servicing them from the local cache, fetching them from a
12
neighbour cache, or retrieving them from the backend store. Secondly, it
13
handles requests for storage of a new object; the object is first cached
14
locally to ensure local view consistency, and then queued for backend storage
15
in a reliable fashion.
19
from zope.interface import implements
21
from epsilon.extime import Time
23
from axiom.item import Item, transacted
24
from axiom.attributes import text, path, timestamp, AND, inmemory
25
from axiom.dependency import dependsOn
27
from twisted.web import http
28
from twisted.python.components import registerAdapter
30
from nevow.inevow import IResource, IRequest
31
from nevow.static import File
32
from nevow.rend import NotFound
33
from nevow.url import URL
35
from entropy.ientropy import IContentStore, IContentObject
36
from entropy.errors import CorruptObject, NonexistentObject
37
from entropy.hash import getHash
38
from entropy.util import deferred
41
class ImmutableObject(Item):
45
Immutable objects are addressed by content hash, and consist of the object
46
data as a binary blob, and object key/value metadata pairs.
48
implements(IContentObject)
50
hash = text(allowNone=False)
51
contentDigest = text(allowNone=False)
52
content = path(allowNone=False)
53
contentType = text(allowNone=False)
54
created = timestamp(allowNone=False, defaultFactory=lambda: Time())
62
return u'%s:%s' % (self.hash, self.contentDigest)
65
fp = self.content.open()
67
h = getHash(self.hash)(fp.read())
68
return unicode(h.hexdigest(), 'ascii')
73
digest = self._getDigest()
74
if self.contentDigest != digest:
75
raise CorruptObject('expected: %r actual: %r' % (self.contentDigest, digest))
78
return self.content.path.getContent()
80
def objectResource(obj):
82
Adapt L{ImmutableObject) to L{IResource}.
84
# XXX: Not sure if we should do this on every single resource retrieval.
86
return File(obj.content.path, defaultType=obj.contentType.encode('ascii'))
88
registerAdapter(objectResource, ImmutableObject, IResource)
91
class ContentStore(Item):
93
Manager for stored objects.
95
implements(IContentStore)
97
hash = text(allowNone=False, default=u'sha256')
103
def storeObject(self, content, contentType, metadata={}, created=None):
105
raise NotImplementedError('metadata not yet supported')
107
contentDigest = unicode(getHash(self.hash)(content).hexdigest(), 'ascii')
109
obj = self.store.findUnique(ImmutableObject,
110
AND(ImmutableObject.hash == self.hash,
111
ImmutableObject.contentDigest == contentDigest),
114
contentFile = self.store.newFile('objects', 'immutable', '%s:%s' % (self.hash, contentDigest))
116
contentFile.write(content)
122
obj = ImmutableObject(store=self.store,
123
contentDigest=contentDigest,
125
content=contentFile.finalpath,
126
contentType=contentType)
128
obj.contentType = contentType
129
obj.created = created
133
def importObject(self, obj):
135
Import an object from elsewhere.
137
@param obj: the object to import.
138
@type obj: IContentObject
140
return self.storeObject(obj.getContent(),
147
def getObject(self, objectId):
148
hash, contentDigest = objectId.split(u':', 1)
149
obj = self.store.findUnique(ImmutableObject,
150
AND(ImmutableObject.hash == hash,
151
ImmutableObject.contentDigest == contentDigest),
154
raise NonexistentObject(objectId)
158
class ObjectCreator(object):
160
Resource for storing new objects.
162
@ivar contentStore: The {IContentStore} provider to create objects in.
164
implements(IResource)
166
def __init__(self, contentStore):
167
self.contentStore = contentStore
170
def renderHTTP(self, ctx):
172
if req.method == 'GET':
173
return 'PUT data here to create an object.'
174
elif req.method == 'PUT':
175
return self.handlePUT(req)
177
req.setResponseCode(http.NOT_ALLOWED)
178
req.setHeader('Content-Type', 'text/plain')
179
return 'Method not allowed'
181
def handlePUT(self, req):
182
data = req.content.read()
183
contentType = unicode(req.getHeader('Content-Type') or 'application/octet-stream', 'ascii')
185
contentMD5 = req.getHeader('Content-MD5')
186
if contentMD5 is not None:
187
expectedHash = contentMD5.decode('base64')
188
actualHash = hashlib.md5(data).digest()
189
if expectedHash != actualHash:
190
raise ValueError('Expected hash %r does not match actual hash %r' % (expectedHash, actualHash))
193
req.setHeader('Content-Type', 'text/plain')
194
objectId = objectId.encode('ascii')
197
d = self.contentStore.storeObject(data, contentType)
198
return d.addCallback(_cb)
201
class ContentResource(Item):
203
Resource for accessing the content store.
205
implements(IResource)
206
powerupInterfaces = [IResource]
208
addSlash = inmemory()
210
contentStore = dependsOn(ContentStore)
212
def childFactory(self, name):
216
/ is the root, nothing to see her.
218
/new is how new objects are stored.
220
/<objectId> is where existing objects are retrieved.
225
return ObjectCreator(self.contentStore)
228
obj = self.contentStore.getObject(name)
229
except NonexistentObject:
236
def renderHTTP(self, ctx):
242
def locateChild(self, ctx, segments):
244
Dispatch to L{childFactory}.
246
if len(segments) >= 1:
247
res = self.childFactory(segments[0])
249
return IResource(res), segments[1:]