1
from StringIO import StringIO
3
from twisted.trial.unittest import TestCase
5
from axiom.store import Store
7
from nevow.testutil import FakeRequest
9
from entropy.store import ContentStore, ImmutableObject, ObjectCreator
11
class ContentStoreTests(TestCase):
13
Tests for L{ContentStore}.
16
self.store = Store(self.mktemp())
17
self.contentStore = ContentStore(store=self.store, hash=u'sha256')
19
def test_storeObject(self):
21
Test storing an object.
23
content = 'blahblah some data blahblah'
24
contentType = u'application/octet-stream'
25
expectedDigest = u'9aef0e119873bb0aab04e941d8f76daf21dedcd79e2024004766ee3b22ca9862'
27
d = self.contentStore.storeObject(content, contentType)
28
return d.addCallback(lambda objectId: self.assertEqual(objectId, u'sha256:%s' % expectedDigest))
30
def test_getObject(self):
32
Test retrieving object.
34
obj = ImmutableObject(store=self.store,
36
contentDigest=u'quux',
37
content=self.store.newFilePath('foo'),
38
contentType=u'application/octet-stream')
39
d = self.contentStore.getObject(u'somehash:quux')
40
return d.addCallback(lambda obj2: self.assertIdentical(obj, obj2))
43
class ObjectCreatorTests(TestCase):
45
self.store = Store(self.mktemp())
46
self.contentStore = ContentStore(store=self.store, hash=u'sha256')
47
self.creator = ObjectCreator(self.contentStore)
49
def test_correctContentMD5(self):
51
req.received_headers['content-md5'] = '72VMQKtPF0f8aZkV1PcJAg=='
52
req.content = StringIO('testdata')
53
return self.creator.handlePUT(req)
55
def test_incorrectContentMD5(self):
57
req.received_headers['content-md5'] = '72VMQKtPF0f8aZkV1PcJAg=='
58
req.content = StringIO('wrongdata')
59
self.assertRaises(ValueError, self.creator.handlePUT, req)
61
def test_missingContentMD5(self):
63
req.content = StringIO('wrongdata')
64
return self.creator.handlePUT(req)