~webreg-fd/nfcpy/dev-lites

« back to all changes in this revision

Viewing changes to examples/phdc-test-agent.py

  • Committer: frank.dawidowsky at sony
  • Date: 2013-10-01 07:35:34 UTC
  • mfrom: (143.1.29 trunk)
  • Revision ID: frank.dawidowsky@eu.sony.com-20131001073534-69pu292zbyelihwx
merged with main branch

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/python
 
2
# -*- coding: latin-1 -*-
 
3
# -----------------------------------------------------------------------------
 
4
# Copyright 2010-2013 Stephen Tiedemann <stephen.tiedemann@gmail.com>
 
5
#
 
6
# Licensed under the EUPL, Version 1.1 or - as soon they 
 
7
# will be approved by the European Commission - subsequent
 
8
# versions of the EUPL (the "Licence");
 
9
# You may not use this work except in compliance with the
 
10
# Licence.
 
11
# You may obtain a copy of the Licence at:
 
12
#
 
13
# http://www.osor.eu/eupl
 
14
#
 
15
# Unless required by applicable law or agreed to in
 
16
# writing, software distributed under the Licence is
 
17
# distributed on an "AS IS" basis,
 
18
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 
19
# express or implied.
 
20
# See the Licence for the specific language governing
 
21
# permissions and limitations under the Licence.
 
22
# -----------------------------------------------------------------------------
 
23
 
 
24
import logging
 
25
log = logging.getLogger('main')
 
26
 
 
27
import os
 
28
import sys
 
29
import time
 
30
import string
 
31
import struct
 
32
import os.path
 
33
import inspect
 
34
import argparse
 
35
import Queue as queue
 
36
from threading import Thread, Lock
 
37
 
 
38
sys.path.insert(1, os.path.split(sys.path[0])[0])
 
39
from cli import CommandLineInterface, TestError
 
40
 
 
41
import nfc
 
42
import nfc.ndef
 
43
import nfc.llcp
 
44
 
 
45
def info(message, prefix="  "):
 
46
    log.info(prefix + message)
 
47
 
 
48
class PhdcAgent(Thread):
 
49
    def __init__(self):
 
50
        super(PhdcAgent, self).__init__()
 
51
        self.oqueue = queue.Queue()
 
52
        self.iqueue = queue.Queue()
 
53
 
 
54
    def enqueue(self, apdu):
 
55
        if apdu is None or len(apdu) > 0:
 
56
            self.iqueue.put(apdu)
 
57
 
 
58
    def dequeue(self, timeout):
 
59
        try:
 
60
            apdu = self.oqueue.get(block=True, timeout=timeout)
 
61
        except queue.Empty:
 
62
            apdu = ""
 
63
        return apdu
 
64
                
 
65
    def send(self, apdu):
 
66
        log.info("[ieee] >>> {0}".format(str(apdu).encode("hex")))
 
67
        self.oqueue.put(apdu)
 
68
 
 
69
    def recv(self, timeout):
 
70
        try:
 
71
            apdu = self.iqueue.get(block=True, timeout=timeout)
 
72
        except queue.Empty:
 
73
            pass
 
74
        else:
 
75
            log.info("[ieee] <<< {0}".format(str(apdu).encode("hex")))
 
76
            return apdu
 
77
 
 
78
class PhdcTagAgent(PhdcAgent):
 
79
    def __init__(self, tag, cmd, apdu=bytearray(), flags='\x00'):
 
80
        super(PhdcTagAgent, self).__init__()
 
81
        self.terminate = False
 
82
        self.mc = 1
 
83
        attr = nfc.tag.tt3.NdefAttributeData()
 
84
        attr.version = "1.0"
 
85
        attr.nbr, attr.nbw = 12, 8
 
86
        attr.capacity = 1024
 
87
        attr.writeable = True
 
88
        attr.length = 7 + len(apdu)
 
89
    
 
90
        phd_rec = nfc.ndef.Record("urn:nfc:wkt:PHD", data=flags + apdu)
 
91
        phd_msg = nfc.ndef.Message(phd_rec)
 
92
        
 
93
        self.ndef_data_area = str(attr) + bytearray(attr.capacity)
 
