Enriching Data with DynamoDB
Enriching data with DynamoDB requires a Lambda that has read access to a DynamoDB table:
################################################
# DynamoDB table
# Metadata can be read by any Lambda in a deployment
################################################
module "dynamodb" {
source = "../../../build/terraform/aws/dynamodb"
kms_arn = module.kms_substation.arn
table_name = "substation"
hash_key = "PK"
attributes = [
{
name = "PK"
type = "S"
}
]
}
################################################
# Lambda
# Reads from Kinesis Data Stream, processes data, writes to Kinesis Data Stream
################################################
module "lambda_processor" {
source = "../../../build/terraform/aws/lambda"
function_name = "substation_processor"
description = "Substation Lambda that reads data from a Kinesis Data Stream, processes data, and writes data to a Kinesis Data Stream"
appconfig_id = aws_appconfig_application.substation.id
kms_arn = module.kms_substation.arn
image_uri = "${module.ecr_substation.repository_url}:latest"
architectures = ["arm64"]
env = {
"AWS_MAX_ATTEMPTS" : 10
"SUBSTATION_CONFIG" : "/applications/substation/environments/prod/configurations/substation_processor"
"SUBSTATION_HANDLER" : "AWS_KINESIS"
"SUBSTATION_DEBUG" : 1
"SUBSTATION_METRICS" : "AWS_CLOUDWATCH_EMBEDDED_METRICS"
}
tags = {
owner = "example"
}
}
################################################
# Trigger settings
################################################
resource "aws_lambda_event_source_mapping" "lambda_esm_processor" {
event_source_arn = module.kinesis_raw.arn
function_name = module.lambda_processor.arn
maximum_batching_window_in_seconds = 30
batch_size = 100
parallelization_factor = 1
starting_position = "LATEST"
}
################################################
# DynamoDB enrichment
################################################
module "iam_lambda_processor_dynamodb_read" {
source = "../../../build/terraform/aws/iam"
resources = [
module.dynamodb.arn
]
}
module "iam_lambda_processor_dynamodb_read_attachment" {
source = "../../../build/terraform/aws/iam_attachment"
id = "${module.lambda_processor.name}_dynamodb_read"
policy = module.iam_lambda_processor_dynamodb_read.dynamodb_read_policy
roles = [
module.lambda_processor.role
]
}
// config for the processor Lambda
local sub = import '../../../../build/config/substation.libsonnet';
local dynamodb = import 'dynamodb.libsonnet';
{
sink: sub.interfaces.sink.stdout,
transform: {
type: 'batch',
settings: {
processors:
dynamodb.processors
},
},
}
local sub = import '../../../../build/config/substation.libsonnet';
local ddb_payload = '!metadata example.ddb';
local processors = [
// creates a DynamoDB payload in metadata by inserting a value into
// the ddb_payload partition key field
// the partition key of the table will be queried with the value 'example'
{
processor: sub.interfaces.processor.insert(
options={value:'example'},
settings={set_key: sub.helpers.key.append(ddb_payload, 'PK')},
)
}
// queries the DynamoDB table using the DynamoDB payload
// items from the DynamoDB table are set into the `ddb` key in the object
{
processor: sub.interfaces.processor.aws_dynamodb(
options={table:'substation', key_condition_expression:'PK = :pk'},
settings={key:ddb_payload, set_key:'dynamodb'},
),
},
];
// flattens the `processors` array into a single array; required for compiling into config.jsonnet
{
processors: sub.helpers.flatten_processors(processors),
}
Updated over 1 year ago