1
# -*- test-case-name: twisted.test.test_nmea -*-
2
# Copyright (c) 2001-2009 Twisted Matrix Laboratories.
3
# See LICENSE for details.
5
"""NMEA 0183 implementation
7
Maintainer: Bob Ippolito
9
The following NMEA 0183 sentences are currently understood::
12
GPRMC (position and time)
13
GPGSA (active satellites)
15
The following NMEA 0183 sentences require implementation::
16
None really, the others aren't generally useful or implemented in most devices anyhow
18
Other desired features::
19
- A NMEA 0183 producer to emulate GPS devices (?)
23
from twisted.protocols import basic
24
from twisted.python.compat import reduce
26
POSFIX_INVALID, POSFIX_SPS, POSFIX_DGPS, POSFIX_PPS = 0, 1, 2, 3
27
MODE_AUTO, MODE_FORCED = 'A', 'M'
28
MODE_NOFIX, MODE_2D, MODE_3D = 1, 2, 3
30
class InvalidSentence(Exception):
33
class InvalidChecksum(Exception):
36
class NMEAReceiver(basic.LineReceiver):
37
"""This parses most common NMEA-0183 messages, presumably from a serial GPS device at 4800 bps
43
'GPGSA': 'activesatellites',
44
'GPRMC': 'positiontime',
45
'GPGSV': 'viewsatellites', # not implemented
46
'GPVTG': 'course', # not implemented
47
'GPALM': 'almanac', # not implemented
48
'GPGRS': 'range', # not implemented
49
'GPGST': 'noise', # not implemented
50
'GPMSS': 'beacon', # not implemented
51
'GPZDA': 'time', # not implemented
53
# generally you may miss the beginning of the first message
54
ignore_invalid_sentence = 1
55
# checksums shouldn't be invalid
56
ignore_checksum_mismatch = 0
57
# ignore unknown sentence types
58
ignore_unknown_sentencetypes = 0
59
# do we want to even bother checking to see if it's from the 20th century?
60
convert_dates_before_y2k = 1
62
def lineReceived(self, line):
63
if not line.startswith('$'):
64
if self.ignore_invalid_sentence:
66
raise InvalidSentence("%r does not begin with $" % (line,))
67
# message is everything between $ and *, checksum is xor of all ASCII values of the message
68
strmessage, checksum = line[1:].strip().split('*')
69
message = strmessage.split(',')
70
sentencetype, message = message[0], message[1:]
71
dispatch = self.dispatch.get(sentencetype, None)
72
if (not dispatch) and (not self.ignore_unknown_sentencetypes):
73
raise InvalidSentence("sentencetype %r" % (sentencetype,))
74
if not self.ignore_checksum_mismatch:
75
checksum, calculated_checksum = int(checksum, 16), reduce(operator.xor, map(ord, strmessage))
76
if checksum != calculated_checksum:
77
raise InvalidChecksum("Given 0x%02X != 0x%02X" % (checksum, calculated_checksum))
78
handler = getattr(self, "handle_%s" % dispatch, None)
79
decoder = getattr(self, "decode_%s" % dispatch, None)
80
if not (dispatch and handler and decoder):
81
# missing dispatch, handler, or decoder
83
# return handler(*decoder(*message))
85
decoded = decoder(*message)
87
raise InvalidSentence("%r is not a valid %s (%s) sentence" % (line, sentencetype, dispatch))
88
return handler(*decoded)
90
def decode_position(self, latitude, ns, longitude, ew, utc, status):
91
latitude, longitude = self._decode_latlon(latitude, ns, longitude, ew)
92
utc = self._decode_utc(utc)
104
def decode_positiontime(self, utc, status, latitude, ns, longitude, ew, speed, course, utcdate, magvar, magdir):
105
utc = self._decode_utc(utc)
106
latitude, longitude = self._decode_latlon(latitude, ns, longitude, ew)
112
course = float(course)
115
utcdate = 2000+int(utcdate[4:6]), int(utcdate[2:4]), int(utcdate[0:2])
116
if self.convert_dates_before_y2k and utcdate[0] > 2073:
117
# GPS was invented by the US DoD in 1973, but NMEA uses 2 digit year.
118
# Highly unlikely that we'll be using NMEA or this twisted module in 70 years,
119
# but remotely possible that you'll be using it to play back data from the 20th century.
120
utcdate = (utcdate[0] - 100, utcdate[1], utcdate[2])
122
magvar = float(magvar)
132
# UTC seconds past utcdate
134
# UTC (year, month, day)
136
# None or magnetic variation in degrees (west is negative)
140
def _decode_utc(self, utc):
141
utc_hh, utc_mm, utc_ss = map(float, (utc[:2], utc[2:4], utc[4:]))
142
return utc_hh * 3600.0 + utc_mm * 60.0 + utc_ss
144
def _decode_latlon(self, latitude, ns, longitude, ew):
145
latitude = float(latitude[:2]) + float(latitude[2:])/60.0
148
longitude = float(longitude[:3]) + float(longitude[3:])/60.0
150
longitude = -longitude
151
return (latitude, longitude)
153
def decode_activesatellites(self, mode1, mode2, *args):
154
satellites, (pdop, hdop, vdop) = args[:12], map(float, args[12:])
158
satlist.append(int(n))
161
mode = (mode1, int(mode2))
163
# satellite list by channel
165
# (MODE_AUTO/MODE_FORCED, MODE_NOFIX/MODE_2DFIX/MODE_3DFIX)
167
# position dilution of precision
169
# horizontal dilution of precision
171
# vertical dilution of precision
175
def decode_fix(self, utc, latitude, ns, longitude, ew, posfix, satellites, hdop, altitude, altitude_units, geoid_separation, geoid_separation_units, dgps_age, dgps_station_id):
176
latitude, longitude = self._decode_latlon(latitude, ns, longitude, ew)
177
utc = self._decode_utc(utc)
179
satellites = int(satellites)
181
altitude = (float(altitude), altitude_units)
182
if geoid_separation != '':
183
geoid = (float(geoid_separation), geoid_separation_units)
187
dgps = (float(dgps_age), dgps_station_id)
191
# seconds since 00:00 UTC
195
# longitude (degrees)
197
# position fix status (POSFIX_INVALID, POSFIX_SPS, POSFIX_DGPS, POSFIX_PPS)
199
# number of satellites used for fix 0 <= satellites <= 12
201
# horizontal dilution of precision
203
# None or (altitude according to WGS-84 ellipsoid, units (typically 'M' for meters))
205
# None or (geoid separation according to WGS-84 ellipsoid, units (typically 'M' for meters))
207
# (age of dgps data in seconds, dgps station id)