94
        self.ndef_data_area[16:16+7+len(apdu)] = bytearray(str(phd_msg))
 
95
 
 
96
        tag.add_service(0x0009, self.ndef_read, self.ndef_write)
 
97
        tag.add_service(0x000B, self.ndef_read, lambda: False)
 
98
        self.tag = tag
 
99
        self.cmd = cmd
 
100
        
 
101
        self.ndef_read_lock = Lock()
 
102
        self.ndef_write_lock = Lock()
 
103
 
 
104
    def ndef_read(self, block, read_begin, read_end):
 
105
        if read_begin is True:
 
106
            self.ndef_read_lock.acquire()
 
107
        try:
 
108
            if block < len(self.ndef_data_area) / 16:
 
109
                data = self.ndef_data_area[block*16:(block+1)*16]
 
110
                log.debug("[tt3] got read block #{0} {1}".format(
 
111
                        block, str(data).encode("hex")))
 
112
                return data
 
113
            else:
 
114
                log.debug("[tt3] got read block #{0}".format(block))
 
115
        finally:
 
116
            if read_end is True:
 
117
                self.ndef_read_lock.release()
 
118
    
 
119
    def ndef_write(self, block, data, write_begin, write_end):
 
120
        if write_begin is True:
 
121
            self.ndef_write_lock.acquire()
 
122
        try:
 
123
            log.debug("[tt3] got write block #{0} {1}".format(
 
124
                    block, str(data).encode("hex")))
 
125
            if block < len(self.ndef_data_area) / 16:
 
126
                self.ndef_data_area[block*16:(block+1)*16] = data
 
127
                return True
 
128
        finally:
 
129
            if write_end is True:
 
130
                self.ndef_write_lock.release()
 
131
                apdu = self.recv_phd_message()
 
132
                if apdu is not None:
 
133
                    self.enqueue(apdu)
 
134
                    Thread(target=self.send_phd_message).start()
 
135
            
 
136
    def recv_phd_message(self):
 
137
        attr = nfc.tag.tt3.NdefAttributeData(self.ndef_data_area[0:16])
 
138
        if attr.valid and not attr.writing and attr.length > 0:
 
139
            #print str(self.ndef_data_area[16:16+attr.length]).encode("hex")
 
140
            try:
 
141
                message = nfc.ndef.Message(
 
142
                    self.ndef_data_area[16:16+attr.length])
 
143
            except nfc.ndef.LengthError:
 
144
                return None
 
145
 
 
146
            if message.type == "urn:nfc:wkt:PHD":
 
147
                data = bytearray(message[0].data)
 
148
                if data[0] & 0x8F == 0x80 | (self.mc % 16):
 
149
                    log.info("[phdc] <<< " + str(data).encode("hex"))
 
150
                    self.mc += 1
 
151
                    attr.length = 0
 
152
                    self.ndef_data_area[0:16] = bytearray(str(attr))
 
153
                    return data[1:]
 
154
                   
 
155
    def send_phd_message(self):
 
156
        apdu = self.dequeue(timeout=0.1)
 
157
        data = bytearray([0x80 | (self.mc % 16)]) + apdu
 
158
        record = nfc.ndef.Record("urn:nfc:wkt:PHD", data=str(data))
 
159
        with self.ndef_read_lock:
 
160
            if not self.terminate:
 
161
                log.info("[phdc] >>> " + str(data).encode("hex"))
 
162
                data = bytearray(str(nfc.ndef.Message(record)))
 
163
                attr = nfc.tag.tt3.NdefAttributeData(self.ndef_data_area[0:16])
 
164
                attr.length = len(data)
 
165
                self.ndef_data_area[0:16+attr.length] = str(attr) + data
 
166
                self.mc += 1
 
167
        
 
168
    def run(self):
 
169
        log.info("entering phdc agent run loop")
 
170
        command, self.cmd = self.cmd, None
 
171
        while not (command is None or self.terminate is True):
 
172
            response = self.tag.process_command(command)
 
173
            try:
 
174
                command = self.tag.send_response(response, timeout=1)
 
175
            except nfc.clf.TimeoutError:
 
176
                log.info("no command received within 1 second")
 
177
                break
 
