~kissiel/checkbox/fix-1420352

« back to all changes in this revision

Viewing changes to checkbox-support/checkbox_support/scripts/audio_settings.py

  • Committer: Sylvain Pineau
  • Date: 2014-01-07 13:39:38 UTC
  • mto: This revision was merged to the branch mainline in revision 2588.
  • Revision ID: sylvain.pineau@canonical.com-20140107133938-46v5ehofwa9whl1e
checkbox-support: Copy required modules from checkbox-old/checkbox

and their corresponding tests

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#
 
2
# This file is part of Checkbox.
 
3
#
 
4
# Copyright 2013 Canonical Ltd.
 
5
#
 
6
# Checkbox is free software: you can redistribute it and/or modify
 
7
# it under the terms of the GNU General Public License version 3,
 
8
# as published by the Free Software Foundation.
 
9
 
 
10
#
 
11
# Checkbox is distributed in the hope that it will be useful,
 
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
14
# GNU General Public License for more details.
 
15
#
 
16
# You should have received a copy of the GNU General Public License
 
17
# along with Checkbox.  If not, see <http://www.gnu.org/licenses/>.
 
18
#
 
19
 
 
20
import configparser
 
21
import logging
 
22
import os
 
23
import re
 
24
import sys
 
25
 
 
26
from argparse import ArgumentParser
 
27
from subprocess import check_output, check_call, CalledProcessError
 
28
 
 
29
from checkbox.parsers.pactl import parse_pactl_output
 
30
 
 
31
TYPES = ("source", "sink")
 
32
DIRECTIONS = {"source": "input", "sink": "output"}
 
33
 
 
34
default_pattern = "(?<=Default %s: ).*"
 
35
index_regex = re.compile("(?<=index: )[0-9]*")
 
36
muted_regex = re.compile("(?<=Mute: ).*")
 
37
volume_regex = re.compile("(?<=Volume: 0:)\s*[0-9]*")
 
38
name_regex = re.compile("(?<=Name:).*")
 
39
 
 
40
entry_pattern = "Name: %s.*?(?=Properties)"
 
41
 
 
42
 
 
43
def unlocalized_env(reset={"LANG": "POSIX.UTF-8"}):
 
44
    """
 
45
    Create un-localized environment.
 
46
 
 
47
    Produce an environment that is suitable for subprocess.Popen() and
 
48
    associated subprocess functions. The returned environment is equal to a
 
49
    copy the current environment, updated with the value of the reset argument.
 
50
    """
 
51
    env = dict(os.environ)
 
52
    env.update(reset)
 
53
    return env
 
54
 
 
55
 
 
56
def _guess_hdmi_profile(pactl_list):
 
57
    """
 
58
    Use the pactl parser to get the stereo profile of the available HDMI port
 
59
 
 
60
    :returns: (card, profile)
 
61
    """
 
62
    hdmi_ports = {}
 
63
    available_port = {}
 
64
    port_status_location = 'Sink'
 
65
 
 
66
    # First parse all cards for HDMI / DisplayPort ports
 
67
    for record in parse_pactl_output(pactl_list).record_list:
 
68
        if not 'Card' in record.name:
 
69
            continue
 
70
        card = re.sub('.*#', '', record.name)  # save the card id
 
71
        ports = [
 
72
            p for p in record.attribute_map['Ports'].value
 
73
            if 'HDMI / DisplayPort' in p.label]
 
74
        if not ports:
 
75
            continue
 
76
        if [p for p in ports if p.availability]:
 
77
            port_status_location = 'Card'
 
78
        hdmi_ports[card] = ports
 
79
 
 
80
    if not hdmi_ports:
 
81
        return (None, None)
 
82
 
 
83
    logging.info("[ HDMI / DisplayPort ports ]".center(80, '='))
 
84
    for card, ports in hdmi_ports.items():
 
85
        for card_port in ports:
 
86
            logging.info("Card #{} Port: {}".format(card, card_port))
 
87
 
 
88
    # Check the ports availability in the list of pulseaudio sinks
 
