Send

Send transforms send data to an external system.

Substation's send transforms differ from other transforms in a couple ways:

  • Data Passthrough: All data processed by a send transform passes through, without modification, to the next configured transform.
  • Data Batching: All data is batched in memory before being sent to an external system. Each batch can be further processed by applying auxiliary transforms before it is sent.

send.aws.dynamodb

Writes JSON objects as items to an AWS DynamoDB table.

Settings

FieldTypeDescriptionRequired
table_namestringThe DynamoDB table name that items are written to.Yes
batch.countintMaximum number of items to batch before emitting a new array.

Defaults to 1,000 items.
No
batch.sizeintMaximum size (in bytes) of items to batch before emitting a new array.

Defaults to 1MB.
No
batch.durationintMaximum duration to batch items before emitting a new array.

Defaults to 1m.
No
auxiliary_transforms[]objectTransforms that are applied to batched data in a sub-pipeline before sending data externally.

Defaults to an empty list (no additional transformation is applied).
No
object.source_keystringRetrieves a value from an object for transformation.No
object.batch_keystringRetrieves a value from an object that is used to organize batched data.

No default, all data is batched into the same array.
No
aws.regionstringAWS region that the DynamoDB table is in.

Defaults to the AWS_REGION and AWS_DEFAULT_REGION environment variables.
No
aws.role_arnstringAWS role that is used to authenticate.

Defaults to an empty string (no role assumption is used).
No
retry.countintegerMaximum number of times to retry queries to the DynamoDB table.

Defaults to the AWS_MAX_ATTEMPTS environment variable.
No

Example

sub.transform.send.aws.dynamodb(
  settings={table_name: 'substation'}
)
sub.tf.send.aws.dynamodb({table_name: 'substation'})

send.aws.kinesis_data_firehose

Puts data into an AWS Kinesis Data Firehose stream.

Settings

FieldTypeDescriptionRequired
stream_namestringThe Kinesis Data Firehose stream name that data is put in to.Yes
batch.durationstringMaximum duration to batch items before emitting a new array.

Defaults to 1m.
No
auxiliary_transforms[]objectTransforms that are applied to batched data in a sub-pipeline before sending data externally.

Defaults to an empty list (no additional transformation is applied).
No
object.batch_keystringRetrieves a value from an object that is used to organize batched data.

No default, all data is batched into the same array.
No
aws.regionstringAWS region that the Firehose stream is in.

Defaults to the AWS_REGION and AWS_DEFAULT_REGION environment variables.
No
aws.role_arnstringAWS role that is used to authenticate.

Defaults to an empty string (no role assumption is used).
No
retry.countintegerMaximum number of times to retry putting data into the Firehose stream.

Defaults to the AWS_MAX_ATTEMPTS environment variable.
No

Example

sub.transform.send.aws.kinesis_data_firehose(
  settings={stream_name: 'substation'}
)
sub.tf.send.aws.firehose({stream_name: 'substation'})

send.aws.kinesis_data_stream

Puts data into an AWS Kinesis Data Stream stream.

Settings

FieldTypeDescriptionRequired
stream_namestringThe Kinesis Data Stream stream name that data is put in to.Yes
batch.durationstringMaximum duration to batch items before emitting a new array.

Defaults to 1m.
No
object.batch_keystringRetrieves a value from an object that is used to organize batched data.

No default, all data is batched into the same array.
No
auxiliary_transforms[]objectTransforms that are applied to batched data in a sub-pipeline before sending data externally.

Defaults to an empty list (no additional transformation is applied).
No
aws.regionstringAWS region that the Kinesis stream is in.

Defaults to the AWS_REGION and AWS_DEFAULT_REGION environment variables.
No
aws.role_arnstringAWS role that is used to authenticate.

Defaults to an empty string (no role assumption is used).
No
retry.countintegerMaximum number of times to retry putting data into the Kinesis stream.

Defaults to the AWS_MAX_ATTEMPTS environment variable.
No
use_batch_key_as_partition_keyboolDetermines if the value retrieved using object.batch_key should be used as the Kinesis record's partition key.

Defaults to false (partition key is a random UUID).
No
enable_record_aggregationbooleanDetermines if records should be aggregated using the Kinesis Producer Library.

Defaults to false (no aggregation is used).
No

Example

sub.transform.send.aws.kinesis_data_stream(
  settings={stream_name: 'substation'}
)
sub.tf.send.aws.kinesis_data_stream({stream_name: 'substation'})

