Skip to content

YAML DSL Specification

This is the full specification for the fyrn YAML DSL. Flows are declarative YAML files that define how data moves between systems.

flow: <string> # Required. Flow identifier.
version: <integer> # Required. Positive integer.
description: <string> # Optional.
type: standard | saga | fan-out # Optional. Inferred from structure if omitted.
source: <SourceConfig> # Required.
target: <TargetConfig> # Required for simple flows.
steps: <StepConfig[]> # Required for multi-step / saga flows.
mapping: <MappingBlock> # For simple flows (source → target).
fan_out: <FanOutConfig> # Required for fan-out flows.
on_error: <ErrorConfig> # Optional.
on_step_failure: <OnStepFailure> # Optional. For saga flows.
policies: <PolicyRef[]> # Optional. Array of policy objects.
limits: <LimitsConfig> # Optional.

A flow must have at least one of:

  • target — simple flow (source → transform → target)
  • steps with length > 0 — multi-step or saga flow
  • fan_out — fan-out flow

If type is omitted:

  • If fan_out is present → fan-out
  • Otherwise → standard

source:
connector: <string> # Required. Connector instance name.
trigger: webhook | poll | schedule | manual | api
event: <string> # Optional. Event type (e.g., "orders/create").
schedule: <string> # Optional. Cron expression (e.g., "0 6 * * *").
timezone: <string> # Optional. IANA timezone (e.g., "Europe/Helsinki").
poll_interval: <string> # Optional. Duration string.
poll: <PollConfig> # Optional. Required when trigger is "poll".
dedup: <DedupConfig> # Optional.
ordering: <OrderingConfig> # Optional.
aggregate: <AggregateConfig> # Optional.
TriggerDescription
webhookInbound HTTP push from external system
pollPeriodic polling of an external API
scheduleCron-based execution
manualTriggered manually via CLI or UI
apiTriggered via a published API endpoint

source:
trigger: poll
poll:
url: <string> # Required. URL to poll.
method: GET | POST # Default: GET.
headers: # Optional. Key-value pairs.
Authorization: "Bearer {{instance.api_key}}"
params: # Optional. Query parameters.
limit: "100"
records_path: <string> # Dot-notation path to array in response.
pagination: <PaginationConfig>

Cursor-based:

pagination:
strategy: cursor
cursor_path: paging.next.after # Dot path to cursor in response.
cursor_param: after # Query param name to pass cursor.
max_pages: 100 # Default: 100.
page_size: 50 # Optional.

Offset-based:

pagination:
strategy: offset
page_size: 50 # Required.
offset_param: offset # Default: "offset".
total_path: meta.total_count # Dot path to total count in response.
max_pages: 100

Link header (RFC 5988):

pagination:
strategy: link_header
max_pages: 100

Next token:

pagination:
strategy: next_token
token_path: meta.next_token # Dot path to token in response.
token_param: page_token # Query param name.
max_pages: 100

target:
connector: <string> # Required. Connector instance name.
endpoint: <string> # Required. API path (e.g., "/api/sales-orders").
method: GET | POST | PUT | PATCH | DELETE # Required.

Steps are used in multi-step, saga, and fan-out flows. Each step has a name (required) and optional when condition.

steps:
- name: <string> # Required. Unique step identifier.
when: <condition> # Optional. Condition expression.

The step type is determined by which fields are present.

Tags the message for later tagged() condition checks.

- name: classify-order
when: source.total_price > 1000
tag: high-value

Calls an external connector/API and optionally stores the result.

- name: enrich-customer
call: crm-api # Required. Connector to call.
endpoint: /customers/lookup # Optional. API endpoint.
action: reserve # Optional. Action name (saga steps).
params: # Optional. Mapping expressions.
email: source.customer.email
store_as: customer_data # Optional. Store result as variable.
compensate: # Optional. Saga compensation.
action: release
params:
reservation_id: result.reservation_id

Conditional routing to different targets.

- name: route-to-warehouse
switch:
- when: source.shipping_address.country == "FI"
target: warehouse-eu
mapping: # Optional. Per-case mapping.
order_id: source.id
- when: source.shipping_address.country == "US"
target: warehouse-us
- default: true # Default case (no "when").
target: warehouse-international

Sends a notification to a channel. Both channel and message must be present.

