~ubuntu-branches/debian/sid/waagent/sid

« back to all changes in this revision

Viewing changes to azurelinuxagent/common/protocol/wire.py

  • Committer: Package Import Robot
  • Author(s): Bastian Blank
  • Date: 2016-10-05 14:10:15 UTC
  • mfrom: (1.2.6)
  • Revision ID: package-import@ubuntu.com-20161005141015-qufn5j541xvy9cpe
Tags: 2.2.0-1
New upstream version.

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
#
17
17
# Requires Python 2.4+ and Openssl 1.0+
18
18
 
 
19
import json
 
20
import os
 
21
import re
19
22
import time
20
23
import xml.sax.saxutils as saxutils
21
24
import azurelinuxagent.common.conf as conf
22
25
from azurelinuxagent.common.exception import ProtocolNotFoundError
23
26
from azurelinuxagent.common.future import httpclient, bytebuffer
24
 
from azurelinuxagent.common.utils.textutil import parse_doc, findall, find, findtext, \
25
 
    getattrib, gettext, remove_bom, get_bytes_from_pem
 
27
from azurelinuxagent.common.utils.textutil import parse_doc, findall, find, \
 
28
    findtext, getattrib, gettext, remove_bom, get_bytes_from_pem
26
29
import azurelinuxagent.common.utils.fileutil as fileutil
27
30
from azurelinuxagent.common.utils.cryptutil import CryptUtil
28
31
from azurelinuxagent.common.protocol.restapi import *
66
69
    """Slim layer to adapt wire protocol data to metadata protocol interface"""
67
70
 
68
71
    # TODO: Clean-up goal state processing
69
 
    #   At present, some methods magically update GoalState (e.g., get_vmagent_manifests), others (e.g., get_vmagent_pkgs)
70
 
    #   assume its presence. A better approach would make an explicit update call that returns the incarnation number and
71
 
    #   establishes that number the "context" for all other calls (either by updating the internal state of the protocol or
72
 
    #   by having callers pass the incarnation number to the method).
 
72
    #  At present, some methods magically update GoalState (e.g.,
 
73
    #  get_vmagent_manifests), others (e.g., get_vmagent_pkgs)
 
74
    #  assume its presence. A better approach would make an explicit update
 
75
    #  call that returns the incarnation number and
 
76
    #  establishes that number the "context" for all other calls (either by
 
77
    #  updating the internal state of the protocol or
 
78
    #  by having callers pass the incarnation number to the method).
73
79
 
74
80
    def __init__(self, endpoint):
75
81
        if endpoint is None:
269
275
        "timestampUTC": timestamp
270
276
    }
271
277
    if len(v1_sub_status) != 0:
272
 
        v1_ext_status['substatus'] = v1_sub_status
 
278
        v1_ext_status['status']['substatus'] = v1_sub_status
273
279
    return v1_ext_status
274
280
 
275
281
 
369
375
        logger.verbose("Check blob type.")
370
376
        timestamp = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
371
377
        try:
372
 
            resp = self.client.call_storage_service(restutil.http_head, url, {
373
 
                "x-ms-date": timestamp,
374
 
                'x-ms-version': self.__class__.__storage_version__
375
 
            })
 
378
            resp = self.client.call_storage_service(
 
379
                restutil.http_head,
 
380
                url,
 
381
                {
 
382
                    "x-ms-date": timestamp,
 
383
                    "x-ms-version": self.__class__.__storage_version__
 
384
                })
376
385
        except HttpError as e:
377
386
            raise ProtocolError((u"Failed to get status blob type: {0}"
378
387
                                 u"").format(e))
387
396
    def put_block_blob(self, url, data):
388
397
        logger.verbose("Upload block blob")
389
398
        timestamp = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