send.aws.lambda

Asynchronously invokes and sends data as a payload to an AWS Lambda function.

If you need to synchronously invoke a Lambda function, then use the enrich AWS Lambda transform.

Settings

FieldTypeDescriptionRequired
function_namestringThe Lambda function name that is asynchronously invoked.Yes
batch.durationstringMaximum duration to batch items before emitting a new array.

Defaults to 1m.
No
object.batch_keystringRetrieves a value from an object that is used to organize batched data.

No default, all data is batched into the same array.
No
auxiliary_transforms[]objectTransforms that are applied to batched data in a sub-pipeline before sending data externally.

Defaults to an empty list (no additional transformation is applied).
No
aws.regionstringAWS region that the Kinesis stream is in.

Defaults to the AWS_REGION and AWS_DEFAULT_REGION environment variables.
No
aws.role_arnstringAWS role that is used to authenticate.

Defaults to an empty string (no role assumption is used).
No
retry.countintegerMaximum number of times to retry putting data into the Kinesis stream.

Defaults to the AWS_MAX_ATTEMPTS environment variable.
No

Example

sub.transform.send.aws.lambda(
  settings={function_name: 'substation'}
)
sub.tf.send.aws.lambda({function_name: 'substation'})

send.aws.s3

Writes data as an object to an AWS S3 bucket.

Settings

FieldTypeDescriptionRequired
bucket_namestringThe S3 bucket name that data is written to.Yes
batch.countintMaximum number of items to batch before emitting a new array.

Defaults to 1,000 items.
No
batch.sizeintMaximum size (in bytes) of items to batch before emitting a new array.

Defaults to 1MB.
No
batch.durationstringMaximum duration to batch items before emitting a new array.

Defaults to 1m.
No
object.batch_keystringRetrieves a value from an object that is used to organize batched data.

No default, all data is batched into the same array.
No
auxiliary_transforms[]objectTransforms that are applied to batched data in a sub-pipeline before sending data externally.

Defaults to an empty list (no additional transformation is applied).
No
aws.regionstringAWS region that the S3 bucket is in.

Defaults to the AWS_REGION and AWS_DEFAULT_REGION environment variables.
No
aws.role_arnstringAWS role that is used to authenticate.

Defaults to an empty string (no role assumption is used).
No
retry.countintegerMaximum number of times to retry writes to the S3 bucket.

Defaults to the AWS_MAX_ATTEMPTS environment variable.
No
file_pathobjectDetermines how the name of the object is constructed.

Defaults to year/month/day/uuid.extension.
No
use_batch_key_as_prefixboolDetermines if the value retrieved using object.batch_key should replace the prefix value in file_path.

Defaults to false.
No

Example

sub.transform.send.aws.s3(
  settings={ bucket_name: 'substation', file_path: { prefix: 'prefix' } }
),
sub.tf.send.aws.s3({ bucket_name: 'substation', file_path: { prefix: 'prefix' } })

send.aws.sns

Sends data to an AWS SNS topic.

Settings

FieldTypeDescriptionRequired
arnstringThe SNS topic ARN that data is sent to.Yes
batch.durationstringMaximum duration to batch items before emitting a new array.

Defaults to 1m.
No
auxiliary_transforms[]objectTransforms that are applied to batched data in a sub-pipeline before sending data externally.

Defaults to an empty list (no additional transformation is applied).
No
object.batch_keystringRetrieves a value from an object that is used to organize batched data.

No default, all data is batched into the same array.
No
aws.regionstringAWS region that the SNS topic is in.

Defaults to the AWS_REGION and AWS_DEFAULT_REGION environment variables.
No
aws.role_arnstringAWS role that is used to authenticate.

Defaults to an empty string (no role assumption is used).
No
retry.countintegerMaximum number of times to retry sending data to the SNS topic.

Defaults to the AWS_MAX_ATTEMPTS environment variable.
No

Example

sub.transform.send.aws.sns(
  settings={arn: 'arn:aws:sns:us-east-1:123456789012:substation' },
)
sub.tf.send.aws.sns({arn: 'arn:aws:sns:us-east-1:123456789012:substation' })

send.aws.sqs

Sends data to an AWS SQS queue.

Settings

FieldTypeDescriptionRequired
arnstringThe SQS queue ARN that data is sent to.Yes
batch.durationstringMaximum duration to batch items before emitting a new array.

Defaults to 1m.
No
auxiliary_transforms[]objectTransforms that are applied to batched data in a sub-pipeline before sending data externally.

