2
# -*- coding: latin-1 -*-
3
# -----------------------------------------------------------------------------
4
# Copyright 2010-2013 Stephen Tiedemann <stephen.tiedemann@gmail.com>
6
# Licensed under the EUPL, Version 1.1 or - as soon they
7
# will be approved by the European Commission - subsequent
8
# versions of the EUPL (the "Licence");
9
# You may not use this work except in compliance with the
11
# You may obtain a copy of the Licence at:
13
# http://www.osor.eu/eupl
15
# Unless required by applicable law or agreed to in
16
# writing, software distributed under the Licence is
17
# distributed on an "AS IS" basis,
18
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
20
# See the Licence for the specific language governing
21
# permissions and limitations under the Licence.
22
# -----------------------------------------------------------------------------
25
log = logging.getLogger('main')
36
from threading import Thread, Lock
38
sys.path.insert(1, os.path.split(sys.path[0])[0])
39
from cli import CommandLineInterface, TestError
45
def info(message, prefix=" "):
46
log.info(prefix + message)
48
class PhdcAgent(Thread):
50
super(PhdcAgent, self).__init__()
51
self.oqueue = queue.Queue()
52
self.iqueue = queue.Queue()
54
def enqueue(self, apdu):
55
if apdu is None or len(apdu) > 0:
58
def dequeue(self, timeout):
60
apdu = self.oqueue.get(block=True, timeout=timeout)
66
log.info("[ieee] >>> {0}".format(str(apdu).encode("hex")))
69
def recv(self, timeout):
71
apdu = self.iqueue.get(block=True, timeout=timeout)
75
log.info("[ieee] <<< {0}".format(str(apdu).encode("hex")))
78
class PhdcTagAgent(PhdcAgent):
79
def __init__(self, tag, cmd, apdu=bytearray(), flags='\x00'):
80
super(PhdcTagAgent, self).__init__()
81
self.terminate = False
83
attr = nfc.tag.tt3.NdefAttributeData()
85
attr.nbr, attr.nbw = 12, 8
88
attr.length = 7 + len(apdu)
90
phd_rec = nfc.ndef.Record("urn:nfc:wkt:PHD", data=flags + apdu)
91
phd_msg = nfc.ndef.Message(phd_rec)
93
self.ndef_data_area = str(attr) + bytearray(attr.capacity)
94
self.ndef_data_area[16:16+7+len(apdu)] = bytearray(str(phd_msg))
96
tag.add_service(0x0009, self.ndef_read, self.ndef_write)
97
tag.add_service(0x000B, self.ndef_read, lambda: False)
101
self.ndef_read_lock = Lock()
102
self.ndef_write_lock = Lock()
104
def ndef_read(self, block, read_begin, read_end):
105
if read_begin is True:
106
self.ndef_read_lock.acquire()
108
if block < len(self.ndef_data_area) / 16:
109
data = self.ndef_data_area[block*16:(block+1)*16]
110
log.debug("[tt3] got read block #{0} {1}".format(
111
block, str(data).encode("hex")))
114
log.debug("[tt3] got read block #{0}".format(block))
117
self.ndef_read_lock.release()
119
def ndef_write(self, block, data, write_begin, write_end):
120
if write_begin is True:
121
self.ndef_write_lock.acquire()
123
log.debug("[tt3] got write block #{0} {1}".format(
124
block, str(data).encode("hex")))
125
if block < len(self.ndef_data_area) / 16:
126
self.ndef_data_area[block*16:(block+1)*16] = data
129
if write_end is True:
130
self.ndef_write_lock.release()
131
apdu = self.recv_phd_message()
134
Thread(target=self.send_phd_message).start()
136
def recv_phd_message(self):
137
attr = nfc.tag.tt3.NdefAttributeData(self.ndef_data_area[0:16])
138
if attr.valid and not attr.writing and attr.length > 0:
139
#print str(self.ndef_data_area[16:16+attr.length]).encode("hex")
141
message = nfc.ndef.Message(
142
self.ndef_data_area[16:16+attr.length])
143
except nfc.ndef.LengthError:
146
if message.type == "urn:nfc:wkt:PHD":
147
data = bytearray(message[0].data)
148
if data[0] & 0x8F == 0x80 | (self.mc % 16):
149
log.info("[phdc] <<< " + str(data).encode("hex"))
152
self.ndef_data_area[0:16] = bytearray(str(attr))
155
def send_phd_message(self):
156
apdu = self.dequeue(timeout=0.1)
157
data = bytearray([0x80 | (self.mc % 16)]) + apdu
158
record = nfc.ndef.Record("urn:nfc:wkt:PHD", data=str(data))
159
with self.ndef_read_lock:
160
if not self.terminate:
161
log.info("[phdc] >>> " + str(data).encode("hex"))
162
data = bytearray(str(nfc.ndef.Message(record)))
163
attr = nfc.tag.tt3.NdefAttributeData(self.ndef_data_area[0:16])
164
attr.length = len(data)
165
self.ndef_data_area[0:16+attr.length] = str(attr) + data
169
log.info("entering phdc agent run loop")
170
command, self.cmd = self.cmd, None
171
while not (command is None or self.terminate is True):
172
response = self.tag.process_command(command)
174
command = self.tag.send_response(response, timeout=1)
175
except nfc.clf.TimeoutError:
176
log.info("no command received within 1 second")
178
except nfc.clf.TransmissionError:
180
log.info("leaving phdc agent run loop")
183
self.terminate = True
184
self.join(timeout=10.0)
186
thermometer_assoc_req = \
187
"E200 0032 8000 0000" \
188
"0001 002A 5079 0026" \
189
"8000 0000 8000 8000" \
190
"0000 0000 0000 0080" \
191
"0000 0008 3132 3334" \
192
"3536 3738 0320 0001" \
195
thermometer_assoc_res = \
196
"E300 002C 0003 5079" \
197
"0026 8000 0000 8000" \
198
"8000 0000 0000 0000" \
199
"8000 0000 0008 3837" \
200
"3635 3433 3231 0000" \
201
"0000 0000 0000 0000" \
203
assoc_release_req = "E40000020000"
204
assoc_release_res = "E50000020000"
206
phdc_tag_agent_description = """
207
Execute some Personal Health Device Communication (PHDC) tests running
208
as a Tag Agent. The reader device must have the PHDC validation R/W
209
Mode Test Manager running.
211
class PhdcTagAgentTest(CommandLineInterface):
213
parser = argparse.ArgumentParser(
214
usage='%(prog)s [OPTION]...',
215
formatter_class=argparse.RawDescriptionHelpFormatter,
216
description=phdc_tag_agent_description)
217
super(PhdcTagAgentTest, self).__init__(
218
parser, groups="test card dbg clf")
220
def on_card_startup(self, clf, targets):
221
idm = bytearray.fromhex("02FE") + os.urandom(6)
222
pmm = bytearray.fromhex("01E0000000FFFF00")
223
sys = bytearray.fromhex("12FC")
224
return [nfc.clf.TTF(br=212, idm=idm, pmm=pmm, sys=sys)]
226
def test_00(self, tag, command):
227
"""Send data read from scenario file"""
229
agent = PhdcTagAgent(tag, command)
231
info("entering ieee agent")
234
with open("scenario.txt") as f:
236
if line.startswith('#'):
238
apdu = bytearray.fromhex(line.strip())
240
apdu = agent.recv(timeout=5.0)
242
raise TestError("no data received")
247
info("leaving ieee agent")
252
def test_01(self, tag, command):
253
"""Discovery, association and release"""
255
agent = PhdcTagAgent(tag, command)
257
info("entering ieee agent")
259
apdu = bytearray.fromhex(thermometer_assoc_req)
260
info("send thermometer association request")
263
apdu = agent.recv(timeout=5.0)
265
raise TestError("no data received")
267
if apdu.startswith("\xE3\x00"):
268
info("rcvd association response")
272
apdu = bytearray.fromhex(assoc_release_req)
273
info("send association release request")
276
apdu = agent.recv(timeout=5.0)
278
raise TestError("no data received")
280
if apdu.startswith("\xE5\x00"):
281
info("rcvd association release response")
283
info("leaving ieee agent")
288
def test_02(self, tag, command):
289
"""Association after release"""
291
agent = PhdcTagAgent(tag, command)
293
info("entering ieee agent")
295
apdu = bytearray.fromhex(thermometer_assoc_req)
296
info("send thermometer association request")
299
apdu = agent.recv(timeout=5.0)
301
raise TestError("no data received")
302
if apdu.startswith("\xE3\x00"):
303
info("rcvd association response")
305
apdu = bytearray.fromhex(assoc_release_req)
306
info("send association release request")
309
apdu = agent.recv(timeout=5.0)
311
raise TestError("no data received")
312
if apdu.startswith("\xE5\x00"):
313
info("rcvd association release response")
315
info("leaving ieee agent")
319
info("entering ieee agent")
321
apdu = bytearray.fromhex(thermometer_assoc_req)
322
info("send thermometer association request")
325
apdu = agent.recv(timeout=5.0)
327
raise TestError("no data received")
328
if apdu.startswith("\xE3\x00"):
329
info("rcvd association response")
332
info("now move devices out of communication range")
334
info("leaving ieee agent")
339
def test_03(self, tag, command):
340
"""Activation with invalid settings"""
342
agent = PhdcTagAgent(tag, command, flags='\x02')
343
info("sending with non-zero message counter")
348
def test_04(self, tag, command):
349
"""Activation with invalid RFU value"""
351
agent = PhdcTagAgent(tag, command, flags='\x40')
352
info("sending with non-zero reserved field")
355
info("entering ieee agent")
357
info("leaving ieee agent")
361
phdc_p2p_agent_description = """
362
Execute some Personal Health Device Communication (PHDC) tests. The
363
peer device must have the PHDC validation test manager running.
365
class PhdcP2pAgentTest(CommandLineInterface):
367
parser = argparse.ArgumentParser(
368
usage='%(prog)s [OPTION]...',
369
formatter_class=argparse.RawDescriptionHelpFormatter,
370
description=phdc_p2p_agent_description)
371
super(PhdcP2pAgentTest, self).__init__(
372
parser, groups="test llcp dbg clf")
374
def test_00(self, llc):
375
"""Send data read from scenario file"""
377
socket = nfc.llcp.Socket(llc, nfc.llcp.DATA_LINK_CONNECTION)
378
socket.setsockopt(nfc.llcp.SO_RCVBUF, 2)
379
socket.connect("urn:nfc:sn:phdc")
380
peer_sap = socket.getpeername()
381
log.info("connected with phdc manager at sap {0}".format(peer_sap))
382
log.info("entering ieee agent")
385
with open("scenario.txt") as f:
387
if line.startswith('#'):
390
apdu = bytearray.fromhex(line)
391
apdu = struct.pack(">H", len(apdu)) + apdu
392
log.info("send {0}".format(str(apdu).encode("hex")))
393
socket.send(str(apdu))
396
log.info("rcvd {0}".format(str(apdu).encode("hex")))
400
log.info("leaving ieee agent")
403
def test_01(self, llc):
404
"""Connect, associate and release"""
406
socket = nfc.llcp.Socket(llc, nfc.llcp.DATA_LINK_CONNECTION)
407
socket.setsockopt(nfc.llcp.SO_RCVBUF, 2)
408
service_name = "urn:nfc:sn:phdc"
410
socket.connect(service_name)
411
except nfc.llcp.ConnectRefused:
412
raise TestError("could not connect to {0!r}".format(service_name))
414
peer_sap = socket.getpeername()
415
info("connected with phdc manager at sap {0}".format(peer_sap))
416
info("entering ieee agent")
418
apdu = bytearray.fromhex(thermometer_assoc_req)
419
apdu = struct.pack(">H", len(apdu)) + apdu
420
info("send thermometer association request")
421
info("send {0}".format(str(apdu).encode("hex")))
422
socket.send(str(apdu))
425
info("rcvd {0}".format(str(apdu).encode("hex")))
426
if apdu.startswith("\xE3\x00"):
427
info("rcvd association response")
431
apdu = bytearray.fromhex(assoc_release_req)
432
apdu = struct.pack(">H", len(apdu)) + apdu
433
info("send association release request")
434
info("send {0}".format(str(apdu).encode("hex")))
435
socket.send(str(apdu))
438
info("rcvd {0}".format(str(apdu).encode("hex")))
439
if apdu.startswith("\xE5\x00"):
440
info("rcvd association release response")
442
info("leaving ieee agent")
445
def test_02(self, llc):
446
"""Association after release"""
448
socket = nfc.llcp.Socket(llc, nfc.llcp.DATA_LINK_CONNECTION)
449
socket.setsockopt(nfc.llcp.SO_RCVBUF, 2)
450
service_name = "urn:nfc:sn:phdc"
452
socket.connect(service_name)
453
except nfc.llcp.ConnectRefused:
454
raise TestError("could not connect to {0!r}".format(service_name))
456
peer_sap = socket.getpeername()
457
info("connected with phdc manager at sap {0}".format(peer_sap))
458
info("entering ieee agent")
460
apdu = bytearray.fromhex(thermometer_assoc_req)
461
apdu = struct.pack(">H", len(apdu)) + apdu
462
info("send thermometer association request")
463
info("send {0}".format(str(apdu).encode("hex")))
464
socket.send(str(apdu))
467
info("rcvd {0}".format(str(apdu).encode("hex")))
468
if apdu.startswith("\xE3\x00"):
469
info("rcvd association response")
473
socket = nfc.llcp.Socket(llc, nfc.llcp.DATA_LINK_CONNECTION)
474
socket.setsockopt(nfc.llcp.SO_RCVBUF, 2)
475
socket.connect("urn:nfc:sn:phdc")
476
peer_sap = socket.getpeername()
477
info("connected with phdc manager at sap {0}".format(peer_sap))
478
info("entering ieee agent")
480
apdu = bytearray.fromhex(thermometer_assoc_req)
481
apdu = struct.pack(">H", len(apdu)) + apdu
482
info("send thermometer association request")
483
info("send {0}".format(str(apdu).encode("hex")))
484
socket.send(str(apdu))
487
info("rcvd {0}".format(str(apdu).encode("hex")))
488
if apdu.startswith("\xE3\x00"):
489
info("rcvd association response")
493
apdu = bytearray.fromhex(assoc_release_req)
494
apdu = struct.pack(">H", len(apdu)) + apdu
495
info("send association release request")
496
info("send {0}".format(str(apdu).encode("hex")))
497
socket.send(str(apdu))
500
info("rcvd {0}".format(str(apdu).encode("hex")))
501
if apdu.startswith("\xE5\x00"):
502
info("rcvd association release response")
504
info("leaving ieee agent")
506
def test_03(self, llc):
507
"""Fragmentation and reassembly"""
509
socket = nfc.llcp.Socket(llc, nfc.llcp.DATA_LINK_CONNECTION)
510
socket.setsockopt(nfc.llcp.SO_RCVBUF, 2)
511
service_name = "urn:nfc:xsn:nfc-forum.org:phdc-validation"
513
socket.connect(service_name)
514
except nfc.llcp.ConnectRefused:
515
raise TestError("could not connect to {0!r}".format(service_name))
517
peer_sap = socket.getpeername()
518
info("connected with phdc manager at sap {0}".format(peer_sap))
520
miu = socket.getsockopt(nfc.llcp.SO_SNDMIU)
522
apdu = os.urandom(2176)
523
log.info("send ieee apdu of size {0} byte".format(len(apdu)))
524
apdu = struct.pack(">H", len(apdu)) + apdu
525
for i in range(0, len(apdu), miu):
526
socket.send(str(apdu[i:i+miu]))
531
size = struct.unpack(">H", data[0:2])[0]
533
while len(apdu) < size:
535
if data == None: break
536
log.info("rcvd {0} byte data".format(len(data)))
538
info("rcvd {0} byte apdu".format(len(apdu)))
540
rcvd_apdu = apdu[::-1]
541
if rcvd_apdu != sent_apdu:
542
raise TestError("received data does not equal sent data")
546
if __name__ == '__main__':
547
try: mode, sys.argv = sys.argv[1], sys.argv[0:1] + sys.argv[2:]
548
except IndexError: mode = None
550
if mode is None or mode not in ("p2p", "tag"):
551
print("{0} requires 'p2p' or 'tag' as first argument."
552
.format(sys.argv[0]))
554
PhdcP2pAgentTest().run()
556
PhdcTagAgentTest().run()