- name: notify-if-high-value
when: tagged(high-value)
channel: "#big-orders"
message: "New order from {{source.customer.name}}"

Triggers another flow (fire-and-forget). Three forms:

String shorthand:

- name: notify-audit
emit: audit-log-flow

Object with payload:

- name: send-notification
emit:
flow: notification-flow
payload:
subject: "'New order received'"
order_id: source.id

Array (multi-emit):

- name: high-value-alerts
when: source.total_price > 1000
emit:
- flow: vip-handler
payload:
customer: source.customer.email
- flow: fraud-check

Pauses processing for a duration. Maximum: 7 days.

- name: wait-before-followup
wait: 10m # Duration: "10s", "5m", "1h", "1d".
timeout: 1h # Optional. Max wait before timeout action.
on_timeout: dead-letter # dead-letter | continue | fail

Delivers to a target connector within a multi-step flow.

- name: deliver-to-erp
target: erp-system
mapping:
order_id: source.id

Mappings are Record<string, expression>. Each expression is a string parsed into one of five types.

TypeExample
Field accesssource.customer.email
Template"Order {{source.id}} for {{source.customer.name}}"
Conditionalsource.paid ? "confirmed" : "pending"
Literal"fixed-value", 42, -10.5, true
Array mapsource.line_items[] -> each
order_id: source.id # Simple
email: source.customer.email # Nested
items: source.items[] # Array
first_item: source.items[0] # Indexed