Defaults to an empty list (no additional transformation is applied).
No
object.batch_keystringRetrieves a value from an object that is used to organize batched data.

No default, all data is batched into the same array.
No
aws.regionstringAWS region that the SQS queue is in.

Defaults to the AWS_REGION and AWS_DEFAULT_REGION environment variables.
No
aws.role_arnstringAWS role that is used to authenticate.

Defaults to an empty string (no role assumption is used).
No
retry.countintegerMaximum number of times to retry sending data to the SQS queue.

Defaults to the AWS_MAX_ATTEMPTS environment variable.
No

Example

sub.transform.send.aws.sns(
  settings={arn: 'arn:aws:sqs:us-east-1:123456789012:substation' },
)
sub.tf.send.aws.sqs({arn: 'arn:aws:sqs:us-east-1:123456789012:substation' })

send.file

Writes data to a file.

Settings

FieldTypeDescriptionRequired
batch.countintegerMaximum number of items to batch before emitting a new array.

Defaults to 1,000 items.
No
batch.sizeintegerMaximum size (in bytes) of items to batch before emitting a new array.

Defaults to 1MB.
No
batch.durationstringMaximum duration to batch items before emitting a new array.

Defaults to 1m.
No
auxiliary_transforms[]objectTransforms that are applied to batched data in a sub-pipeline before sending data externally.

Defaults to an empty list (no additional transformation is applied).
No
object.batch_keystringRetrieves a value from an object that is used to organize batched data.

No default, all data is batched into the same array.
No
file_pathobjectDetermines how the name of the object is constructed.

Defaults to year/month/day/uuid.extension.
No
use_batch_key_as_prefixboolDetermines if the value retrieved using object.batch_key should replace the prefix value in file_path.

Defaults to false.
No

Example

sub.transform.send.file()
sub.tf.send.file()

send.http.post

POSTs data to an HTTP(S) URL.

Settings

FieldTypeDescriptionRequired
urlstringThe HTTP(S) URL used in the POST request.

URLs support loading secrets.
Yes
batch.countintegerMaximum number of items to batch before emitting a new array.

Defaults to 1,000 items.
No
batch.sizeintegerMaximum size (in bytes) of items to batch before emitting a new array.

Defaults to 1MB.
No
batch.durationstringMaximum duration to batch items before emitting a new array.

Defaults to 1m.
No
auxiliary_transforms[]objectTransforms that are applied to batched data in a sub-pipeline before sending data externally.

Defaults to an empty list (no additional transformation is applied).
No
object.batch_keystringRetrieves a value from an object that is used to organize batched data.

No default, all data is batched into the same array.
No
headers[]objectAn array of objects that contain HTTP headers sent in the request. Header values support loading secrets.

Defaults to an empty object (no headers are used).
No

Example

sub.transform.send.http.post(
  settings={ url: 'api.foo.com' }
),
sub.tf.send.http.post({ url: 'api.foo.com' })

send.stdout

Sends data to stdout.

Settings

FieldTypeDescriptionRequired
batch.countintegerMaximum number of items to batch before emitting a new array.

Defaults to 1,000 items.
No
batch.sizeintegerMaximum size (in bytes) of items to batch before emitting a new array.

Defaults to 1MB.
No
batch.durationstringMaximum duration to batch items before emitting a new array.

Defaults to 1m.
No
auxiliary_transforms[]objectTransforms that are applied to batched data in a sub-pipeline before sending data externally.

Defaults to an empty list (no additional transformation is applied).
No
object.batch_keystringRetrieves a value from an object that is used to organize batched data.

No default, all data is batched into the same array.
No

Example

sub.transform.send.stdout()
sub.tf.send.stdout()

File-Based Send Transforms

Send transforms that deliver file-like objects have specific settings that determine the path, format, and compression for each file.

file_path Settings

Determines how the name of the file is constructed.

FieldTypeDescriptionRequired
prefixstringString value that is prepended to the file path.No
time_formatstringInserts a formatted datetime string into the file path.

Must be one of:
- pattern-based layouts
- unix: epoch (supports fractions of a second)
- unix_milli: epoch milliseconds
No
uuidbooleanInserts a random UUID into the file path. In most configurations, this becomes the file name.No
suffixstringString value that is appended to the file name.No

Use Cases

Random, Date-Based Files

{
  // creates the file pattern `year/month/day/uuid.extension`
  file_path: {
    time_format: '2006/01/02',
    uuid: true,
  }
}