Enriching Data with Lambda

Enriching data with Lambda requires a Lambda that has execution access to another Lambda that is deployed as a synchronous microservice:

################################################
# Lambda
# Reads from Kinesis Data Stream, processes data, writes to Kinesis Data Stream
################################################

module "lambda_processor" {
  source        = "../../../build/terraform/aws/lambda"
  function_name = "substation_processor"
  description   = "Substation Lambda that reads data from a Kinesis Data Stream, processes data, and writes data to a Kinesis Data Stream"
  appconfig_id  = aws_appconfig_application.substation.id
  kms_arn       = module.kms_substation.arn
  image_uri     = "${module.ecr_substation.repository_url}:latest"
  architectures = ["arm64"]

  env = {
    "AWS_MAX_ATTEMPTS" : 10
    "SUBSTATION_CONFIG" : "/applications/substation/environments/prod/configurations/substation_processor"
    "SUBSTATION_HANDLER" : "AWS_KINESIS"
    "SUBSTATION_DEBUG" : 1
    "SUBSTATION_METRICS" : "AWS_CLOUDWATCH_EMBEDDED_METRICS"
  }

  tags = {
    owner = "example"
  }
}

################################################
# Trigger settings
################################################

resource "aws_lambda_event_source_mapping" "lambda_esm_processor" {
  event_source_arn                   = module.kinesis_raw.arn
  function_name                      = module.lambda_processor.arn
  maximum_batching_window_in_seconds = 30
  batch_size                         = 100
  parallelization_factor             = 1
  starting_position                  = "LATEST"
}

################################################
# Lambda enrichment
################################################

# Allows processor Lambda to synchronously invoke any Lambda
module "iam_lambda_processor_lambda_execute" {
  source    = "../../../build/terraform/aws/iam"
  resources = ["*"]
}

module "iam_lambda_processor_lambda_execute_attachment" {
  source = "../../../build/terraform/aws/iam_attachment"
  id     = "${module.lambda_processor.name}_lambda_execute"
  policy = module.iam_lambda_processor_lambda_execute.lambda_execute_policy
  roles = [
    module.lambda_processor.role
  ]
}
// config for the processor Lambda
local sub = import '../../../../build/config/substation.libsonnet';

local lambda = import 'lambda.libsonnet';

{
  sink: sub.interfaces.sink.stdout,
  transform: {
    type: 'batch',
    settings: {
      processors:
        lambda.processors
    },
  },
}
local sub = import '../../../../build/config/substation.libsonnet';

local processors = [
  // synchronously invokes a Lambda with the entire contents of the object
  // the results of the invocation are set into the `lambda` key in the object
  {
    processors: [
      sub.interfaces.processor.aws_lambda(
        options={function_name:'external_enrichment'},
        settings={key:'@this', set_key:'lambda'},
      ),
    ],
  },
];

// flattens the `processors` array into a single array; required for compiling into config.jsonnet
{
  processors: sub.helpers.flatten_processors(processors),
}