2
# vim: ai ts=4 sts=4 et sw=4 encoding=utf-8
5
from __future__ import with_statement
15
# 'encoding', (max_normal, max_csm)
19
MAX_CSM_SEGMENTS = 255
21
# used to track csm reference numbers per receiver
23
__ref_lock = threading.Lock()
25
def get_outbound_pdus(text, recipient):
27
Returns a list of PDUs to send the provided
28
text to the given recipient.
30
If everything fits in one message, the list
31
will have just one PDU.
33
Otherwise is will be a list of Concatenated SM PDUs
35
If the message goes beyond the max length for a CSM
36
(it's gotta be _REALLY BIG_), this will raise a 'ValueError'
40
# first figure out the encoding
41
# if 'gsm', encode it to account for
42
# multi-byte char length
45
encoded_text = text.encode('gsm')
50
csm_max = MSG_LIMITS[encoding][1]
51
if len(encoded_text)>(MAX_CSM_SEGMENTS*csm_max):
52
raise ValueError('Message text too long')
54
# see if we are under the single PDU limit
55
if len(encoded_text)<=MSG_LIMITS[encoding][0]:
56
return [OutboundGsmPdu(text, recipient)]
58
# ok, we are a CSM, so lets figure out
63
if recipient not in __csm_refs:
64
__csm_refs[recipient]=0
65
csm_ref = __csm_refs[recipient] % 256
66
__csm_refs[recipient]+=1
69
num = int(math.ceil(len(encoded_text)/float(MSG_LIMITS[encoding][0])))
71
for seq in range(num):
73
seg_txt = encoded_text[i:i+csm_max]
75
# a little silly to encode, decode, then have PDU
76
# re-encode but keeps PDU API clean
77
seg_txt = seg_txt.decode('gsm')
91
class SmsParseException(Exception):
94
class SmsEncodeException(Exception):
101
self.csm_total = None
105
self.pdu_string = None
110
Return a useful multiline rep of self
113
header='Addressee: %s\nLength: %s\nSent %s' % \
114
(self.address, len(self.text), self.sent_ts)
117
csm_info='\nCSM: %d of %d for Ref# %d' % (self.csm_seq, self.csm_total,self.csm_ref)
118
return '%s%s\nMessage: \n%s\nPDU: %s' % (header, csm_info,self.text,self.pdu_string)
121
class OutboundGsmPdu(GsmPdu):
123
Formatted outbound PDU. Basically just
126
Don't instantiate directly! Use 'get_outbound_pdus()'
127
which will return a list of PDUs needed to
132
def __init__(self, text, recipient, csm_ref=None, csm_seq=None, csm_total=None):
133
GsmPdu.__init__(self)
135
self.address = recipient
137
self.gsm_text = None # if we are gsm, put the gsm encoded str here
138
self.is_csm = csm_ref is not None
139
self.csm_ref = ( None if csm_ref is None else int(csm_ref) )
140
self.csm_seq = ( None if csm_seq is None else int(csm_seq) )
141
self.csm_total = ( None if csm_total is None else int(csm_total) )
144
# following does two things:
145
# 1. Raises exception if text cannot be encoded GSM
146
# 2. measures the number of chars after encoding
147
# since GSM is partially multi-byte, a string
148
# in GSM can be longer than the obvious num of chars
149
# e.g. 'hello' is 5 but 'hello^' is _7_
150
self.gsm_text=self.text.encode('gsm')
151
num_chars=len(self.gsm_text)
153
num_chars=len(self.text)
156
max = MSG_LIMITS[self.encoding][1]
158
max = MSG_LIMITS[self.encoding][0]
161
raise SmsEncodeException('Text length too great')
165
return ( 'gsm' if self.is_gsm else 'ucs2' )
169
return self.gsm_text is not None
173
return not self.is_gsm
175
def __get_pdu_string(self):
176
# now put the PDU string together
177
# first octet is SMSC info, 00 means get from stored on SIM
179
# Next is 'SMS-SUBMIT First Octet' -- '11' means submit w/validity.
180
# '51' means Concatendated SM w/validity
181
pdu.append('51' if self.is_csm else '11')
182
# Next is 'message' reference. '00' means phone can set this
184
# now recipient number, first type
185
if self.address[0]=='+':
186
num = self.address[1:]
187
type = '91' # international
190
type = 'A8' # national number
195
num = _twiddle(num, False)
196
pdu.append('%02X' % num_len) # length
204
pdu.append('00' if self.is_gsm else '08')
206
# validity period, just default to 4 days
209
# Now the fun! Make the user data (the text message)
211
# 1. If we are a CSM, need the CSM header
212
# 2. If we are a CSM and GSM, need to pad the data
216
# data header always starts the same:
217
# length: 5 octets '05'
219
# length of CSM info, 3 octets '03'
220
udh='050003%02X%02X%02X' % (self.csm_ref, self.csm_total, self.csm_seq)
223
# padding is number of pits to pad-out beyond
224
# the header to make everything land on a '7-bit'
225
# boundary rather than 8-bit.
226
# Can calculate as 7 - (UDH*8 % 7), but the UDH
227
# is always 48, so padding is always 1
230
# now encode contents
232
_pack_septets(self.gsm_text, padding=padding)
234
else self.text.encode('utf_16_be')
236
encoded_sm = encoded_sm.encode('hex').upper()
238
# and get the data length which is in septets
239
# if GSM, and octets otherwise
241
# just take length of encoded gsm text
242
# as each char becomes a septet when encoded
243
udl = len(self.gsm_text)
245
udl+=7 # header is always 7 septets (inc. padding)
247
# in this case just the byte length of content + header
248
udl = (len(encoded_sm)+len(udh))/2
250
# now add it all to the pdu
251
pdu.append('%02X' % udl)
253
pdu.append(encoded_sm)
256
def __set_pdu_string(self, val):
258
pdu_string=property(__get_pdu_string, __set_pdu_string)
260
class ReceivedGsmPdu(GsmPdu):
262
A nice little class to parse a PDU and give you useful
265
Maybe one day it will let you set text and sender info and
266
ask it to write itself out as a PDU!
269
def __init__(self, pdu_str):
270
GsmPdu.__init__(self)
272
# hear are the properties that are set below in the
275
self.tp_mms = False # more messages to send
276
self.tp_sri = False # status report indication
277
self.address = None # phone number of sender as string
278
self.sent_ts = None # Datetime of when SMSC stamped the message, roughly when sent
279
self.text = None # string of message contents
280
self.pdu_string = pdu_str.upper() # original data as a string
281
self.is_csm = False # is this one of a sequence of concatenated messages?
282
self.csm_ref = 0 # reference number
283
self.csm_seq = 0 # this chunks sequence num, 1-based
284
self.csm_total = 0 # number of chunks total
285
self.encoding = None # either 'gsm' or 'ucs2'
291
This is truly hideous, just don't look below this line!
293
It's times like this that I miss closed-compiled source...
297
def __parse_pdu(self):
298
pdu=self.pdu_string # make copy
300
# grab smsc header, and throw away
301
# length is held in first octet
302
smsc_len,pdu=_consume_one_int(pdu)
304
# consume smsc header
305
c,pdu=_consume(pdu, smsc_len)
307
# grab the deliver octect
308
deliver_attrs,pdu=_consume_one_int(pdu)
310
if deliver_attrs & 0x03 != 0:
311
raise SmsParseException("Not a SMS-DELIVER, we ignore")
313
self.tp_mms=deliver_attrs & 0x04 # more messages to send
314
self.tp_sri=deliver_attrs & 0x20 # Status report indication
315
tp_udhi=deliver_attrs & 0x40 # There is a user data header in the user data portion
316
# get the sender number.
317
# First the length which is given in 'nibbles' (half octets)
318
# so divide by 2 and round up for odd
319
sender_dec_len,pdu=_consume_one_int(pdu)
320
sender_len=int(math.ceil(sender_dec_len/2.0))
322
# next is sender id type
323
sender_type,pdu=_consume(pdu,1)
325
# now the number itself, (unparsed)
326
num,pdu=_consume(pdu,sender_len)
328
# now parse the number
329
self.address=_parse_phone_num(sender_type,num)
331
# now the protocol id
332
# we only understand SMS (0)
333
tp_pid,pdu=_consume_one_int(pdu)
336
raise SmsParseException("Not SMS protocol, bailing")
338
# get and interpet DCS (char encoding info)
339
self.encoding,pdu=_consume(pdu,1,_read_dcs)
340
if self.encoding not in ['gsm','ucs2']:
341
raise SmsParseException("Don't understand short message encoding")
343
#get and interpret timestamp
344
self.sent_ts,pdu=_consume(pdu,7,_read_ts)
346
# ok, how long is ud?
347
# note, if encoding is GSM this is num 7-bit septets
348
# if ucs2, it's num bytes
349
udl,pdu=_consume_one_int(pdu)
351
# Now to deal with the User Data header!
353
# yup, we got one, probably part of a 'concatenated short message',
354
# what happens when you type too much text and your phone sends
357
# in fact this is the _only_ case we care about
359
# get the header length
360
udhl,pdu=_consume_decimal(pdu)
362
# now loop through consuming the header
363
# and looking to see if we are a csm
366
# get info about the element
367
ie_type,pdu=_consume_one_int(pdu)
368
ie_l,pdu=_consume_decimal(pdu)
369
ie_d,pdu=_consume(pdu,ie_l)
370
i+=(ie_l+2) # move index up for all bytes read
374
(ref,self.csm_total,self.csm_seq),r=_consume_bytes(ie_d,3)
375
self.csm_ref=ref % 256 # the definition is 'modulo 256'
376
# ok, done with header
378
# now see if we are gsm, in which case we need to unpack bits
379
if self.encoding=='gsm':
380
# if we had a data header, we need to figure out padding
382
# num septets * 7 bits minus
383
# 8 * header length (+1 for length indicator octet)
384
# mod'd by 7 to git the number of leftover padding bits
385
padding=((7*udl) - (8*(udhl+1))) % 7
391
self.text=_unpack_septets(pdu, padding).decode('gsm')
392
except Exception, ex:
393
# we have bogus data! But don't die
394
# as we are used deeply embedded
395
raise SmsParseException('GSM encoded data is invalid')
398
# we are just good old UCS2
399
# problem is, we don't necessarily know the byte order
400
# some phones include it, some--including some
401
# popular Nokia's _don't_, in which case it
402
# seems they use big-endian...
406
if bom==codecs.BOM_UTF16_LE.encode('hex'):
407
decoded_text=pdu[4:].decode('hex').decode('utf_16_le')
409
decoded_text=pdu.decode('hex').decode('utf_16_be')
410
self.text=decoded_text
411
# some phones add a leading <cr> so strip it
412
self.text=self.text.strip()
415
# And all the ugly helper functions
419
# make an int for masking
422
# for an SMS, as opposed to a 'voice mail waiting'
423
# indicator, first 4-bits must be zero
428
dcs &= 0x0c # mask off everything but bits 3&2
434
# not a type we know about, but should never get here
438
"""Convert slot to Byte boundary"""
441
def _consume(seq, num,func=None):
443
Consume the num of BYTES
445
return a tuple of (consumed,remainder)
447
func -- a function to call on the consumed. Result in tuple[0]
457
def _consume_decimal(seq):
458
"""read 2 chars as a decimal"""
459
return (int(seq[0:2],10),seq[2:])
461
def _consume_one_int(seq):
463
Consumes one byte and returns int and remainder
464
(int, remainder_of_seq)
468
ints,remainder = _consume_bytes(seq,1)
469
return (ints[0],remainder)
471
def _consume_bytes(seq,num=1):
473
consumes bytes for num ints (e.g. 2-chars per byte)
474
coverts to int, returns tuple of ([byte...], remainder)
479
for i in range(0,_B(num),2):
480
bytes.append(int(seq[i:i+2],16))
482
return (bytes,seq[_B(num):])
484
def _twiddle(seq, decode=True):
485
seq=seq.upper() # just in case
487
for i in range(0,len(seq)-1,2):
488
result.extend((seq[i+1],seq[i]))
490
if len(result)<len(seq) and not decode:
491
# encoding odd length
492
result.extend(('F',seq[-1]))
493
elif decode and result[-1:][0]=='F':
497
return ''.join(result)
499
def _parse_phone_num(num_type,seq):
502
return _unpack_septets(seq).decode('gsm')
504
# sender number is encoded in DECIMAL with each octect swapped, and
505
# padded to even length with F
506
# so 1 415 555 1212 is: 41 51 55 15 12 f2
512
return '%s%s' % (intl_code,num)
514
def _chop(seq,how_much):
515
"""chops the number of octets given off front of seq"""
516
return seq[_B(how_much):]
518
TS_MATCHER=re.compile(r'^(..)(..)(..)(..)(..)(..)(..)$')
524
m = TS_MATCHER.match(ts)
526
print "TS not valid: %s" % ts
527
return datetime.datetime.utcnow().replace(tzinfo=pytz.utc)
529
yr,mo,dy,hr,mi,se=[int(g) for g in m.groups()[:-1]]
531
# handle time-zone separately to deal with
532
# the MSB bit for negative
533
tz = int(m.groups()[-1],16)
538
# now convert BACK to dec rep,
539
# I know, ridiculous, but that's
541
tz = int('%02X' % tz)
544
tz_offset = -tz_offset
545
tz_delta = datetime.timedelta(hours=tz_offset)
547
# year is 2 digit! Yeah! Y2K problem again!!
553
# python sucks with timezones,
554
# so create UTC not using this offset
557
# parse TS and adjust for TZ to get into UTC
558
dt = datetime.datetime(yr,mo,dy,hr,mi,se, tzinfo=pytz.utc) - tz_delta
559
except ValueError, ex:
560
# Timestamp was bogus, set it to UTC now
561
dt = datetime.datetime.utcnow().replace(tzinfo=pytz.utc)
568
s = ("%1d" % (n & 1)) + s
572
def _unpack_septets(seq,padding=0):
574
this function taken from:
575
http://offog.org/darcs/misccode/desms.py
577
Thank you Adam Sampson <ats@offog.org>!
580
# Unpack 7-bit characters
581
msgbytes,r = _consume_bytes(seq,len(seq)/2)
583
asbinary = ''.join(map(_to_binary, msgbytes))
585
asbinary = asbinary[:-padding]
587
while len(asbinary) >= 7:
588
chars.append(int(asbinary[-7:], 2))
589
asbinary = asbinary[:-7]
590
return "".join(map(chr, chars))
592
def _pack_septets(str, padding=0):
593
bytes=[ord(c) for c in str]
595
asbinary = ''.join([_to_binary(b)[1:] for b in bytes])
597
for i in range(padding):
600
# zero extend last octet if needed
601
extra = len(asbinary) % 8
603
for i in range(8-extra):
604
asbinary='0'+asbinary
606
# convert back to bytes
608
for i in range(0,len(asbinary),8):
609
bytes.append(int(asbinary[i:i+8],2))
611
return ''.join([chr(b) for b in bytes])
613
if __name__ == "__main__":
614
# poor man's unit tests
617
"07912180958729F6040B814151733717F500009011709055902B0148",
618
"07912180958729F6400B814151733717F500009070208044148AA0050003160201986FF719C47EBBCF20F6DB7D06B1DFEE3388FD769F41ECB7FB0C62BFDD6710FBED3E83D8ECB73B0D62BFDD67109BFD76A741613719C47EBBCF20F6DB7D06BCF61BC466BF41ECF719C47EBBCF20F6D",
619
"07912180958729F6440B814151733717F500009070207095828AA00500030E0201986FF719C47EBBCF20F6DB7D06B1DFEE3388FD769F41ECB7FB0C62BFDD6710FBED3E83D8ECB7",
620
"07912180958729F6040B814151733717F500009070103281418A09D93728FFDE940303",
621
"07912180958729F6040B814151733717F500009070102230438A02D937",
622
"0791227167830001040C912271271640910008906012024514001C002E004020AC00A300680065006C006C006F002000E900EC006B00F0",
623
"07917283010010F5040BC87238880900F10000993092516195800AE8329BFD4697D9EC37",
624
"0791448720900253040C914497035290960000500151614414400DD4F29C9E769F41E17338ED06",
625
"0791448720003023440C91449703529096000050015132532240A00500037A020190E9339A9D3EA3E920FA1B1466B341E472193E079DD3EE73D85DA7EB41E7B41C1407C1CBF43228CC26E3416137390F3AABCFEAB3FAAC3EABCFEAB3FAAC3EABCFEAB3FAAC3EABCFEAB3FADC3EB7CFED73FBDC3EBF5D4416D9457411596457137D87B7E16438194E86BBCF6D16D9055D429548A28BE822BA882E6370196C2A8950E291E822BA88",
626
"0791448720003023440C91449703529096000050015132537240310500037A02025C4417D1D52422894EE5B17824BA8EC423F1483C129BC725315464118FCDE011247C4A8B44",
627
"07914477790706520414D06176198F0EE361F2321900005001610013334014C324350B9287D12079180D92A3416134480E",
628
"0791448720003023440C91449703529096000050016121855140A005000301060190F5F31C447F83C8E5327CEE0221EBE73988FE0691CB65F8DC05028190F5F31C447F83C8E5327CEE028140C8FA790EA2BF41E472193E7781402064FD3C07D1DF2072B90C9FBB402010B27E9E83E86F10B95C86CF5D2064FD3C07D1DF2072B90C9FBB40C8FA790EA2BF41E472193E7781402064FD3C07D1DF2072B90C9FBB402010B27E9E83E8",
629
"0791448720003023440C91449703529096000050016121850240A0050003010602DE2072B90C9FBB402010B27E9E83E86F10B95C86CF5D201008593FCF41F437885C2EC3E72E100884AC9FE720FA1B442E97E1731708593FCF41F437885C2EC3E72E100884AC9FE720FA1B442E97E17317080442D6CF7310FD0D2297CBF0B90B040221EBE73988FE0691CB65F8DC05028190F5F31C447F83C8E5327CEE028140C8FA790EA2BF41",
630
"0791448720003023440C91449703529096000050016121854240A0050003010603C8E5327CEE0221EBE73988FE0691CB65F8DC05028190F5F31C447F83C8E5327CEE028140C8FA790EA2BF41E472193E7781402064FD3C07D1DF2072B90C9FBB402010B27E9E83E86F10B95C86CF5D201008593FCF41F437885C2EC3E72E10B27E9E83E86F10B95C86CF5D201008593FCF41F437885C2EC3E72E100884AC9FE720FA1B442E97E1",
631
"0791448720003023400C91449703529096000050016121853340A005000301060540C8FA790EA2BF41E472193E7781402064FD3C07D1DF2072B90C9FBB402010B27E9E83E86F10B95C86CF5D201008593FCF41F437885C2EC3E72E100884AC9FE720FA1B442E97E17317080442D6CF7310FD0D2297CBF0B90B84AC9FE720FA1B442E97E17317080442D6CF7310FD0D2297CBF0B90B040221EBE73988FE0691CB65F8DC05028190",
632
"0791448720003023440C914497035290960000500161218563402A050003010606EAE73988FE0691CB65F8DC05028190F5F31C447F83C8E5327CEE0281402010",
637
p.dump() for p in get_outbound_pdus(
638
u'\u5c71hellohello hellohello hellohello hellohello hellohello hellohello hellohello hellohello hellohello hellohello hellohello hellohello hellohello hellohello hellohello hellohello',
645
print '\n-------- Received ----------\nPDU: %s\n' % p
646
rp = ReceivedGsmPdu(p)
648
op = get_outbound_pdus(rp.text, rp.address)[0]
649
print '\nOut ------> \n'
651
print '-----------------------------'