Skip to content

Commit

Permalink
Merge branch 'ddbv2-retries' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
toastdriven committed Apr 29, 2013
2 parents f48a8c3 + e3da8b1 commit 12d8efb
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 4 deletions.
4 changes: 4 additions & 0 deletions boto/dynamodb2/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,9 @@ class InternalServerError(JSONResponseError):
pass


class ValidationException(JSONResponseError):
pass


class ItemCollectionSizeLimitExceededException(JSONResponseError):
pass
65 changes: 61 additions & 4 deletions boto/dynamodb2/layer1.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
#
from binascii import crc32

import json
import boto
Expand All @@ -30,9 +31,10 @@

class DynamoDBConnection(AWSQueryConnection):
"""
Amazon DynamoDB **Overview**
This is the Amazon DynamoDB API Reference. This guide provides
descriptions and samples of the Amazon DynamoDB API.
Amazon DynamoDB is a fast, highly scalable, highly available,
cost-effective non-relational database service. Amazon DynamoDB
removes traditional scalability limitations on data storage while
maintaining low latency and predictable performance.
"""
APIVersion = "2012-08-10"
DefaultRegionName = "us-east-1"
Expand All @@ -49,17 +51,23 @@ class DynamoDBConnection(AWSQueryConnection):
"ResourceNotFoundException": exceptions.ResourceNotFoundException,
"InternalServerError": exceptions.InternalServerError,
"ItemCollectionSizeLimitExceededException": exceptions.ItemCollectionSizeLimitExceededException,
"ValidationException": exceptions.ValidationException,
}

NumberRetries = 10


def __init__(self, **kwargs):
region = kwargs.pop('region', None)
validate_checksums = kwargs.pop('validate_checksums', True)
if not region:
region = RegionInfo(self, self.DefaultRegionName,
self.DefaultRegionEndpoint)
kwargs['host'] = region.endpoint
AWSQueryConnection.__init__(self, **kwargs)
self.region = region
self._validate_checksums = boto.config.getbool(
'DynamoDB', 'validate_checksums', validate_checksums)

def _required_auth_capability(self):
return ['hmac-v4']
Expand Down Expand Up @@ -1392,7 +1400,8 @@ def make_request(self, action, body):
method='POST', path='/', auth_path='/', params={},
headers=headers, data=body)
response = self._mexe(http_request, sender=None,
override_num_retries=10)
override_num_retries=self.NumberRetries,
retry_handler=self._retry_handler)
response_body = response.read()
boto.log.debug(response_body)
if response.status == 200:
Expand All @@ -1405,3 +1414,51 @@ def make_request(self, action, body):
raise exception_class(response.status, response.reason,
body=json_body)

def _retry_handler(self, response, i, next_sleep):
status = None
if response.status == 400:
response_body = response.read()
boto.log.debug(response_body)
data = json.loads(response_body)
if 'ProvisionedThroughputExceededException' in data.get('__type'):
self.throughput_exceeded_events += 1
msg = "%s, retry attempt %s" % (
'ProvisionedThroughputExceededException',
i
)
next_sleep = self._exponential_time(i)
i += 1
status = (msg, i, next_sleep)
if i == self.NumberRetries:
# If this was our last retry attempt, raise
# a specific error saying that the throughput
# was exceeded.
raise exceptions.ProvisionedThroughputExceededException(
response.status, response.reason, data)
elif 'ConditionalCheckFailedException' in data.get('__type'):
raise exceptions.ConditionalCheckFailedException(
response.status, response.reason, data)
elif 'ValidationException' in data.get('__type'):
raise exceptions.ValidationException(
response.status, response.reason, data)
else:
raise self.ResponseError(response.status, response.reason,
data)
expected_crc32 = response.getheader('x-amz-crc32')
if self._validate_checksums and expected_crc32 is not None:
boto.log.debug('Validating crc32 checksum for body: %s',
response.read())
actual_crc32 = crc32(response.read()) & 0xffffffff
expected_crc32 = int(expected_crc32)
if actual_crc32 != expected_crc32:
msg = ("The calculated checksum %s did not match the expected "
"checksum %s" % (actual_crc32, expected_crc32))
status = (msg, i + 1, self._exponential_time(i))
return status

def _exponential_time(self, i):
if i == 0:
next_sleep = 0
else:
next_sleep = 0.05 * (2 ** i)
return next_sleep

0 comments on commit 12d8efb

Please sign in to comment.