2
# Copyright (C) 2013 Johannes Dewender
3
# This test is free. You can redistribute and/or modify it at will.
12
from io import TextIOWrapper, BytesIO
13
from subprocess import Popen
22
from discid import DiscError
24
from libdiscid.compat import discid
25
from libdiscid.compat.discid import DiscError
27
SCRIPT_NAME = "isrcsubmit.py"
28
TEST_DATA = "test_data/"
32
class TestInternal(unittest.TestCase):
35
with open(os.devnull, 'w') as devnull:
36
self._old_stdout = os.dup(sys.stdout.fileno())
37
os.dup2(devnull.fileno(), 1)
39
def test_encoding(self):
40
self.assertTrue(type(isrcsubmit.encode("test")) is type(b"test"))
41
self.assertEqual(isrcsubmit.encode("test"), b"test")
42
self.assertTrue(type(isrcsubmit.decode(b"test"))
43
is type(b"test".decode()))
44
self.assertEqual(isrcsubmit.decode(b"test"), "test")
47
self.assertEqual(isrcsubmit.decode(isrcsubmit.encode(string)),
50
self.assertEqual(isrcsubmit.encode(isrcsubmit.decode(bytestring)),
53
def test_gather_options(self):
54
# make sure most important options always work
55
options = isrcsubmit.gather_options([SCRIPT_NAME])
56
self.assertFalse(options.debug)
57
self.assertTrue(options.backend)
58
self.assertEqual(options.server, "musicbrainz.org")
59
self.assertFalse(options.force_submit)
60
self.assertTrue(options.release_id is None)
63
device = "/some/other/device"
64
options = isrcsubmit.gather_options([SCRIPT_NAME, user, device])
65
self.assertEqual(options.user, user)
66
self.assertEqual(options.device, device)
67
options = isrcsubmit.gather_options([SCRIPT_NAME,
68
"-d", device, "-u", user])
69
self.assertEqual(options.user, user)
70
self.assertEqual(options.device, device)
71
options = isrcsubmit.gather_options([SCRIPT_NAME, "--user", user,
73
self.assertEqual(options.user, user)
74
self.assertEqual(options.device, device)
78
os.dup2(self._old_stdout, 1)
81
# mock musicbrainzngs queries
82
# - - - - - - - - - - - - - -
84
# save mbngs functions
85
_mbngs_get_releases_by_discid = musicbrainzngs.get_releases_by_discid
86
_mbngs_get_release_by_id = musicbrainzngs.get_release_by_id
87
_mbngs_submit_isrcs = musicbrainzngs.submit_isrcs
89
def _get_releases_by_discid(disc_id, includes=[]):
90
file_name = "%s%s_releases.json" % (TEST_DATA, disc_id)
92
releases = _mbngs_get_releases_by_discid(disc_id, includes)
93
with open(file_name, "w") as releases_file:
94
json.dump(releases, releases_file, indent=2)
97
with open(file_name, "r") as releases_file:
98
return json.load(releases_file)
100
musicbrainzngs.get_releases_by_discid = _get_releases_by_discid
102
def _get_release_by_id(release_id, includes=[]):
103
file_name = "%s%s.json" % (TEST_DATA, release_id)
105
release = _mbngs_get_release_by_id(release_id, includes)
106
with open(file_name, "w") as release_file:
107
json.dump(release, releases_file, indent=2)
110
with open(file_name, "r") as release_file:
111
return json.load(release_file)
113
musicbrainzngs.get_release_by_id = _get_release_by_id
115
def _submit_isrcs(tracks2isrcs):
117
data_sent.tracks2isrcs = tracks2isrcs
120
musicbrainzngs.submit_isrcs = _submit_isrcs
123
# mock discid reading
124
# - - - - - - - - - -
126
class MockedTrack(object):
127
def __init__(self, track):
128
self.number = track.number
129
self.isrc = track.isrc
131
class MockedDisc(object):
132
def __init__(self, disc):
134
self.submission_url = disc.submission_url
137
for track in disc.tracks:
138
tracks.append(MockedTrack(track))
141
# save discid functions
142
_discid_read = discid.read
144
def _read(device=None, features=[]):
146
# always read all features to save full libdiscid information
147
disc = _discid_read(device, ["read", "mcn", "isrc"])
148
file_name = "%s%s.pickle" % (TEST_DATA, disc.id)
149
with open(file_name, "wb") as disc_file:
150
pickle.dump(MockedDisc(disc), disc_file, 2)
153
file_name = "%s%s.pickle" % (TEST_DATA, mocked_disc_id)
154
with open(file_name, "rb") as disc_file:
155
return pickle.load(disc_file)
160
# mock cdrdao reading
161
# - - - - - - - - - -
164
def __new__(cls, args, stdin=sys.stdin, stdout=sys.stdout, stderr=sys.stderr):
165
if args[0] == "cdrdao":
166
file_name = "%s%s_cdrdao.toc" % (TEST_DATA, mocked_disc_id)
168
# save file to a different place
170
# delete file so cdrdao doesn't complain it's already there
173
# don't actually call cdrdao
174
args = ["echo", "mocked cdrdao"]
175
return Popen(args, stdin=stdin, stdout=stdout, stderr=stderr)
177
isrcsubmit.Popen = _Popen
179
def _open(name, mode):
180
if re.search("cdrdao-.*\.toc", name):
181
name = "%s%s_cdrdao.toc" % (TEST_DATA, mocked_disc_id)
182
return open(name, mode)
184
isrcsubmit.open = _open
189
_isrcsubmit_has_program = isrcsubmit.has_program
190
_isrcsubmit_get_prog_version = isrcsubmit.get_prog_version
192
def _has_program(program, strict=False):
193
if program == "libdiscid":
195
# libdiscid >= 0.2.2 still needed to load discid
197
elif program == "cdrdao":
201
return _isrcsubmit_has_program(program, strict)
203
def _get_prog_version(prog):
204
if prog == "libdiscid":
205
version = "mocked libdiscid"
206
elif prog == "cdrdao":
207
version = "mocked cdrdao"
209
return _isrcsubmit_get_prog_version(prog)
210
return isrcsubmit.decode(version)
212
isrcsubmit.has_program = _has_program
213
isrcsubmit.get_prog_version = _get_prog_version
216
# mock answers given by user
217
# - - - - - - - - - - - - -
219
def append_to_stdin(msg):
220
position = sys.stdin.tell()
222
sys.stdin.seek(position)
224
def answer_on_stdin(answer, default=True):
225
if answer == default:
226
append_to_stdin("\n")
228
append_to_stdin("y\n")
230
append_to_stdin("n\n")
232
def handle_question(string):
237
# these questions can be handled right now
238
if "username" in string:
239
append_to_stdin("invalid_username\n") # doesn't matter, mocked
240
elif "password" in string:
241
append_to_stdin("invalid_password\n") # doesn't matter, mocked
242
elif "Which one do you want?" in string:
244
append_to_stdin("%d\n" % answers["choice"])
246
append_to_stdin("1\n")
247
elif "press <return>" in string:
248
append_to_stdin("\n")
250
# these questions use multiple writes
251
if "submit" in string and "disc" in string:
252
last_question = "submit_disc"
253
elif "help clean" in string:
254
last_question = "clean"
255
elif "ISRC in browser" in string:
256
last_question = "open_isrc"
257
# question and prompt can be on different writes
258
if "[y/N]" in string: question = True; default = False
259
if "[Y/n]" in string: question = True; default = True
263
answer = answers[last_question]
266
answer_on_stdin(answer, default)
269
class SmartStdin(TextIOWrapper):
270
def write(self, string):
271
if type(string) == bytes:
272
string = string.decode()
273
return super(type(self), self).write(string)
275
class SmartStdout(TextIOWrapper):
276
def write(self, string):
277
handle_question(string)
278
# using "except TypeError" and the buffer would be nice
279
# but exceptions don't seem to work here in Python 2.6
280
# additionally TexIOWrapper doesn't have "buffer" in 2.6
281
if type(string) == bytes:
282
string = string.decode()
283
return super(type(self), self).write(string)
286
# the actual tests of the overall script
287
# - - - - - - - - - - - - - - - - - - -
289
class TestScript(unittest.TestCase):
291
global answers, data_sent, mocked_disc_id
294
# make sure globals are unset
295
answers = data_sent = {}
296
mocked_disc_id = last_question = None
299
self._old_stdout = sys.stdout
300
self._stdout = SmartStdout(BytesIO(), sys.stdout.encoding)
301
sys.stdout = self._stdout
302
self._old_stdin = sys.stdin
303
self._stdin = SmartStdin(BytesIO(), sys.stdin.encoding)
304
sys.stdin = self._stdin
308
return sys.stdout.read()
310
def assert_output(self, string):
311
self.assertTrue(string in self._output().strip())
314
return sys.stderr.write(self._output())
316
def test_version(self):
318
isrcsubmit.main([SCRIPT_NAME, "--version"])
322
self.assertTrue(isrcsubmit.__version__ in self._output().strip())
326
isrcsubmit.main([SCRIPT_NAME, "-h"])
330
self.assertTrue(self._output().strip())
332
def test_libdiscid(self):
333
global mocked_disc_id
334
mocked_disc_id = "TqvKjMu7dMliSfmVEBtrL7sBSno-"
335
# we use defaults to questions -> no settings here
337
isrcsubmit.main([SCRIPT_NAME, "--backend", "libdiscid"])
341
self.assertTrue(isrcsubmit.__version__ in self._output().strip())
342
self.assert_output("mocked libdiscid")
343
self.assert_output("TqvKjMu7dMliSfmVEBtrL7sBSno-")
344
self.assert_output("07090529-0fbf-4bd3-adc4-fe627343976d")
345
self.assert_output("submit the disc?")
346
self.assert_output("DEC680000220 is already attached to track 4")
347
self.assert_output("No new ISRCs")
349
def test_cdrdao(self):
350
global mocked_disc_id
351
mocked_disc_id = "hSI7B4G4AkB5.DEBcW.3KCn.D_E-"
352
answers["choice"] = 1
354
isrcsubmit.main([SCRIPT_NAME, "--backend", "cdrdao", "--device", "/dev/cdrw"])
358
self.assertTrue(isrcsubmit.__version__ in self._output().strip())
359
self.assert_output("mocked cdrdao")
360
self.assert_output("hSI7B4G4AkB5.DEBcW.3KCn.D_E-")
361
self.assert_output("none of these")
362
self.assert_output("174a5513-73d1-3c9d-a316-3c1c179e35f8")
363
self.assert_output("GBBBN7902023 is already attached to track 7")
364
self.assert_output("No new ISRCs")
368
sys.stdout = self._old_stdout
370
sys.stdin = self._old_stdin
374
class TestDisc(unittest.TestCase):
375
"""Test reading the disc currently in the drive
380
if __name__ == "__main__":
384
# vim:set shiftwidth=4 smarttab expandtab: