Utility

Utility transforms provide testing or debugging functions.

utility.control

Generates a control message based on a batch configuration.

Settings

FieldTypeDescriptionRequired
batch.countintMaximum number of items to buffer before emitting a new array.

Defaults to 1,000 items.
No
batch.sizeintMaximum size (in bytes) of items to buffer before emitting a new array.

Defaults to 1MB.
No
batch.durationstringMaximum duration to buffer items before emitting a new array.

Defaults to 1m.
No

Example

sub.transform.utility.control(
  settings={ batch: { count: 1000 } }
)
sub.tf.util.ctrl({ batch: { count: 1000 } })

utility.delay

Delays transformation for a duration of time.

Settings

FieldTypeDescriptionRequired
durationstringDuration of time to sleep.Yes

Example

sub.transform.utility.delay(
  settings={duration: '1m'}
)
sub.tf.util.delay({duration: '1m'})

utility.drop

Drops the message and returns nothing.

Example

sub.transform.utility.drop()
sub.tf.util.drop()

utility.err

Generates an error with a custom message.

Settings

FieldTypeDescriptionRequired
messagestringMessage returned in the error.Yes

Example

sub.transform.utility.err(
  settings={message: 'test'}
)
sub.tf.util.err({message: 'test'})

utility.metric.bytes

Generates a metric that reports the sum of message bytes that have passed through the transform.

Settings

FieldTypeDescriptionRequired
metric.namestringName of the metric.Yes
metric.destinationobjectMetrics Destination configuration that reports the metric to an external system.Yes
metric.attributesmapMap (dictionary) of strings that are included in the metric as attributes or labels.No

Example

sub.transform.utility.metric.bytes(
  settings={ metric: { name: 'BytesReceived', destination: { type: 'aws_cloudwatch_embedded_metrics' } } },
)
sub.tf.util.metric.bytes({ metric: { name: 'BytesReceived', destination: { type: 'aws_cloudwatch_embedded_metrics' } } })

utility.metric.count

Generates a metric that reports the count of messages that have passed through the transform.

Settings

FieldTypeDescriptionRequired
metric.namestringName of the metric.Yes
metric.destinationobjectMetrics Destination configuration that reports the metric to an external system.Yes
metric.attributesmapMap (dictionary) of strings that are included in the metric as attributes or labels.No

Example

sub.transform.utility.metric.count(
  settings={ metric: { name: 'MessagesReceived', destination: { type: 'aws_cloudwatch_embedded_metrics' } } },
)
sub.tf.util.metric.count({ metric: { name: 'MessagesReceived', destination: { type: 'aws_cloudwatch_embedded_metrics' } } })

utility.metric.freshness

Generates metrics that reports the freshness of messages that have passed through the transform.

Freshness is measured by the difference between a time field in the message and the time the message passes through the transform. This generates "success" and "failure" metrics (i.e. fresh and stale) that are distinguishable using the attribute named FreshnessType.

This can be used to measure the freshness of individual pipeline components, segments of a pipeline, or an end-to-end pipeline.

Settings

FieldTypeDescriptionRequired
thresholdstringDelta between the two datetime values that determines if a message is fresh or stale.Yes
object.source_keyRetrieves a value from an object for measurement.Yes
metric.namestringName of the metric.Yes
metric.destinationobjectMetrics Destination configuration that reports the metric to an external system.Yes
metric.attributesmapMap (dictionary) of strings that are included in the metric as attributes or labels.No

Example

sub.transform.utility.metric.freshness(
  settings={ 
     threshold: '5m',
     object: { source_key: 'timestamp' } },
     metric: { name: 'MessageFreshness', destination: { type: 'aws_cloudwatch_embedded_metrics' } } 
  },
)
sub.tf.util.metric.freshness({
  threshold: '5m',
  obj: { src: 'timestamp' } },
  metric: { name: 'MessagesReceived', destination: { type: 'aws_cloudwatch_embedded_metrics' } }
})

utility.secret

Retrieves a secret from an external secret management system and loads it into the internal Secrets Store.

Settings

FieldTypeDescriptionRequired
secretobjectThe Secret configuration for retrieving and loading the secret.Yes

Example

sub.transform.utility.secret(
  settings={ secret: sub.secrets.environment_variable(
    settings={ id: 'ENV_VAR', name: 'SUBSTATION_SECRET' }
  ) }
)
sub.tf.util.secret({ secret: sub.secrets.environment_variable({ id: 'ENV_VAR', name: 'SUBSTATION_SECRET' }) })

Use Cases

Conditionally Drop Messages

Messages can be conditionally dropped by wrapping the utility.drop transform with meta.switch:

sub.transform.meta.switch(
  settings={ cases: [
    condition=[...],
    transform=sub.tf.utility.drop(),
  ]},
)