~bernanet84/bordergateway/BorderGateway

« back to all changes in this revision

Viewing changes to MessageQueue.py

  • Committer: Luca Bernardini
  • Date: 2010-04-19 16:21:45 UTC
  • Revision ID: luca@luca-laptop-20100419162145-uivu5nlysugexrnv
Initial import

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# 2010 Luca Bernardini
 
2
# MessageQueue.py  berna
 
3
 
 
4
from Queue import Queue
 
5
from Queue import Full
 
6
#from Queue import Empty
 
7
from Timeout import Timeout
 
8
import string
 
9
 
 
10
 
 
11
class GetAllQueue(Queue):
 
12
    # Override these methods to implement other queue organizations
 
13
    # (e.g. stack or priority queue).
 
14
    # These will only be called with appropriate locks held
 
15
 
 
16
    # Get all item from the queue in a list
 
17
    def _get(self):
 
18
        list = []
 
19
        while(self._qsize() > 0):
 
20
            item = self.queue.popleft()
 
21
            list.append(item)
 
22
        return list
 
23
 
 
24
class MessageQueue():
 
25
    """ Coda FIFO per qualsiasi messaggio di semplice testo\
 
26
        Spedisce il blocco di messaggi che contiene in quel momento:\
 
27
        - allo scadere di un timeout che riparte da ogni consegna dei blocchi di messaggio\
 
28
        - al raggiungimento di una soglia di numero di messaggi"""
 
29
    timer = None
 
30
    period = None
 
31
    threshold = None    
 
32
    sendBlock = None
 
33
    queue = None
 
34
    name = None
 
35
    
 
36
    def __init__(self, name, timeout = 10, threshold = 10, sendBlock = None):
 
37
        self.queue = GetAllQueue(-1)
 
38
        self.threshold = threshold
 
39
        self.sendBlock = sendBlock
 
40
        self.period = timeout
 
41
        self.timer = Timeout(self.deliveryBlockMSG,self.period,-1)
 
42
        self.name = name
 
43
    
 
44
    def getName(self):
 
45
        return self.name
 
46
   
 
47
    def insertMSG(self, msg):
 
48
        """ Inserisci un messaggio in coda e torna 1.\
 
49
            Se il messaggio fa raggiungere la soglia, spedisci blocco messaggi e torna 0"""
 
50
        msg = string.strip(msg)
 
51
        self.queue.put(msg, block=True, timeout=None)
 
52
        if self.numMSG() >= self.threshold:
 
53
            self.deliveryBlockMSG()
 
54
            return 0
 
55
        else:  return 1
 
56
    
 
57
    def deliveryBlockMSG(self):
 
58
        """ Costruisce il blocco dei messaggi che contiene in questo istante\
 
59
            e li passa come argomento alla funzione di callback fornita nel costruttore"""
 
60
        if self.queue.empty():
 
61
            print "** QUEUE "+self.name+": Timeout scaduto ma queue vuota"
 
62
            return 0    
 
63
        else:
 
64
            list = self.queue.get()
 
65
            block = ""
 
66
            for x in list[0:(len(list)-1)]:
 
67
                block = block + str(x) +" || "
 
68
            block = block + list[len(list)-1]
 
69
            self.sendBlock(block)
 
70
            print "** QUEUE "+self.name+": Send BLOCK!"
 
71
            return 1
 
72
 
 
73
    # Legge il numero dei messaggi senza bloccarsi (non acquisisce lock)
 
74
    def numMSG(self):
 
75
        return self.queue._qsize();
 
76
    
 
77
if __name__ == '__main__':
 
78
    from twisted.internet import reactor
 
79
    from time import sleep
 
80
    
 
81
    def testBatch():
 
82
        list = ["ciao","come","stai"]
 
83
        print list
 
84
        block = ""
 
85
        for x in list[0:(len(list)-1)]:
 
86
            block = block + str(x) +" || "
 
87
        block = block + list[len(list)-1]
 
88
        print block
 
89
 
 
90
 
 
91
    def insertA(arg):
 
92
        for i in range(50):
 
93
            arg[0].insertMSG("msg A."+str(i))
 
94
            sleep(1)
 
95
        print "end insert A"
 
96
    
 
97
    def insertB(arg):
 
98
        for i in range(50):
 
99
            arg[0].insertMSG("msg B."+str(i))
 
100
            sleep(3)
 
101
        print "end insert B"
 
102
    
 
103
    def insertC(arg):
 
104
        for i in range(50):
 
105
            arg[0].insertMSG("msg C"+str(i))
 
106
            sleep(5)
 
107
        print "end insert C"
 
108
 
 
109
    def test1(msg):
 
110
        print msg
 
111
        #reactor.crash()
 
112
 
 
113
    queue = MessageQueue(0.5, 5, test1)
 
114
    print queue.period
 
115
    print queue.threshold
 
116
    reactor.callInThread(insertA,[queue])
 
117
    reactor.callInThread(insertB,[queue])
 
118
    reactor.callInThread(insertC,[queue])
 
119
    reactor.run()
 
120
 
 
121
    
 
 
b'\\ No newline at end of file'