17
17
# Requires Python 2.4+ and Openssl 1.0+
20
23
import xml.sax.saxutils as saxutils
21
24
import azurelinuxagent.common.conf as conf
22
25
from azurelinuxagent.common.exception import ProtocolNotFoundError
23
26
from azurelinuxagent.common.future import httpclient, bytebuffer
24
from azurelinuxagent.common.utils.textutil import parse_doc, findall, find, findtext, \
25
getattrib, gettext, remove_bom, get_bytes_from_pem
27
from azurelinuxagent.common.utils.textutil import parse_doc, findall, find, \
28
findtext, getattrib, gettext, remove_bom, get_bytes_from_pem
26
29
import azurelinuxagent.common.utils.fileutil as fileutil
27
30
from azurelinuxagent.common.utils.cryptutil import CryptUtil
28
31
from azurelinuxagent.common.protocol.restapi import *
66
69
"""Slim layer to adapt wire protocol data to metadata protocol interface"""
68
71
# TODO: Clean-up goal state processing
69
# At present, some methods magically update GoalState (e.g., get_vmagent_manifests), others (e.g., get_vmagent_pkgs)
70
# assume its presence. A better approach would make an explicit update call that returns the incarnation number and
71
# establishes that number the "context" for all other calls (either by updating the internal state of the protocol or
72
# by having callers pass the incarnation number to the method).
72
# At present, some methods magically update GoalState (e.g.,
73
# get_vmagent_manifests), others (e.g., get_vmagent_pkgs)
74
# assume its presence. A better approach would make an explicit update
75
# call that returns the incarnation number and
76
# establishes that number the "context" for all other calls (either by
77
# updating the internal state of the protocol or
78
# by having callers pass the incarnation number to the method).
74
80
def __init__(self, endpoint):
75
81
if endpoint is None:
369
375
logger.verbose("Check blob type.")
370
376
timestamp = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
372
resp = self.client.call_storage_service(restutil.http_head, url, {
373
"x-ms-date": timestamp,
374
'x-ms-version': self.__class__.__storage_version__
378
resp = self.client.call_storage_service(
382
"x-ms-date": timestamp,
383
"x-ms-version": self.__class__.__storage_version__
376
385
except HttpError as e:
377
386
raise ProtocolError((u"Failed to get status blob type: {0}"
387
396
def put_block_blob(self, url, data):
388
397
logger.verbose("Upload block blob")
389
398
timestamp = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
390
resp = self.client.call_storage_service(restutil.http_put, url, data,
392
"x-ms-date": timestamp,
393
"x-ms-blob-type": "BlockBlob",
394
"Content-Length": ustr(len(data)),
395
"x-ms-version": self.__class__.__storage_version__
399
resp = self.client.call_storage_service(
404
"x-ms-date": timestamp,
405
"x-ms-blob-type": "BlockBlob",
406
"Content-Length": ustr(len(data)),
407
"x-ms-version": self.__class__.__storage_version__
397
409
if resp.status != httpclient.CREATED:
398
410
raise UploadError(
399
411
"Failed to upload block blob: {0}".format(resp.status))
408
420
# Align to 512 bytes
409
421
page_blob_size = int((len(data) + 511) / 512) * 512
410
resp = self.client.call_storage_service(restutil.http_put, url, "",
412
"x-ms-date": timestamp,
413
"x-ms-blob-type": "PageBlob",
414
"Content-Length": "0",
415
"x-ms-blob-content-length": ustr(page_blob_size),
416
"x-ms-version": self.__class__.__storage_version__
422
resp = self.client.call_storage_service(
427
"x-ms-date": timestamp,
428
"x-ms-blob-type": "PageBlob",
429
"Content-Length": "0",
430
"x-ms-blob-content-length": ustr(page_blob_size),
431
"x-ms-version": self.__class__.__storage_version__
418
433
if resp.status != httpclient.CREATED:
419
434
raise UploadError(
420
435
"Failed to clean up page blob: {0}".format(resp.status))
437
452
buf = bytearray(buf_size)
438
453
buf[0: content_size] = data[start: end]
439
454
resp = self.client.call_storage_service(
440
restutil.http_put, url, bytebuffer(buf),
442
459
"x-ms-date": timestamp,
443
460
"x-ms-range": "bytes={0}-{1}".format(start, page_end - 1),
465
482
attr_type = 'mt:bool'
466
483
elif param_type is float:
467
484
attr_type = 'mt:float64'
468
return param_format.format(param.name, saxutils.quoteattr(ustr(param.value)),
485
return param_format.format(param.name,
486
saxutils.quoteattr(ustr(param.value)),
501
519
now = time.time()
502
520
if now - self.last_request < 1:
503
521
logger.verbose("Last request issued less than 1 second ago")
504
logger.verbose("Sleep {0} second to avoid throttling.",
505
SHORT_WAITING_INTERVAL)
522
logger.verbose("Sleep {0} second to avoid throttling.",
523
SHORT_WAITING_INTERVAL)
506
524
time.sleep(SHORT_WAITING_INTERVAL)
507
525
self.last_request = now
509
527
self.req_count += 1
510
528
if self.req_count % 3 == 0:
511
logger.verbose("Sleep {0} second to avoid throttling.",
512
SHORT_WAITING_INTERVAL)
529
logger.verbose("Sleep {0} second to avoid throttling.",
530
SHORT_WAITING_INTERVAL)
513
531
time.sleep(SHORT_WAITING_INTERVAL)
514
532
self.req_count = 0
516
534
def call_wireserver(self, http_req, *args, **kwargs):
518
Call wire server. Handle throttling(403) and Resource Gone(410)
536
Call wire server; handle throttling (403), resource gone (410) and
537
service unavailable (503).
520
539
self.prevent_throttling()
521
540
for retry in range(0, 3):
522
541
resp = http_req(*args, **kwargs)
523
542
if resp.status == httpclient.FORBIDDEN:
524
logger.warn("Sending too much request to wire server")
525
logger.info("Sleep {0} second to avoid throttling.",
543
logger.warn("Sending too many requests to wire server. ")
544
logger.info("Sleeping {0}s to avoid throttling.",
526
545
LONG_WAITING_INTERVAL)
527
546
time.sleep(LONG_WAITING_INTERVAL)
547
elif resp.status == httpclient.SERVICE_UNAVAILABLE:
548
logger.warn("Service temporarily unavailable, sleeping {0}s "
549
"before retrying.", LONG_WAITING_INTERVAL)
550
time.sleep(LONG_WAITING_INTERVAL)
528
551
elif resp.status == httpclient.GONE:
529
552
msg = args[0] if len(args) > 0 else ""
530
553
raise WireProtocolResourceGone(msg)
533
raise ProtocolError(("Calling wire server failed: {0}"
534
"").format(resp.status))
556
raise ProtocolError(("Calling wire server failed: "
557
"{0}").format(resp.status))
536
559
def decode_config(self, data):
543
566
def fetch_config(self, uri, headers):
545
resp = self.call_wireserver(restutil.http_get, uri,
568
resp = self.call_wireserver(restutil.http_get,
547
571
except HttpError as e:
548
572
raise ProtocolError(ustr(e))
550
if (resp.status != httpclient.OK):
574
if resp.status != httpclient.OK:
551
575
raise ProtocolError("{0} - {1}".format(resp.status, uri))
553
577
return self.decode_config(resp.read())
566
590
except IOError as e:
567
591
raise ProtocolError("Failed to write cache: {0}".format(e))
569
def call_storage_service(self, http_req, *args, **kwargs):
594
def call_storage_service(http_req, *args, **kwargs):
571
596
Call storage service, handle SERVICE_UNAVAILABLE(503)
599
# force the chk_proxy arg to True, since all calls to storage should
600
# use a configured proxy
601
kwargs['chk_proxy'] = True
573
603
for retry in range(0, 3):
574
604
resp = http_req(*args, **kwargs)
575
605
if resp.status == httpclient.SERVICE_UNAVAILABLE:
576
logger.warn("Storage service is not avaible temporaryly")
577
logger.info("Will retry later, in {0} seconds",
606
logger.warn("Storage service is temporarily unavailable. ")
607
logger.info("Will retry in {0} seconds. ",
578
608
LONG_WAITING_INTERVAL)
579
609
time.sleep(LONG_WAITING_INTERVAL)
582
raise ProtocolError(("Calling storage endpoint failed: {0}"
583
"").format(resp.status))
612
raise ProtocolError(("Calling storage endpoint failed: "
613
"{0}").format(resp.status))
585
615
def fetch_manifest(self, version_uris):
586
616
for version_uri in version_uris:
587
617
logger.verbose("Fetch ext handler manifest: {0}", version_uri.uri)
589
resp = self.call_storage_service(restutil.http_get,
590
version_uri.uri, None,
619
resp = self.call_storage_service(
592
623
except HttpError as e:
593
624
raise ProtocolError(ustr(e))
694
725
def get_hosting_env(self):
695
726
if (self.hosting_env is None):
696
local_file = os.path.join(conf.get_lib_dir(), HOSTING_ENV_FILE_NAME)
727
local_file = os.path.join(conf.get_lib_dir(),
728
HOSTING_ENV_FILE_NAME)
697
729
xml_text = self.fetch_cache(local_file)
698
730
self.hosting_env = HostingEnv(xml_text)
699
731
return self.hosting_env
701
733
def get_shared_conf(self):
702
734
if (self.shared_conf is None):
703
local_file = os.path.join(conf.get_lib_dir(), SHARED_CONF_FILE_NAME)
735
local_file = os.path.join(conf.get_lib_dir(),
736
SHARED_CONF_FILE_NAME)
704
737
xml_text = self.fetch_cache(local_file)
705
738
self.shared_conf = SharedConfig(xml_text)
706
739
return self.shared_conf
774
807
role_prop_uri = ROLE_PROP_URI.format(self.endpoint)
775
808
headers = self.get_header_for_xml_content()
777
resp = self.call_wireserver(restutil.http_post, role_prop_uri,
778
role_prop, headers=headers)
810
resp = self.call_wireserver(restutil.http_post,
779
814
except HttpError as e:
780
raise ProtocolError((u"Failed to send role properties: {0}"
815
raise ProtocolError((u"Failed to send role properties: "
782
817
if resp.status != httpclient.ACCEPTED:
783
raise ProtocolError((u"Failed to send role properties: {0}"
784
u", {1}").format(resp.status, resp.read()))
818
raise ProtocolError((u"Failed to send role properties: "
819
u",{0}: {1}").format(resp.status,
786
822
def report_health(self, status, substatus, description):
787
823
goal_state = self.get_goal_state()
795
831
health_report_uri = HEALTH_REPORT_URI.format(self.endpoint)
796
832
headers = self.get_header_for_xml_content()
798
resp = self.call_wireserver(restutil.http_post, health_report_uri,
799
health_report, headers=headers, max_retry=8)
834
# 30 retries with 10s sleep gives ~5min for wireserver updates;
835
# this is retried 3 times with 15s sleep before throwing a
836
# ProtocolError, for a total of ~15min.
837
resp = self.call_wireserver(restutil.http_post,
800
842
except HttpError as e:
801
raise ProtocolError((u"Failed to send provision status: {0}"
843
raise ProtocolError((u"Failed to send provision status: "
803
845
if resp.status != httpclient.OK:
804
raise ProtocolError((u"Failed to send provision status: {0}"
805
u", {1}").format(resp.status, resp.read()))
846
raise ProtocolError((u"Failed to send provision status: "
847
u",{0}: {1}").format(resp.status,
807
850
def send_event(self, provider_id, event_str):
808
851
uri = TELEMETRY_URI.format(self.endpoint)
821
864
if resp.status != httpclient.OK:
822
865
logger.verbose(resp.read())
823
raise ProtocolError("Failed to send events:{0}".format(resp.status))
867
"Failed to send events:{0}".format(resp.status))
825
869
def report_event(self, event_list):
880
925
xml_doc = parse_doc(xml_text)
881
926
preferred = find(xml_doc, "Preferred")
882
927
self.preferred = findtext(preferred, "Version")
883
logger.info("Fabric preferred wire protocol version:{0}", self.preferred)
928
logger.info("Fabric preferred wire protocol version:{0}",
885
931
self.supported = []
886
932
supported = find(xml_doc, "Supported")
887
933
supported_version = findall(supported, "Version")
888
934
for node in supported_version:
889
935
version = gettext(node)
890
logger.verbose("Fabric supported wire protocol version:{0}", version)
936
logger.verbose("Fabric supported wire protocol version:{0}",
891
938
self.supported.append(version)
893
940
def get_preferred(self):
1176
1224
ext.sequenceNumber = seqNo
1177
1225
ext.publicSettings = handler_settings.get("publicSettings")
1178
1226
ext.protectedSettings = handler_settings.get("protectedSettings")
1179
thumbprint = handler_settings.get("protectedSettingsCertThumbprint")
1227
thumbprint = handler_settings.get(
1228
"protectedSettingsCertThumbprint")
1180
1229
ext.certificateThumbprint = thumbprint
1181
1230
ext_handler.properties.extensions.append(ext)
1192
1241
def parse(self, xml_text):
1193
1242
xml_doc = parse_doc(xml_text)
1194
self._handle_packages(findall(find(xml_doc, "Plugins"), "Plugin"), False)
1195
self._handle_packages(findall(find(xml_doc, "InternalPlugins"), "Plugin"), True)
1243
self._handle_packages(findall(find(xml_doc,
1247
self._handle_packages(findall(find(xml_doc,
1197
1252
def _handle_packages(self, packages, isinternal):
1198
1253
for package in packages:
1199
1254
version = findtext(package, "Version")
1201
disallow_major_upgrade = findtext(package, "DisallowMajorVersionUpgrade")
1256
disallow_major_upgrade = findtext(package,
1257
"DisallowMajorVersionUpgrade")
1202
1258
if disallow_major_upgrade is None:
1203
1259
disallow_major_upgrade = ''
1204
1260
disallow_major_upgrade = disallow_major_upgrade.lower() == "true"