~ubuntu-branches/ubuntu/vivid/frescobaldi/vivid

« back to all changes in this revision

Viewing changes to frescobaldi_app/portmidi/ctypes_pypm.py

  • Committer: Package Import Robot
  • Author(s): Ryan Kavanagh
  • Date: 2012-01-03 16:20:11 UTC
  • mfrom: (1.4.1)
  • Revision ID: package-import@ubuntu.com-20120103162011-tsjkwl4sntwmprea
Tags: 2.0.0-1
* New upstream release 
* Drop the following uneeded patches:
  + 01_checkmodules_no_python-kde4_build-dep.diff
  + 02_no_pyc.diff
  + 04_no_binary_lilypond_upgrades.diff
* Needs new dependency python-poppler-qt4
* Update debian/watch for new download path
* Update copyright file with new holders and years

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#! python
 
2
# This module provides the same api via ctypes as John Harrison's pyrex-based
 
3
# PortMIDI binding. 
 
4
# Don't use this module directly but via the toplevel API of this package.
 
5
# Copyright (C) 2011 Wilbert Berendsen, placed in the public domain.
 
6
 
 
7
from ctypes import byref, create_string_buffer
 
8
 
 
9
from .pm_ctypes import (
 
10
    libpm, libpt,
 
11
    pmHostError, PmEvent,
 
12
    PmTimeProcPtr, NullTimeProcPtr,
 
13
    PortMidiStreamPtr,
 
14
)
 
15
 
 
16
from . import MidiException
 
17
 
 
18
 
 
19
__all__ = [
 
20
    'TRUE',
 
21
    'FALSE',
 
22
    'Initialize',
 
23
    'Terminate',
 
24
    'CountDevices',
 
25
    'GetDeviceInfo',
 
26
    'GetDefaultInputDeviceID',
 
27
    'GetDefaultOutputDeviceID',
 
28
    'GetErrorText',
 
29
    'Time',
 
30
    'Input',
 
31
    'Output',
 
32
]
 
33
 
 
34
 
 
35
FALSE = 0
 
36
TRUE = 1
 
37
 
 
38
 
 
39
def Initialize():
 
40
    libpm.Pm_Initialize()
 
41
    # equiv to TIME_START: start timer w/ ms accuracy
 
42
    libpt.Pt_Start(1, NullTimeProcPtr, None)
 
43
 
 
44
 
 
45
def Terminate():
 
46
    libpm.Pm_Terminate()
 
47
 
 
48
 
 
49
def GetDeviceInfo(device_id):
 
50
    info_ptr = libpm.Pm_GetDeviceInfo(device_id)
 
51
    if info_ptr:
 
52
        info = info_ptr.contents
 
53
        return (
 
54
            info.interf,
 
55
            info.name,
 
56
            bool(info.input),
 
57
            bool(info.output),
 
58
            bool(info.opened),
 
59
        )
 
60
 
 
61
 
 
62
CountDevices = libpm.Pm_CountDevices
 
63
GetDefaultInputDeviceID = libpm.Pm_GetDefaultInputDeviceID
 
64
GetDefaultOutputDeviceID = libpm.Pm_GetDefaultOutputDeviceID
 
65
GetErrorText = libpm.Pm_GetErrorText
 
66
Time = libpt.Pt_Time
 
67
 
 
68
 
 
69
class Output(object):
 
70
    buffer_size = 1024
 
71
    def __init__(self, device_id, latency=0):
 
72
        self.device_id = device_id
 
73
        self.latency = latency
 
74
        self._midi_stream = PortMidiStreamPtr()
 
75
        self._open = False
 
76
        self._open_device()
 
77
 
 
78
    def _open_device(self):
 
79
        err = libpm.Pm_OpenOutput(byref(self._midi_stream), self.device_id,
 
80
            None, 0, NullTimeProcPtr, None, self.latency)
 
81
        _check_error(err)
 
82
        self._open = True
 
83
 
 
84
    def Close(self):
 
85
        if self._open and GetDeviceInfo(self.device_id)[4]:
 
86
            err = libpm.Pm_Abort(self._midi_stream)
 
87
            _check_error(err)
 
88
            err = libpm.Pm_Close(self._midi_stream)
 
89
            _check_error(err)
 
90
            self._open = False
 
91
    
 
92
    __del__ = Close
 
