~ubuntu-branches/ubuntu/vivid/isrcsubmit/vivid

« back to all changes in this revision

Viewing changes to .pc/extra-import.patch/test_isrcsubmit.py

  • Committer: Package Import Robot
  • Author(s): Sebastian Ramacher
  • Date: 2014-05-05 17:25:39 UTC
  • mfrom: (1.1.3)
  • Revision ID: package-import@ubuntu.com-20140505172539-hti0oqzyxbb782zk
Tags: 2.0.0-1
* New upstream release.
* debian/patches/extra-import.patch: Removed, applied upstream.
* debian/copyright: Update copyright years.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
#!/usr/bin/env python
2
 
# Copyright (C) 2013  Johannes Dewender
3
 
# This test is free. You can redistribute and/or modify it at will.
4
 
 
5
 
import os
6
 
import re
7
 
import sys
8
 
import math
9
 
import json
10
 
import pickle
11
 
import unittest
12
 
from io import TextIOWrapper, BytesIO
13
 
from subprocess import Popen
14
 
 
15
 
import discid
16
 
import musicbrainzngs
17
 
import isrcsubmit
18
 
 
19
 
 
20
 
try:
21
 
    import discid
22
 
    from discid import DiscError
23
 
except ImportError:
24
 
    from libdiscid.compat import discid
25
 
    from libdiscid.compat.discid import DiscError
26
 
 
27
 
SCRIPT_NAME = "isrcsubmit.py"
28
 
TEST_DATA = "test_data/"
29
 
SAVE_RUN = False
30
 
 
31
 
 
32
 
class TestInternal(unittest.TestCase):
33
 
    def setUp(self):
34
 
        # suppress output
35
 
        with open(os.devnull, 'w') as devnull:
36
 
            self._old_stdout = os.dup(sys.stdout.fileno())
37
 
            os.dup2(devnull.fileno(), 1)
38
 
 
39
 
    def test_encoding(self):
40
 
        self.assertTrue(type(isrcsubmit.encode("test")) is type(b"test"))
41
 
        self.assertEqual(isrcsubmit.encode("test"), b"test")
42
 
        self.assertTrue(type(isrcsubmit.decode(b"test"))
43
 
                        is type(b"test".decode()))
44
 
        self.assertEqual(isrcsubmit.decode(b"test"), "test")
45
 
 
46
 
        string = "test"
47
 
        self.assertEqual(isrcsubmit.decode(isrcsubmit.encode(string)),
48
 
                         string)
49
 
        bytestring = b"test"
50
 
        self.assertEqual(isrcsubmit.encode(isrcsubmit.decode(bytestring)),
51
 
                         bytestring)
52
 
 
53
 
    def test_gather_options(self):
54
 
        # make sure most important options always work
55
 
        options = isrcsubmit.gather_options([SCRIPT_NAME])
56
 
        self.assertFalse(options.debug)
57
 
        self.assertTrue(options.backend)
58
 
        self.assertEqual(options.server, "musicbrainz.org")
59
 
        self.assertFalse(options.force_submit)
60
 
        self.assertTrue(options.release_id is None)
61
 
 
62
 
        user = "JonnyJD"
63
 
        device = "/some/other/device"
64
 
        options = isrcsubmit.gather_options([SCRIPT_NAME, user, device])
65
 
        self.assertEqual(options.user, user)
66
 
        self.assertEqual(options.device, device)
67
 
        options = isrcsubmit.gather_options([SCRIPT_NAME,
68
 
                                             "-d", device, "-u", user])
69
 
        self.assertEqual(options.user, user)
70
 
        self.assertEqual(options.device, device)
71
 
        options = isrcsubmit.gather_options([SCRIPT_NAME, "--user", user,
72
 
                                             "--device", device])
73
 
        self.assertEqual(options.user, user)
74
 
        self.assertEqual(options.device, device)
75
 
 
76
 
    def tearDown(self):
77
 
        # restore output
78
 
        os.dup2(self._old_stdout, 1)
79
 
 
80
 
 
81
 
# mock musicbrainzngs queries
82
 
# - - - - - - - - - - - - - -
83
 
 
84
 
# save mbngs functions
85
 
_mbngs_get_releases_by_discid = musicbrainzngs.get_releases_by_discid
86
 
