Utility
Utility transforms provide testing or debugging functions.
utility.control
Generates a control message based on a batch configuration.
Settings
Field | Type | Description | Required |
---|---|---|---|
batch.count | int | Maximum number of items to buffer before emitting a new array. Defaults to 1,000 items. | No |
batch.size | int | Maximum size (in bytes) of items to buffer before emitting a new array. Defaults to 1MB. | No |
batch.duration | string | Maximum duration to buffer items before emitting a new array. Defaults to 1m. | No |
Example
sub.transform.utility.control(
settings={ batch: { count: 1000 } }
)
sub.tf.util.ctrl({ batch: { count: 1000 } })
utility.delay
Delays transformation for a duration of time.
Settings
Field | Type | Description | Required |
---|---|---|---|
duration | string | Duration of time to sleep. | Yes |
Example
sub.transform.utility.delay(
settings={duration: '1m'}
)
sub.tf.util.delay({duration: '1m'})
utility.drop
Drops the message and returns nothing.
Example
sub.transform.utility.drop()
sub.tf.util.drop()
utility.err
Generates an error with a custom message.
Settings
Field | Type | Description | Required |
---|---|---|---|
message | string | Message returned in the error. | Yes |
Example
sub.transform.utility.err(
settings={message: 'test'}
)
sub.tf.util.err({message: 'test'})
utility.metric.bytes
Generates a metric that reports the sum of message bytes that have passed through the transform.
Settings
Field | Type | Description | Required |
---|---|---|---|
metric.name | string | Name of the metric. | Yes |
metric.destination | object | Metrics Destination configuration that reports the metric to an external system. | Yes |
metric.attributes | map | Map (dictionary) of strings that are included in the metric as attributes or labels. | No |
Example
sub.transform.utility.metric.bytes(
settings={ metric: { name: 'BytesReceived', destination: { type: 'aws_cloudwatch_embedded_metrics' } } },
)
sub.tf.util.metric.bytes({ metric: { name: 'BytesReceived', destination: { type: 'aws_cloudwatch_embedded_metrics' } } })
utility.metric.count
Generates a metric that reports the count of messages that have passed through the transform.
Settings
Field | Type | Description | Required |
---|---|---|---|
metric.name | string | Name of the metric. | Yes |
metric.destination | object | Metrics Destination configuration that reports the metric to an external system. | Yes |
metric.attributes | map | Map (dictionary) of strings that are included in the metric as attributes or labels. | No |
Example
sub.transform.utility.metric.count(
settings={ metric: { name: 'MessagesReceived', destination: { type: 'aws_cloudwatch_embedded_metrics' } } },
)
sub.tf.util.metric.count({ metric: { name: 'MessagesReceived', destination: { type: 'aws_cloudwatch_embedded_metrics' } } })
utility.metric.freshness
Generates metrics that reports the freshness of messages that have passed through the transform.
Freshness is measured by the difference between a time field in the message and the time the message passes through the transform. This generates "success" and "failure" metrics (i.e. fresh and stale) that are distinguishable using the attribute named FreshnessType
.
This can be used to measure the freshness of individual pipeline components, segments of a pipeline, or an end-to-end pipeline.
Settings
Field | Type | Description | Required |
---|---|---|---|
threshold | string | Delta between the two datetime values that determines if a message is fresh or stale. | Yes |
object.source_key | Retrieves a value from an object for measurement. | Yes | |
metric.name | string | Name of the metric. | Yes |
metric.destination | object | Metrics Destination configuration that reports the metric to an external system. | Yes |
metric.attributes | map | Map (dictionary) of strings that are included in the metric as attributes or labels. | No |
Example
sub.transform.utility.metric.freshness(
settings={
threshold: '5m',
object: { source_key: 'timestamp' } },
metric: { name: 'MessageFreshness', destination: { type: 'aws_cloudwatch_embedded_metrics' } }
},
)
sub.tf.util.metric.freshness({
threshold: '5m',
obj: { src: 'timestamp' } },
metric: { name: 'MessagesReceived', destination: { type: 'aws_cloudwatch_embedded_metrics' } }
})
utility.secret
Retrieves a secret from an external secret management system and loads it into the internal Secrets Store.
Settings
Field | Type | Description | Required |
---|---|---|---|
secret | object | The Secret configuration for retrieving and loading the secret. | Yes |
Example
sub.transform.utility.secret(
settings={ secret: sub.secrets.environment_variable(
settings={ id: 'ENV_VAR', name: 'SUBSTATION_SECRET' }
) }
)
sub.tf.util.secret({ secret: sub.secrets.environment_variable({ id: 'ENV_VAR', name: 'SUBSTATION_SECRET' }) })
Use Cases
Conditionally Drop Messages
Messages can be conditionally dropped by wrapping the utility.drop
transform with meta.switch
:
sub.transform.meta.switch(
settings={ cases: [
condition=[...],
transform=sub.tf.utility.drop(),
]},
)
Updated 5 months ago