Event-driven, Serverless Architectures with AWS Lambda, SQS, DynamoDB, and API Gateway

In this post, we will explore modern application development using an event-driven, serverless architecture on AWS. To demonstrate this architecture, we will integrate several fully-managed services, all part of the AWS Serverless Computing platform, including Lambda, API Gateway, SQS, S3, and DynamoDB. The result will be an application composed of small, easily deployable, loosely coupled, independently scalable, serverless components.

What is ‘Event-Driven’?

According to Otavio Ferreira, Manager, Amazon SNS, and James Hood, Senior Software Development Engineer, in their AWS Compute Blog, Enriching Event-Driven Architectures with AWS Event Fork Pipelines, “Many customers are choosing to build event-driven applications in which subscriber services automatically perform work in response to events triggered by publisher services. This architectural pattern can make services more reusable, interoperable, and scalable.” This description of an event-driven architecture perfectly captures the essence of the following post. All interactions between application components in this post will be as a direct result of triggering an event.

What is ‘Serverless’?

Mistakingly, many of us think of serverless as just functions (aka Function-as-a-Service or FaaS). When it comes to functions on AWS, Lambda is only one of many fully-managed services that make up the AWS Serverless Computing platform. So, what is ‘ serverless’? According to AWS, “Serverless applications don’t require provisioning, maintaining, and administering servers for backend components such as compute, databases, storage, stream processing, message queueing, and more.”

AWS Technologies

In this demonstration, we will use several AWS serverless services, including the following.

Architecture

The high-level architecture for the platform provisioned and deployed in this post is illustrated in the diagram below. There are two separate workflows. In the first workflow (top), data is extracted from CSV files placed in S3, transformed, queued to SQS, and written to DynamoDB, using Python-based Lambda functions. In the second workflow (bottom), data is manipulated in DynamoDB through interactions with a RESTful API, exposed via an API Gateway, and backed by Node.js-based Lambda functions.

Source Code

All source code for this post is available on GitHub in a single public repository, serverless-sqs-dynamo-demo. To clone the GitHub repository, execute the following command.

git clone --branch master --single-branch --depth 1 --no-tags \
https://github.com/garystafford/serverless-sqs-dynamo-demo.git
.
├── README.md
├── lambda_apigtw_to_dynamodb
│ ├── app.js
│ ├── events
│ ├── node_modules
│ ├── package.json
│ └── tests
├── lambda_s3_to_sqs
│ ├── __init__.py
│ ├── app.py
│ ├── requirements.txt
│ └── tests
├── lambda_sqs_to_dynamodb
│ ├── __init__.py
│ ├── app.py
│ ├── requirements.txt
│ └── tests
├── requirements.txt
├── template.yaml
└── sample_data
├── data.csv
├── data_bad_msg.csv
└── data_good_msg.csv

Prerequisites

The demonstration assumes you already have an AWS account. You will need the latest copy of the AWS CLI, SAM CLI, and Python 3 installed on your development machine.

Deploying the Project

Before diving into the code, we will deploy the project to AWS. Conveniently, the entire project’s resources are codified in an AWS SAM template. We are using the AWS Serverless Application Model (SAM). AWS SAM is a model used to define serverless applications on AWS. According to the official SAM GitHub project documentation, AWS SAM is based on AWS CloudFormation. A serverless application is defined in a CloudFormation template and deployed as a CloudFormation stack.

Template Parameter

CloudFormation will create and uniquely name the SQS queues and the DynamoDB table. However, to avoid circular references, a common issue when creating resources associated with S3 event notifications, it is easier to use a pre-existing bucket. To start, you will need to change the SAM template’s DataBucketName parameter’s default value to your own S3 bucket name. Again, this bucket is where we will eventually push the CSV data files. Alternately, override the default values using the sam build command, next.

Parameters:
DataBucketName:
Type: String
Description: S3 bucket where CSV files are processed
Default: your-data-bucket-nameSAM CLI Commands
# change me
S3_BUILD_BUCKET=your_build_bucket_name
STACK_NAME=your_cloudformation_stack_name
# validate
sam validate --template template.yaml
aws cloudformation validate-template \
--template-body file://template.yaml
# build
sam build --template template.yaml
# package
sam package \
--output-template-file packaged.yaml \
--s3-bucket $S3_BUILD_BUCKET
# deploy
sam deploy --template-file packaged.yaml \
--stack-name $STACK_NAME \
--capabilities CAPABILITY_IAM \
--debug

Test the Deployed Application

Once the CloudFormation stack has deployed without error, copying a CSV file to the S3 bucket is the quickest way to confirm everything is working. The project includes test data files with 20 rows of test message data. Below is a sample of the CSV file, which is included in the project. The data was collected from IoT devices that measured response time from wired versus wireless devices on a LAN network; the message details are immaterial to this demonstration (gist).

timestamp,location,source,local_dest,local_avg,remote_dest,remote_avg
1559040909.3853335,location-03,wireless,router-1,4.39,device-1,9.09
1559040919.5273902,location-03,wireless,router-1,0.49,device-1,16.75
1559040929.6446512,location-03,wireless,router-1,0.56,device-1,8.31
1559040939.7712135,location-03,wireless,router-1,1.64,device-1,9.4
1559040949.891723,location-03,wireless,router-1,1.18,device-1,9.07
1559040960.011338,location-03,wireless,router-1,0.42,device-1,8.4
1559040970.1319716,location-03,wireless,router-1,1.73,device-1,8.66
1559040980.2533505,location-03,wireless,router-1,0.67,device-1,8.61
1559040990.3816211,location-03,wireless,router-1,1.27,device-1,10.87
1559041000.5105414,location-03,wireless,router-1,1.63,device-1,10.08
S3_DATA_BUCKET=your_data_bucket_name aws s3 cp sample_data/data.csv s3://$S3_DATA_BUCKET

