Meta

Meta transforms apply other transforms.

meta.err

Catches errors returned by other transforms and returns messages.

Settings

FieldTypeDescriptionRequired
transformobjectTransform that is applied.Yes
error_messages[]arrayRegular expressions that match error messages and determine if the error should be caught.

Defaults to an empty list (all errors are caught).
No

Example

sub.tf.meta.err(
  settings={transform: sub.tf.enrich.http.get([...])}
)
sub.tf.meta.err({transform: sub.tf.enrich.http.get([...])})

meta.for_each

Applies transform to every element of a JSON array.

Settings

FieldTypeDescriptionRequired
object.source_keystringRetrieves a value from an object for transformation.Yes
object.target_keystringPlaces a value into an object after transformation.Yes
transformobjectTransform that is applied.Yes

Example

sub.tf.meta.for_each(
  settings={object: {source_key: 'arr', target_key: 'arr'}, transform: sub.tf.enrich.http.get([...])}
)
sub.tf.meta.for_each({obj: {src: 'arr', trg: 'arr'}, transform: sub.tf.enrich.http.get([...])})

meta.kv_store.lock

Transforms data by acquiring a lock in a key-value store before executing a transform. Any error in the transform will cause keys to unlock.

This transform can be used to achieve "exactly-once" semantics in other transforms, including send transforms.

Settings

FieldTypeDescriptionRequired
object.source_keystringRetrieves a value from an object that is used as the key to lock the item in the KV store.

Defaults to an empty string (the SHA256 hash of the message is the key).
No
object.ttl_keystringRetrieves a value from an object that is used as the time-to-live (TTL) of the item set into the KV store. This value must be an integer that represents the Unix time when the item will be evicted from the store. Any precision greater than seconds (e.g., milliseconds, nanoseconds) is truncated to seconds.

Defaults to an empty string (no TTL is used when locking items in the store).
No
prefixstringString that is prepended to the value retrieved by object.source_key.

Defaults to an empty string (no prefix is used).
No
ttl_offsetstringAn offset used to determine the time-to-live (TTL) of the item set into the KV store. If object.ttl_key is configured, then this value is added to the TTL value retrieved from the object. If object.ttl_key is not used, then this value is added to the current time.

For example, if object.ttl_key is not configured and the offset is "1d" (1 day), then the value will be evicted from the store when more than 1 day from now has passed.

Defaults to an empty string (no TTL is used when locking items in the store).
No
transformobjectTransform that is applied.Yes
kv_storeobjectThe KV store configuration settings. Refer to each KV store backend described in Key-Value Stores for more information.Yes

Example

sub.transform.meta.kv_store.lock(settings={
  kv_store: kv,
  ttl_offset: '1m',
  transform: sub.tf.send.stdout(),  // Only unique messages are sent to stdout.
})
sub.tf.meta.kv.lock(settings={
  kv_store: kv,
  ttl_offset: '1m',
  transform: sub.tf.send.stdout(),  // Only unique messages are sent to stdout.
})

meta.metric.duration

Generates a metric that reports the execution time (duration) of the transform.

Settings

FieldTypeDescriptionRequired
metric.namestringName of the metric.Yes
metric.destinationobjectMetrics Destination configuration that reports the metric to an external system.Yes
transformobjectTransform that is applied.Yes
metric.attributesmapMap (dictionary) of strings that are included in the metric as attributes or labels.No

Example

sub.tf.meta.metric.duration(
  settings={
    metric: { name: 'ObjectCopyDuration', destination: { type: 'aws_cloudwatch_embedded_metrics' } },
    transform: sub.tf.object.copy(
      settings={ object: { source_key: 'a', target_key: 'z' } },
    ),
  },
)
sub.tf.meta.metric.duration({
  metric: { name: 'ObjectCopyDuration', destination: { type: 'aws_cloudwatch_embedded_metrics' } },
  transform: sub.tf.object.copy({ obj: { src: 'a', trg: 'z' } }),
})

meta.pipeline

Applies multiple transform in a series.

Settings

FieldTypeDescriptionRequired
object.source_keystringRetrieves a value from an object for transformation.No
object.target_keystringSets a value into an object after transformation.No
transformsobjectTransforms that are applied.Yes

Example

sub.tf.meta.pipeline(
  settings={transforms: [
    sub.tf.enrich.http.get([...]),
    sub.tf.object.copy([...]),
  ]}
)
sub.tf.meta.pipe(
  settings={transforms: [
    sub.tf.enrich.http.get([...]),
    sub.tf.obj.cp([...]),
  ]}
)

meta.switch

Conditionally applies transforms using conditional statements:

  • if
  • if ... else
  • if ... elif ... else

Settings

FieldTypeDescriptionRequired
cases.condition[]objectArray of conditions that apply a transform.No
cases.transform[]objectArray of transforms conditionally applied.Yes

Example

sub.tf.meta.switch(
  settings={ cases: [
    {condition: [...], transform: [...]},
    {condition: [...], transform: [...]},
  ]}
)

Use Cases

If Statement

An if statement is created by configuring a single meta.switch statement. This examples copies a string value if it contains a specific character:

sub.tf.meta.switch(
  settings={ cases: [
    { 
      condition: sub.cnd.any(sub.cnd.string.contains(settings={object: {source_key: 'a'}, value: 'b'}),
      transform: sub.tf.object.copy(settings={object: {source_key: 'a', target_key: 'z'}}),
    }
  ]}
)
sub.tf.meta.switch(
  settings={ switch: [
    { 
      condition: sub.cnd.any(sub.cnd.str.has(settings={object: {src: 'a'}, value: 'b'}),
      transform: sub.tf.obj.cp(settings={object: {src: 'a', trg: 'z'}}),
    }
  ]}
)
sub.pattern.tf.conditional(
  // tf.conditional pattern automatically applies the ANY operator if no operator is provided.
  condition=sub.cnd.str.contains(settings={obj: {src: 'a'}, value: 'b'},
  transform=sub.tf.obj.cp(settings={obj: {src: 'a', trg: 'z'}}),
)

If ... Else Statement

An if ... else statement is created by configuring two meta.switch statements:

  • The first statement is configured with a condition
  • The second statement is configured without a condition

This example aggregates JSON text into an array, otherwise the messages are dropped if they are not valid JSON.

sub.tf.meta.switch(
  settings={ cases: [
    { 
      condition: sub.cnd.any(sub.cnd.format.json()),
      transform: sub.tf.aggregate.to.array(),
    },
    {
      transform: sub.tf.utility.drop(),
    }
  ]}
)
sub.tf.meta.switch(
  settings={ switch: [
    { 
      condition: sub.cnd.any(sub.cnd.fmt.json()),
      transform: sub.tf.agg.to.arr(),
    },
    {
      transform: sub.tf.util.drop(),
    }
  ]}
)

More statements can be added to create if ... elif ... else statements.