Autoscaling Kinesis Data Streams

Substation uses a custom autoscaling service that manages the shard count of Kinesis Data Streams based on volume and size. Any Kinesis Data Streams that run within the deployment can use the same autoscaling service.

The service supports multiple "speeds" of autoscaling based on two Lambda environment variables:

  • SUBSTATION_AUTOSCALING_UPSCALE_DATAPOINTS: the number of consecutive datapoints that must be observed in 5 minutes before the stream will scale up / out
  • SUBSTATION_AUTOSCALING_DOWNSCALE_DATAPOINTS: the number of consecutive datapoints that must be observed in 60 minutes before the stream will scale down / in

Multiple services can be deployed in the same deployment; for example, a "quick" autoscaler service and a "slow" autoscaler.

Autoscaling includes deployment of these resources:

  • SNS topic for sending autoscale notifications
  • Lambda that executes the autoscaling application
# Used for deploying and maintaining the Kinesis Data Streams autoscaling application
# Not required if deployments don't use Kinesis Data Streams

resource "aws_sns_topic" "autoscaling_topic" {
  name              = "substation_autoscaling"
  kms_master_key_id = module.kms_substation.key_id

  tags = {
    owner = "example"
  }
}

module "lambda_autoscaling" {
  source = "../../../build/terraform/aws/lambda"
  function_name = "substation_autoscaling"
  description   = "Autoscales Kinesis streams based on data volume and size"
  appconfig_id  = aws_appconfig_application.substation.id
  kms_arn       = module.kms_substation.arn
  image_uri     = "${module.ecr_autoscaling.repository_url}:latest"
  architectures = ["arm64"]

  # scales up when 1 out of 5 datapoints breach upper threshold
  # scales down when 60 out of 60 datapoints breach lower threshold
  env = {
    SUBSTATION_AUTOSCALING_UPSCALE_DATAPOINTS   = 1
    SUBSTATION_AUTOSCALING_DOWNSCALE_DATAPOINTS = 60
  }

  tags = {
    owner = "example"
  }

  depends_on = [
    aws_appconfig_application.substation,
    module.ecr_autoscaling.repository_url,
  ]
}

resource "aws_sns_topic_subscription" "autoscaling_subscription" {
  topic_arn = aws_sns_topic.autoscaling_topic.arn
  protocol  = "lambda"
  endpoint  = module.lambda_autoscaling.arn

  depends_on = [
    module.lambda_autoscaling.name
  ]
}

resource "aws_lambda_permission" "autoscaling_invoke" {
  statement_id  = "AllowExecutionFromSNS"
  action        = "lambda:InvokeFunction"
  function_name = module.lambda_autoscaling.name
  principal     = "sns.amazonaws.com"
  source_arn    = aws_sns_topic.autoscaling_topic.arn

  depends_on = [
    module.lambda_autoscaling.name
  ]
}

# Required for updating shard counts on Kinesis streams
# Resources can be isolated, but defaults to all streams
module "autoscaling_kinesis_modify" {
  source = "../../../build/terraform/aws/iam"
  resources = ["*"]
}

module "autoscaling_kinesis_modify_attachment" {
  source = "../../../build/terraform/aws/iam_attachment"
  id     = "substation_autoscaling_kinesis_modify_attachment"
  policy = module.autoscaling_kinesis_modify.kinesis_modify_policy
  roles = [
    module.lambda_autoscaling.role,
  ]
}

# Required for reading active shard counts for Kinesis streams
# Resources can be isolated, but defaults to all streams
module "autoscaling_kinesis_read" {
  source = "../../../build/terraform/aws/iam"
  resources = ["*"]
}

module "autoscaling_kinesis_read_attachment" {
  source = "../../../build/terraform/aws/iam_attachment"
  id     = "substation_autoscaling_kinesis_read_attachment"
  policy = module.autoscaling_kinesis_read.kinesis_read_policy
  roles = [
    module.lambda_autoscaling.role,
  ]
}

# Required for resetting CloudWatch alarm states
# Resources can be isolated, but defaults to all streams
module "autoscaling_cloudwatch_modify" {
  source = "../../../build/terraform/aws/iam"
  resources = ["*"]
}

module "autoscaling_cloudwatch_modify_attachment" {
  source = "../../../build/terraform/aws/iam_attachment"
  id     = "substation_autoscaling_cloudwatch_modify_attachment"
  policy = module.autoscaling_cloudwatch_modify.cloudwatch_modify_policy
  roles = [
    module.lambda_autoscaling.role,
  ]
}

# Required for updating CloudWatch alarm variables
# Resources can be isolated, but defaults to all streams
module "autoscaling_cloudwatch_write" {
  source = "../../../build/terraform/aws/iam"
  resources = ["*"]
}