89
    # if the status is not already available in the cards section.
 
90
    def check_available_port():
 
91
        match_found = False
 
92
        for record in parse_pactl_output(pactl_list).record_list:
 
93
            if not port_status_location in record.name:
 
94
                continue
 
95
            for sink_port in record.attribute_map['Ports'].value:
 
96
                for card, ports in hdmi_ports.items():
 
97
                    for card_port in ports:
 
98
                        if sink_port.label == card_port.label:
 
99
                            match_found = True
 
100
                            if sink_port.availability != 'not available':
 
101
                                return {card: card_port}
 
102
        # If the availability cannot be determined then we keep the first
 
103
        # candidate
 
104
        if not match_found and hdmi_ports:
 
105
            card, ports = hdmi_ports.popitem()
 
106
            return {card: ports.pop()}
 
107
 
 
108
    available_port = check_available_port()
 
109
 
 
110
    if available_port:
 
111
        card, port = available_port.popitem()
 
112
        # Keep the shortest string in the profile_list including 'stereo'
 
113
        # it will avoid testing 'surround' profiles
 
114
        profile = min([p for p in port.profile_list if 'stereo' in p], key=len)
 
115
        logging.info("[ Selected profile ]".center(80, '='))
 
116
        logging.info("Card #{} Profile: {}".format(card, profile))
 
117
        return (card, profile)
 
118
    else:
 
119
        return (None, None)
 
120
 
 
121
 
 
122
def set_profile_hdmi():
 
123
    """Sets desired device as active profile. This is typically
 
124
    used as a fallback for setting HDMI / DisplayPort as the output device.
 
125
    """
 
126
    pactl_list = check_output(
 
127
        ['pactl', 'list'], universal_newlines=True, env=unlocalized_env())
 
128
 
 
129
    card, profile = _guess_hdmi_profile(pactl_list)
 
130
    if not profile:
 
131
        logging.error('No available port found')
 
132
        return 1
 
133
 
 
134
    # Try and set device as default audio output
 
135
    try:
 
136
        check_call(["pactl", "set-card-profile", card, profile])
 
137
    except CalledProcessError as error:
 
138
        logging.error("Failed setting audio output to:%s: %s" %
 
139
                      (profile, error))
 
140
 
 
141
 
 
142
def get_current_profiles_settings():
 
143
    """Captures and Writes current audio profiles settings"""
 
144
    pactl_list = check_output(
 
145
        ['pactl', 'list'], universal_newlines=True, env=unlocalized_env())
 
146
 
 
147
    config = configparser.ConfigParser()
 
148
 
 
149
    for match in re.finditer(
 
150
        "(?P<card_id>Card #\d+)\n\tName:\s+(?P<card_name>.*?)\n.*?"
 
151
        "Active\sProfile:\s+(?P<profile>.*?)\n", pactl_list, re.M | re.S
 
152
    ):
 
153
        config[match.group('card_id')] = {
 
154
            'name': match.group('card_name'),
 
155
            'profile': match.group('profile')
 
156
        }
 
157
 
 
158
    try:
 
159
        with open('active_profiles', 'w') as active_profiles:
 
160
            config.write(active_profiles)
 
161
    except IOError:
 
162
        logging.error("Failed to save active profiles information: %s" %
 
163
                      sys.exc_info()[1])
 
164
 
 
165
 
 
166
def restore_profiles_settings():
 
167
    config = configparser.ConfigParser()
 
168
    try:
 
169
        config.read('active_profiles')
 
170
    except IOError:
 
171
        logging.error("Failed to retrieve previous profiles information")
 
172
 
 
173
    for card in config.sections():
 
174
        try:
 
175
            check_call(["pactl", "set-card-profile", config[card]['name'],
 
176
                       config[card]['profile']])
 
177
        except CalledProcessError as error:
 
178
            logging.error("Failed setting card <%s> profile to <%s>: %s" %
 
179
                          (config[card]['name'],
 
180
                           config[card]['profile'], error))
 
