Enriching Data with Lambda
Enriching data with Lambda requires a Lambda that has execution access to another Lambda that is deployed as a synchronous microservice:
################################################
# Lambda
# Reads from Kinesis Data Stream, processes data, writes to Kinesis Data Stream
################################################
module "lambda_processor" {
source = "../../../build/terraform/aws/lambda"
function_name = "substation_processor"
description = "Substation Lambda that reads data from a Kinesis Data Stream, processes data, and writes data to a Kinesis Data Stream"
appconfig_id = aws_appconfig_application.substation.id
kms_arn = module.kms_substation.arn
image_uri = "${module.ecr_substation.repository_url}:latest"
architectures = ["arm64"]
env = {
"AWS_MAX_ATTEMPTS" : 10
"SUBSTATION_CONFIG" : "/applications/substation/environments/prod/configurations/substation_processor"
"SUBSTATION_HANDLER" : "AWS_KINESIS"
"SUBSTATION_DEBUG" : 1
"SUBSTATION_METRICS" : "AWS_CLOUDWATCH_EMBEDDED_METRICS"
}
tags = {
owner = "example"
}
}
################################################
# Trigger settings
################################################
resource "aws_lambda_event_source_mapping" "lambda_esm_processor" {
event_source_arn = module.kinesis_raw.arn
function_name = module.lambda_processor.arn
maximum_batching_window_in_seconds = 30
batch_size = 100
parallelization_factor = 1
starting_position = "LATEST"
}
################################################
# Lambda enrichment
################################################
# Allows processor Lambda to synchronously invoke any Lambda
module "iam_lambda_processor_lambda_execute" {
source = "../../../build/terraform/aws/iam"
resources = ["*"]
}
module "iam_lambda_processor_lambda_execute_attachment" {
source = "../../../build/terraform/aws/iam_attachment"
id = "${module.lambda_processor.name}_lambda_execute"
policy = module.iam_lambda_processor_lambda_execute.lambda_execute_policy
roles = [
module.lambda_processor.role
]
}
// config for the processor Lambda
local sub = import '../../../../build/config/substation.libsonnet';
local lambda = import 'lambda.libsonnet';
{
sink: sub.interfaces.sink.stdout,
transform: {
type: 'batch',
settings: {
processors:
lambda.processors
},
},
}
local sub = import '../../../../build/config/substation.libsonnet';
local processors = [
// synchronously invokes a Lambda with the entire contents of the object
// the results of the invocation are set into the `lambda` key in the object
{
processors: [
sub.interfaces.processor.aws_lambda(
options={function_name:'external_enrichment'},
settings={key:'@this', set_key:'lambda'},
),
],
},
];
// flattens the `processors` array into a single array; required for compiling into config.jsonnet
{
processors: sub.helpers.flatten_processors(processors),
}
Updated over 1 year ago