Event-Driven Patterns

There are three distinct and discrete event-driven dataflows within the demonstration’s architecture

  1. SQS Event Source for Lambda (SQS to DynamoDB)
  2. API Gateway Event Source for Lambda (API Gateway to DynamoDB)

S3 Event Source for Lambda

Whenever a file is copied into the target S3 bucket, an S3 Event Notification triggers an asynchronous invocation of a Lambda. According to AWS, when you invoke a function asynchronously, the Lambda sends the event to the SQS queue. A separate process reads events from the queue and executes your Lambda function.

def lambda_handler(event, context):
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.parse.unquote_plus(
event['Records'][0]['s3']['object']['key'],
encoding='utf-8'
)
messages = read_csv_file(bucket, key)
process_messages(messages)
{
"TableName": "your-dynamodb-table-name",
"Item": {
"date": {
"S": "2001-01-01"
},
"time": {
"S": "09:01:05"
},
"location": {
"S": "location-03"
},
"source": {
"S": "wireless"
},
"local_dest": {
"S": "router-1"
},
"local_avg": {
"N": "5.55"
},
"remote_dest": {
"S": "device-1"
},
"remote_avg": {
"N": "10.10"
}
}
}

SQS Event Source for Lambda

According to AWS, SQS offers two types of message queues, Standard and FIFO (First-In-First-Out). An SQS FIFO queue is designed to guarantee that messages are processed exactly once, in the exact order that they are sent. A Standard SQS queue offers maximum throughput, best-effort ordering, and at-least-once delivery.

def lambda_handler(event, context):
operations = {
'DELETE': lambda dynamo, x: dynamo.delete_item(**x),
'POST': lambda dynamo, x: dynamo.put_item(**x),
'PUT': lambda dynamo, x: dynamo.update_item(**x),
'GET': lambda dynamo, x: dynamo.get_item(**x),
'GET_ALL': lambda dynamo, x: dynamo.scan(**x),
}
for record in event['Records']:
payload = loads(record['body'], parse_float=str)
operation = record['messageAttributes']['Method']['stringValue']
if operation in operations:
try:
operations[operation](dynamo_client, payload)
except Exception as e:
logger.error(e)
else:
logger.error('Unsupported method \'{}\''.format(operation))

API Gateway Event Source for Lambda

Examining the API Gateway management console, you should observe that CloudFormation created a new Edge-optimized API. The API contains several resources and their associated HTTP methods.

exports.getMessage = async (event, context) => {
if (tableName == null) {
tableName = process.env.TABLE_NAME;
}
params = {
TableName: tableName,
Key: {
"date": event.pathParameters.date,
"time": event.queryStringParameters.time
}
};
console.debug(params.Key);
return await new Promise((resolve, reject) => {
docClient.get(params, (error, data) => {
if (error) {
console.error(`getMessage ERROR=${error.stack}`);
resolve({
statusCode: 400,
error: `Could not get messages: ${error.stack}`
});
} else {
console.info(`getMessage data=${JSON.stringify(data)}`);
resolve({
statusCode: 200,
body: JSON.stringify(data)
});
}
});
});
};

Test the API

To test the Lambda functions, called by our API, we can use the sam local invoke command, part of the SAM CLI. Using this command, we can test the local Lambda functions, without them being deployed to AWS. The command allows us to trigger events, which the Lambda functions will handle. This is useful as we continue to develop, test, and re-deploy the Lambda functions to our Development, Staging, and Production environments.

{
"body": "",
"resource": "/",
"path": "/message",
"httpMethod": "GET",
"isBase64Encoded": false,
"queryStringParameters": {
"time": "06:45:43"
},
"pathParameters": {
"date": "2000-01-01"
},
"stageVariables": {}
}
# change me (required by local lambda functions)
TABLE_NAME=your-dynamodb-table-name
# local testing (All CRUD functions)
sam local invoke PostMessageFunction \
--event lambda_apigtw_to_dynamodb/events/event_postMessage.json
sam local invoke GetMessageFunction \
--event lambda_apigtw_to_dynamodb/events/event_getMessage.json
sam local invoke GetMessagesFunction \
--event lambda_apigtw_to_dynamodb/events/event_getMessages.json
sam local invoke PutMessageFunction \
--event lambda_apigtw_to_dynamodb/events/event_putMessage.json
sam local invoke DeleteMessageFunction \
--event lambda_apigtw_to_dynamodb/events/event_deleteMessage.json

Testing the Deployed API

To test the actual deployed API, we can call one of the API’s resources using an HTTP client, such as Postman. To locate the URL used to invoke the API resource, look at the ‘Prod’ Stage for the new API. This can be found in the Stages tab of the API Gateway console. For example, note the Invoke URL for the POST HTTP method of the /message resource, shown below.

S3_DATA_BUCKET=your_data_bucket_name STACK_NAME=your_stack_name aws s3 rm s3://$S3_DATA_BUCKET/data.csv # and any other objects aws cloudformation delete-stack \ --stack-name $STACK_NAME

AWS Solutions Architect | AWS Certified Pro | Polyglot Developer | Data Analytics | DataOps | DevOps

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store