Aggregate

Aggregate transforms aggregate or disaggregate data.

aggregate.to.array

Transforms data by aggregating it into a JSON array.

{"a":"b"}
{"c":"d"}
{"e":"f"}
[{"a":"b"},{"c":"d"},{"e":"f"}]

Settings

FieldTypeDescriptionRequired
batch.countintMaximum number of items to buffer before emitting a new array.

Defaults to 1,000 items.
No
batch.sizeintMaximum size (in bytes) of items to buffer before emitting a new array.

Defaults to 1MB.
No
batch.durationstringMaximum duration to buffer items before emitting a new array.

Defaults to 1m.
No
object.source_keystringRetrieves a value from an object for transformation.No
object.target_keystringPlaces a value into an object after transformation.No
object.batch_keystringRetrieves a value from an object that is used to group batched data.

No default, all data is batched into the same array.
No

Example

sub.transform.aggregate.to.array()
sub.tf.agg.to.arr()

aggregate.from.array

Transforms data by disaggregating it from a JSON array. Values from the array become new messages.

[{"a":"b"},{"c":"d"},{"e":"f"}]
{"a":"b"}
{"c":"d"}
{"e":"f"}

Settings

FieldTypeDescriptionRequired
object.source_keystringRetrieves a value from an object for transformation.No
object.target_keystringPlaces a value into an object after transformation.No

Example

sub.transform.aggregate.from.array()
sub.tf.agg.from.arr()

aggregate.to.string

Transforms data by aggregating it to a string.

{"a":"b"}
{"c":"d"}
{"e":"f"}
{"a":"b"}\n{"c":"d"}\n{"e":"f"}

Settings

FieldTypeDescriptionRequired
batch.countintMaximum number of items to buffer before emitting a new string.

Defaults to 1,000 items.
No
batch.sizeintMaximum size (in bytes) of items to buffer before emitting a new string.

Defaults to 1MB.
No
batch.durationstringMaximum duration to buffer items before emitting a new string.

Defaults to 5m.
No
object.source_keystringRetrieves a value from an object for transformation.No
object.batch_keystringRetrieves a value from an object that is used to organize batched data.

No default, all data is batched into the same string.
No
separatorstringSeparator substring that is used to aggregate data.Yes

Example

sub.transform.aggregate.to.string(
  settings={separator: '\n'}
)
sub.tf.agg.to.str({separator: '\n'})

aggregate.from.string

Transforms data by disaggregating it from a string.

{"a":"b"}\n{"c":"d"}\n{"e":"f"}
{"a":"b"}
{"c":"d"}
{"e":"f"}

Settings

FieldTypeDescriptionRequired
separatorstringSeparator substring that is used to disaggregate data.Yes

Example

sub.transform.aggregate.from.string(
  settings={separator: '\n'}
)
sub.tf.agg.from.str({separator: '\n'})

Use Cases

Organizing Data by Key

The buffer.key setting can be used to dynamically organize data based on values inside of objects. For example, if the buffer.key is set to "key", then these results are produced by the aggregate.to.array transform:

{"a":"b","key":"x"}
{"c":"d","key":"y"}
{"e":"f","key":"x"}
[{"a":"b","key":"x"},{"e":"f","key":"x"}]
[{"c":"d","key":"y"}]