34
34
def __init__(self, secret_key):
35
secret_key = secret_key.encode()
36
self.hmac = hmac.new(secret_key, digestmod=hashlib.sha1)
35
self.secret_key = secret_key.encode()
36
self.hmac = hmac.new(self.secret_key, digestmod=hashlib.sha1)
38
self.hmac_256 = hmac.new(secret_key, digestmod=hashlib.sha256)
38
self.hmac_256 = hmac.new(self.secret_key, digestmod=hashlib.sha256)
40
def _v4_creds(self, credentials):
42
Detect if the credentials are for a v4 signed request, since AWS
43
removed the SignatureVersion field from the v4 request spec...
44
This expects a dict of the request headers to be passed in the
45
credentials dict, since the recommended way to pass v4 creds is
46
via the 'Authorization' header
47
see http://docs.aws.amazon.com/general/latest/gr/
48
sigv4-signed-request-examples.html
50
Alternatively X-Amz-Algorithm can be specified as a query parameter,
51
and the authentication data can also passed as query parameters.
53
Note a hash of the request body is also required in the credentials
54
for v4 auth to work in the body_hash key, calculated via:
55
hashlib.sha256(req.body).hexdigest()
58
auth_str = credentials['headers']['Authorization']
59
if auth_str.startswith('AWS4-HMAC-SHA256'):
62
# Alternatively the Authorization data can be passed via
63
# the query params list, check X-Amz-Algorithm=AWS4-HMAC-SHA256
65
if (credentials['params']['X-Amz-Algorithm'] ==
40
73
def generate(self, credentials):
41
74
"""Generate auth string according to what SignatureVersion is given."""
42
if credentials['params']['SignatureVersion'] == '0':
75
signature_version = credentials['params'].get('SignatureVersion')
76
if signature_version == '0':
43
77
return self._calc_signature_0(credentials['params'])
44
if credentials['params']['SignatureVersion'] == '1':
78
if signature_version == '1':
45
79
return self._calc_signature_1(credentials['params'])
46
if credentials['params']['SignatureVersion'] == '2':
80
if signature_version == '2':
47
81
return self._calc_signature_2(credentials['params'],
48
82
credentials['verb'],
49
83
credentials['host'],
50
84
credentials['path'])
51
raise Exception('Unknown Signature Version: %s' %
52
credentials['params']['SignatureVersion'])
85
if self._v4_creds(credentials):
86
return self._calc_signature_4(credentials['params'],
90
credentials['headers'],
91
credentials['body_hash'])
93
if signature_version is not None:
94
raise Exception('Unknown signature version: %s' %
97
raise Exception('Unexpected signature format')
55
100
def _get_utf8_value(value):
87
148
current_hmac = self.hmac
88
149
params['SignatureMethod'] = 'HmacSHA1'
93
val = self._get_utf8_value(params[key])
94
val = urllib.quote(val, safe='-_~')
95
pairs.append(urllib.quote(key, safe='') + '=' + val)
150
string_to_sign += self._canonical_qs(params)
98
151
current_hmac.update(string_to_sign)
99
152
b64 = base64.b64encode(current_hmac.digest())
155
def _calc_signature_4(self, params, verb, server_string, path, headers,
157
"""Generate AWS signature version 4 string."""
160
return hmac.new(key, self._get_utf8_value(msg),
161
hashlib.sha256).digest()
163
def signature_key(datestamp, region_name, service_name):
165
Signature key derivation, see
166
http://docs.aws.amazon.com/general/latest/gr/
167
signature-v4-examples.html#signature-v4-examples-python
169
k_date = sign(self._get_utf8_value("AWS4" + self.secret_key),
171
k_region = sign(k_date, region_name)
172
k_service = sign(k_region, service_name)
173
k_signing = sign(k_service, "aws4_request")
176
def auth_param(param_name):
178
Get specified auth parameter, provided via one of:
179
- the Authorization header
180
- the X-Amz-* query parameters
183
auth_str = headers['Authorization']
184
param_str = auth_str.partition(
185
'%s=' % param_name)[2].split(',')[0]
187
param_str = params.get('X-Amz-%s' % param_name)
192
Get the X-Amz-Date' value, which can be either a header or paramter
194
Note AWS supports parsing the Date header also, but this is not
195
currently supported here as it will require some format mangling
196
So the X-Amz-Date value must be YYYYMMDDTHHMMSSZ format, then it
197
can be used to match against the YYYYMMDD format provided in the
200
http://docs.aws.amazon.com/general/latest/gr/
201
sigv4-date-handling.html
204
return headers['X-Amz-Date']
206
return params.get('X-Amz-Date')
208
def canonical_header_str():
209
# Get the list of headers to include, from either
210
# - the Authorization header (SignedHeaders key)
211
# - the X-Amz-SignedHeaders query parameter
212
headers_lower = dict((k.lower().strip(), v.strip())
213
for (k, v) in headers.iteritems())
215
sh_str = auth_param('SignedHeaders')
216
for h in sh_str.split(';'):
217
if h not in headers_lower:
220
# Note we discard any port suffix
221
header_list.append('%s:%s' %
222
(h, headers_lower[h].split(':')[0]))
224
header_list.append('%s:%s' % (h, headers_lower[h]))
225
return '\n'.join(header_list) + '\n'
227
# Create canonical request:
228
# http://docs.aws.amazon.com/general/latest/gr/
229
# sigv4-create-canonical-request.html
230
# Get parameters and headers in expected string format
231
cr = "\n".join((verb.upper(), path,
232
self._canonical_qs(params),
233
canonical_header_str(),
234
auth_param('SignedHeaders'),
237
# Check the date, reject any request where the X-Amz-Date doesn't
238
# match the credential scope
239
credential = auth_param('Credential')
240
credential_split = credential.split('/')
241
credential_scope = '/'.join(credential_split[1:])
242
credential_date = credential_split[1]
243
param_date = date_param()
244
if not param_date.startswith(credential_date):
245
raise Exception('Request date mismatch error')
247
# Create the string to sign
248
# http://docs.aws.amazon.com/general/latest/gr/
249
# sigv4-create-string-to-sign.html
250
string_to_sign = '\n'.join(('AWS4-HMAC-SHA256',
253
hashlib.sha256(cr).hexdigest()))
255
# Calculate the derived key, this requires a datestamp, region
256
# and service, which can be extracted from the credential scope
257
(req_region, req_service) = credential_split[2:4]
258
s_key = signature_key(credential_date, req_region, req_service)
259
# Finally calculate the signature!
260
signature = hmac.new(s_key, self._get_utf8_value(string_to_sign),
261
hashlib.sha256).hexdigest()