Skip to content

Aggregation

Aggregation buffers incoming messages and emits combined output based on time windows, count thresholds, or both. Use it to batch orders, summarize events, or reduce API call volume.

Add an aggregate block under source:. At least one of window or count must be specified.

source:
connector: shopify
trigger: webhook
event: order.created
aggregate:
window: 5m # Flush every 5 minutes
count: 100 # Or after 100 messages, whichever comes first
group_by: source.customer_id # Optional. Group messages by field.
output: # Required. Aggregate expressions.
customer_id: group.key
order_count: count()
total_amount: sum(source.total)
max_buffer: 5000 # Default: 10000.
FieldTypeRequiredDefaultDescription
windowdurationConditionalTime window for aggregation
countintegerConditionalFlush after N messages
group_bystring or string[]NoField path(s) for grouping
outputRecord<string, string>YesAggregate expressions
max_bufferintegerNo10000Max messages per buffer

FunctionDescription
count()Count of messages in the buffer
sum(field.path)Sum of numeric field values
min(field.path)Minimum value
max(field.path)Maximum value
first(field.path)Value from the first message
last(field.path)Value from the last message
collect()Collect all messages into an array
collect_field(field.path)Collect a specific field from all messages
group.keyReference to the current group key value

Flush the buffer after a time period elapses:

aggregate:
window: 5m
output:
event_count: count()
events: collect()

All messages received in the 5-minute window are aggregated and emitted as a single output.


Flush after a specific number of messages:

aggregate:
count: 50
output:
event_count: count()
events: collect()

When both window and count are set, the buffer flushes when either condition is met — whichever comes first:

aggregate:
window: 5m
count: 100
output:
batch_size: count()
records: collect()

Group messages by a field value. Each group has its own buffer and flushes independently.

aggregate:
window: 5m
group_by: source.customer_id
output:
customer_id: group.key
order_count: count()
total_amount: sum(source.total)
order_ids: collect_field(source.order_id)
aggregate:
window: 10m
group_by: [source.region, source.product_category]
output:
group: group.key
sales_count: count()
revenue: sum(source.amount)

A buffer flushes when any one of three conditions is met: the window timer expires (e.g., every 60 seconds), the number of buffered records reaches the count threshold, or the total buffer size hits max_buffer. Whichever trigger fires first wins — the remaining records stay in the next buffer cycle.

On flush, fyrn emits a single record per group key containing the fields defined in your output block. Scalar aggregations like sum(), count(), and avg() resolve to their final values; collect() produces an array. This output record then passes through any downstream transform steps and into deliver exactly like a normal source record — the pipeline sees one event per group, not the original N.

The aggregated output replaces source.* for downstream mapping. The output fields defined in the output block become the new source data.


Batch individual order events into a single summary per customer:

flow: customer-order-summary
version: 1
source:
connector: shopify
trigger: webhook
event: orders/create
aggregate:
window: 5m
count: 100
group_by: source.customer_id
output:
customer_id: group.key
order_count: count()
total_amount: sum(source.total_price)
min_order: min(source.total_price)
max_order: max(source.total_price)
first_order_id: first(source.id)
last_order_id: last(source.id)
all_order_ids: collect_field(source.id)
max_buffer: 5000
target:
connector: crm-system
endpoint: /api/customer-summaries
method: POST
mapping:
customer_id: source.customer_id
order_count: source.order_count | to_integer
total_amount: source.total_amount | decimal(2)
order_range: "{{source.min_order}} - {{source.max_order}}"
on_error:
retry: 3x exponential(30s)
then: dead-letter