Enriching Data with DynamoDB

Enriching data with DynamoDB requires a Lambda that has read access to a DynamoDB table:

################################################
# DynamoDB table
# Metadata can be read by any Lambda in a deployment
################################################

module "dynamodb" {
  source     = "../../../build/terraform/aws/dynamodb"
  kms_arn    = module.kms_substation.arn
  table_name = "substation"
  hash_key   = "PK"
  attributes = [
    {
      name = "PK"
      type = "S"
    }
  ]
}
################################################
# Lambda
# Reads from Kinesis Data Stream, processes data, writes to Kinesis Data Stream
################################################

module "lambda_processor" {
  source        = "../../../build/terraform/aws/lambda"
  function_name = "substation_processor"
  description   = "Substation Lambda that reads data from a Kinesis Data Stream, processes data, and writes data to a Kinesis Data Stream"
  appconfig_id  = aws_appconfig_application.substation.id
  kms_arn       = module.kms_substation.arn
  image_uri     = "${module.ecr_substation.repository_url}:latest"
  architectures = ["arm64"]

  env = {
    "AWS_MAX_ATTEMPTS" : 10
    "SUBSTATION_CONFIG" : "/applications/substation/environments/prod/configurations/substation_processor"
    "SUBSTATION_HANDLER" : "AWS_KINESIS"
    "SUBSTATION_DEBUG" : 1
    "SUBSTATION_METRICS" : "AWS_CLOUDWATCH_EMBEDDED_METRICS"
  }

  tags = {
    owner = "example"
  }
}

################################################
# Trigger settings
################################################

resource "aws_lambda_event_source_mapping" "lambda_esm_processor" {
  event_source_arn                   = module.kinesis_raw.arn
  function_name                      = module.lambda_processor.arn
  maximum_batching_window_in_seconds = 30
  batch_size                         = 100
  parallelization_factor             = 1
  starting_position                  = "LATEST"
}

################################################
# DynamoDB enrichment
################################################

module "iam_lambda_processor_dynamodb_read" {
  source    = "../../../build/terraform/aws/iam"
  resources = [
    module.dynamodb.arn
  ]
}

module "iam_lambda_processor_dynamodb_read_attachment" {
  source = "../../../build/terraform/aws/iam_attachment"
  id     = "${module.lambda_processor.name}_dynamodb_read"
  policy = module.iam_lambda_processor_dynamodb_read.dynamodb_read_policy
  roles = [
    module.lambda_processor.role
  ]
}
// config for the processor Lambda
local sub = import '../../../../build/config/substation.libsonnet';

local dynamodb = import 'dynamodb.libsonnet';

{
  sink: sub.interfaces.sink.stdout,
  transform: {
    type: 'batch',
    settings: {
      processors:
        dynamodb.processors
    },
  },
}
local sub = import '../../../../build/config/substation.libsonnet';

local ddb_payload = '!metadata example.ddb';

local processors = [
  // creates a DynamoDB payload in metadata by inserting a value into 
  // the ddb_payload partition key field
  // the partition key of the table will be queried with the value 'example'
  {
    processor: sub.interfaces.processor.insert(
      options={value:'example'},
      settings={set_key: sub.helpers.key.append(ddb_payload, 'PK')},
    )
  }
  // queries the DynamoDB table using the DynamoDB payload
  // items from the DynamoDB table are set into the `ddb` key in the object
  {
    processor: sub.interfaces.processor.aws_dynamodb(
      options={table:'substation', key_condition_expression:'PK = :pk'},
      settings={key:ddb_payload, set_key:'dynamodb'},
    ),
  },
];

// flattens the `processors` array into a single array; required for compiling into config.jsonnet
{
  processors: sub.helpers.flatten_processors(processors),
}