178
            except nfc.clf.TransmissionError:
 
179
                break
 
180
        log.info("leaving phdc agent run loop")
 
181
 
 
182
    def stop(self):
 
183
        self.terminate = True
 
184
        self.join(timeout=10.0)
 
185
        
 
186
thermometer_assoc_req = \
 
187
    "E200 0032 8000 0000" \
 
188
    "0001 002A 5079 0026" \
 
189
    "8000 0000 8000 8000" \
 
190
    "0000 0000 0000 0080" \
 
191
    "0000 0008 3132 3334" \
 
192
    "3536 3738 0320 0001" \
 
193
    "0100 0000 0000"
 
194
 
 
195
thermometer_assoc_res = \
 
196
    "E300 002C 0003 5079" \
 
197
    "0026 8000 0000 8000" \
 
198
    "8000 0000 0000 0000" \
 
199
    "8000 0000 0008 3837" \
 
200
    "3635 3433 3231 0000" \
 
201
    "0000 0000 0000 0000" \
 
202
 
 
203
assoc_release_req = "E40000020000"
 
204
assoc_release_res = "E50000020000"
 
205
 
 
206
phdc_tag_agent_description = """
 
207
Execute some Personal Health Device Communication (PHDC) tests running
 
208
as a Tag Agent. The reader device must have the PHDC validation R/W
 
209
Mode Test Manager running.
 
210
"""
 
211
class PhdcTagAgentTest(CommandLineInterface):
 
212
    def __init__(self):
 
213
        parser = argparse.ArgumentParser(
 
214
            usage='%(prog)s [OPTION]...',
 
215
            formatter_class=argparse.RawDescriptionHelpFormatter,
 
216
            description=phdc_tag_agent_description)
 
217
        super(PhdcTagAgentTest, self).__init__(
 
218
            parser, groups="test card dbg clf")
 
219
 
 
220
    def on_card_startup(self, clf, targets):
 
221
        idm = bytearray.fromhex("02FE") + os.urandom(6)
 
222
        pmm = bytearray.fromhex("01E0000000FFFF00")
 
223
        sys = bytearray.fromhex("12FC")
 
224
        return [nfc.clf.TTF(br=212, idm=idm, pmm=pmm, sys=sys)]
 
225
    
 
226
    def test_00(self, tag, command):
 
227
        """Send data read from scenario file"""
 
228
 
 
229
        agent = PhdcTagAgent(tag, command)
 
230
        agent.start()
 
231
        info("entering ieee agent")
 
232
 
 
233
        try:
 
234
            with open("scenario.txt") as f:
 
235
                for line in f:
 
236
                    if line.startswith('#'):
 
237
                        continue
 
238
                    apdu = bytearray.fromhex(line.strip())
 
239
                    agent.send(apdu)
 
240
                    apdu = agent.recv(timeout=5.0)
 
241
                    if apdu is None:
 
242
                        raise TestError("no data received")
 
243
        except IOError as e:
 
244
            log.error(e)
 
245
            time.sleep(0.1)
 
246
 
 
247
        info("leaving ieee agent")
 
248
 
 
249
        if agent.is_alive():
 
250
            agent.stop()
 
251
 
 
252
    def test_01(self, tag, command):
 
253
        """Discovery, association and release"""
 
254
        
 
255
        agent = PhdcTagAgent(tag, command)
 
256
        agent.start()
 
257
        info("entering ieee agent")
 
258
 
 
259
        apdu = bytearray.fromhex(thermometer_assoc_req)
 
260
        info("send thermometer association request")
 
261
        agent.send(apdu)
 
262
 
 
263
        apdu = agent.recv(timeout=5.0)
 
264
        if apdu is None:
 
265
            raise TestError("no data received")
 
266
 
 
267
        if apdu.startswith("\xE3\x00"):
 
268
            info("rcvd association response")
 
269
 
 
270
        time.sleep(3.0)
 
271
 
 
272
        apdu = bytearray.fromhex(assoc_release_req)
 
273
        info("send association release request")
 
274
        agent.send(apdu)
 
275
 
 
276
        apdu = agent.recv(timeout=5.0)
 
277
        if apdu is None:
 
278
            raise TestError("no data received")
 
