Skip to content

Commit

Permalink
refactor Lambda API (localstack#1436)
Browse files Browse the repository at this point in the history
  • Loading branch information
whummer authored Jul 24, 2019
1 parent f607a35 commit f0eb916
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 71 deletions.
118 changes: 72 additions & 46 deletions localstack/services/awslambda/lambda_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,18 @@
BUCKET_MARKER_LOCAL = '__local__'


class ClientError(Exception):
def __init__(self, msg, code=400):
super(ClientError, self).__init__(msg)
self.code = code
self.msg = msg

def get_response(self):
if isinstance(self.msg, Response):
return self.msg
return error_response(self.msg, self.code)


class LambdaContext(object):

def __init__(self, func_details, qualifier=None):
Expand Down Expand Up @@ -276,11 +288,9 @@ def get_function_version(arn, version):


def publish_new_function_version(arn):
versions = arn_to_lambda.get(arn).versions
if len(versions) == 1:
last_version = 0
else:
last_version = max([int(key) for key in versions.keys() if key != '$LATEST'])
func_details = arn_to_lambda.get(arn)
versions = func_details.versions
last_version = func_details.max_version()
versions[str(last_version + 1)] = {
'CodeSize': versions.get('$LATEST').get('CodeSize'),
'CodeSha256': versions.get('$LATEST').get('CodeSha256'),
Expand Down Expand Up @@ -411,12 +421,12 @@ def get_zip_bytes(function_code):
s3_client.download_fileobj(function_code['S3Bucket'], function_code['S3Key'], bytes_io)
zip_file_content = bytes_io.getvalue()
except Exception as e:
return error_response('Unable to fetch Lambda archive from S3: %s' % e, 404)
raise ClientError('Unable to fetch Lambda archive from S3: %s' % e, 404)
elif 'ZipFile' in function_code:
zip_file_content = function_code['ZipFile']
zip_file_content = base64.b64decode(zip_file_content)
else:
return error_response('No valid Lambda archive specified.', 400)
raise ClientError('No valid Lambda archive specified.')
return zip_file_content


Expand All @@ -436,7 +446,7 @@ def get_java_handler(zip_file_content, handler, main_file):
with zipfile.ZipFile(BytesIO(zip_file_content)) as zip_ref:
jar_entries = [e for e in zip_ref.infolist() if e.filename.endswith('.jar')]
if len(jar_entries) != 1:
raise Exception('Expected exactly one *.jar entry in zip file, found %s' % len(jar_entries))
raise ClientError('Expected exactly one *.jar entry in zip file, found %s' % len(jar_entries))
zip_file_content = zip_ref.read(jar_entries[0].filename)
LOG.info('Found jar file %s with %s bytes in Lambda zip archive' %
(jar_entries[0].filename, len(zip_file_content)))
Expand All @@ -448,50 +458,62 @@ def execute(event, context):
event, context, handler=handler, main_file=main_file)
return result
return execute
return error_response(
'Unable to extract Java Lambda handler - file is not a valid zip/jar files', 400, error_type='ValidationError')
raise ClientError(error_response(
'Unable to extract Java Lambda handler - file is not a valid zip/jar files', 400, error_type='ValidationError'))


def set_archive_code(code, lambda_name, zip_file_content=None):
# get file content
zip_file_content = zip_file_content or get_zip_bytes(code)

# get metadata
lambda_arn = func_arn(lambda_name)
lambda_details = arn_to_lambda[lambda_arn]
is_local_mount = code.get('S3Bucket') == BUCKET_MARKER_LOCAL

# Stop/remove any containers that this arn uses.
LAMBDA_EXECUTOR.cleanup(lambda_arn)

def set_function_code(code, lambda_name):
if is_local_mount:
# Mount or use a local folder lambda executors can reference
# WARNING: this means we're pointing lambda_cwd to a local path in the user's
# file system! We must ensure that there is no data loss (i.e., we must *not* add
# this folder to TMP_FILES or similar).
return code['S3Key']

# Save the zip file to a temporary file that the lambda executors can reference
code_sha_256 = base64.standard_b64encode(hashlib.sha256(zip_file_content).digest())
lambda_details.get_version('$LATEST')['CodeSize'] = len(zip_file_content)
lambda_details.get_version('$LATEST')['CodeSha256'] = code_sha_256.decode('utf-8')
tmp_dir = '%s/zipfile.%s' % (config.TMP_FOLDER, short_uid())
mkdir(tmp_dir)
tmp_file = '%s/%s' % (tmp_dir, LAMBDA_ZIP_FILE_NAME)
save_file(tmp_file, zip_file_content)
TMP_FILES.append(tmp_dir)
lambda_details.cwd = tmp_dir
return tmp_dir


def set_function_code(code, lambda_name, lambda_cwd=None):

def generic_handler(event, context):
raise Exception(('Unable to find executor for Lambda function "%s". ' +
raise ClientError(('Unable to find executor for Lambda function "%s". ' +
'Note that Node.js and .NET Core Lambdas currently require LAMBDA_EXECUTOR=docker') % lambda_name)

lambda_cwd = None
arn = func_arn(lambda_name)
lambda_details = arn_to_lambda[arn]
runtime = lambda_details.runtime
handler_name = lambda_details.handler
lambda_environment = lambda_details.envvars
if not handler_name:
handler_name = LAMBDA_DEFAULT_HANDLER

# Stop/remove any containers that this arn uses.
LAMBDA_EXECUTOR.cleanup(arn)
zip_file_content = None
handler_name = lambda_details.handler or LAMBDA_DEFAULT_HANDLER
is_local_mount = code.get('S3Bucket') == BUCKET_MARKER_LOCAL

if is_local_mount:
# Mount or use a local folder lambda executors can reference
# WARNING: this means we're pointing lambda_cwd to a local path in the user's
# file system! We must ensure that there is no data loss (i.e., we must *not* add
# this folder to TMP_FILES or similar).
lambda_cwd = code['S3Key']
else:
# Save the zip file to a temporary file that the lambda executors can reference
zip_file_content = get_zip_bytes(code)
if isinstance(zip_file_content, Response):
return zip_file_content
code_sha_256 = base64.standard_b64encode(hashlib.sha256(zip_file_content).digest())
lambda_details.get_version('$LATEST')['CodeSize'] = len(zip_file_content)
lambda_details.get_version('$LATEST')['CodeSha256'] = code_sha_256.decode('utf-8')
tmp_dir = '%s/zipfile.%s' % (config.TMP_FOLDER, short_uid())
mkdir(tmp_dir)
tmp_file = '%s/%s' % (tmp_dir, LAMBDA_ZIP_FILE_NAME)
save_file(tmp_file, zip_file_content)
TMP_FILES.append(tmp_dir)
lambda_cwd = tmp_dir
lambda_cwd = lambda_cwd or set_archive_code(code, lambda_name)

# Save the zip file to a temporary file that the lambda executors can reference
zip_file_content = get_zip_bytes(code)

# get local lambda working directory
tmp_file = '%s/%s' % (lambda_cwd, LAMBDA_ZIP_FILE_NAME)

# Set the appropriate lambda handler.
lambda_handler = generic_handler
Expand All @@ -502,7 +524,7 @@ def generic_handler(event, context):
# save the zip_file_content as a .jar here.
if is_jar_archive(zip_file_content):
jar_tmp_file = '{working_dir}/{file_name}'.format(
working_dir=tmp_dir, file_name=LAMBDA_JAR_FILE_NAME)
working_dir=lambda_cwd, file_name=LAMBDA_JAR_FILE_NAME)
save_file(jar_tmp_file, zip_file_content)

lambda_handler = get_java_handler(zip_file_content, handler_name, tmp_file)
Expand All @@ -515,7 +537,7 @@ def generic_handler(event, context):
if not is_local_mount:
# Lambda code must be uploaded in Zip format
if not is_zip_file(zip_file_content):
raise Exception(
raise ClientError(
'Uploaded Lambda code for runtime ({}) is not in Zip format'.format(runtime))
unzip(tmp_file, lambda_cwd)

Expand All @@ -532,9 +554,9 @@ def generic_handler(event, context):
if not is_local_mount or not use_docker() or config.LAMBDA_REMOTE_DOCKER:
file_list = run('ls -la %s' % lambda_cwd)
LOG.debug('Lambda archive content:\n%s' % file_list)
return error_response(
raise ClientError(error_response(
'Unable to find handler script in Lambda archive.', 400,
error_type='ValidationError')
error_type='ValidationError'))

if runtime.startswith('python') and not use_docker():
try:
Expand All @@ -544,7 +566,7 @@ def generic_handler(event, context):
lambda_cwd=lambda_cwd,
lambda_env=lambda_environment)
except Exception as e:
raise Exception('Unable to get handler function from lambda code.', e)
raise ClientError('Unable to get handler function from lambda code.', e)

add_function_mapping(lambda_name, lambda_handler, lambda_cwd)

Expand All @@ -554,6 +576,8 @@ def generic_handler(event, context):
def do_list_functions():
funcs = []
for f_arn, func in arn_to_lambda.items():
if type(func) != LambdaFunction:
continue
func_name = f_arn.split(':function:')[-1]
arn = func_arn(func_name)
func_details = arn_to_lambda.get(arn)
Expand Down Expand Up @@ -607,7 +631,7 @@ def forward_to_fallback_url(func_arn, data):
if re.match(r'^https?://.+', config.LAMBDA_FALLBACK_URL):
response = safe_requests.post(config.LAMBDA_FALLBACK_URL, data)
return response.content
raise Exception('Unexpected value for LAMBDA_FALLBACK_URL: %s' % config.LAMBDA_FALLBACK_URL)
raise ClientError('Unexpected value for LAMBDA_FALLBACK_URL: %s' % config.LAMBDA_FALLBACK_URL)


# ------------
Expand Down Expand Up @@ -663,6 +687,8 @@ def create_function():
return jsonify(result or {})
except Exception as e:
arn_to_lambda.pop(arn, None)
if isinstance(e, ClientError):
return e.get_response()
return error_response('Unknown error: %s %s' % (e, traceback.format_exc()))


Expand Down
6 changes: 4 additions & 2 deletions localstack/services/kinesis/kinesis_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ def return_response(self, method, path, data, headers, response):
if action in (ACTION_CREATE_STREAM, ACTION_DELETE_STREAM):
event_type = (event_publisher.EVENT_KINESIS_CREATE_STREAM if action == ACTION_CREATE_STREAM
else event_publisher.EVENT_KINESIS_DELETE_STREAM)
event_publisher.fire_event(
event_type, payload={'n': event_publisher.get_hash(data.get('StreamName'))})
payload = {'n': event_publisher.get_hash(data.get('StreamName'))}
if action == ACTION_CREATE_STREAM:
payload['s'] = data.get('ShardCount')
event_publisher.fire_event(event_type, payload=payload)
elif action == ACTION_PUT_RECORD:
response_body = json.loads(to_str(response.content))
event_record = {
Expand Down
1 change: 1 addition & 0 deletions localstack/utils/analytics/event_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
EVENT_KINESIS_DELETE_STREAM = 'kns.ds'
EVENT_LAMBDA_CREATE_FUNC = 'lmb.cf'
EVENT_LAMBDA_DELETE_FUNC = 'lmb.df'
EVENT_LAMBDA_INVOKE_FUNC = 'lmb.if'
EVENT_SQS_CREATE_QUEUE = 'sqs.cq'
EVENT_SQS_DELETE_QUEUE = 'sqs.dq'
EVENT_SNS_CREATE_TOPIC = 'sns.ct'
Expand Down
4 changes: 4 additions & 0 deletions localstack/utils/aws/aws_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ def __init__(self, arn):
def get_version(self, version):
return self.versions.get(version)

def max_version(self):
versions = [int(key) for key in self.versions.keys() if key != '$LATEST']
return versions and max(versions) or 0

def name(self):
# Example ARN: arn:aws:lambda:aws-region:acct-id:function:helloworld:1
return self.id.split(':')[6]
Expand Down
3 changes: 2 additions & 1 deletion localstack/utils/aws/aws_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,12 +317,13 @@ def dynamodb_stream_arn(table_name, account_id=None):


def lambda_function_arn(function_name, account_id=None):
pattern = 'arn:aws:lambda:.*:.*:function:.*'
pattern = 'arn:aws:lambda:.*:.*:(function|layer):.*'
if re.match(pattern, function_name):
return function_name
if ':' in function_name:
raise Exception('Lambda function name should not contain a colon ":"')
account_id = get_account_id(account_id)
pattern = re.sub(r'\([^\|]+\|.+\)', 'function', pattern)
return pattern.replace('.*', '%s') % (get_local_region(), account_id, function_name)


Expand Down
22 changes: 18 additions & 4 deletions localstack/utils/cloudwatch/cloudwatch_util.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
from datetime import datetime
from flask import Response
from localstack import config
from localstack.utils.common import now_utc
from localstack.utils.aws import aws_stack
from localstack.utils.common import now_utc
from localstack.utils.analytics import event_publisher


# ---------------
# Lambda metrics
# ---------------

def dimension_lambda(kwargs):
func_name = kwargs.get('func_name')
if not func_name:
func_name = kwargs.get('func_arn').split(':function:')[1].split(':')[0]
func_name = _func_name(kwargs)
return [{
'Name': 'FunctionName',
'Value': func_name
Expand Down Expand Up @@ -55,14 +54,29 @@ def publish_lambda_result(time_before, result, kwargs):
# ---------------


def _func_name(kwargs):
func_name = kwargs.get('func_name')
if not func_name:
func_name = kwargs.get('func_arn').split(':function:')[1].split(':')[0]
return func_name


def publish_event(time_before, result, kwargs):
event_publisher.fire_event(
event_publisher.EVENT_LAMBDA_INVOKE_FUNC,
payload={'f': _func_name(kwargs), 'd': now_utc() - time_before, 'r': result[0]})


def publish_result(ns, time_before, result, kwargs):
if ns == 'lambda':
publish_lambda_result(time_before, result, kwargs)
publish_event(time_before, 'success', kwargs)


def publish_error(ns, time_before, e, kwargs):
if ns == 'lambda':
publish_lambda_error(time_before, kwargs)
publish_event(time_before, 'error', kwargs)


def cloudwatched(ns):
Expand Down
14 changes: 9 additions & 5 deletions localstack/utils/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ def is_number(s):
try:
float(s) # for int, long and float
return True
except ValueError:
except (TypeError, ValueError):
return False


Expand Down Expand Up @@ -646,10 +646,7 @@ def clear_list(l):
def cleanup_tmp_files():
for tmp in TMP_FILES:
try:
if os.path.isdir(tmp):
run('rm -rf "%s"' % tmp)
else:
os.remove(tmp)
rm_rf(tmp)
except Exception:
pass # file likely doesn't exist, or permission denied
del TMP_FILES[:]
Expand All @@ -663,6 +660,13 @@ def new_tmp_file():
return tmp_path


def new_tmp_dir():
folder = new_tmp_file()
rm_rf(folder)
mkdir(folder)
return folder


def is_ip_address(addr):
try:
socket.inet_aton(addr)
Expand Down
Loading

0 comments on commit f0eb916

Please sign in to comment.