~ubuntu-branches/ubuntu/vivid/nxt-python/vivid

« back to all changes in this revision

Viewing changes to nxt/sensor/digital.py

  • Committer: Package Import Robot
  • Author(s): Scott Kitterman
  • Date: 2012-01-01 21:20:35 UTC
  • Revision ID: package-import@ubuntu.com-20120101212035-zzs76zi8fyndgbhj
Tags: upstream-2.2.1
ImportĀ upstreamĀ versionĀ 2.2.1

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# nxt.sensor module -- Classes to read LEGO Mindstorms NXT sensors
 
2
# Copyright (C) 2006,2007  Douglas P Lau
 
3
# Copyright (C) 2009  Marcus Wanner, Paulo Vieira, rhn
 
4
# Copyright (C) 2010,2011  Marcus Wanner
 
5
#
 
6
# This program is free software: you can redistribute it and/or modify
 
7
# it under the terms of the GNU General Public License as published by
 
8
# the Free Software Foundation, either version 3 of the License, or
 
9
# (at your option) any later version.
 
10
#
 
11
# This program is distributed in the hope that it will be useful,
 
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
14
# GNU General Public License for more details.
 
15
 
 
16
from nxt.error import I2CError, I2CPendingError, DirProtError
 
17
 
 
18
from common import *
 
19
from time import sleep, time
 
20
import struct
 
21
 
 
22
 
 
23
class SensorInfo:
 
24
    def __init__(self, version, product_id, sensor_type):
 
25
        self.version = version
 
26
        self.product_id = product_id
 
27
        self.sensor_type = sensor_type
 
28
    
 
29
    def clarifybinary(self, instr, label):
 
30
        outstr = ''
 
31
        outstr += (label + ': `' + instr + '`\n')
 
32
        for char in instr:
 
33
            outstr += (hex(ord(char))+', ')
 
34
        outstr += ('\n')
 
35
        return outstr
 
36
    
 
37
    def __str__(self):
 
38
        outstr = ''
 
39
        outstr += (self.clarifybinary(str(self.version), 'Version'))
 
40
        outstr += (self.clarifybinary(str(self.product_id), 'Product ID'))
 
41
        outstr += (self.clarifybinary(str(self.sensor_type), 'Type'))
 
42
        return outstr
 
43
 
 
44
class BaseDigitalSensor(Sensor):
 
45
    """Object for digital sensors. I2C_ADDRESS is the dictionary storing name
 
46
    to i2c address mappings. It should be updated in every subclass. When
 
47
    subclassing this class, make sure to call add_compatible_sensor to add
 
48
    compatible sensor data.
 
49
    """
 
50
    I2C_DEV = 0x02
 
51
    I2C_ADDRESS = {'version': (0x00, '8s'),
 
52
        'product_id': (0x08, '8s'),
 
53
        'sensor_type': (0x10, '8s'),
 
54
#        'factory_zero': (0x11, 1),      # is this really correct?
 
55
        'factory_scale_factor': (0x12, 'B'),
 
56
        'factory_scale_divisor': (0x13, 'B'),
 
57
    }
 
58
    
 
59
    def __init__(self, brick, port, check_compatible=True):
 
60
        """Creates a BaseDigitalSensor. If check_compatible is True, queries
 
61
        the sensor for its name, and if a wrong sensor class was used, prints
 
62
        a warning.
 
63
        """
 
64
        super(BaseDigitalSensor, self).__init__(brick, port)
 
65
        self.set_input_mode(Type.LOW_SPEED_9V, Mode.RAW)
 
66
        self.last_poll = time()
 
67
        self.poll_delay = 0.01
 
68
        sleep(0.1)  # Give I2C time to initialize
 
69
        #Don't do type checking if this class has no compatible sensors listed.
 
70
        try: self.compatible_sensors
 
71
        except AttributeError: check_compatible = False
 
72
        if check_compatible:
 
73
            sensor = self.get_sensor_info()
 
74
            if not sensor in self.compatible_sensors:
 
75
                print ('WARNING: Wrong sensor class chosen for sensor ' + 
 
76
                          str(sensor.product_id) + ' on port ' + str(port) + '. ' + """
 
77
You may be using the wrong type of sensor or may have connected the cable
 
78
incorrectly. If you are sure you're using the correct sensor class for the
 
79
sensor, this message is likely in error and you should disregard it and file a
 
80
bug report, including the output of get_sensor_info(). This message can be
 
81
suppressed by passing "check_compatible=False" when creating the sensor object.""")
 
82
 
 
83
    def _ls_get_status(self, n_bytes):
 
84
        for n in range(10):
 
85
            try:
 
86
                b = self.brick.ls_get_status(self.port)
 
87
                if b >= n_bytes:
 
88
                    return b
 
89
            except I2CPendingError:
 
90
                pass
 
91
        raise I2CError, 'ls_get_status timeout'
 
92
 
 
93
    def _i2c_command(self, address, value, format):
 
94
        """Writes an i2c value to the given address. value must be a string. value is
 
95
        a tuple of values corresponding to the given format.
 
96
        """
 
97
        value = struct.pack(format, *value)
 
98
        msg = chr(self.I2C_DEV) + chr(address) + value
 
99
        now = time()
 
100
        if self.last_poll+self.poll_delay > now:
 
101
            diff = now - self.last_poll
 
102
            sleep(self.poll_delay - diff)
 
103
        self.last_poll = time()
 