279
 
 
280
        if apdu.startswith("\xE5\x00"):
 
281
            info("rcvd association release response")
 
282
 
 
283
        info("leaving ieee agent")
 
284
 
 
285
        if agent.is_alive():
 
286
            agent.stop()
 
287
        
 
288
    def test_02(self, tag, command):
 
289
        """Association after release"""
 
290
        
 
291
        agent = PhdcTagAgent(tag, command)
 
292
        agent.start()
 
293
        info("entering ieee agent")
 
294
 
 
295
        apdu = bytearray.fromhex(thermometer_assoc_req)
 
296
        info("send thermometer association request")
 
297
        agent.send(apdu)
 
298
 
 
299
        apdu = agent.recv(timeout=5.0)
 
300
        if apdu is None:
 
301
            raise TestError("no data received")
 
302
        if apdu.startswith("\xE3\x00"):
 
303
            info("rcvd association response")
 
304
 
 
305
        apdu = bytearray.fromhex(assoc_release_req)
 
306
        info("send association release request")
 
307
        agent.send(apdu)
 
308
 
 
309
        apdu = agent.recv(timeout=5.0)
 
310
        if apdu is None:
 
311
            raise TestError("no data received")
 
312
        if apdu.startswith("\xE5\x00"):
 
313
            info("rcvd association release response")
 
314
 
 
315
        info("leaving ieee agent")
 
316
 
 
317
        time.sleep(3.0)
 
318
 
 
319
        info("entering ieee agent")
 
320
 
 
321
        apdu = bytearray.fromhex(thermometer_assoc_req)
 
322
        info("send thermometer association request")
 
323
        agent.send(apdu)
 
324
 
 
325
        apdu = agent.recv(timeout=5.0)
 
326
        if apdu is None:
 
327
            raise TestError("no data received")
 
328
        if apdu.startswith("\xE3\x00"):
 
329
            info("rcvd association response")
 
330
 
 
331
        time.sleep(1.0)
 
332
        info("now move devices out of communication range")
 
333
 
 
334
        info("leaving ieee agent")
 
335
        
 
336
        if agent.is_alive():
 
337
            agent.stop()
 
338
        
 
339
    def test_03(self, tag, command):
 
340
        """Activation with invalid settings"""
 
341
        
 
342
        agent = PhdcTagAgent(tag, command, flags='\x02')
 
343
        info("sending with non-zero message counter")
 
344
        agent.start()
 
345
        if agent.is_alive():
 
346
            agent.stop()
 
347
        
 
348
    def test_04(self, tag, command):
 
349
        """Activation with invalid RFU value"""
 
350
        
 
351
        agent = PhdcTagAgent(tag, command, flags='\x40')
 
352
        info("sending with non-zero reserved field")
 
353
        agent.start()
 
354
            
 
355
        info("entering ieee agent")
 
356
        time.sleep(3.0)
 
357
        info("leaving ieee agent")
 
358
        if agent.is_alive():
 
359
            agent.stop()
 
360
        
 
361
phdc_p2p_agent_description = """
 
362
Execute some Personal Health Device Communication (PHDC) tests. The
 
363
peer device must have the PHDC validation test manager running.
 
364
"""
 
365
class PhdcP2pAgentTest(CommandLineInterface):
 
366
    def __init__(self):
 
367
        parser = argparse.ArgumentParser(
 
368
            usage='%(prog)s [OPTION]...',
 
369
            formatter_class=argparse.RawDescriptionHelpFormatter,
 
370
            description=phdc_p2p_agent_description)
 
371
        super(PhdcP2pAgentTest, self).__init__(
 
372
            parser, groups="test llcp dbg clf")
 
373
 
 
374
    def test_00(self, llc):
 
375
        """Send data read from scenario file"""
 
376
 
 
377
        socket = nfc.llcp.Socket(llc, nfc.llcp.DATA_LINK_CONNECTION)
 
378
        socket.setsockopt(nfc.llcp.SO_RCVBUF, 2)
 
379
        socket.connect("urn:nfc:sn:phdc")
 
380
        peer_sap = socket.getpeername()
 
381
        log.info("connected with phdc manager at sap {0}".format(peer_sap))
 