93
 
 
94
    def Write(self, data):
 
95
        bufsize = self.buffer_size
 
96
 
 
97
        if len(data) > bufsize:
 
98
            raise ValueError("too much data for buffer")
 
99
 
 
100
        BufferType = PmEvent * bufsize
 
101
        buf = BufferType()
 
102
 
 
103
        for i, event in enumerate(data):
 
104
            msg, buf[i].timestamp = event
 
105
            if len(msg) > 4 or len(msg) < 1:
 
106
                raise ValueError("invalid message size")
 
107
            message = 0
 
108
            for j, byte in enumerate(msg):
 
109
                message += ((byte & 0xFF) << (8*j))
 
110
            buf[i].message = message
 
111
        err = libpm.Pm_Write(self._midi_stream, buf, len(data))
 
112
        _check_error(err)
 
113
 
 
114
    def WriteShort(self, status, data1=0, data2=0):
 
115
        buf = PmEvent()
 
116
        buf.timestamp = libpt.Pt_Time()
 
117
        buf.message = (((data2 << 16) & 0xFF0000) |
 
118
            ((data1 << 8) & 0xFF00) | (status & 0xFF))
 
119
        err = libpm.Pm_Write(self._midi_stream, buf, 1)
 
120
        _check_error(err)
 
121
 
 
122
    def WriteSysEx(self, timestamp, msg):
 
123
        """msg may be a tuple or list of ints, or a bytes string."""
 
124
        if isinstance(msg, (tuple, list)):
 
125
            msg = array.array('B', msg).tostring()
 
126
        cur_time = Time()
 
127
        err = libpm.Pm_WriteSysEx(self._midi_stream, timestamp, msg)
 
128
        _check_error(err)
 
129
        while Time() == cur_time:
 
130
            pass
 
131
 
 
132
 
 
133
class Input(object):
 
134
    def __init__(self, device_id, bufsize=1024):
 
135
        self.device_id = device_id
 
136
        self.buffer_size = bufsize
 
137
        self._midi_stream = PortMidiStreamPtr()
 
138
        self._open = False
 
139
        self._open_device()
 
140
 
 
141
    def _open_device(self):
 
142
        err = libpm.Pm_OpenInput(byref(self._midi_stream), self.device_id,
 
143
            None, 100, NullTimeProcPtr, None)
 
144
        _check_error(err)
 
145
        self._open = True
 
146
 
 
147
    def Close(self):
 
148
        """Closes a midi stream, flushing any pending buffers."""
 
149
        if self._open and GetDeviceInfo(self.device_id)[4]:
 
150
            err = libpm.Pm_Abort(self._midi_stream)
 
151
            _check_error(err)
 
152
            err = libpm.Pm_Close(self._midi_stream)
 
153
            _check_error(err)
 
154
            self._opened = False
 
155
        
 
156
    __del__ = Close
 
157
 
 
158
    def Poll(self):
 
159
        return libpm.Pm_Poll(self._midi_stream)
 
160
 
 
161
    def Read(self, length):
 
162
        bufsize = self.buffer_size
 
163
        BufferType = PmEvent * bufsize
 
164
        buf = BufferType()
 
165
 
 
166
        if not 1 <= length <= bufsize:
 
167
            raise ValueError("invalid length")
 
168
        num_events = libpm.Pm_Read(self._midi_stream, buf, length)
 
169
        _check_error(num_events)
 
170
 
 
171
        data = []
 
172
        for i in range(num_events):
 
173
            ev = buf[i]
 
174
            msg = ev.message
 
175
            msg = (msg & 255, (msg>>8) & 255, (msg>>16) & 255, (msg>>24) & 255)
 
176
            data.append((msg, ev.timestamp))
 
177
        return data
 
178
 
 
179
 
 
180
def _check_error(err_no):
 
181
    if err_no < 0:
 
182
        if err_no == pmHostError:
 
183
            err_msg = create_string_buffer('\000' * 256)
 
184
            libpm.Pm_GetHostErrorText(err_msg, 256)
 
185
            err_msg = err_msg.value
 
186
        else:
 
187
            err_msg = libpm.Pm_GetErrorText(err_no)
 
188
        raise MidiException(
 
189
            "PortMIDI-ctypes error [{0}]: {1}".format(err_no, err_msg))
 
190
 
 
191