_mbngs_get_release_by_id = musicbrainzngs.get_release_by_id
87
 
_mbngs_submit_isrcs = musicbrainzngs.submit_isrcs
88
 
 
89
 
def _get_releases_by_discid(disc_id, includes=[]):
90
 
    file_name = "%s%s_releases.json" % (TEST_DATA, disc_id)
91
 
    if SAVE_RUN:
92
 
        releases = _mbngs_get_releases_by_discid(disc_id, includes)
93
 
        with open(file_name, "w") as releases_file:
94
 
            json.dump(releases, releases_file, indent=2)
95
 
        return releases
96
 
    else:
97
 
        with open(file_name, "r") as releases_file:
98
 
            return json.load(releases_file)
99
 
 
100
 
musicbrainzngs.get_releases_by_discid = _get_releases_by_discid
101
 
 
102
 
def _get_release_by_id(release_id, includes=[]):
103
 
    file_name = "%s%s.json" % (TEST_DATA, release_id)
104
 
    if SAVE_RUN:
105
 
        release = _mbngs_get_release_by_id(release_id, includes)
106
 
        with open(file_name, "w") as release_file:
107
 
            json.dump(release, releases_file, indent=2)
108
 
        return releases
109
 
    else:
110
 
        with open(file_name, "r") as release_file:
111
 
            return json.load(release_file)
112
 
 
113
 
musicbrainzngs.get_release_by_id = _get_release_by_id
114
 
 
115
 
def _submit_isrcs(tracks2isrcs):
116
 
    global data_sent
117
 
    data_sent.tracks2isrcs = tracks2isrcs
118
 
    return True
119
 
 
120
 
musicbrainzngs.submit_isrcs = _submit_isrcs
121
 
 
122
 
 
123
 
# mock discid reading
124
 
# - - - - - - - - - -
125
 
 
126
 
class MockedTrack(object):
127
 
    def __init__(self, track):
128
 
        self.number = track.number
129
 
        self.isrc = track.isrc
130
 
 
131
 
class MockedDisc(object):
132
 
    def __init__(self, disc):
133
 
        self.id = disc.id
134
 
        self.submission_url = disc.submission_url
135
 
        self.mcn = disc.mcn
136
 
        tracks = []
137
 
        for track in disc.tracks:
138
 
            tracks.append(MockedTrack(track))
139
 
        self.tracks = tracks
140
 
 
141
 
# save discid functions
142
 
_discid_read = discid.read
143
 
 
144
 
def _read(device=None, features=[]):
145
 
    if SAVE_RUN:
146
 
        # always read all features to save full libdiscid information
147
 
        disc = _discid_read(device, ["read", "mcn", "isrc"])
148
 
        file_name = "%s%s.pickle" % (TEST_DATA, disc.id)
149
 
        with open(file_name, "wb") as disc_file:
150
 
            pickle.dump(MockedDisc(disc), disc_file, 2)
151
 
        return disc
152
 
    else:
153
 
        file_name = "%s%s.pickle" % (TEST_DATA, mocked_disc_id)
154
 
        with open(file_name, "rb") as disc_file:
155
 
            return pickle.load(disc_file)
156
 
 
157
 
discid.read = _read
158
 
 
159
 
 
160
 
# mock cdrdao reading
161
 
# - - - - - - - - - -
162
 
 
163
 
class _Popen(Popen):
164
 
    def __new__(cls, args, stdin=sys.stdin, stdout=sys.stdout, stderr=sys.stderr):
165
 
        if args[0] == "cdrdao":
166
 
            file_name = "%s%s_cdrdao.toc" % (TEST_DATA, mocked_disc_id)
167
 
            if SAVE_RUN:
168
 
                # save file to a different place
169
 
                args[-1] = file_name
170
 
                # delete file so cdrdao doesn't complain it's already there
171
 
                os.remove(file_name)
172
 
            else:
173
 
                # don't actually call cdrdao
174
 
                args = ["echo", "mocked cdrdao"]
175
 
        return Popen(args, stdin=stdin, stdout=stdout, stderr=stderr)
176
 
 
177
 
isrcsubmit.Popen = _Popen
178
 
 
179
 
def _open(name, mode):
180
 
    if re.search("cdrdao-.*\.toc", name):