390
 
        resp = self.client.call_storage_service(restutil.http_put, url, data,
391
 
                        {
392
 
                            "x-ms-date": timestamp,
393
 
                            "x-ms-blob-type": "BlockBlob",
394
 
                            "Content-Length": ustr(len(data)),
395
 
                            "x-ms-version": self.__class__.__storage_version__
396
 
                        })
 
399
        resp = self.client.call_storage_service(
 
400
            restutil.http_put,
 
401
            url,
 
402
            data,
 
403
            {
 
404
                "x-ms-date": timestamp,
 
405
                "x-ms-blob-type": "BlockBlob",
 
406
                "Content-Length": ustr(len(data)),
 
407
                "x-ms-version": self.__class__.__storage_version__
 
408
            })
397
409
        if resp.status != httpclient.CREATED:
398
410
            raise UploadError(
399
411
                "Failed to upload block blob: {0}".format(resp.status))
407
419
 
408
420
        # Align to 512 bytes
409
421
        page_blob_size = int((len(data) + 511) / 512) * 512
410
 
        resp = self.client.call_storage_service(restutil.http_put, url, "",
411
 
                        {
412
 
                            "x-ms-date": timestamp,
413
 
                            "x-ms-blob-type": "PageBlob",
414
 
                            "Content-Length": "0",
415
 
                            "x-ms-blob-content-length": ustr(page_blob_size),
416
 
                            "x-ms-version": self.__class__.__storage_version__
417
 
                        })
 
422
        resp = self.client.call_storage_service(
 
423
            restutil.http_put,
 
424
            url,
 
425
            "",
 
426
            {
 
427
                "x-ms-date": timestamp,
 
428
                "x-ms-blob-type": "PageBlob",
 
429
                "Content-Length": "0",
 
430
                "x-ms-blob-content-length": ustr(page_blob_size),
 
431
                "x-ms-version": self.__class__.__storage_version__
 
432
            })
418
433
        if resp.status != httpclient.CREATED:
419
434
            raise UploadError(
420
435
                "Failed to clean up page blob: {0}".format(resp.status))
437
452
            buf = bytearray(buf_size)
438
453
            buf[0: content_size] = data[start: end]