181
 
 
182
 
 
183
def move_sinks(name):
 
184
    sink_inputs = check_output(["pacmd", "list-sink-inputs"],
 
185
                               universal_newlines=True,
 
186
                               env=unlocalized_env())
 
187
    input_indexes = index_regex.findall(sink_inputs)
 
188
 
 
189
    for input_index in input_indexes:
 
190
        try:
 
191
            with open(os.devnull, 'wb') as DEVNULL:
 
192
                check_call(["pacmd", "move-sink-input", input_index, name],
 
193
                           stdout=DEVNULL)
 
194
        except CalledProcessError:
 
195
            logging.error("Failed to move input %d to sink %d" %
 
196
                          (input_index, name))
 
197
            sys.exit(1)
 
198
 
 
199
 
 
200
def store_audio_settings(file):
 
201
    logging.info("[ Saving audio settings ]".center(80, '='))
 
202
    try:
 
203
        settings_file = open(file, 'w')
 
204
    except IOError:
 
205
        logging.error("Failed to save settings: %s" % sys.exc_info()[1])
 
206
        sys.exit(1)
 
207
 
 
208
    for type in TYPES:
 
209
        pactl_status = check_output(["pactl", "stat"],
 
210
                                    universal_newlines=True,
 
211
                                    env=unlocalized_env())
 
212
        default_regex = re.compile(default_pattern % type.title())
 
213
        default = default_regex.search(pactl_status).group()
 
214
 
 
215
        print("default_%s: %s" % (type, default), file=settings_file)
 
216
 
 
217
        pactl_list = check_output(["pactl", "list", type + 's'],
 
218
                                  universal_newlines=True,
 
219
                                  env=unlocalized_env())
 
220
 
 
221
        entry_regex = re.compile(entry_pattern % default, re.DOTALL)
 
222
        entry = entry_regex.search(pactl_list).group()
 
223
 
 
224
        muted = muted_regex.search(entry)
 
225
        print("%s_muted: %s" % (type, muted.group().strip()),
 
226
              file=settings_file)
 
227
 
 
228
        volume = int(volume_regex.search(entry).group().strip())
 
229
 
 
230
        print("%s_volume: %s%%" % (type, str(volume)),
 
231
              file=settings_file)
 
232
    settings_file.close()
 
233
 
 
234
 
 
235
def set_audio_settings(device, mute, volume):
 
236
    for type in TYPES:
 
237
        pactl_entries = check_output(["pactl", "list", type + 's'],
 
238
                                     universal_newlines=True,
 
239
                                     env=unlocalized_env())
 
240
 
 
241
        # Find the name of the sink/source we want to set
 
242
        names = name_regex.findall(pactl_entries)
 
243
 
 
244
        for name in names:
 
245
            name = name.strip()
 
246
            if device in name and DIRECTIONS[type] in name:
 
247
                try:
 
248
                    logging.info("[ Fallback sink ]".center(80, '='))
 
249
                    logging.info("Name: {}".format(name))
 
250
                    with open(os.devnull, 'wb') as DEVNULL:
 
251
                        check_call(["pacmd", "set-default-%s" % type, name],
 
252
                                   stdout=DEVNULL)
 
253
                except CalledProcessError:
 
254
                    logging.error("Failed to set default %s" % type)
 
255
                    sys.exit(1)
 
256
 
 
257
                if type == "sink":
 
258
                    move_sinks(name)
 
259
 
 
260
                try:
 
261
                    check_call(["pactl",
 
262
                                "set-%s-mute" % type, name, str(int(mute))])
 
263
                except:
 
264
                    logging.error("Failed to set mute for %s" % name)
 
265
                    sys.exit(1)
 
266
 
 
267
                try:
 
268
                    check_call(["pactl", "set-%s-volume" % type,
 
269
                               name, str(volume) + '%'])
 
270
                except:
 
271
                    logging.error("Failed to set volume for %s" % name)
 
272
                    sys.exit(1)
 
273
 
 
274
 
 
275
def restore_audio_settings(file):
 