181
 
        name = "%s%s_cdrdao.toc" % (TEST_DATA, mocked_disc_id)
182
 
    return open(name, mode)
183
 
 
184
 
isrcsubmit.open = _open
185
 
 
186
 
# general mocking
187
 
# - - - - - - - -
188
 
 
189
 
_isrcsubmit_has_program = isrcsubmit.has_program
190
 
_isrcsubmit_get_prog_version = isrcsubmit.get_prog_version
191
 
 
192
 
def _has_program(program, strict=False):
193
 
    if program == "libdiscid":
194
 
        # we mock it anyways
195
 
        # libdiscid >= 0.2.2 still needed to load discid
196
 
        return True
197
 
    elif program == "cdrdao":
198
 
        # also mocked
199
 
        return True
200
 
    else:
201
 
        return _isrcsubmit_has_program(program, strict)
202
 
 
203
 
def _get_prog_version(prog):
204
 
    if prog == "libdiscid":
205
 
        version = "mocked libdiscid"
206
 
    elif prog == "cdrdao":
207
 
        version = "mocked cdrdao"
208
 
    else:
209
 
        return _isrcsubmit_get_prog_version(prog)
210
 
    return isrcsubmit.decode(version)
211
 
 
212
 
isrcsubmit.has_program = _has_program
213
 
isrcsubmit.get_prog_version = _get_prog_version
214
 
 
215
 
 
216
 
# mock answers given by user
217
 
# - - - - - - - - - - - - -
218
 
 
219
 
def append_to_stdin(msg):
220
 
    position = sys.stdin.tell()
221
 
    sys.stdin.write(msg)
222
 
    sys.stdin.seek(position)
223
 
 
224
 
def answer_on_stdin(answer, default=True):
225
 
    if answer == default:
226
 
        append_to_stdin("\n")
227
 
    elif answer:
228
 
        append_to_stdin("y\n")
229
 
    else:
230
 
        append_to_stdin("n\n")
231
 
 
232
 
def handle_question(string):
233
 
    global last_question
234
 
    question = False
235
 
    default = False
236
 
 
237
 
    # these questions can be handled right now
238
 
    if "username" in string:
239
 
        append_to_stdin("invalid_username\n")   # doesn't matter, mocked
240
 
    elif "password" in string:
241
 
        append_to_stdin("invalid_password\n")   # doesn't matter, mocked
242
 
    elif "Which one do you want?" in string:
243
 
        try:
244
 
            append_to_stdin("%d\n" % answers["choice"])
245
 
        except KeyError:
246
 
            append_to_stdin("1\n")
247
 
    elif "press <return>" in string:
248
 
        append_to_stdin("\n")
249
 
 
250
 
    # these questions use multiple writes
251
 
    if "submit" in string and "disc" in string:
252
 
        last_question = "submit_disc"
253
 
    elif "help clean" in string:
254
 
        last_question = "clean"
255
 
    elif "ISRC in browser" in string:
256
 
        last_question = "open_isrc"
257
 
    # question and prompt can be on different writes
258
 
    if "[y/N]" in string: question = True; default = False
259
 
    if "[Y/n]" in string: question = True; default = True
260
 
 
261
 
    if question:
262
 
        try:
263
 
            answer = answers[last_question]
264
 
        except KeyError:
265
 
            answer = default
266
 
        answer_on_stdin(answer, default)
267
 
 
268
 
 
269
 
class SmartStdin(TextIOWrapper):
270
 
    def write(self, string):
271
 
        if type(string) == bytes:
272
 
            string = string.decode()
273
 
        return super(type(self), self).write(string)
274
 
 
275
 
class SmartStdout(TextIOWrapper):
276
 
    def write(self, string):
277
 
        handle_question(string)
278
 
        # using "except TypeError" and the buffer would be nice
279
 
        # but exceptions don't seem to work here in Python 2.6
280
 
        # additionally TexIOWrapper doesn't have "buffer" in 2.6
281
 
        if type(string) == bytes:
282
 
            string = string.decode()
283
 
        return super(type(self), self).write(string)
284
 
 
285
 
 
286
 
# the actual tests of the overall script
287
 
# - - - - - - - - - - - - - - - - - - -
288
 
 
289
 
