Reading From and Writing to Kinesis Data Streams
Reading from and writing to Kinesis Data Streams includes deployment of these resources:
- Unprocessed ("raw") Kinesis Data Stream that data is read from
- Processed ("transformed") Kinesis Data Stream that data is written to
- Processor Lambda that reads, transforms, and writes data
This pattern partially implements the two-phase mutation pipeline design recommended by the project maintainers.
Never read and write to the same Kinesis Data Stream!
################################################
# Kinesis Data Stream
# Unprocessed (raw) data is written to this stream
################################################
module "kinesis_raw" {
source = "../../../build/terraform/aws/kinesis"
kms_key_id = module.kms_substation.arn
stream_name = "substation_raw"
autoscaling_topic = aws_sns_topic.autoscaling_topic.arn
tags = {
owner = "example"
}
}
################################################
## Raw data stream read permissions
################################################
module "iam_kinesis_raw_read" {
source = "../../../build/terraform/aws/iam"
resources = [
module.kinesis_raw.arn,
]
}
module "iam_kinesis_raw_read_attachment" {
source = "../../../build/terraform/aws/iam_attachment"
id = "substation_kinesis_raw_read"
policy = module.iam_kinesis_raw_read.kinesis_read_policy
roles = [
module.lambda_processor.role,
]
}
################################################
## Raw data stream write permissions
################################################
module "iam_kinesis_raw_write" {
source = "../../../build/terraform/aws/iam"
resources = [
module.kinesis_raw.arn,
]
}
module "iam_kinesis_raw_write_attachment" {
source = "../../../build/terraform/aws/iam_attachment"
id = "substation_kinesis_raw_write"
policy = module.iam_kinesis_raw_write.kinesis_write_policy
roles = [
# add resources that will write to the Kinesis Data Stream
]
}
################################################
# Kinesis Data Stream
# Processed (transformed) data is written to this stream
################################################
module "kinesis_processed" {
source = "../../../build/terraform/aws/kinesis"
kms_key_id = module.kms_substation.arn
stream_name = "substation_processed"
autoscaling_topic = aws_sns_topic.autoscaling_topic.arn
tags = {
owner = "example"
}
}
################################################
## Processed data stream read permissions
################################################
module "iam_kinesis_processed_read" {
source = "../../../build/terraform/aws/iam"
resources = [
module.kinesis_processed.arn,
]
}
module "iam_kinesis_processed_read_attachment" {
source = "../../../build/terraform/aws/iam_attachment"
id = "substation_kinesis_processed_read"
policy = module.iam_kinesis_processed_read.kinesis_read_policy
roles = [
# add resources that will read from the Kinesis Data Stream
]
}
################################################
## Processed data stream write permissions
################################################
module "iam_kinesis_processed_write" {
source = "../../../build/terraform/aws/iam"
resources = [
module.kinesis_processed.arn,
]
}
module "iam_kinesis_processed_write_attachment" {
source = "../../../build/terraform/aws/iam_attachment"
id = "substation_kinesis_processed_write"
policy = module.iam_kinesis_processed_write.kinesis_write_policy
roles = [
module.lambda_processor.role,
]
}
################################################
# Lambda
# Reads from Kinesis Data Stream, processes data, writes to Kinesis Data Stream
################################################
module "lambda_processor" {
source = "../../../build/terraform/aws/lambda"
function_name = "substation_processor"
description = "Substation Lambda that reads data from a Kinesis Data Stream, processes data, and writes data to a Kinesis Data Stream"
appconfig_id = aws_appconfig_application.substation.id
kms_arn = module.kms_substation.arn
image_uri = "${module.ecr_substation.repository_url}:latest"
architectures = ["arm64"]
env = {
"AWS_MAX_ATTEMPTS" : 10
"SUBSTATION_CONFIG" : "/applications/substation/environments/prod/configurations/substation_processor"
"SUBSTATION_HANDLER" : "AWS_KINESIS"
"SUBSTATION_DEBUG" : 1
"SUBSTATION_METRICS" : "AWS_CLOUDWATCH_EMBEDDED_METRICS"
}
tags = {
owner = "example"
}
}
################################################
# Trigger settings
################################################
resource "aws_lambda_event_source_mapping" "lambda_esm_processor" {
event_source_arn = module.kinesis_raw.arn
function_name = module.lambda_processor.arn
maximum_batching_window_in_seconds = 30
batch_size = 100
parallelization_factor = 1
starting_position = "LATEST"
}
// config for the processor Lambda
local sub = import '../../../../build/config/substation.libsonnet';
{
sink: sub.interfaces.sink.aws_kinesis(stream='substation_processed'),
// add processor configurations to enable data transformation or
// use the "transfer" transform to send data as-is to the sink
transform: {
type: 'batch',
settings: {
processors:
// + foo.processors
// + bar.processors
// + baz.processors
},
},
}
Updated over 1 year ago