~tribaal/txaws/xss-hardening

7.6.9 by Duncan McGreggor
PEP-8 clean ups.
1
# Copyright (c) 2009 Canonical Ltd <duncan.mcgreggor@canonical.com>
2
# Licenced under the txaws licence available at /LICENSE in the txaws source.
3
7.6.29 by Duncan McGreggor
- Updated the AWSError class to inherit from the Twisted web error class.
4
from twisted.web.error import Error
7.6.9 by Duncan McGreggor
PEP-8 clean ups.
5
50.1.5 by Duncan McGreggor
* Moved the shared logic out of EC2Error and into AWSError.
6
from txaws.util import XML
7
22.2.11 by Duncan McGreggor
Added missing newline.
8
7.6.29 by Duncan McGreggor
- Updated the AWSError class to inherit from the Twisted web error class.
9
class AWSError(Error):
7.6.1 by Duncan McGreggor
Added exception modules and tests.
10
    """
11
    A base class for txAWS errors.
12
    """
63 by Thomas Hervé
Fix test suite against latest release of Twisted, which requires the code argument
13
    def __init__(self, xml_bytes, status, message=None, response=None):
50.1.5 by Duncan McGreggor
* Moved the shared logic out of EC2Error and into AWSError.
14
        super(AWSError, self).__init__(status, message, response)
15
        if not xml_bytes:
16
            raise ValueError("XML cannot be empty.")
17
        self.original = xml_bytes
18
        self.errors = []
19
        self.request_id = ""
20
        self.host_id = ""
21
        self.parse()
22
23
    def __str__(self):
24
        return self._get_error_message_string()
25
26
    def __repr__(self):
27
        return "<%s object with %s>" % (
28
            self.__class__.__name__, self._get_error_code_string())
29
30
    def _set_request_id(self, tree):
31
        request_id_node = tree.find(".//RequestID")
32
        if hasattr(request_id_node, "text"):
33
            text = request_id_node.text
34
            if text:
35
                self.request_id = text
36
37
    def _set_host_id(self, tree):
38
        host_id = tree.find(".//HostID")
39
        if hasattr(host_id, "text"):
40
            text = host_id.text
41
            if text:
42
                self.host_id = text
43
44
    def _get_error_code_string(self):
45
        count = len(self.errors)
46
        error_code = self.get_error_codes()
47
        if count > 1:
48
            return "Error count: %s" % error_code
49
        else:
50
            return "Error code: %s" % error_code
51
52
    def _get_error_message_string(self):
53
        count = len(self.errors)
54
        error_message = self.get_error_messages()
55
        if count > 1:
56
            return "%s." % error_message
57
        else:
58
            return "Error Message: %s" % error_message
59
60
    def _node_to_dict(self, node):
61
        data = {}
62
        for child in node:
63
            if child.tag and child.text:
64
                data[child.tag] = child.text
65
        return data
66
67
    def _check_for_html(self, tree):
68
        if tree.tag == "html":
69
            message = "Could not parse HTML in the response."
70
            raise AWSResponseParseError(message)
71
72
    def _set_400_error(self, tree):
73
        """
74
        This method needs to be implemented by subclasses.
75
        """
76
77
    def _set_500_error(self, tree):
78
        self._set_request_id(tree)
79
        self._set_host_id(tree)
80
        data = self._node_to_dict(tree)
81
        if data:
82
            self.errors.append(data)
83
84
    def parse(self, xml_bytes=""):
85
        if not xml_bytes:
86
            xml_bytes = self.original
87
        self.original = xml_bytes
88
        tree = XML(xml_bytes.strip())
89
        self._check_for_html(tree)
90
        self._set_request_id(tree)
91
        if self.status:
92
            status = int(self.status)
93
        else:
94
            status = 400
95
        if status >= 500:
96
            self._set_500_error(tree)
97
        else:
98
            self._set_400_error(tree)
99
100
    def has_error(self, errorString):
101
        for error in self.errors:
102
            if errorString in error.values():
103
                return True
104
        return False
105
106
    def get_error_codes(self):
107
        count = len(self.errors)
108
        if count > 1:
109
            return count
110
        elif count == 0:
111
            return
112
        else:
113
            return self.errors[0]["Code"]
114
115
    def get_error_messages(self):
116
        count = len(self.errors)
117
        if count > 1:
118
            return "Multiple EC2 Errors"
119
        elif count == 0:
120
            return "Empty error list"
121
        else:
122
            return self.errors[0]["Message"]
123
44.1.2 by Duncan McGreggor
- Added docstring for ec2_error_wrapper.
124
125
126
class AWSResponseParseError(Exception):
127
    """
128
    txAWS was unable to parse the server response.
129
    """