Aggregation
Aggregation buffers incoming messages and emits combined output based on time windows, count thresholds, or both. Use it to batch orders, summarize events, or reduce API call volume.
Configuration
Section titled “Configuration”Add an aggregate block under source:. At least one of window or count must be specified.
source: connector: shopify trigger: webhook event: order.created aggregate: window: 5m # Flush every 5 minutes count: 100 # Or after 100 messages, whichever comes first group_by: source.customer_id # Optional. Group messages by field. output: # Required. Aggregate expressions. customer_id: group.key order_count: count() total_amount: sum(source.total) max_buffer: 5000 # Default: 10000.| Field | Type | Required | Default | Description |
|---|---|---|---|---|
window | duration | Conditional | — | Time window for aggregation |
count | integer | Conditional | — | Flush after N messages |
group_by | string or string[] | No | — | Field path(s) for grouping |
output | Record<string, string> | Yes | — | Aggregate expressions |
max_buffer | integer | No | 10000 | Max messages per buffer |
Aggregate functions
Section titled “Aggregate functions”| Function | Description |
|---|---|
count() | Count of messages in the buffer |
sum(field.path) | Sum of numeric field values |
min(field.path) | Minimum value |
max(field.path) | Maximum value |
first(field.path) | Value from the first message |
last(field.path) | Value from the last message |
collect() | Collect all messages into an array |
collect_field(field.path) | Collect a specific field from all messages |
group.key | Reference to the current group key value |
Time-based windows
Section titled “Time-based windows”Flush the buffer after a time period elapses:
aggregate: window: 5m output: event_count: count() events: collect()All messages received in the 5-minute window are aggregated and emitted as a single output.
Count-based windows
Section titled “Count-based windows”Flush after a specific number of messages:
aggregate: count: 50 output: event_count: count() events: collect()Combined windows
Section titled “Combined windows”When both window and count are set, the buffer flushes when either condition is met — whichever comes first:
aggregate: window: 5m count: 100 output: batch_size: count() records: collect()Grouping with group_by
Section titled “Grouping with group_by”Group messages by a field value. Each group has its own buffer and flushes independently.
aggregate: window: 5m group_by: source.customer_id output: customer_id: group.key order_count: count() total_amount: sum(source.total) order_ids: collect_field(source.order_id)Multiple group keys
Section titled “Multiple group keys”aggregate: window: 10m group_by: [source.region, source.product_category] output: group: group.key sales_count: count() revenue: sum(source.amount)Flush behavior
Section titled “Flush behavior”A buffer flushes when any one of three conditions is met: the window timer expires (e.g., every 60 seconds), the number of buffered records reaches the count threshold, or the total buffer size hits max_buffer. Whichever trigger fires first wins — the remaining records stay in the next buffer cycle.
On flush, fyrn emits a single record per group key containing the fields defined in your output block. Scalar aggregations like sum(), count(), and avg() resolve to their final values; collect() produces an array. This output record then passes through any downstream transform steps and into deliver exactly like a normal source record — the pipeline sees one event per group, not the original N.
The aggregated output replaces source.* for downstream mapping. The output fields defined in the output block become the new source data.
Example: Aggregate order line items
Section titled “Example: Aggregate order line items”Batch individual order events into a single summary per customer:
flow: customer-order-summaryversion: 1source: connector: shopify trigger: webhook event: orders/create aggregate: window: 5m count: 100 group_by: source.customer_id output: customer_id: group.key order_count: count() total_amount: sum(source.total_price) min_order: min(source.total_price) max_order: max(source.total_price) first_order_id: first(source.id) last_order_id: last(source.id) all_order_ids: collect_field(source.id) max_buffer: 5000target: connector: crm-system endpoint: /api/customer-summaries method: POSTmapping: customer_id: source.customer_id order_count: source.order_count | to_integer total_amount: source.total_amount | decimal(2) order_range: "{{source.min_order}} - {{source.max_order}}"on_error: retry: 3x exponential(30s) then: dead-letterWhat’s next
Section titled “What’s next”- Dedup & Ordering — Prevent duplicates and ensure message order
- Batch Processing — Process arrays of records through a flow
- DSL Reference — Full
aggregatespecification