1
# nxt.sensor module -- Classes to read LEGO Mindstorms NXT sensors
2
# Copyright (C) 2006,2007 Douglas P Lau
3
# Copyright (C) 2009 Marcus Wanner, Paulo Vieira, rhn
4
# Copyright (C) 2010,2011 Marcus Wanner
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
# GNU General Public License for more details.
16
from nxt.error import I2CError, I2CPendingError, DirProtError
19
from time import sleep, time
24
def __init__(self, version, product_id, sensor_type):
25
self.version = version
26
self.product_id = product_id
27
self.sensor_type = sensor_type
29
def clarifybinary(self, instr, label):
31
outstr += (label + ': `' + instr + '`\n')
33
outstr += (hex(ord(char))+', ')
39
outstr += (self.clarifybinary(str(self.version), 'Version'))
40
outstr += (self.clarifybinary(str(self.product_id), 'Product ID'))
41
outstr += (self.clarifybinary(str(self.sensor_type), 'Type'))
44
class BaseDigitalSensor(Sensor):
45
"""Object for digital sensors. I2C_ADDRESS is the dictionary storing name
46
to i2c address mappings. It should be updated in every subclass. When
47
subclassing this class, make sure to call add_compatible_sensor to add
48
compatible sensor data.
51
I2C_ADDRESS = {'version': (0x00, '8s'),
52
'product_id': (0x08, '8s'),
53
'sensor_type': (0x10, '8s'),
54
# 'factory_zero': (0x11, 1), # is this really correct?
55
'factory_scale_factor': (0x12, 'B'),
56
'factory_scale_divisor': (0x13, 'B'),
59
def __init__(self, brick, port, check_compatible=True):
60
"""Creates a BaseDigitalSensor. If check_compatible is True, queries
61
the sensor for its name, and if a wrong sensor class was used, prints
64
super(BaseDigitalSensor, self).__init__(brick, port)
65
self.set_input_mode(Type.LOW_SPEED_9V, Mode.RAW)
66
self.last_poll = time()
67
self.poll_delay = 0.01
68
sleep(0.1) # Give I2C time to initialize
69
#Don't do type checking if this class has no compatible sensors listed.
70
try: self.compatible_sensors
71
except AttributeError: check_compatible = False
73
sensor = self.get_sensor_info()
74
if not sensor in self.compatible_sensors:
75
print ('WARNING: Wrong sensor class chosen for sensor ' +
76
str(sensor.product_id) + ' on port ' + str(port) + '. ' + """
77
You may be using the wrong type of sensor or may have connected the cable
78
incorrectly. If you are sure you're using the correct sensor class for the
79
sensor, this message is likely in error and you should disregard it and file a
80
bug report, including the output of get_sensor_info(). This message can be
81
suppressed by passing "check_compatible=False" when creating the sensor object.""")
83
def _ls_get_status(self, n_bytes):
86
b = self.brick.ls_get_status(self.port)
89
except I2CPendingError:
91
raise I2CError, 'ls_get_status timeout'
93
def _i2c_command(self, address, value, format):
94
"""Writes an i2c value to the given address. value must be a string. value is
95
a tuple of values corresponding to the given format.
97
value = struct.pack(format, *value)
98
msg = chr(self.I2C_DEV) + chr(address) + value
100
if self.last_poll+self.poll_delay > now:
101
diff = now - self.last_poll
102
sleep(self.poll_delay - diff)
103
self.last_poll = time()
104
self.brick.ls_write(self.port, msg, 0)
106
def _i2c_query(self, address, format):
107
"""Reads an i2c value from given address, and returns a value unpacked
108
according to the given format. Format is the same as in the struct
109
module. See http://docs.python.org/library/struct.html#format-strings
111
n_bytes = struct.calcsize(format)
112
msg = chr(self.I2C_DEV) + chr(address)
114
if self.last_poll+self.poll_delay > now:
115
diff = now - self.last_poll
116
sleep(self.poll_delay - diff)
117
self.last_poll = time()
118
self.brick.ls_write(self.port, msg, n_bytes)
120
self._ls_get_status(n_bytes)
122
#we should clear the buffer no matter what happens
123
data = self.brick.ls_read(self.port)
124
if len(data) < n_bytes:
125
raise I2CError, 'Read failure: Not enough bytes'
126
data = struct.unpack(format, data[-n_bytes:])
129
def read_value(self, name):
130
"""Reads a value from the sensor. Name must be a string found in
131
self.I2C_ADDRESS dictionary. Entries in self.I2C_ADDRESS are in the
132
name: (address, format) form, with format as in the struct module.
133
Be careful on unpacking single variables - struct module puts them in
134
tuples containing only one element.
136
address, fmt = self.I2C_ADDRESS[name]
139
return self._i2c_query(address, fmt)
142
raise I2CError, "read_value timeout"
144
def write_value(self, name, value):
145
"""Writes value to the sensor. Name must be a string found in
146
self.I2C_ADDRESS dictionary. Entries in self.I2C_ADDRESS are in the
147
name: (address, format) form, with format as in the struct module.
148
value is a tuple of values corresponding to the format from
149
self.I2C_ADDRESS dictionary.
151
address, fmt = self.I2C_ADDRESS[name]
152
self._i2c_command(address, value, fmt)
154
def get_sensor_info(self):
155
version = self.read_value('version')[0].split('\0')[0]
156
product_id = self.read_value('product_id')[0].split('\0')[0]
157
sensor_type = self.read_value('sensor_type')[0].split('\0')[0]
158
return SensorInfo(version, product_id, sensor_type)
161
def add_compatible_sensor(cls, version, product_id, sensor_type):
162
"""Adds an entry in the compatibility table for the sensor. If version
163
is None, then it's the default class for this model. If product_id is
164
None, then this is the default class for this vendor.
167
cls.compatible_sensors
168
except AttributeError:
169
cls.compatible_sensors = []
171
cls.compatible_sensors.append(SCompatibility(version, product_id,
173
add_mapping(cls, version, product_id, sensor_type)
176
class SCompatibility(SensorInfo):
177
"""An object that helps manage the sensor mappings"""
178
def __eq__(self, other):
179
if self.product_id is None:
180
return self.product_id == other.product_id
181
elif self.version is None:
182
return (self.product_id == other.product_id and
183
self.sensor_type == other.sensor_type)
185
return (self.version == other.version and
186
self.product_id == other.product_id and
187
self.sensor_type == other.sensor_type)
192
def add_mapping(cls, version, product_id, sensor_type):
193
"None means any other value"
194
if product_id not in sensor_mappings:
195
sensor_mappings[product_id] = {}
196
models = sensor_mappings[product_id]
198
if sensor_type is None:
199
if sensor_type in models:
200
raise ValueError('Already registered!')
201
models[sensor_type] = cls
204
if sensor_type not in models:
205
models[sensor_type] = {}
206
versions = models[sensor_type]
208
if version in versions:
209
raise ValueError('Already registered!')
211
versions[version] = cls
214
class SearchError(Exception):
218
def find_class(info):
219
"""Returns an appropriate class for the given SensorInfo"""
220
dic = sensor_mappings
221
for val, msg in zip((info.product_id, info.sensor_type, info.version),
222
('Vendor', 'Model', 'Version')):
228
raise SearchError(msg + ' not found')
229
return dic[info.sensor_type][None]