Skip to content
This repository has been archived by the owner on May 10, 2024. It is now read-only.

Add S3 SigV4 Presigning #2349

Merged
merged 1 commit into from
Jun 27, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Add S3 sigv4 presigning
  • Loading branch information
danielgtaylor committed Jun 27, 2014
commit 16729da27b95d6dbbd81bcebb43bcf099ce23fd3
53 changes: 53 additions & 0 deletions boto/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,11 @@ def determine_region_name(self, host):
if part == 's3':
# If it's by itself, the region is the previous part.
region_name = parts[-offset]

# Unless it's Vhosted classic
if region_name == 'amazonaws':
region_name = 'us-east-1'

break
elif part.startswith('s3-'):
region_name = self.clean_region_name(part)
Expand Down Expand Up @@ -666,6 +671,54 @@ def add_auth(self, req, **kwargs):
req = self.mangle_path_and_params(req)
return super(S3HmacAuthV4Handler, self).add_auth(req, **kwargs)

def presign(self, req, expires, iso_date=None):
"""
Presign a request using SigV4 query params. Takes in an HTTP request
and an expiration time in seconds and returns a URL.

http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-query-string-auth.html
"""
if iso_date is None:
iso_date = datetime.datetime.utcnow().strftime('%Y%m%dT%H%M%SZ')

region = self.determine_region_name(req.host)
service = self.determine_service_name(req.host)

params = {
'X-Amz-Algorithm': 'AWS4-HMAC-SHA256',
'X-Amz-Credential': '%s/%s/%s/%s/aws4_request' % (
self._provider.access_key,
iso_date[:8],
region,
service
),
'X-Amz-Date': iso_date,
'X-Amz-Expires': expires,
'X-Amz-SignedHeaders': 'host'
}

if self._provider.security_token:
params['X-Amz-Security-Token'] = self._provider.security_token

req.params.update(params)

cr = self.canonical_request(req)

# We need to replace the payload SHA with a constant
cr = '\n'.join(cr.split('\n')[:-1]) + '\nUNSIGNED-PAYLOAD'

# Date header is expected for string_to_sign, but unused otherwise
req.headers['X-Amz-Date'] = iso_date

sts = self.string_to_sign(req, cr)
signature = self.signature(req, sts)

# Add signature to params now that we have it
req.params['X-Amz-Signature'] = signature

return 'https://%s%s?%s' % (req.host, req.path,
urllib.urlencode(req.params))


class QueryAuthHandler(AuthHandler):
"""
Expand Down
25 changes: 25 additions & 0 deletions boto/s3/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,9 +350,34 @@ def build_post_form_args(self, bucket_name, key, expires_in=6000,

return {"action": url, "fields": fields}

def generate_url_sigv4(self, expires_in, method, bucket='', key='',
headers=None, force_http=False,
response_headers=None, version_id=None,
iso_date=None):
path = self.calling_format.build_path_base(bucket, key)
auth_path = self.calling_format.build_auth_path(bucket, key)
host = self.calling_format.build_host(self.server_name(), bucket)

params = {}
if version_id is not None:
params['VersionId'] = version_id

http_request = self.build_base_http_request(method, path, auth_path,
headers=headers, host=host,
params=params)

return self._auth_handler.presign(http_request, expires_in,
iso_date=iso_date)

def generate_url(self, expires_in, method, bucket='', key='', headers=None,
query_auth=True, force_http=False, response_headers=None,
expires_in_absolute=False, version_id=None):
if self._auth_handler.capability[0] == 'hmac-v4-s3':
# Handle the special sigv4 case
return self.generate_url_sigv4(expires_in, method, bucket=bucket,
key=key, headers=headers, force_http=force_http,
response_headers=response_headers, version_id=version_id)

headers = headers or {}
if expires_in_absolute:
expires = int(expires_in)
Expand Down
43 changes: 43 additions & 0 deletions tests/unit/s3/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,49 @@ def test_sigv4_opt_in(self):
)


class TestSigV4Presigned(MockServiceWithConfigTestCase):
connection_class = S3Connection

def test_sigv4_presign(self):
self.config = {
's3': {
'use-sigv4': True,
}
}

conn = self.connection_class(
aws_access_key_id='less',
aws_secret_access_key='more',
host='s3.amazonaws.com'
)

# Here we force an input iso_date to ensure we always get the
# same signature.
url = conn.generate_url_sigv4(86400, 'GET', bucket='examplebucket',
key='test.txt', iso_date='20140625T000000Z')

self.assertIn('a937f5fbc125d98ac8f04c49e0204ea1526a7b8ca058000a54c192457be05b7d', url)

def test_sigv4_presign_optional_params(self):
self.config = {
's3': {
'use-sigv4': True,
}
}

conn = self.connection_class(
aws_access_key_id='less',
aws_secret_access_key='more',
security_token='token',
host='s3.amazonaws.com'
)

url = conn.generate_url_sigv4(86400, 'GET', bucket='examplebucket',
key='test.txt', version_id=2)

self.assertIn('VersionId=2', url)
self.assertIn('X-Amz-Security-Token=token', url)


class TestUnicodeCallingFormat(AWSMockServiceTestCase):
connection_class = S3Connection
Expand Down