~dongpo-deng/sahana-eden/test

« back to all changes in this revision

Viewing changes to modules/pygsm/devicewrapper.py

  • Committer: Deng Dongpo
  • Date: 2010-08-01 09:29:44 UTC
  • Revision ID: dongpo@dhcp-21193.iis.sinica.edu.tw-20100801092944-8t9obt4xtl7otesb
initial

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/env python
 
2
# vim: ai ts=4 sts=4 et sw=4 encoding=utf-8
 
3
 
 
4
# arch: pacman -S python-pyserial
 
5
# debian/ubuntu: apt-get install python-serial
 
6
import serial
 
7
import re
 
8
import errors
 
9
 
 
10
class DeviceWrapper(object):
 
11
    
 
12
    def __init__(self, logger, *args, **kwargs):
 
13
        self.device = serial.Serial(*args, **kwargs)
 
14
        self.logger = logger
 
15
 
 
16
    def isOpen(self):
 
17
        return self.device.isOpen()
 
18
 
 
19
    def close(self):
 
20
        self.device.close()
 
21
    
 
22
    def write(self, str):
 
23
        self.device.write(str)
 
24
            
 
25
    def _read(self, read_term=None, read_timeout=None):
 
26
        """Read from the modem (blocking) until _terminator_ is hit,
 
27
           (defaults to \r\n, which reads a single "line"), and return."""
 
28
        
 
29
        buffer = []
 
30
 
 
31
        # if a different timeout was requested just
 
32
        # for _this_ read, store and override the
 
33
        # current device setting (not thread safe!)
 
34
        if read_timeout is not None:
 
35
            old_timeout = self.device.timeout
 
36
            self.device.timeout = read_timeout
 
37
 
 
38
        def __reset_timeout():
 
39
            """restore the device's previous timeout
 
40
               setting, if we overrode it earlier."""
 
41
            if read_timeout is not None:
 
42
                self.device.timeout =\
 
43
                    old_timeout
 
44
 
 
45
        # the default terminator reads
 
46
        # until a newline is hit
 
47
        if read_term is None:
 
48
            read_term = "\r\n"
 
49
 
 
50
        while(True):
 
51
            buf = self.device.read()
 
52
            buffer.append(buf)            
 
53
            # if a timeout was hit, raise an exception including the raw data that
 
54
            # we've already read (in case the calling func was _expecting_ a timeout
 
55
            # (wouldn't it be nice if serial.Serial.read returned None for this?)
 
56
            if buf == '':
 
57
                __reset_timeout()
 
58
                raise(errors.GsmReadTimeoutError(buffer))
 
59
 
 
60
            # if last n characters of the buffer match the read
 
61
            # terminator, return what we've received so far
 
62
            if ''.join(buffer[-len(read_term):]) == read_term:
 
63
                buf_str = ''.join(buffer)
 
64
                __reset_timeout()
 
65
 
 
66
                self._log(repr(buf_str), 'read')
 
67
                return buf_str
 
68
 
 
69
 
 
70
    def read_lines(self, read_term=None, read_timeout=None):
 
71
        """Read from the modem (blocking) one line at a time until a response
 
72
           terminator ("OK", "ERROR", or "CMx ERROR...") is hit, then return
 
73
           a list containing the lines."""
 
74
        buffer = []
 
75
 
 
76
        # keep on looping until a command terminator
 
77
        # is encountered. these are NOT the same as the
 
78
        # "read_term" argument - only OK or ERROR is valid
 
79
        while(True):
 
80
            buf = self._read(
 
81
                read_term=read_term,
 
82
                read_timeout=read_timeout)
 
83
 
 
84
            buf = buf.strip()
 
85
            buffer.append(buf)
 
86
 
 
87
            # most commands return OK for success, but there
 
88
            # are some exceptions. we're not checking those
 
89
            # here (unlike RubyGSM), because they should be
 
90
            # handled when they're _expected_
 
91
            if buf == "OK":
 
92
                return buffer
 
93
 
 
94
            # some errors contain useful error codes, so raise a
 
95
            # proper error with a description from pygsm/errors.py
 
96
            m = re.match(r"^\+(CM[ES]) ERROR: (\d+)$", buf)
 
97
            if m is not None:
 
98
                type, code = m.groups()
 
99
                raise(errors.GsmModemError(type, int(code)))
 
100
 
 
101
            # ...some errors are not so useful
 
102
            # (at+cmee=1 should enable error codes)
 
103
            if buf == "ERROR":
 
104
                raise(errors.GsmModemError)
 
105
 
 
106
    def _log(self, str, type="debug"):
 
107
        if hasattr(self, "logger"):
 
108
            self.logger(self, str, type)    
 
 
b'\\ No newline at end of file'