Reading From and Writing to Kinesis Data Streams

Reading from and writing to Kinesis Data Streams includes deployment of these resources:

  • Unprocessed ("raw") Kinesis Data Stream that data is read from
  • Processed ("transformed") Kinesis Data Stream that data is written to
  • Processor Lambda that reads, transforms, and writes data

This pattern partially implements the two-phase mutation pipeline design recommended by the project maintainers.

🚧

Never read and write to the same Kinesis Data Stream!

################################################
# Kinesis Data Stream
# Unprocessed (raw) data is written to this stream
################################################

module "kinesis_raw" {
  source            = "../../../build/terraform/aws/kinesis"
  kms_key_id        = module.kms_substation.arn
  stream_name       = "substation_raw"
  autoscaling_topic = aws_sns_topic.autoscaling_topic.arn

  tags = {
    owner = "example"
  }
}

################################################
## Raw data stream read permissions
################################################

module "iam_kinesis_raw_read" {
  source = "../../../build/terraform/aws/iam"
  resources = [
    module.kinesis_raw.arn,
  ]
}

module "iam_kinesis_raw_read_attachment" {
  source = "../../../build/terraform/aws/iam_attachment"
  id     = "substation_kinesis_raw_read"
  policy = module.iam_kinesis_raw_read.kinesis_read_policy
  roles = [
    module.lambda_processor.role,
  ]
}

################################################
## Raw data stream write permissions
################################################

module "iam_kinesis_raw_write" {
  source = "../../../build/terraform/aws/iam"
  resources = [
    module.kinesis_raw.arn,
  ]
}

module "iam_kinesis_raw_write_attachment" {
  source = "../../../build/terraform/aws/iam_attachment"
  id     = "substation_kinesis_raw_write"
  policy = module.iam_kinesis_raw_write.kinesis_write_policy
  roles = [
	# add resources that will write to the Kinesis Data Stream
  ]
}

################################################
# Kinesis Data Stream
# Processed (transformed) data is written to this stream
################################################

module "kinesis_processed" {
  source            = "../../../build/terraform/aws/kinesis"
  kms_key_id        = module.kms_substation.arn
  stream_name       = "substation_processed"
  autoscaling_topic = aws_sns_topic.autoscaling_topic.arn

  tags = {
    owner = "example"
  }
}

################################################
## Processed data stream read permissions
################################################

module "iam_kinesis_processed_read" {
  source = "../../../build/terraform/aws/iam"
  resources = [
    module.kinesis_processed.arn,
  ]
}

module "iam_kinesis_processed_read_attachment" {
  source = "../../../build/terraform/aws/iam_attachment"
  id     = "substation_kinesis_processed_read"
  policy = module.iam_kinesis_processed_read.kinesis_read_policy
  roles = [
	# add resources that will read from the Kinesis Data Stream
  ]
}

################################################
## Processed data stream write permissions
################################################

module "iam_kinesis_processed_write" {
  source = "../../../build/terraform/aws/iam"
  resources = [
    module.kinesis_processed.arn,
  ]
}

module "iam_kinesis_processed_write_attachment" {
  source = "../../../build/terraform/aws/iam_attachment"
  id     = "substation_kinesis_processed_write"
  policy = module.iam_kinesis_processed_write.kinesis_write_policy
  roles = [
    module.lambda_processor.role,
  ]
}

################################################
# Lambda
# Reads from Kinesis Data Stream, processes data, writes to Kinesis Data Stream
################################################

module "lambda_processor" {
  source        = "../../../build/terraform/aws/lambda"
  function_name = "substation_processor"
  description   = "Substation Lambda that reads data from a Kinesis Data Stream, processes data, and writes data to a Kinesis Data Stream"
  appconfig_id  = aws_appconfig_application.substation.id
  kms_arn       = module.kms_substation.arn
  image_uri     = "${module.ecr_substation.repository_url}:latest"
  architectures = ["arm64"]

  env = {
    "AWS_MAX_ATTEMPTS" : 10
    "SUBSTATION_CONFIG" : "/applications/substation/environments/prod/configurations/substation_processor"
    "SUBSTATION_HANDLER" : "AWS_KINESIS"
    "SUBSTATION_DEBUG" : 1
    "SUBSTATION_METRICS" : "AWS_CLOUDWATCH_EMBEDDED_METRICS"
  }

  tags = {
    owner = "example"
  }
}

################################################
# Trigger settings
################################################

resource "aws_lambda_event_source_mapping" "lambda_esm_processor" {
  event_source_arn                   = module.kinesis_raw.arn
  function_name                      = module.lambda_processor.arn
  maximum_batching_window_in_seconds = 30
  batch_size                         = 100
  parallelization_factor             = 1
  starting_position                  = "LATEST"
}
// config for the processor Lambda
local sub = import '../../../../build/config/substation.libsonnet';

{
  sink: sub.interfaces.sink.aws_kinesis(stream='substation_processed'),
  // add processor configurations to enable data transformation or
  // use the "transfer" transform to send data as-is to the sink
  transform: {
    type: 'batch',
    settings: {
      processors:
      // + foo.processors
      // + bar.processors
      // + baz.processors
    },
  },
}