class TestScript(unittest.TestCase):
290
 
    def setUp(self):
291
 
        global answers, data_sent, mocked_disc_id
292
 
        global last_question
293
 
 
294
 
        # make sure globals are unset
295
 
        answers = data_sent = {}
296
 
        mocked_disc_id = last_question = None
297
 
 
298
 
        # gather output
299
 
        self._old_stdout = sys.stdout
300
 
        self._stdout = SmartStdout(BytesIO(), sys.stdout.encoding)
301
 
        sys.stdout = self._stdout
302
 
        self._old_stdin = sys.stdin
303
 
        self._stdin = SmartStdin(BytesIO(), sys.stdin.encoding)
304
 
        sys.stdin = self._stdin
305
 
 
306
 
    def _output(self):
307
 
        sys.stdout.seek(0)
308
 
        return sys.stdout.read()
309
 
 
310
 
    def assert_output(self, string):
311
 
        self.assertTrue(string in self._output().strip())
312
 
 
313
 
    def _debug(self):
314
 
        return sys.stderr.write(self._output())
315
 
 
316
 
    def test_version(self):
317
 
        try:
318
 
            isrcsubmit.main([SCRIPT_NAME, "--version"])
319
 
        except SystemExit:
320
 
            pass
321
 
        finally:
322
 
            self.assertTrue(isrcsubmit.__version__ in self._output().strip())
323
 
 
324
 
    def test_help(self):
325
 
        try:
326
 
            isrcsubmit.main([SCRIPT_NAME, "-h"])
327
 
        except SystemExit:
328
 
            pass
329
 
        finally:
330
 
            self.assertTrue(self._output().strip())
331
 
 
332
 
    def test_libdiscid(self):
333
 
        global mocked_disc_id
334
 
        mocked_disc_id = "TqvKjMu7dMliSfmVEBtrL7sBSno-"
335
 
        # we use defaults to questions -> no settings here
336
 
        try:
337
 
            isrcsubmit.main([SCRIPT_NAME, "--backend", "libdiscid"])
338
 
        except SystemExit:
339
 
            pass
340
 
        finally:
341
 
            self.assertTrue(isrcsubmit.__version__ in self._output().strip())
342
 
            self.assert_output("mocked libdiscid")
343
 
            self.assert_output("TqvKjMu7dMliSfmVEBtrL7sBSno-")
344
 
            self.assert_output("07090529-0fbf-4bd3-adc4-fe627343976d")
345
 
            self.assert_output("submit the disc?")
346
 
            self.assert_output("DEC680000220 is already attached to track 4")
347
 
            self.assert_output("No new ISRCs")
348
 
 
349
 
    def test_cdrdao(self):
350
 
        global mocked_disc_id
351
 
        mocked_disc_id = "hSI7B4G4AkB5.DEBcW.3KCn.D_E-"
352
 
        answers["choice"] = 1
353
 
        try:
354
 
            isrcsubmit.main([SCRIPT_NAME, "--backend", "cdrdao", "--device", "/dev/cdrw"])
355
 
        except SystemExit:
356
 
            pass
357
 
        finally:
358
 
            self.assertTrue(isrcsubmit.__version__ in self._output().strip())
359
 
            self.assert_output("mocked cdrdao")
360
 
            self.assert_output("hSI7B4G4AkB5.DEBcW.3KCn.D_E-")
361
 
            self.assert_output("none of these")
362
 
            self.assert_output("174a5513-73d1-3c9d-a316-3c1c179e35f8")
363
 
            self.assert_output("GBBBN7902023 is already attached to track 7")
364
 
            self.assert_output("No new ISRCs")
365
 
 
366
 
    def tearDown(self):
367
 
        # restore output
368
 
        sys.stdout = self._old_stdout
369
 
        self._stdout.close()
370
 
        sys.stdin = self._old_stdin
371
 
 
372
 
 
373
 
 
374
 
class TestDisc(unittest.TestCase):
375
 
    """Test reading the disc currently in the drive
376
 
    """
377
 
    pass
378
 
 
379
 
 
380
 
if __name__ == "__main__":
381
 
    unittest.main()
382
 
 
383
 
 
384
 
# vim:set shiftwidth=4 smarttab expandtab: