2
# This module provides the same api via ctypes as John Harrison's pyrex-based
4
# Don't use this module directly but via the toplevel API of this package.
5
# Copyright (C) 2011 Wilbert Berendsen, placed in the public domain.
7
from ctypes import byref, create_string_buffer
9
from .pm_ctypes import (
12
PmTimeProcPtr, NullTimeProcPtr,
16
from . import MidiException
26
'GetDefaultInputDeviceID',
27
'GetDefaultOutputDeviceID',
41
# equiv to TIME_START: start timer w/ ms accuracy
42
libpt.Pt_Start(1, NullTimeProcPtr, None)
49
def GetDeviceInfo(device_id):
50
info_ptr = libpm.Pm_GetDeviceInfo(device_id)
52
info = info_ptr.contents
62
CountDevices = libpm.Pm_CountDevices
63
GetDefaultInputDeviceID = libpm.Pm_GetDefaultInputDeviceID
64
GetDefaultOutputDeviceID = libpm.Pm_GetDefaultOutputDeviceID
65
GetErrorText = libpm.Pm_GetErrorText
71
def __init__(self, device_id, latency=0):
72
self.device_id = device_id
73
self.latency = latency
74
self._midi_stream = PortMidiStreamPtr()
78
def _open_device(self):
79
err = libpm.Pm_OpenOutput(byref(self._midi_stream), self.device_id,
80
None, 0, NullTimeProcPtr, None, self.latency)
85
if self._open and GetDeviceInfo(self.device_id)[4]:
86
err = libpm.Pm_Abort(self._midi_stream)
88
err = libpm.Pm_Close(self._midi_stream)
94
def Write(self, data):
95
bufsize = self.buffer_size
97
if len(data) > bufsize:
98
raise ValueError("too much data for buffer")
100
BufferType = PmEvent * bufsize
103
for i, event in enumerate(data):
104
msg, buf[i].timestamp = event
105
if len(msg) > 4 or len(msg) < 1:
106
raise ValueError("invalid message size")
108
for j, byte in enumerate(msg):
109
message += ((byte & 0xFF) << (8*j))
110
buf[i].message = message
111
err = libpm.Pm_Write(self._midi_stream, buf, len(data))
114
def WriteShort(self, status, data1=0, data2=0):
116
buf.timestamp = libpt.Pt_Time()
117
buf.message = (((data2 << 16) & 0xFF0000) |
118
((data1 << 8) & 0xFF00) | (status & 0xFF))
119
err = libpm.Pm_Write(self._midi_stream, buf, 1)
122
def WriteSysEx(self, timestamp, msg):
123
"""msg may be a tuple or list of ints, or a bytes string."""
124
if isinstance(msg, (tuple, list)):
125
msg = array.array('B', msg).tostring()
127
err = libpm.Pm_WriteSysEx(self._midi_stream, timestamp, msg)
129
while Time() == cur_time:
134
def __init__(self, device_id, bufsize=1024):
135
self.device_id = device_id
136
self.buffer_size = bufsize
137
self._midi_stream = PortMidiStreamPtr()
141
def _open_device(self):
142
err = libpm.Pm_OpenInput(byref(self._midi_stream), self.device_id,
143
None, 100, NullTimeProcPtr, None)
148
"""Closes a midi stream, flushing any pending buffers."""
149
if self._open and GetDeviceInfo(self.device_id)[4]:
150
err = libpm.Pm_Abort(self._midi_stream)
152
err = libpm.Pm_Close(self._midi_stream)
159
return libpm.Pm_Poll(self._midi_stream)
161
def Read(self, length):
162
bufsize = self.buffer_size
163
BufferType = PmEvent * bufsize
166
if not 1 <= length <= bufsize:
167
raise ValueError("invalid length")
168
num_events = libpm.Pm_Read(self._midi_stream, buf, length)
169
_check_error(num_events)
172
for i in range(num_events):
175
msg = (msg & 255, (msg>>8) & 255, (msg>>16) & 255, (msg>>24) & 255)
176
data.append((msg, ev.timestamp))
180
def _check_error(err_no):
182
if err_no == pmHostError:
183
err_msg = create_string_buffer('\000' * 256)
184
libpm.Pm_GetHostErrorText(err_msg, 256)
185
err_msg = err_msg.value
187
err_msg = libpm.Pm_GetErrorText(err_no)
189
"PortMIDI-ctypes error [{0}]: {1}".format(err_no, err_msg))