~libra-core/libra/master

« back to all changes in this revision

Viewing changes to libra/common/api/gearman_client.py

  • Committer: German Eichberger
  • Date: 2014-06-04 21:52:37 UTC
  • Revision ID: git-v1:dc82de4a3ecc9fa031ba4729b294f31d55af0dce
api serve rnow uses gear library; remove yelp's gearman

Change-Id: Iaa81fc34161992a98a9449b8c58e80484d048d16

Show diffs side-by-side

added added

removed removed

Lines of Context:
13
13
# under the License.
14
14
 
15
15
import eventlet
 
16
import gear
 
17
import json
 
18
 
16
19
eventlet.monkey_patch()
17
20
import ipaddress
18
 
from libra.common.json_gearman import JSONGearmanClient
19
21
from libra.common.api.lbaas import LoadBalancer, db_session, Device, Node, Vip
20
22
from libra.common.api.lbaas import HealthMonitor, Counters
21
23
from libra.common.api.lbaas import loadbalancers_devices
22
24
from libra.common.api.mnb import update_mnb
23
25
from libra.openstack.common import log
24
26
from pecan import conf
25
 
 
 
27
from time import sleep
26
28
 
27
29
LOG = log.getLogger(__name__)
28
 
 
 
30
POLL_COUNT = 10
 
31
POLL_SLEEP = 10
29
32
 
30
33
gearman_workers = [
31
34
    'UPDATE',  # Create/Update a Load Balancer.
39
42
]
40
43
 
41
44
 
 
45
class DisconnectClient(gear.Client):
 
46
    def handleDisconnect(self, job):
 
47
        job.disconnect = True
 
48
 
 
49
 
 
50
class DisconnectJob(gear.Job):
 
51
    def __init__(self, name, arguments):
 
52
        super(DisconnectJob, self).__init__(name, arguments)
 
53
        self.disconnect = False
 
54
 
 
55
 
42
56
def submit_job(job_type, host, data, lbid):
43
57
    eventlet.spawn_n(client_job, job_type, str(host), data, lbid)
44
58
 
122
136
        self.host = host
123
137
        self.lbid = lbid
124
138
 
125
 
        server_list = []
 
139
        self.gear_client = DisconnectClient()
 
140
 
126
141
        for server in conf.gearman.server:
127
142
            ghost, gport = server.split(':')
128
 
            server_list.append({'host': ghost,
129
 
                                'port': int(gport),
130
 
                                'keyfile': conf.gearman.ssl_key,
131
 
                                'certfile': conf.gearman.ssl_cert,
132
 
                                'ca_certs': conf.gearman.ssl_ca,
133
 
                                'keepalive': conf.gearman.keepalive,
134
 
                                'keepcnt': conf.gearman.keepcnt,
135
 
                                'keepidle': conf.gearman.keepidle,
136
 
                                'keepintvl': conf.gearman.keepintvl})
137
 
        self.gearman_client = JSONGearmanClient(server_list)
 
143
            self.gear_client.addServer(ghost,
 
144
                                       int(gport),
 
145
                                       conf.gearman.ssl_key,
 
146
                                       conf.gearman.ssl_cert,
 
147
                                       conf.gearman.ssl_ca)
138
148
 
139
149
    def send_assign(self, data):
140
150
        NULL = None  # For pep8
522
532
                           mnb_data["tenantid"])
523
533
 
524
534
    def _send_message(self, message, response_name):
525
 
        job_status = self.gearman_client.submit_job(
526
 
            self.host, message, background=False, wait_until_complete=True,
527
 
            max_retries=10, poll_timeout=120.0
528
 
        )
529
 
        if job_status.state == 'UNKNOWN':
530
 
            # Gearman server connection failed
531
 
            LOG.error('Could not talk to gearman server')
532
 
            return False, "System error communicating with load balancer"
533
 
        if job_status.timed_out:
534
 
            # Job timed out
535
 
            LOG.warning(
536
 
                'Gearman timeout talking to {0}'.format(self.host)
537
 
            )
 
535
 
 
536
        self.gear_client.waitForServer()
 
537
 
 
538
        job = DisconnectJob(self.host, json.dumps(message))
 
539
 
 
540
        self.gear_client.submitJob(job)
 
541
 
 
542
        pollcount = 0
 
543
        # Would like to make these config file settings
 
544
        while not job.complete and pollcount < POLL_COUNT:
 
545
            sleep(POLL_SLEEP)
 
546
            pollcount += 1
 
547
 
 
548
        if job.disconnect:
 
549
            LOG.error('Gearman Job server fail - disconnect')
 
550
            return False, "Gearman Job server fail - "\
 
551
                "disconnect communicating with load balancer"
 
552
 
 
553
        # We timed out waiting for the job to finish
 
554
        if not job.complete:
 
555
            LOG.warning('Gearman timeout talking to {0}'.format(self.host))
538
556
            return False, "Timeout error communicating with load balancer"
539
 
        LOG.debug(job_status.result)
540
 
        if 'badRequest' in job_status.result:
541
 
            error = job_status.result['badRequest']['validationErrors']
 
557
 
 
558
        result = json.loads(job.data[0])
 
559
 
 
560
        LOG.debug(result)
 
561
 
 
562
        if 'badRequest' in result:
 
563
            error = result['badRequest']['validationErrors']
542
564
            return False, error['message']
543
 
        if job_status.result[response_name] == 'FAIL':
 
565
        if result[response_name] == 'FAIL':
544
566
            # Worker says 'no'
545
 
            if 'hpcs_error' in job_status.result:
546
 
                error = job_status.result['hpcs_error']
 
567
            if 'hpcs_error' in result:
 
568
                error = result['hpcs_error']
547
569
            else:
548
570
                error = 'Load Balancer error'
549
571
            LOG.error(
551
573
            )
552
574
            return False, error
553
575
        LOG.info('Gearman success from {0}'.format(self.host))
554
 
        return True, job_status.result
 
576
        return True, result
 
 
b'\\ No newline at end of file'