~bernanet84/bordergateway/BorderGateway

« back to all changes in this revision

Viewing changes to Broker.py

  • Committer: Luca Bernardini
  • Date: 2010-04-19 16:21:45 UTC
  • Revision ID: luca@luca-laptop-20100419162145-uivu5nlysugexrnv
Initial import

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# 2010 Luca Bernardini
 
2
# Broker.py  berna
 
3
 
 
4
from MessageQueue import MessageQueue
 
5
from time import sleep
 
6
from twisted.internet import reactor
 
7
 
 
8
class QueueMngException(Exception):
 
9
    def __init__(self, value):
 
10
        self.value = str(value)
 
11
 
 
12
    def __str__(self):
 
13
        return repr(self.value)
 
14
 
 
15
class Broker():
 
16
    # Il broker e un componente unico
 
17
    # Entita passiva creata nel main principale
 
18
    # Viene chiamata in causa da IncomingSIPHandler.handleIncoming quando arriva un messaggio SIP
 
19
    global_config = None
 
20
    
 
21
    # Un QueueHandler per ogni dominio destinatario.
 
22
    # Dizionario del tipo {"fqdn": [QueueHandler,"uriBGW","addrBGW",portBGW]}
 
23
    _naming_dict = None
 
24
    
 
25
    # Chiede servizi di connessione MSRP al SessionMSRPHandler
 
26
    sendMSRP = None
 
27
    
 
28
    # lista di queue
 
29
    queueDict = None
 
30
    rr = None
 
31
    
 
32
    ready = False
 
33
    policy = None
 
34
    
 
35
    def __init__(self, global_config):
 
36
        self.global_config = global_config
 
37
        self.sendMSRP = None
 
38
        self.queueDict = {}
 
39
        self.rr = {}
 
40
    
 
41
    def setSendMSRP(self, send):
 
42
        self.sendMSRP = send
 
43
 
 
44
    def inizializeQueueByFile(self, filename):
 
45
        if filename is None:
 
46
            return None
 
47
        f = open(filename,'r')
 
48
        lines = f.readlines()
 
49
        for line in lines:
 
50
            s = line.split(',')
 
51
            name, method, timeout, threshold = s
 
52
            queue = MessageQueue(name, float(timeout.strip()), int(threshold.strip()), self.sendMSRP)
 
53
            self.addQueue(method.strip(), queue)
 
54
        f.close()
 
55
        self.ready = True
 
56
        return
 
57
 
 
58
    def recvMessageSIP(self, msg, method, destHost, destPort):
 
59
        if not (self.ready):
 
60
            print "** BROKER: non sono pronto"
 
61
            raise QueueMngException
 
62
        else:
 
63
            #LOG print "Broker: MESSAGGIO ricevuto e da ritrasmettere a "+str(destHost)+":"+str(destPort)
 
64
            queue = self.scheduleQueue(method)
 
65
            queue.insertMSG(msg)
 
66
    
 
67
    def scheduleQueue(self, method):
 
68
        if self.queueDict == None:
 
69
            print "** BROKER: Non ci sono code"
 
70
            return -1
 
71
        if not(self.queueDict.has_key(method)):
 
72
            if not(self.queueDict.has_key("ALL")):
 
73
                print "** BROKER: Metodo non servito"
 
74
                return -1
 
75
            else:
 
76
                method = "ALL"
 
77
        try:
 
78
            i = self.rr[method].next()
 
79
        except StopIteration:
 
80
            self.rr[method] = iter(self.queueDict[method])
 
81
            i = self.rr[method].next()
 
82
        print "** BROKER: Select queue "+i.getName()+" - method "+method
 
83
        return i
 
84
    
 
85
    def addQueue(self, method, queue):
 
86
        if self.queueDict.has_key(method):
 
87
                self.queueDict[method].append(queue)
 
88
        else:
 
89
                self.queueDict[method] = [queue]
 
90
                self.rr[method] = iter(self.queueDict[method])
 
91
        return
 
92
    
 
93
    def delQueue(self, method, queue):
 
94
        pass
 
95
 
 
96
 
 
97
### CODICE DI TEST
 
98
 
 
99
class ManagerFittizio():
 
100
 
 
101
    def __init__(self):
 
102
        self.connected = True
 
103
    
 
104
    def sendMSRP(self, msg):
 
105
        print "INVIATO BLOCCO!"
 
106
    
 
107
    def isConnected(self):
 
108
        return self.connected
 
109
    
 
110
if __name__ == '__main__':
 
111
    
 
112
    m = ManagerFittizio()
 
113
    global_config = {}
 
114
    broker = Broker(global_config)
 
115
    broker.setSendMSRP(m)
 
116
    
 
117
    broker.inizializeQueueByFile("queueConfig.txt")
 
118
    
 
119
    print broker.queueDict
 
120
    print broker.rr
 
121
    
 
122
    f = open('test/resp.txt','r')
 
123
    res = f.read()
 
124
    f = open('test/reqTo.txt','r')
 
125
    req = f.read()
 
126
    f.close()
 
127
    
 
128
    for x in range(100):
 
129
        broker.recvMessageSIP(req, "INVITE", "biloxi.it", 9999)
 
130
        sleep(0.2)
 
131
        broker.recvMessageSIP(res, "INVITE", "atlanta.it", 10000)
 
132
    
 
133
    sleep(30)
 
 
b'\\ No newline at end of file'