Field references are detected by containing a dot, starting with known prefixes (source., result., item., env.), containing [, or containing ->.

<field_or_literal> | <transform1> | <transform2>(arg)

Transforms chain left-to-right. See the Transforms Guide for the complete function reference.

mapping:
currency: source.currency | uppercase
total: source.total_price | decimal(2)
formatted_code: 'source.raw_code | trim | uppercase | pad_left(8, "0")'
processed_at: '"" | now'
message: "Order {{source.id}} totaling {{source.total | decimal(2)}}"

Must have matching {{ }} pairs. Supports inline pipe transforms.

status: source.paid ? "confirmed" : "pending"
fixed_string: "fixed-value"
single_quoted: 'also literal'
number: 42
negative: -10.5
boolean: true
# Default iterator (item.*)
lines:
source.line_items[] -> each:
sku: item.sku
qty: item.quantity
# Named iterator
lines:
source.line_items[] -> each as line:
sku: line.sku
qty: line.quantity
# Nested arrays (requires "as <name>" at depth > 0)
orders:
source.orders[] -> each as order:
id: order.id
items:
order.lines[] -> each as line:
order_ref: order.id # Parent scope access
sku: line.sku

Rules:

  • Top-level array maps can omit as <name> (defaults to item.*)
  • Nested levels must use as <name>
  • Reserved names: source, result, env, item
  • Maximum nesting depth: 5 levels
  • Inner loops inherit all outer loop scopes
mapping: use standard-order-mapping

References a shared/reusable mapping definition.

PrefixDescription
source.Fields from the inbound payload
result.Result of a call step (via store_as)
item.Default iterator variable in array maps
env.Environment variables

Used in when: fields on steps, switch cases, and fan-out targets.

expression = or_expr
or_expr = and_expr ('or' and_expr)*
and_expr = unary_expr ('and' unary_expr)*
unary_expr = 'not' unary_expr | primary
primary = '(' expression ')' | atom
atom = tagged_call | comparison
TypeExample
Comparisonsource.amount > 1000
Equalitysource.status == "active"
Inequalitysource.status != "draft"
Taggedtagged(high-value)
ANDsource.country == "FI" and source.amount > 1000
ORsource.type == "order" or source.type == "return"
NOTnot tagged(processed)
Grouped(source.type == "order" or source.type == "return") and source.status != "draft"

Precedence (highest to lowest): notandor

Keywords and, or, not are only matched at word boundaries.


type: fan-out
fan_out:
routing_key: <string> # Required. Expression (e.g., "source.client_id").
targets: # Required. Min 1 target.
- name: <string> # Required.
instance: <string> # Required. Connector instance name.
endpoint: <string> # Required. API endpoint.
method: POST # Default: POST.
mapping: <MappingBlock> # Optional. Per-target mapping.
when: <condition> # Optional. Routing condition.

Example:

flow: payroll-distribution
version: 1
type: fan-out
source:
connector: payroll-inbound
trigger: webhook
fan_out:
routing_key: source.client_id
targets:
- name: sap-hr
instance: sap-production
endpoint: /api/hr/payroll
method: POST
when: routing_key == "acme-corp"
mapping:
employee_id: source.employee.id
gross_pay: source.amounts.gross | decimal(2)
- name: default-hr
instance: bamboohr-production
endpoint: /api/payroll
method: POST
mapping:
employeeId: source.employee.id
on_error:
retry: 3x exponential(30s)
then: dead-letter

Deduplicates messages by key within a time window. Placed under source:.

source:
dedup:
key: <string | string[]> # Required. Field path(s).
window: <duration> # Required. Duration string.

Single key:

dedup:
key: source.id
window: 24h

Composite key:

dedup:
key: [source.entity_id, source.event_type]
window: 1h

Ensures in-order processing per key. Placed under source:.

source:
ordering:
key: <string | string[]> # Required. Field path(s).
mode: strict | best-effort # Default: strict.
lock_ttl: <duration> # Optional. Default: 60s.
ModeBehavior
strictGuarantees in-order delivery. Messages queued per key.
best-effortAttempts ordering without blocking on lock contention.

When both dedup and ordering are configured, dedup runs first, then ordering lock is acquired.


Buffers messages and emits aggregated output. Placed under source:. At least one of window or count must be specified.

source:
aggregate:
window: <duration> # Optional. Time window.
count: <integer> # Optional. Flush after N messages.
group_by: <string | string[]> # Optional. Field path(s) for grouping.
output: # Required. Aggregate expressions.
<field>: <aggregate_expr>
max_buffer: <integer> # Default: 10000.
FunctionDescription
count()Count of messages in buffer
sum(field.path)Sum of numeric field values
min(field.path)Minimum value
max(field.path)Maximum value
first(field.path)Value from first message
last(field.path)Value from last message
collect()Collect all messages into array
collect_field(field.path)Collect a specific field from all messages
group.keyReference to the current group key value

Example:

source:
connector: shopify
trigger: webhook
event: order.created
aggregate:
window: 5m
count: 100
group_by: source.customer_id
output:
customer_id: group.key
order_count: count()
total_amount: sum(source.total)
min_amount: min(source.total)
max_amount: max(source.total)
first_order: first(source.order_id)
last_order: last(source.order_id)
all_orders: collect()
order_ids: collect_field(source.order_id)
max_buffer: 5000

on_error:
retry: <retry_string> # Optional.
fallback: <string> # Optional. Fallback action name.
then: <string> # Optional. Action after retries: "dead-letter", "alert".
ref: <string> # Optional. Reference to shared error policy.
<maxAttempts>x <strategy>(<baseDelay>)
FieldDescription
maxAttemptsInteger number of retry attempts
strategyexponential, linear, or fixed
baseDelayDuration with optional unit: 30s, 1m

Examples:

on_error:
retry: 3x exponential(30s) # 3 retries, exponential, 30s base
then: dead-letter
on_error:
retry: 5x exponential(60s) # 5 retries, 60s base
then: dead-letter
on_error:
retry: 2x fixed(10s) # 2 retries, fixed 10s delay
then: alert

Default (when retry string doesn’t match pattern): 3 attempts, exponential, 30s base.

on_step_failure:
strategy: compensate-previous # Required.
then: alert(ops-team) # Optional.

Defined under policies: as an array. Three built-in PII policy types:

Masks specified field paths (replaces with ***).

policies:
- pii-mask:
- customer.email
- customer.phone

Encrypts specified field paths using AES-256-GCM.

policies:
- pii-encrypt:
- customer.ssn
- payment.card_number

Strips fields for specific targets (completely removed from payload).

policies:
- pii-strip:
fields:
- customer.ssn
- payment.card_number
targets:
- analytics
- external-partner

The schema accepts arbitrary policy records for extensibility:

policies:
- audit: full

limits:
max_payload_size: <size_string> # Default: 10MB.
max_array_size: <integer> # Max items in an array.
chunk_size: <integer> # Auto-chunking size.

Pattern: <number><unit> where unit is b, kb, mb, or gb.

limits:
max_payload_size: 5mb
max_array_size: 1000
chunk_size: 100

Defines rollback actions for saga-flow call steps. Params are parsed as mapping expressions.

- name: reserve-inventory
call: inventory-api
action: reserve
params:
items: source.line_items
compensate:
action: release
params:
reservation_id: result.reservation_id

Used in dedup.window, ordering.lock_ttl, aggregate.window, wait, timeout.

Pattern: <number><unit>

UnitMeaningExample
sSeconds30s
mMinutes5m
hHours1h
dDays7d

flow: shopify-orders-to-erp
version: 2
source:
connector: shopify
trigger: webhook
event: orders/create
target:
connector: erp-system
endpoint: /api/sales-orders
method: POST
mapping:
order_id: source.id
total: source.total_price | decimal(2)
customer_email: source.customer.email | lowercase
currency: source.currency | uppercase
processed_at: '"" | now'
on_error:
retry: 3x exponential(30s)
then: dead-letter
flow: order-processing-pipeline
version: 1
source:
connector: shopify
trigger: webhook
event: orders/create
steps:
- name: classify-order
when: source.total_price > 1000
tag: high-value
- name: enrich-customer
call: crm-api
endpoint: /customers/lookup
params:
email: source.customer.email
store_as: customer_data
- name: route-to-warehouse
switch:
- when: source.shipping_address.country == "FI"
target: warehouse-eu
- default: true
target: warehouse-international
- name: notify-vip
when: tagged(high-value)
channel: "#big-orders"
message: "High-value order from {{source.customer.name}}"
flow: fulfillment-saga
version: 1
type: saga
source:
connector: order-service
trigger: webhook
event: order/confirmed
steps:
- name: reserve-inventory
call: inventory-api
action: reserve
params:
items: source.line_items
compensate:
action: release
params:
reservation_id: result.reservation_id
- name: charge-payment
call: payment-api
action: charge
params:
amount: source.total_price
compensate:
action: refund
params:
charge_id: result.charge_id
- name: create-shipment
call: shipping-api
action: create
params:
address: source.shipping_address
on_step_failure:
strategy: compensate-previous
then: alert(ops-team)
on_error:
retry: 2x exponential(60s)
then: dead-letter
flow: payroll-distribution
version: 1
type: fan-out
source:
connector: payroll-inbound
trigger: webhook
fan_out:
routing_key: source.client_id
targets:
- name: sap-hr
instance: sap-production
endpoint: /api/hr/payroll
method: POST
when: routing_key == "acme-corp"
mapping:
employee_id: source.employee.id
gross_pay: source.amounts.gross | decimal(2)
- name: default-hr
instance: bamboohr-production
endpoint: /api/payroll
method: POST
mapping:
employeeId: source.employee.id
on_error:
retry: 3x exponential(30s)
then: dead-letter
flow: aggregate-orders
version: 1
source:
connector: shopify
trigger: webhook
event: orders/create
dedup:
key: source.id
window: 24h
ordering:
key: source.customer_id
mode: strict
aggregate:
window: 5m
group_by: source.customer_id
output:
customer_id: group.key
order_count: count()
total: sum(source.total_price)
order_ids: collect_field(source.id)
target:
connector: crm-system
endpoint: /api/customer-summary
method: POST
mapping:
customer_id: source.customer_id
order_count: source.order_count
total: source.total | decimal(2)
on_error:
retry: 3x exponential(30s)
then: dead-letter
flow: order-to-analytics
version: 1
source:
connector: order-service
trigger: webhook
target:
connector: analytics-warehouse
endpoint: /api/events
method: POST
mapping:
order_id: source.id
amount: source.total_price | decimal(2)
customer_email: source.customer.email
customer_ssn: source.customer.ssn
policies:
- pii-mask:
- customer.email
- pii-encrypt:
- customer.ssn
- pii-strip:
fields:
- customer.ssn
targets:
- analytics-warehouse
limits:
max_payload_size: 5mb
on_error:
retry: 3x exponential(30s)
then: dead-letter

When compiled, the runtime executes operations in this order:

  1. receive — always first
  2. dedup — if configured (before transforms)
  3. ordering — if configured (after dedup)
  4. aggregate — if configured (after ordering)
  5. Then one of:
    • Simple flow: transform → deliver
    • Multi-step flow: sequence of tag, call, store, condition, deliver, notify, emit, wait operations
    • Fan-out flow: fan-out