382
        log.info("entering ieee agent")
 
383
 
 
384
        try:
 
385
            with open("scenario.txt") as f:
 
386
                for line in f:
 
387
                    if line.startswith('#'):
 
388
                        continue
 
389
 
 
390
                    apdu = bytearray.fromhex(line)
 
391
                    apdu = struct.pack(">H", len(apdu)) + apdu
 
392
                    log.info("send {0}".format(str(apdu).encode("hex")))
 
393
                    socket.send(str(apdu))
 
394
 
 
395
                    apdu = socket.recv()
 
396
                    log.info("rcvd {0}".format(str(apdu).encode("hex")))
 
397
        except IOError as e:
 
398
            log.error(e)
 
399
 
 
400
        log.info("leaving ieee agent")
 
401
        socket.close()
 
402
 
 
403
    def test_01(self, llc):
 
404
        """Connect, associate and release"""
 
405
        
 
406
        socket = nfc.llcp.Socket(llc, nfc.llcp.DATA_LINK_CONNECTION)
 
407
        socket.setsockopt(nfc.llcp.SO_RCVBUF, 2)
 
408
        service_name = "urn:nfc:sn:phdc"
 
409
        try:
 
410
            socket.connect(service_name)
 
411
        except nfc.llcp.ConnectRefused:
 
412
            raise TestError("could not connect to {0!r}".format(service_name))
 
413
        
 
414
        peer_sap = socket.getpeername()
 
415
        info("connected with phdc manager at sap {0}".format(peer_sap))
 
416
        info("entering ieee agent")
 
417
 
 
418
        apdu = bytearray.fromhex(thermometer_assoc_req)
 
419
        apdu = struct.pack(">H", len(apdu)) + apdu
 
420
        info("send thermometer association request")
 
421
        info("send {0}".format(str(apdu).encode("hex")))
 
422
        socket.send(str(apdu))
 
423
 
 
424
        apdu = socket.recv()
 
425
        info("rcvd {0}".format(str(apdu).encode("hex")))
 
426
        if apdu.startswith("\xE3\x00"):
 
427
            info("rcvd association response")
 
428
 
 
429
        time.sleep(3.0)
 
430
 
 
431
        apdu = bytearray.fromhex(assoc_release_req)
 
432
        apdu = struct.pack(">H", len(apdu)) + apdu
 
433
        info("send association release request")
 
434
        info("send {0}".format(str(apdu).encode("hex")))
 
435
        socket.send(str(apdu))
 
436
 
 
437
        apdu = socket.recv()
 
438
        info("rcvd {0}".format(str(apdu).encode("hex")))
 
439
        if apdu.startswith("\xE5\x00"):
 
440
            info("rcvd association release response")
 
441
 
 
442
        info("leaving ieee agent")
 
443
        socket.close()
 
444
 
 
445
    def test_02(self, llc):
 
446
        """Association after release"""
 
447
 
 
448
        socket = nfc.llcp.Socket(llc, nfc.llcp.DATA_LINK_CONNECTION)
 
449
        socket.setsockopt(nfc.llcp.SO_RCVBUF, 2)
 
450
        service_name = "urn:nfc:sn:phdc"
 
451
        try:
 
452
            socket.connect(service_name)
 
453
        except nfc.llcp.ConnectRefused:
 
454
            raise TestError("could not connect to {0!r}".format(service_name))
 
455
        
 
456
        peer_sap = socket.getpeername()
 
457
        info("connected with phdc manager at sap {0}".format(peer_sap))
 
458
        info("entering ieee agent")
 
459
 
 
460
        apdu = bytearray.fromhex(thermometer_assoc_req)
 
461
        apdu = struct.pack(">H", len(apdu)) + apdu
 
462
        info("send thermometer association request")
 
463
        info("send {0}".format(str(apdu).encode("hex")))
 
464
        socket.send(str(apdu))
 
465
 
 
466
        apdu = socket.recv()
 
467
        info("rcvd {0}".format(str(apdu).encode("hex")))
 
468
        if apdu.startswith("\xE3\x00"):
 
469
            info("rcvd association response")
 
470
 
 
471
        socket.close()
 