276
    logging.info("[ Restoring audio settings ]".center(80, '='))
 
277
    try:
 
278
        with open(file) as f:
 
279
            settings_file = f.read().split()
 
280
    except IOError:
 
281
        logging.error("Unable to open existing settings file: %s" %
 
282
                      sys.exc_info()[1])
 
283
        return 1
 
284
 
 
285
    for type in TYPES:
 
286
        # First try to get the three elements we need.
 
287
        # If we fail to get any of them, it means the file's format
 
288
        # is incorrect, so we just abort.
 
289
        try:
 
290
            name = settings_file[
 
291
                settings_file.index("default_%s:" % type) + 1]
 
292
            muted = settings_file[settings_file.index("%s_muted:" % type) + 1]
 
293
            volume = settings_file[
 
294
                settings_file.index("%s_volume:" % type) + 1]
 
295
        except ValueError:
 
296
            logging.error("Unable to restore settings because settings "
 
297
                          "file is invalid")
 
298
            return 1
 
299
 
 
300
        try:
 
301
            with open(os.devnull, 'wb') as DEVNULL:
 
302
                check_call(["pacmd", "set-default-%s" % type, name],
 
303
                           stdout=DEVNULL)
 
304
        except CalledProcessError:
 
305
            logging.error("Failed to restore default %s" % name)
 
306
            return 1
 
307
 
 
308
        if type == "sink":
 
309
            move_sinks(name)
 
310
 
 
311
        try:
 
312
            check_call(["pactl", "set-%s-mute" % type, name, muted])
 
313
        except:
 
314
            logging.error("Failed to restore mute for %s" % name)
 
315
            return 1
 
316
 
 
317
        try:
 
318
            check_call(["pactl", "set-%s-volume" % type, name, volume])
 
319
        except:
 
320
            logging.error("Failed to restore volume for %s" % name)
 
321
            return 1
 
322
 
 
323
 
 
324
def main():
 
325
    parser = ArgumentParser("Manipulate PulseAudio settings")
 
326
    parser.add_argument("action",
 
327
                        choices=['store', 'set', 'restore'],
 
328
                        help="Action to perform with the audio settings")
 
329
    parser.add_argument("-d", "--device",
 
330
                        help="The device to apply the new settings to.")
 
331
    parser.add_argument("-m", "--mute",
 
332
                        action="store_true",
 
333
                        help="""The new value for the mute setting
 
334
                                of the specified device.""")
 
335
    parser.add_argument("-v", "--volume",
 
336
                        type=int,
 
337
                        help="""The new value for the volume setting
 
338
                                of the specified device.""")
 
339
    parser.add_argument("-f", "--file",
 
340
                        help="""The file to store settings in or restore
 
341
                                settings from.""")
 
342
    parser.add_argument("--verbose",
 
343
                        action='store_true',
 
344
                        help="Turn on verbosity")
 
345
    args = parser.parse_args()
 
346
 
 
347
    if args.verbose:
 
348
        logging.basicConfig(format='%(levelname)s:%(message)s',
 
349
                            level=logging.INFO, stream=sys.stdout)
 
350
    if args.action == "store":
 
351
        if not args.file:
 
352
            logging.error("No file specified to store audio settings!")
 
353
            return 1
 
354
 
 
355
        store_audio_settings(args.file)
 
356
        get_current_profiles_settings()
 
357
    elif args.action == "set":
 
358
        if not args.device:
 
359
            logging.error("No device specified to change settings of!")
 
360
            return 1
 
361
        if not args.volume:
 
362
            logging.error("No volume level specified!")
 
363
            return 1
 
364
 
 
365
        if args.device == "hdmi":
 
366
            set_profile_hdmi()
 
367
        set_audio_settings(args.device, args.mute, args.volume)
 
368
    elif args.action == "restore":
 
369
        if restore_profiles_settings() or restore_audio_settings(args.file):
 
370
            return 1
 
371
    else:
 
372
        logging.error(args.action + "is not a valid action")
 
373
        return 1
 
374
 
 
375
    return 0