104
        self.brick.ls_write(self.port, msg, 0)
 
105
 
 
106
    def _i2c_query(self, address, format):
 
107
        """Reads an i2c value from given address, and returns a value unpacked
 
108
        according to the given format. Format is the same as in the struct
 
109
        module. See http://docs.python.org/library/struct.html#format-strings
 
110
        """
 
111
        n_bytes = struct.calcsize(format)
 
112
        msg = chr(self.I2C_DEV) + chr(address)
 
113
        now = time()
 
114
        if self.last_poll+self.poll_delay > now:
 
115
            diff = now - self.last_poll
 
116
            sleep(self.poll_delay - diff)
 
117
        self.last_poll = time()
 
118
        self.brick.ls_write(self.port, msg, n_bytes)
 
119
        try:
 
120
            self._ls_get_status(n_bytes)
 
121
        finally:
 
122
            #we should clear the buffer no matter what happens
 
123
            data = self.brick.ls_read(self.port)
 
124
        if len(data) < n_bytes:
 
125
            raise I2CError, 'Read failure: Not enough bytes'
 
126
        data = struct.unpack(format, data[-n_bytes:])
 
127
        return data
 
128
        
 
129
    def read_value(self, name):
 
130
        """Reads a value from the sensor. Name must be a string found in
 
131
        self.I2C_ADDRESS dictionary. Entries in self.I2C_ADDRESS are in the
 
132
        name: (address, format) form, with format as in the struct module.
 
133
        Be careful on unpacking single variables - struct module puts them in
 
134
        tuples containing only one element.
 
135
        """
 
136
        address, fmt = self.I2C_ADDRESS[name]
 
137
        for n in range(3):
 
138
            try:
 
139
                return self._i2c_query(address, fmt)
 
140
            except DirProtError:
 
141
                pass
 
142
        raise I2CError, "read_value timeout"
 
143
 
 
144
    def write_value(self, name, value):
 
145
        """Writes value to the sensor. Name must be a string found in
 
146
        self.I2C_ADDRESS dictionary. Entries in self.I2C_ADDRESS are in the
 
147
        name: (address, format) form, with format as in the struct module.
 
148
        value is a tuple of values corresponding to the format from
 
149
        self.I2C_ADDRESS dictionary.
 
150
        """
 
151
        address, fmt = self.I2C_ADDRESS[name]
 
152
        self._i2c_command(address, value, fmt)
 
153
    
 
154
    def get_sensor_info(self):
 
155
        version = self.read_value('version')[0].split('\0')[0]
 
156
        product_id = self.read_value('product_id')[0].split('\0')[0]
 
157
        sensor_type = self.read_value('sensor_type')[0].split('\0')[0]
 
158
        return SensorInfo(version, product_id, sensor_type)
 
159
        
 
160
    @classmethod
 
161
    def add_compatible_sensor(cls, version, product_id, sensor_type):
 
162
        """Adds an entry in the compatibility table for the sensor. If version
 
163
        is None, then it's the default class for this model. If product_id is
 
164
        None, then this is the default class for this vendor.
 
165
        """
 
166
        try:
 
167
            cls.compatible_sensors
 
168
        except AttributeError:
 
169
            cls.compatible_sensors = []
 
170
        finally:
 
171
            cls.compatible_sensors.append(SCompatibility(version, product_id,
 
172
                                                                            sensor_type))
 
173
            add_mapping(cls, version, product_id, sensor_type)
 
174
            
 
175
            
 
176
class SCompatibility(SensorInfo):
 
177
    """An object that helps manage the sensor mappings"""
 
178
    def __eq__(self, other):
 
179
        if self.product_id is None:
 
180
            return self.product_id == other.product_id
 
181
        elif self.version is None:
 
182
            return (self.product_id == other.product_id and
 
183
                    self.sensor_type == other.sensor_type)
 
184
        else:
 
185
            return (self.version == other.version and
 
186
                    self.product_id == other.product_id and
 
187
                    self.sensor_type == other.sensor_type)
 
188
 
 
189
sensor_mappings = {}
 
190
 
 
191
 
 
192
def add_mapping(cls, version, product_id, sensor_type):
 
193
    "None means any other value"
 
194
    if product_id not in sensor_mappings:
 
195
        sensor_mappings[product_id] = {}
 
196
    models = sensor_mappings[product_id]
 
197
       
 
198
    if sensor_type is None:
 
199
        if sensor_type in models:
 
200
            raise ValueError('Already registered!')
 
201
        models[sensor_type] = cls
 
202
        return
 
203
 
 
204
    if sensor_type not in models:
 
205
        models[sensor_type] = {}
 
206
    versions = models[sensor_type]
 
207
    
 
208
    if version in versions:
 
209
        raise ValueError('Already registered!')
 
210
    else:
 
211
        versions[version] = cls
 
212
 
 
213
 
 
214
class SearchError(Exception):
 
215
    pass
 
216
 
 
217
 
 
218
def find_class(info):
 
219
    """Returns an appropriate class for the given SensorInfo"""
 
220
    dic = sensor_mappings
 
221
    for val, msg in zip((info.product_id, info.sensor_type, info.version),
 
222
                                    ('Vendor', 'Model', 'Version')):
 
223
        if val in dic:
 
224
            dic = dic[val]
 
225
        elif None in dic:
 
226
            dic = dic[None]
 
227
        else:
 
228
            raise SearchError(msg + ' not found')
 
229
        return dic[info.sensor_type][None]