Reading From and Writing to S3 Buckets

Reading from and writing to an S3 bucket includes deployment of these resources:

  • S3 bucket that data is read from and written to
  • Processor Lambda that reads, transforms, and writes data

If an S3 bucket is used as intermediary and long-term storage for a deployment, then we recommend using one bucket and organizing it using prefixes.

################################################
# S3 bucket
# Storage for raw and processed data
################################################

module "s3" {
  source  = "../../../build/terraform/aws/s3"
  kms_arn = module.kms_substation.arn
  # S3 buckets are globally unique, replace `foo` with a unique identifier (e.g., company name)
  bucket  = "foo-substation-example"
}

################################################
## Read permissions
################################################

module "iam_s3_read" {
  source = "../../../build/terraform/aws/iam"
  resources = [
    "${module.s3.arn}/*",
  ]
}

module "iam_s3_read_attachment" {
  source = "../../../build/terraform/aws/iam_attachment"
  id     = "${module.s3.name}_s3_read"
  policy = module.iam_s3_read.s3_read_policy
  roles = [
    module.lambda_processor.role,
  ]
}

################################################
## Write permissions
################################################

module "iam_s3_write" {
  source = "../../../build/terraform/aws/iam"
  resources = [
    "${module.s3.arn}/*",
  ]
}

module "iam_s3_write_attachment" {
  source = "../../../build/terraform/aws/iam_attachment"
  id     = "${module.lambda_processed_s3_sink.name}_write"
  policy = module.iam_s3_write.s3_write_policy
  roles = [
    module.lambda_processor.role,
  ]
}
################################################
# Lambda
# Reads from S3 bucket, processes data, writes to S3 bucket
################################################

module "lambda_processor" {
  source        = "../../../build/terraform/aws/lambda"
  function_name = "substation_processor"
  description   = "Substation Lambda that reads data from an S3 bucket, processes data, and writes data to an S3 bucket"
  appconfig_id  = aws_appconfig_application.substation.id
  kms_arn       = module.kms_substation.arn
  image_uri     = "${module.ecr_substation.repository_url}:latest"
  architectures = ["arm64"]

  env = {
    "AWS_MAX_ATTEMPTS" : 10
    "AWS_APPCONFIG_EXTENSION_PREFETCH_LIST" : "/applications/substation/environments/prod/configurations/substation_processor"
    "SUBSTATION_HANDLER" : "AWS_S3"
    "SUBSTATION_DEBUG" : 1
    "SUBSTATION_METRICS" : "AWS_CLOUDWATCH_EMBEDDED_METRICS"
  }
  
  tags = {
    owner = "example"
  }

  depends_on = [
    aws_appconfig_application.substation,
    module.ecr_substation.repository_url,
  ]
}

################################################
# Trigger settings
################################################

resource "aws_lambda_permission" "lambda_processor" {
  statement_id  = "AllowExecutionFromS3Bucket"
  action        = "lambda:InvokeFunction"
  function_name = module.lambda_processor.name
  principal     = "s3.amazonaws.com"
  source_arn    = module.s3.arn
}

resource "aws_s3_bucket_notification" "lambda_notification_processor" {
  bucket = module.s3.id

  lambda_function {
    lambda_function_arn = module.lambda_processor.arn
    events              = ["s3:ObjectCreated:*"]
    # enable prefix filtering based on the source service that is writing objects to the bucket
    filter_prefix       = "ingest/"
  }

  depends_on = [
    aws_lambda_permission.lambda_processor,
  ]
}
// config for the processor Lambda
local sub = import '../../../../build/config/substation.libsonnet';

{
  // objects have this path: foo-substation-example/processed/[year]/[month]/[day]/
  sink: sub.interfaces.sink.aws_s3(bucket='foo-substation-example', prefix='processed'),
  // add processor configurations to enable data transformation or
  // use the "transfer" transform to send data as-is to the sink
  transform: {
    type: 'batch',
    settings: {
      processors:
      // + foo.processors
      // + bar.processors
      // + baz.processors
    },
  },
}