~ubuntu-branches/ubuntu/utopic/exabgp/utopic

« back to all changes in this revision

Viewing changes to lib/exabgp/reactor/listener.py

  • Committer: Package Import Robot
  • Author(s): Henry-Nicolas Tourneur
  • Date: 2014-03-08 19:07:00 UTC
  • mfrom: (1.1.8)
  • Revision ID: package-import@ubuntu.com-20140308190700-xjbibpg1g6001c9x
Tags: 3.3.1-1
* New upstream release
* Bump python minimal required version (2.7)
* Closes: #726066 Debian packaging improvements proposed by Vincent Bernat
* Closes: #703774 not existent rundir (/var/run/exabgp) after reboot

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# encoding: utf-8
 
2
"""
 
3
listen.py
 
4
 
 
5
Created by Thomas Mangin on 2013-07-11.
 
6
Copyright (c) 2013-2013 Exa Networks. All rights reserved.
 
7
"""
 
8
 
 
9
import socket
 
10
 
 
11
from exabgp.util.errstr import errstr
 
12
 
 
13
from exabgp.protocol.family import AFI
 
14
#from exabgp.util.coroutine import each
 
15
from exabgp.util.ip import isipv4,isipv6
 
16
from exabgp.reactor.network.error import error,errno,NetworkError,BindingError,AcceptError
 
17
from exabgp.reactor.network.incoming import Incoming
 
18
#from exabgp.bgp.message.open import Open
 
19
#from exabgp.bgp.message.notification import Notify
 
20
 
 
21
from exabgp.logger import Logger
 
22
 
 
23
 
 
24
class Listener (object):
 
25
        def __init__ (self,hosts,port,backlog=200):
 
26
                self._hosts = hosts
 
27
                self._port = port
 
28
                self._backlog = backlog
 
29
 
 
30
                self.serving = False
 
31
                self._sockets = {}
 
32
                #self._connected = {}
 
33
                self.logger = Logger()
 
34
 
 
35
        def _bind (self,ip,port):
 
36
                try:
 
37
                        if isipv6(ip):
 
38
                                s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP)
 
39
                                try:
 
40
                                        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 
41
                                except (socket.error,AttributeError):
 
42
                                        pass
 
43
                                s.bind((ip,port,0,0))
 
44
                        elif isipv4(ip):
 
45
                                s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP)
 
46
                                try:
 
47
                                        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 
48
                                except (socket.error,AttributeError):
 
49
                                        pass
 
50
                                s.bind((ip,port))
 
51
                        else:
 
52
                                return None
 
53
                        s.setblocking(0)
 
54
                        ##s.settimeout(0.0)
 
55
                        s.listen(self._backlog)
 
56
                        return s
 
57
                except socket.error, e:
 
58
                        if e.args[0] == errno.EADDRINUSE:
 
59
                                raise BindingError('could not listen on %s:%d, the port already in use by another application' % (ip,self._port))
 
60
                        elif e.args[0] == errno.EADDRNOTAVAIL:
 
61
                                raise BindingError('could not listen on %s:%d, this is an invalid address' % (ip,self._port))
 
62
                        else:
 
63
                                raise BindingError('could not listen on %s:%d (%s)' % (ip,self._port,errstr(e)))
 
64
 
 
65
        def start (self):
 
66
                try:
 
67
                        for host in self._hosts:
 
68
                                if (host,self._port) not in self._sockets:
 
69
                                        s = self._bind(host,self._port)
 
70
                                        self._sockets[s] = (host,self._port)
 
71
                        self.serving = True
 
72
                except NetworkError,e:
 
73
                                self.logger.network(str(e),'critical')
 
74
                                raise e
 
75
                self.serving = True
 
76
 
 
77
        # @each
 
78
        def connected (self):
 
79
                if not self.serving:
 
80
                        return
 
81
 
 
82
                try:
 
83
                        for sock,(host,_) in self._sockets.items():
 
84
                                try:
 
85
                                        io, _ = sock.accept()
 
86
                                        local_ip,local_port = io.getpeername()
 
87
                                        remote_ip,remote_port = io.getsockname()
 
88
                                        yield Incoming(AFI.ipv4,remote_ip,local_ip,io)
 
89
                                        break
 
90
                                except socket.error, e:
 
91
                                        if e.errno in error.block:
 
92
                                                continue
 
93
                                        raise AcceptError('could not accept a new connection (%s)' % errstr(e))
 
94
                except NetworkError,e:
 
95
                        self.logger.network(str(e),'critical')
 
96
                        raise e
 
97
 
 
98
        def stop (self):
 
99
                if not self.serving:
 
100
                        return
 
101
 
 
102
                for sock,(ip,port) in self._sockets.items():
 
103
                        sock.close()
 
104
                        self.logger.network('stopped listening on %s:%d' % (ip,port),'info')
 
105
 
 
106
                self._sockets = {}
 
107
                self.serving = False