472
 
 
473
        socket = nfc.llcp.Socket(llc, nfc.llcp.DATA_LINK_CONNECTION)
 
474
        socket.setsockopt(nfc.llcp.SO_RCVBUF, 2)
 
475
        socket.connect("urn:nfc:sn:phdc")
 
476
        peer_sap = socket.getpeername()
 
477
        info("connected with phdc manager at sap {0}".format(peer_sap))
 
478
        info("entering ieee agent")
 
479
 
 
480
        apdu = bytearray.fromhex(thermometer_assoc_req)
 
481
        apdu = struct.pack(">H", len(apdu)) + apdu
 
482
        info("send thermometer association request")
 
483
        info("send {0}".format(str(apdu).encode("hex")))
 
484
        socket.send(str(apdu))
 
485
 
 
486
        apdu = socket.recv()
 
487
        info("rcvd {0}".format(str(apdu).encode("hex")))
 
488
        if apdu.startswith("\xE3\x00"):
 
489
            info("rcvd association response")
 
490
 
 
491
        time.sleep(3.0)
 
492
 
 
493
        apdu = bytearray.fromhex(assoc_release_req)
 
494
        apdu = struct.pack(">H", len(apdu)) + apdu
 
495
        info("send association release request")
 
496
        info("send {0}".format(str(apdu).encode("hex")))
 
497
        socket.send(str(apdu))
 
498
 
 
499
        apdu = socket.recv()
 
500
        info("rcvd {0}".format(str(apdu).encode("hex")))
 
501
        if apdu.startswith("\xE5\x00"):
 
502
            info("rcvd association release response")
 
503
 
 
504
        info("leaving ieee agent")
 
505
 
 
506
    def test_03(self, llc):
 
507
        """Fragmentation and reassembly"""
 
508
        
 
509
        socket = nfc.llcp.Socket(llc, nfc.llcp.DATA_LINK_CONNECTION)
 
510
        socket.setsockopt(nfc.llcp.SO_RCVBUF, 2)
 
511
        service_name = "urn:nfc:xsn:nfc-forum.org:phdc-validation"
 
512
        try:
 
513
            socket.connect(service_name)
 
514
        except nfc.llcp.ConnectRefused:
 
515
            raise TestError("could not connect to {0!r}".format(service_name))
 
516
        
 
517
        peer_sap = socket.getpeername()
 
518
        info("connected with phdc manager at sap {0}".format(peer_sap))
 
519
 
 
520
        miu = socket.getsockopt(nfc.llcp.SO_SNDMIU)
 
521
        
 
522
        apdu = os.urandom(2176)
 
523
        log.info("send ieee apdu of size {0} byte".format(len(apdu)))
 
524
        apdu = struct.pack(">H", len(apdu)) + apdu
 
525
        for i in range(0, len(apdu), miu):
 
526
            socket.send(str(apdu[i:i+miu]))
 
527
 
 
528
        sent_apdu = apdu[2:]
 
529
 
 
530
        data = socket.recv()
 
531
        size = struct.unpack(">H", data[0:2])[0]
 
532
        apdu = data[2:]
 
533
        while len(apdu) < size:
 
534
            data = socket.recv()
 
535
            if data == None: break
 
536
            log.info("rcvd {0} byte data".format(len(data)))
 
537
            apdu += data
 
538
        info("rcvd {0} byte apdu".format(len(apdu)))
 
539
 
 
540
        rcvd_apdu = apdu[::-1]
 
541
        if rcvd_apdu != sent_apdu:
 
542
            raise TestError("received data does not equal sent data")
 
543
 
 
544
        socket.close()
 
545
    
 
546
if __name__ == '__main__':
 
547
    try: mode, sys.argv = sys.argv[1], sys.argv[0:1] + sys.argv[2:]
 
548
    except IndexError: mode = None
 
549
 
 
550
    if mode is None or mode not in ("p2p", "tag"):
 
551
        print("{0} requires 'p2p' or 'tag' as first argument."
 
552
              .format(sys.argv[0]))
 
553
    elif mode == "p2p":
 
554
        PhdcP2pAgentTest().run()
 
555
    elif mode == "tag":
 
556
        PhdcTagAgentTest().run()