2
# MessageQueue.py berna
4
from Queue import Queue
6
#from Queue import Empty
7
from Timeout import Timeout
11
class GetAllQueue(Queue):
12
# Override these methods to implement other queue organizations
13
# (e.g. stack or priority queue).
14
# These will only be called with appropriate locks held
16
# Get all item from the queue in a list
19
while(self._qsize() > 0):
20
item = self.queue.popleft()
25
""" Coda FIFO per qualsiasi messaggio di semplice testo\
26
Spedisce il blocco di messaggi che contiene in quel momento:\
27
- allo scadere di un timeout che riparte da ogni consegna dei blocchi di messaggio\
28
- al raggiungimento di una soglia di numero di messaggi"""
36
def __init__(self, name, timeout = 10, threshold = 10, sendBlock = None):
37
self.queue = GetAllQueue(-1)
38
self.threshold = threshold
39
self.sendBlock = sendBlock
41
self.timer = Timeout(self.deliveryBlockMSG,self.period,-1)
47
def insertMSG(self, msg):
48
""" Inserisci un messaggio in coda e torna 1.\
49
Se il messaggio fa raggiungere la soglia, spedisci blocco messaggi e torna 0"""
50
msg = string.strip(msg)
51
self.queue.put(msg, block=True, timeout=None)
52
if self.numMSG() >= self.threshold:
53
self.deliveryBlockMSG()
57
def deliveryBlockMSG(self):
58
""" Costruisce il blocco dei messaggi che contiene in questo istante\
59
e li passa come argomento alla funzione di callback fornita nel costruttore"""
60
if self.queue.empty():
61
print "** QUEUE "+self.name+": Timeout scaduto ma queue vuota"
64
list = self.queue.get()
66
for x in list[0:(len(list)-1)]:
67
block = block + str(x) +" || "
68
block = block + list[len(list)-1]
70
print "** QUEUE "+self.name+": Send BLOCK!"
73
# Legge il numero dei messaggi senza bloccarsi (non acquisisce lock)
75
return self.queue._qsize();
77
if __name__ == '__main__':
78
from twisted.internet import reactor
79
from time import sleep
82
list = ["ciao","come","stai"]
85
for x in list[0:(len(list)-1)]:
86
block = block + str(x) +" || "
87
block = block + list[len(list)-1]
93
arg[0].insertMSG("msg A."+str(i))
99
arg[0].insertMSG("msg B."+str(i))
105
arg[0].insertMSG("msg C"+str(i))
113
queue = MessageQueue(0.5, 5, test1)
115
print queue.threshold
116
reactor.callInThread(insertA,[queue])
117
reactor.callInThread(insertB,[queue])
118
reactor.callInThread(insertC,[queue])
b'\\ No newline at end of file'