399
453
FILEPATH = settings.MEDIA_ROOT
402
super(FileStorageTest, self).setUp()
455
def make_upload_dir(self):
456
"""Create the upload directory, and arrange for eventual deletion.
458
The directory must not already exist. If it does, this method will
459
fail rather than arrange for deletion of a directory that may
460
contain meaningful data.
462
:return: Absolute path to the `FileStorage` upload directory. This
463
is the directory where the actual files are stored.
465
# These will blow up if either directory already exists. Which
466
# is brittle, but probably for the best since we ruthlessly
467
# delete them on cleanup!
403
468
os.mkdir(self.FILEPATH)
469
upload_dir = os.path.join(self.FILEPATH, FileStorage.upload_dir)
404
471
self.addCleanup(shutil.rmtree, self.FILEPATH)
406
def test_creation(self):
407
storage = factory.make_file_storage(filename="myfile", data=b"mydata")
408
expected = ["myfile", "mydata"]
409
actual = [storage.filename, storage.data.read()]
410
self.assertEqual(expected, actual)
412
def test_creation_writes_a_file(self):
413
# The development settings say to write a file starting at
414
# /var/tmp/maas, so check one is actually written there. The field
415
# itself is hard-coded to make a directory called "storage".
416
factory.make_file_storage(filename="myfile", data=b"mydata")
418
expected_filename = os.path.join(
419
self.FILEPATH, "storage", "myfile")
421
with open(expected_filename) as f:
422
self.assertEqual("mydata", f.read())
474
def get_media_path(self, filename):
475
"""Get the path to a given stored file, relative to MEDIA_ROOT."""
476
return os.path.join(FileStorage.upload_dir, filename)
478
def make_data(self, including_text='data'):
479
"""Return arbitrary data.
481
:param including_text: Text to include in the data. Leave something
482
here to make failure messages more recognizable.
483
:type including_text: basestring
484
:return: A string of bytes, including `including_text`.
487
# Note that this won't automatically insert any non-ASCII bytes.
488
# Proper handling of real binary data is tested separately.
489
text = "%s %s" % (including_text, factory.getRandomString())
490
return text.encode('ascii')
492
def age_file(self, path, seconds=None):
493
"""Make the file at `path` look like it hasn't been touched recently.
495
Decrements the file's mtime by a bit over a day.
498
seconds = FileStorage.objects.grace_time + 1
499
stat_result = os.stat(path)
500
atime = stat_result.st_atime
501
mtime = stat_result.st_mtime
502
os.utime(path, (atime, mtime - seconds))
504
def test_get_existing_storage_returns_None_if_none_found(self):
505
nonexistent_file = factory.getRandomString()
507
FileStorage.objects.get_existing_storage(nonexistent_file))
509
def test_get_existing_storage_finds_FileStorage(self):
510
self.make_upload_dir()
511
storage = factory.make_file_storage()
514
FileStorage.objects.get_existing_storage(storage.filename))
516
def test_save_file_creates_storage(self):
517
self.make_upload_dir()
518
filename = factory.getRandomString()
519
data = self.make_data()
520
storage = FileStorage.objects.save_file(filename, BytesIO(data))
523
(storage.filename, storage.data.read()))
525
def test_storage_can_be_retrieved(self):
526
self.make_upload_dir()
527
filename = factory.getRandomString()
528
data = self.make_data()
529
factory.make_file_storage(filename=filename, data=data)
530
storage = FileStorage.objects.get(filename=filename)
533
(storage.filename, storage.data.read()))
424
535
def test_stores_binary_data(self):
536
self.make_upload_dir()
425
538
# This horrible binary data could never, ever, under any
426
# encoding known to man be intepreted as text. Switch the bytes
427
# of the byte-order mark around and by design you get an invalid
428
# codepoint; put a byte with the high bit set between bytes that
429
# have it cleared, and you have a guaranteed non-UTF-8 sequence.
539
# encoding known to man be interpreted as text(1). Switch the
540
# bytes of the byte-order mark around and by design you get an
541
# invalid codepoint; put a byte with the high bit set between bytes
542
# that have it cleared, and you have a guaranteed non-UTF-8
545
# (1) Provided, of course, that man know only about ASCII and
430
547
binary_data = codecs.BOM64_LE + codecs.BOM64_BE + b'\x00\xff\x00'
431
549
# And yet, because FileStorage supports binary data, it comes
433
551
storage = factory.make_file_storage(filename="x", data=binary_data)
434
552
self.assertEqual(binary_data, storage.data.read())
554
def test_overwrites_file(self):
555
# If a file of the same name has already been stored, the
556
# reference to the old data gets overwritten with one to the new
557
# data. They are actually different files on the filesystem.
558
self.make_upload_dir()
559
filename = 'filename-%s' % factory.getRandomString()
560
old_storage = factory.make_file_storage(
561
filename=filename, data=self.make_data('old data'))
562
new_data = self.make_data('new-data')
563
new_storage = factory.make_file_storage(
564
filename=filename, data=new_data)
565
self.assertNotEqual(old_storage.data.name, new_storage.data.name)
567
new_data, FileStorage.objects.get(filename=filename).data.read())
569
def test_list_stored_files_lists_files(self):
570
upload_dir = self.make_upload_dir()
571
filename = factory.getRandomString()
572
with open(os.path.join(upload_dir, filename), 'w') as f:
573
f.write(self.make_data())
575
self.get_media_path(filename),
576
FileStorage.objects.list_stored_files())
578
def test_list_stored_files_includes_referenced_files(self):
579
self.make_upload_dir()
580
storage = factory.make_file_storage()
582
storage.data.name, FileStorage.objects.list_stored_files())
584
def test_list_referenced_files_lists_FileStorage_files(self):
585
self.make_upload_dir()
586
storage = factory.make_file_storage()
588
storage.data.name, FileStorage.objects.list_referenced_files())
590
def test_list_referenced_files_excludes_unreferenced_files(self):
591
upload_dir = self.make_upload_dir()
592
filename = factory.getRandomString()
593
with open(os.path.join(upload_dir, filename), 'w') as f:
594
f.write(self.make_data())
596
self.get_media_path(filename),
597
FileStorage.objects.list_referenced_files())
599
def test_list_referenced_files_uses_file_name_not_FileStorage_name(self):
600
self.make_upload_dir()
601
filename = factory.getRandomString()
602
# The filename we're going to use is already taken. The file
603
# we'll be looking at will have to have a different name.
604
factory.make_file_storage(filename=filename)
605
storage = factory.make_file_storage(filename=filename)
606
# It's the name of the file, not the FileStorage.filename, that
607
# is in list_referenced_files.
609
storage.data.name, FileStorage.objects.list_referenced_files())
611
def test_is_old_returns_False_for_recent_file(self):
612
upload_dir = self.make_upload_dir()
613
filename = factory.getRandomString()
614
path = os.path.join(upload_dir, filename)
615
with open(path, 'w') as f:
616
f.write(self.make_data())
617
self.age_file(path, FileStorage.objects.grace_time - 60)
619
FileStorage.objects.is_old(self.get_media_path(filename)))
621
def test_is_old_returns_True_for_old_file(self):
622
upload_dir = self.make_upload_dir()
623
filename = factory.getRandomString()
624
path = os.path.join(upload_dir, filename)
625
with open(path, 'w') as f:
626
f.write(self.make_data())
627
self.age_file(path, FileStorage.objects.grace_time + 1)
629
FileStorage.objects.is_old(self.get_media_path(filename)))
631
def test_collect_garbage_deletes_garbage(self):
632
upload_dir = self.make_upload_dir()
633
filename = factory.getRandomString()
634
path = os.path.join(upload_dir, filename)
635
with open(path, 'w') as f:
636
f.write(self.make_data())
638
FileStorage.objects.collect_garbage()
640
FileStorage.storage.exists(self.get_media_path(filename)))
642
def test_grace_time_is_generous_but_not_unlimited(self):
643
# Grace time for garbage collection is long enough that it won't
644
# expire while the request that wrote it is still being handled.
645
# But it won't keep a file around for ages. For instance, it'll
646
# be more than 20 seconds, but less than a day.
647
self.assertThat(FileStorage.objects.grace_time, GreaterThan(20))
648
self.assertThat(FileStorage.objects.grace_time, LessThan(24 * 60 * 60))
650
def test_collect_garbage_leaves_recent_files_alone(self):
651
upload_dir = self.make_upload_dir()
652
filename = factory.getRandomString()
653
with open(os.path.join(upload_dir, filename), 'w') as f:
654
f.write(self.make_data())
655
FileStorage.objects.collect_garbage()
657
FileStorage.storage.exists(self.get_media_path(filename)))
659
def test_collect_garbage_leaves_referenced_files_alone(self):
660
self.make_upload_dir()
661
storage = factory.make_file_storage()
662
self.age_file(storage.data.path)
663
FileStorage.objects.collect_garbage()
664
self.assertTrue(FileStorage.storage.exists(storage.data.name))
666
def test_collect_garbage_tolerates_missing_upload_dir(self):
667
# When MaaS is freshly installed, the upload directory is still
669
FileStorage.objects.collect_garbage()
670
# ...we get through garbage collection without breakage.
674
class ConfigDefaultTest(TestCase, TestWithFixtures):
675
"""Test config default values."""
677
def test_default_config_maas_name(self):
678
name = factory.getRandomString()
679
fixture = EnvironmentVariableFixture('LOGNAME', name)
680
self.useFixture(fixture)
681
default_config = get_default_config()
683
"%s's" % name.capitalize(), default_config['maas_name'])
437
686
class ConfigTest(TestCase):
438
687
"""Testing of the :class:`Config` model."""