module "autoscaling_cloudwatch_write_attachment" {
  source = "../../../build/terraform/aws/iam_attachment"
  id     = "substation_autoscaling_cloudwatch_write_attachment"
  policy = module.autoscaling_cloudwatch_write.cloudwatch_write_policy
  roles = [
    module.lambda_autoscaling.role,
  ]
}

# Used for deploying and maintaining the Kinesis Data Streams autoscaling application
# Not required if deployments don't use Kinesis Data Streams

resource "aws_sns_topic" "autoscaling_topic" {
  name              = "substation_autoscaling"
  kms_master_key_id = module.kms_substation.key_id

  tags = {
    owner = "example"
  }
}

module "lambda_autoscaling" {
  source = "../../../build/terraform/aws/lambda"
  function_name = "substation_autoscaling"
  description   = "Autoscales Kinesis streams based on data volume and size"
  appconfig_id  = aws_appconfig_application.substation.id
  kms_arn       = module.kms_substation.arn
  image_uri     = "${module.ecr_autoscaling.repository_url}:latest"
  architectures = ["arm64"]

  # scales up when 5 out of 5 datapoints breach upper threshold (default)
  # scales down when 57 out of 60 datapoints breach lower threshold (default)
  env = {
    SUBSTATION_AUTOSCALING_UPSCALE_DATAPOINTS   = 5
    SUBSTATION_AUTOSCALING_DOWNSCALE_DATAPOINTS = 57
  }

  tags = {
    owner = "example"
  }

  depends_on = [
    aws_appconfig_application.substation,
    module.ecr_autoscaling.repository_url,
  ]
}

resource "aws_sns_topic_subscription" "autoscaling_subscription" {
  topic_arn = aws_sns_topic.autoscaling_topic.arn
  protocol  = "lambda"
  endpoint  = module.lambda_autoscaling.arn

  depends_on = [
    module.lambda_autoscaling.name
  ]
}

resource "aws_lambda_permission" "autoscaling_invoke" {
  statement_id  = "AllowExecutionFromSNS"
  action        = "lambda:InvokeFunction"
  function_name = module.lambda_autoscaling.name
  principal     = "sns.amazonaws.com"
  source_arn    = aws_sns_topic.autoscaling_topic.arn

  depends_on = [
    module.lambda_autoscaling.name
  ]
}

# Required for updating shard counts on Kinesis streams
# Resources can be isolated, but defaults to all streams
module "autoscaling_kinesis_modify" {
  source = "../../../build/terraform/aws/iam"
  resources = ["*"]
}

module "autoscaling_kinesis_modify_attachment" {
  source = "../../../build/terraform/aws/iam_attachment"
  id     = "substation_autoscaling_kinesis_modify_attachment"
  policy = module.autoscaling_kinesis_modify.kinesis_modify_policy
  roles = [
    module.lambda_autoscaling.role,
  ]
}

# Required for reading active shard counts for Kinesis streams
# Resources can be isolated, but defaults to all streams
module "autoscaling_kinesis_read" {
  source = "../../../build/terraform/aws/iam"
  resources = ["*"]
}

module "autoscaling_kinesis_read_attachment" {
  source = "../../../build/terraform/aws/iam_attachment"
  id     = "substation_autoscaling_kinesis_read_attachment"
  policy = module.autoscaling_kinesis_read.kinesis_read_policy
  roles = [
    module.lambda_autoscaling.role,
  ]
}

# Required for resetting CloudWatch alarm states
# Resources can be isolated, but defaults to all streams
module "autoscaling_cloudwatch_modify" {
  source = "../../../build/terraform/aws/iam"
  resources = ["*"]
}

module "autoscaling_cloudwatch_modify_attachment" {
  source = "../../../build/terraform/aws/iam_attachment"
  id     = "substation_autoscaling_cloudwatch_modify_attachment"
  policy = module.autoscaling_cloudwatch_modify.cloudwatch_modify_policy
  roles = [
    module.lambda_autoscaling.role,
  ]
}

# Required for updating CloudWatch alarm variables
# Resources can be isolated, but defaults to all streams
module "autoscaling_cloudwatch_write" {
  source = "../../../build/terraform/aws/iam"
  resources = ["*"]
}

module "autoscaling_cloudwatch_write_attachment" {
  source = "../../../build/terraform/aws/iam_attachment"
  id     = "substation_autoscaling_cloudwatch_write_attachment"
  policy = module.autoscaling_cloudwatch_write.cloudwatch_write_policy
  roles = [
    module.lambda_autoscaling.role,
  ]
}