~tribaal/txaws/xss-hardening

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# Copyright (C) 2009 Robert Collins <robertc@robertcollins.net>
# Licenced under the txaws licence available at /LICENSE in the txaws source.

"""EC2 client support."""

__all__ = ['EC2Client']

from base64 import b64encode
from urllib import quote

from twisted.web.client import getPage

from txaws import credentials
from txaws.util import iso8601time, XML


class Instance(object):
    """An Amazon EC2 Instance.

    :attrib instanceId: The instance ID of this instance.
    :attrib instanceState: The state of this instance.
    """

    def __init__(self, instanceId, instanceState):
        self.instanceId = instanceId
        self.instanceState = instanceState


class EC2Client(object):
    """A client for EC2."""

    NS = '{http://ec2.amazonaws.com/doc/2008-12-01/}'

    def __init__(self, creds=None, query_factory=None):
        """Create an EC2Client.

        :param creds: Explicit credentials to use. If None, credentials are
            inferred as per txaws.credentials.AWSCredentials.
        """
        if creds is None:
            self.creds = credentials.AWSCredentials()
        else:
            self.creds = creds
        if query_factory is None:
            self.query_factory = Query
        else:
            self.query_factory = query_factory

    def describe_instances(self):
        """Describe current instances."""
        q = self.query_factory('DescribeInstances', self.creds)
        d = q.submit()
        return d.addCallback(self._parse_Reservation)

    def _parse_Reservation(self, xml_bytes):
        root = XML(xml_bytes)
        result = []
        # May be a more elegant way to do this:
        for reservation in root.find(self.NS + 'reservationSet'):
            for instance in reservation.find(self.NS + 'instancesSet'):
                instanceId = instance.findtext(self.NS + 'instanceId')
                instanceState = instance.find(
                    self.NS + 'instanceState').findtext(self.NS + 'name')
                result.append(Instance(instanceId, instanceState))
        return result


class Query(object):
    """A query that may be submitted to EC2."""

    def __init__(self, action, creds, other_params=None, time_tuple=None):
        """Create a Query to submit to EC2."""
        # Require params (2008-12-01 API):
        # Version, SignatureVersion, SignatureMethod, Action, AWSAccessKeyId,
        # Timestamp || Expires, Signature, 
        self.params = {'Version': '2008-12-01',
            'SignatureVersion': '2',
            'SignatureMethod': 'HmacSHA1',
            'Action': action,
            'AWSAccessKeyId': creds.access_key,
            'Timestamp': iso8601time(time_tuple),
            }
        if other_params:
            self.params.update(other_params)
        self.method = 'GET'
        self.host = 'ec2.amazonaws.com'
        self.uri = '/'
        self.creds = creds

    def canonical_query_params(self):
        """Return the canonical query params (used in signing)."""
        result = []
        for key, value in self.sorted_params():
            result.append('%s=%s' % (self.encode(key), self.encode(value)))
        return '&'.join(result)

    def encode(self, a_string):
        """Encode a_string as per the canonicalisation encoding rules.

        See the AWS dev reference page 90 (2008-12-01 version).
        :return: a_string encoded.
        """
        return quote(a_string, safe='~')

    def signing_text(self):
        """Return the text to be signed when signing the query."""
        result = "%s\n%s\n%s\n%s" % (self.method, self.host, self.uri,
            self.canonical_query_params())
        return result

    def sign(self):
        """Sign this query using its built in credentials.
        
        This prepares it to be sent, and should be done as the last step before
        submitting the query. Signing is done automatically - this is a public
        method to facilitate testing.
        """
        self.params['Signature'] = self.creds.sign(self.signing_text())

    def sorted_params(self):
        """Return the query params sorted appropriately for signing."""
        return sorted(self.params.items())

    def submit(self):
        """Submit this query.

        :return: A deferred from twisted.web.client.getPage
        """
        self.sign()
        url = 'http://%s%s?%s' % (self.host, self.uri,
            self.canonical_query_params())
        return getPage(url, method=self.method)