439
454
            resp = self.client.call_storage_service(
440
 
                restutil.http_put, url, bytebuffer(buf),
 
455
                restutil.http_put,
 
456
                url,
 
457
                bytebuffer(buf),
441
458
                {
442
459
                    "x-ms-date": timestamp,
443
460
                    "x-ms-range": "bytes={0}-{1}".format(start, page_end - 1),
465
482
        attr_type = 'mt:bool'
466
483
    elif param_type is float:
467
484
        attr_type = 'mt:float64'
468
 
    return param_format.format(param.name, saxutils.quoteattr(ustr(param.value)),
 
485
    return param_format.format(param.name,
 
486
                               saxutils.quoteattr(ustr(param.value)),
469
487
                               attr_type)
470
488
 
471
489
 
501
519
        now = time.time()
502
520
        if now - self.last_request < 1:
503
521
            logger.verbose("Last request issued less than 1 second ago")
504
 
            logger.verbose("Sleep {0} second to avoid throttling.", 
505
 
                        SHORT_WAITING_INTERVAL)
 
522
            logger.verbose("Sleep {0} second to avoid throttling.",
 
523
                           SHORT_WAITING_INTERVAL)
506
524
            time.sleep(SHORT_WAITING_INTERVAL)
507
525
        self.last_request = now
508
526
 
509
527
        self.req_count += 1
510
528
        if self.req_count % 3 == 0:
511
 
            logger.verbose("Sleep {0} second to avoid throttling.", 
512
 
                        SHORT_WAITING_INTERVAL)
 
529
            logger.verbose("Sleep {0} second to avoid throttling.",
 
530
                           SHORT_WAITING_INTERVAL)
513
531
            time.sleep(SHORT_WAITING_INTERVAL)
514
532
            self.req_count = 0
515
533
 
516
534
    def call_wireserver(self, http_req, *args, **kwargs):
517
535
        """
518
 
        Call wire server. Handle throttling(403) and Resource Gone(410)
 
536
        Call wire server; handle throttling (403), resource gone (410) and
 
537
        service unavailable (503).
519
538
        """
520
539
        self.prevent_throttling()
521
540
        for retry in range(0, 3):
522
541
            resp = http_req(*args, **kwargs)
523
542
            if resp.status == httpclient.FORBIDDEN:
524
 
                logger.warn("Sending too much request to wire server")
525
 
                logger.info("Sleep {0} second to avoid throttling.",
 
543
                logger.warn("Sending too many requests to wire server. ")
 
544
                logger.info("Sleeping {0}s to avoid throttling.",
526
545
                            LONG_WAITING_INTERVAL)
527
546
                time.sleep(LONG_WAITING_INTERVAL)
 
547
            elif resp.status == httpclient.SERVICE_UNAVAILABLE:
 
548
                logger.warn("Service temporarily unavailable, sleeping {0}s "
 
549
                            "before retrying.", LONG_WAITING_INTERVAL)
 
550
                time.sleep(LONG_WAITING_INTERVAL)
528
551
            elif resp.status == httpclient.GONE:
529
552
                msg = args[0] if len(args) > 0 else ""
530
553
                raise WireProtocolResourceGone(msg)
531
554
            else:
532
555
                return resp
533
 
        raise ProtocolError(("Calling wire server failed: {0}"
534
 
                             "").format(resp.status))
 
556
        raise ProtocolError(("Calling wire server failed: "
 
557
                             "{0}").format(resp.status))
535
558
 
536
559
    def decode_config(self, data):
537
560
        if data is None:
542
565
 
543
566
    def fetch_config(self, uri, headers):
544
567
        try:
545
 
            resp = self.call_wireserver(restutil.http_get, uri,
 
568
            resp = self.call_wireserver(restutil.http_get,
 
569
                                        uri,
546
570
                                        headers=headers)
547
571
        except HttpError as e:
548
572
            raise ProtocolError(ustr(e))
549
573
 
550
 
        if (resp.status != httpclient.OK):
 
574
        if resp.status != httpclient.OK:
551
575
            raise ProtocolError("{0} - {1}".format(resp.status, uri))
552
576
 
553
577
        return self.decode_config(resp.read())
566
590
        except IOError as e:
567
591
            raise ProtocolError("Failed to write cache: {0}".format(e))
568
592
 
569
 
    def call_storage_service(self, http_req, *args, **kwargs):
 
593
    @staticmethod
 
594
    def call_storage_service(http_req, *args, **kwargs):
570
595
        """ 
571
596
        Call storage service, handle SERVICE_UNAVAILABLE(503)
572
597
        """
 
598
 
 
599
        # force the chk_proxy arg to True, since all calls to storage should
 
600
        #  use a configured proxy
 
601
        kwargs['chk_proxy'] = True
 
602
 
573
603
        for retry in range(0, 3):
574
604
            resp = http_req(*args, **kwargs)
575
605
            if resp.status == httpclient.SERVICE_UNAVAILABLE:
576
 
                logger.warn("Storage service is not avaible temporaryly")
577
 
                logger.info("Will retry later, in {0} seconds",
 
606
                logger.warn("Storage service is temporarily unavailable. ")
 
607
                logger.info("Will retry in {0} seconds. ",
578
608
                            LONG_WAITING_INTERVAL)
579
609
                time.sleep(LONG_WAITING_INTERVAL)
580
610
            else:
581
611
                return resp
582
 
        raise ProtocolError(("Calling storage endpoint failed: {0}"
583
 
                             "").format(resp.status))
 
612
        raise ProtocolError(("Calling storage endpoint failed: "
 
613
                             "{0}").format(resp.status))
584
614
 
585
615
    def fetch_manifest(self, version_uris):
586
616
        for version_uri in version_uris:
587
617
            logger.verbose("Fetch ext handler manifest: {0}", version_uri.uri)
588
618
            try:
589
 
                resp = self.call_storage_service(restutil.http_get,
590
 
                                                 version_uri.uri, None,
591
 
                                                 chk_proxy=True)
 
619
                resp = self.call_storage_service(
 
620
                    restutil.http_get,
 
621
                    version_uri.uri,
 
622
                    None)
592
623
            except HttpError as e:
593
624
                raise ProtocolError(ustr(e))
594
625
 
693
724
 
694
725
    def get_hosting_env(self):
695
726
        if (self.hosting_env is None):
696
 
            local_file = os.path.join(conf.get_lib_dir(), HOSTING_ENV_FILE_NAME)
 
727
            local_file = os.path.join(conf.get_lib_dir(),
 
728
                                      HOSTING_ENV_FILE_NAME)
697
729
            xml_text = self.fetch_cache(local_file)
698
730
            self.hosting_env = HostingEnv(xml_text)
699
731
        return self.hosting_env
700
732
 
701
733
    def get_shared_conf(self):
702
734
        if (self.shared_conf is None):
703
 
            local_file = os.path.join(conf.get_lib_dir(), SHARED_CONF_FILE_NAME)
 
735
            local_file = os.path.join(conf.get_lib_dir(),
 
736
                                      SHARED_CONF_FILE_NAME)
704
737
            xml_text = self.fetch_cache(local_file)
705
738
            self.shared_conf = SharedConfig(xml_text)
706
739
        return self.shared_conf
774
807
        role_prop_uri = ROLE_PROP_URI.format(self.endpoint)
775
808
        headers = self.get_header_for_xml_content()
776
809
        try:
777
 
            resp = self.call_wireserver(restutil.http_post, role_prop_uri,
778
 
                                        role_prop, headers=headers)
 
810
            resp = self.call_wireserver(restutil.http_post,
 
811
                                        role_prop_uri,
 
812
                                        role_prop,
 
813
                                        headers=headers)
779
814
        except HttpError as e:
780
 
            raise ProtocolError((u"Failed to send role properties: {0}"
781
 
                                 u"").format(e))
 
815
            raise ProtocolError((u"Failed to send role properties: "
 
816
                                 u"{0}").format(e))
782
817
        if resp.status != httpclient.ACCEPTED:
783
 
            raise ProtocolError((u"Failed to send role properties: {0}"
784
 
                                 u", {1}").format(resp.status, resp.read()))
 
818
            raise ProtocolError((u"Failed to send role properties: "
 
819
                                 u",{0}: {1}").format(resp.status,
 
820
                                                      resp.read()))
785
821
 
786
822
    def report_health(self, status, substatus, description):
787
823
        goal_state = self.get_goal_state()
795
831
        health_report_uri = HEALTH_REPORT_URI.format(self.endpoint)
796
832
        headers = self.get_header_for_xml_content()
797
833
        try:
798
 
            resp = self.call_wireserver(restutil.http_post, health_report_uri,
799
 
                                        health_report, headers=headers, max_retry=8)
 
834
            # 30 retries with 10s sleep gives ~5min for wireserver updates;
 
835
            # this is retried 3 times with 15s sleep before throwing a
 
836
            # ProtocolError, for a total of ~15min.
 
837
            resp = self.call_wireserver(restutil.http_post,
 
838
                                        health_report_uri,
 
839
                                        health_report,
 
840
                                        headers=headers,
 
841
                                        max_retry=30)
800
842
        except HttpError as e:
801
 
            raise ProtocolError((u"Failed to send provision status: {0}"
802
 
                                 u"").format(e))
 
843
            raise ProtocolError((u"Failed to send provision status: "
 
844
                                 u"{0}").format(e))
803
845
        if resp.status != httpclient.OK:
804
 
            raise ProtocolError((u"Failed to send provision status: {0}"
805
 
                                 u", {1}").format(resp.status, resp.read()))
 
846
            raise ProtocolError((u"Failed to send provision status: "
 
847
                                 u",{0}: {1}").format(resp.status,
 
848
                                                      resp.read()))
806
849
 
807
850
    def send_event(self, provider_id, event_str):
808
851
        uri = TELEMETRY_URI.format(self.endpoint)
820
863
 
821
864
        if resp.status != httpclient.OK:
822
865
            logger.verbose(resp.read())
823
 
            raise ProtocolError("Failed to send events:{0}".format(resp.status))
 
866
            raise ProtocolError(
 
867
                "Failed to send events:{0}".format(resp.status))
824
868
 
825
869
    def report_event(self, event_list):
826
870
        buf = {}
867
911
            "x-ms-guest-agent-public-x509-cert": cert
868
912
        }
869
913
 
 
914
 
870
915
class VersionInfo(object):
871
916
    def __init__(self, xml_text):
872
917
        """
880
925
        xml_doc = parse_doc(xml_text)
881
926
        preferred = find(xml_doc, "Preferred")
882
927
        self.preferred = findtext(preferred, "Version")
883
 
        logger.info("Fabric preferred wire protocol version:{0}", self.preferred)
 
928
        logger.info("Fabric preferred wire protocol version:{0}",
 
929
                    self.preferred)
884
930
 
885
931
        self.supported = []
886
932
        supported = find(xml_doc, "Supported")
887
933
        supported_version = findall(supported, "Version")
888
934
        for node in supported_version:
889
935
            version = gettext(node)
890
 
            logger.verbose("Fabric supported wire protocol version:{0}", version)
 
936
            logger.verbose("Fabric supported wire protocol version:{0}",
 
937
                           version)
891
938
            self.supported.append(version)
892
939
 
893
940
    def get_preferred(self):
980
1027
        # Not used currently
981
1028
        return self
982
1029
 
 
1030
 
983
1031
class Certificates(object):
984
1032
    """
985
1033
    Object containing certificates of host and provisioned user.
1176
1224
            ext.sequenceNumber = seqNo
1177
1225
            ext.publicSettings = handler_settings.get("publicSettings")
1178
1226
            ext.protectedSettings = handler_settings.get("protectedSettings")
1179
 
            thumbprint = handler_settings.get("protectedSettingsCertThumbprint")
 
1227
            thumbprint = handler_settings.get(
 
1228
                "protectedSettingsCertThumbprint")
1180
1229
            ext.certificateThumbprint = thumbprint
1181
1230
            ext_handler.properties.extensions.append(ext)
1182
1231
 
1191
1240
 
1192
1241
    def parse(self, xml_text):
1193
1242
        xml_doc = parse_doc(xml_text)
1194
 
        self._handle_packages(findall(find(xml_doc, "Plugins"), "Plugin"), False)
1195
 
        self._handle_packages(findall(find(xml_doc, "InternalPlugins"), "Plugin"), True)
 
1243
        self._handle_packages(findall(find(xml_doc,
 
1244
                                           "Plugins"),
 
1245
                                      "Plugin"),
 
1246
                              False)
 
1247
        self._handle_packages(findall(find(xml_doc,
 
1248
                                           "InternalPlugins"),
 
1249
                                      "Plugin"),
 
1250
                              True)
1196
1251
 
1197
1252
    def _handle_packages(self, packages, isinternal):
1198
1253
        for package in packages:
1199
1254
            version = findtext(package, "Version")
1200
1255
 
1201
 
            disallow_major_upgrade = findtext(package, "DisallowMajorVersionUpgrade")
 
1256
            disallow_major_upgrade = findtext(package,
 
1257
                                              "DisallowMajorVersionUpgrade")
1202
1258
            if disallow_major_upgrade is None:
1203
1259
                disallow_major_upgrade = ''
1204
1260
            disallow_major_upgrade = disallow_major_upgrade.lower() == "true"