YAML DSL Specification
This is the full specification for the fyrn YAML DSL. Flows are declarative YAML files that define how data moves between systems.
Top-level structure
Section titled “Top-level structure”flow: <string> # Required. Flow identifier.version: <integer> # Required. Positive integer.description: <string> # Optional.type: standard | saga | fan-out # Optional. Inferred from structure if omitted.
source: <SourceConfig> # Required.target: <TargetConfig> # Required for simple flows.steps: <StepConfig[]> # Required for multi-step / saga flows.mapping: <MappingBlock> # For simple flows (source → target).fan_out: <FanOutConfig> # Required for fan-out flows.
on_error: <ErrorConfig> # Optional.on_step_failure: <OnStepFailure> # Optional. For saga flows.policies: <PolicyRef[]> # Optional. Array of policy objects.limits: <LimitsConfig> # Optional.Structural constraints
Section titled “Structural constraints”A flow must have at least one of:
target— simple flow (source → transform → target)stepswith length > 0 — multi-step or saga flowfan_out— fan-out flow
Type inference
Section titled “Type inference”If type is omitted:
- If
fan_outis present →fan-out - Otherwise →
standard
Source config
Section titled “Source config”source: connector: <string> # Required. Connector instance name. trigger: webhook | poll | schedule | manual | api event: <string> # Optional. Event type (e.g., "orders/create"). schedule: <string> # Optional. Cron expression (e.g., "0 6 * * *"). timezone: <string> # Optional. IANA timezone (e.g., "Europe/Helsinki"). poll_interval: <string> # Optional. Duration string. poll: <PollConfig> # Optional. Required when trigger is "poll". dedup: <DedupConfig> # Optional. ordering: <OrderingConfig> # Optional. aggregate: <AggregateConfig> # Optional.Trigger types
Section titled “Trigger types”| Trigger | Description |
|---|---|
webhook | Inbound HTTP push from external system |
poll | Periodic polling of an external API |
schedule | Cron-based execution |
manual | Triggered manually via CLI or UI |
api | Triggered via a published API endpoint |
Poll config
Section titled “Poll config”source: trigger: poll poll: url: <string> # Required. URL to poll. method: GET | POST # Default: GET. headers: # Optional. Key-value pairs. Authorization: "Bearer {{instance.api_key}}" params: # Optional. Query parameters. limit: "100" records_path: <string> # Dot-notation path to array in response. pagination: <PaginationConfig>Pagination strategies
Section titled “Pagination strategies”Cursor-based:
pagination: strategy: cursor cursor_path: paging.next.after # Dot path to cursor in response. cursor_param: after # Query param name to pass cursor. max_pages: 100 # Default: 100. page_size: 50 # Optional.Offset-based:
pagination: strategy: offset page_size: 50 # Required. offset_param: offset # Default: "offset". total_path: meta.total_count # Dot path to total count in response. max_pages: 100Link header (RFC 5988):
pagination: strategy: link_header max_pages: 100Next token:
pagination: strategy: next_token token_path: meta.next_token # Dot path to token in response. token_param: page_token # Query param name. max_pages: 100Target config
Section titled “Target config”target: connector: <string> # Required. Connector instance name. endpoint: <string> # Required. API path (e.g., "/api/sales-orders"). method: GET | POST | PUT | PATCH | DELETE # Required.Step types
Section titled “Step types”Steps are used in multi-step, saga, and fan-out flows. Each step has a name (required) and optional when condition.
steps: - name: <string> # Required. Unique step identifier. when: <condition> # Optional. Condition expression.The step type is determined by which fields are present.
Tag step
Section titled “Tag step”Tags the message for later tagged() condition checks.
- name: classify-order when: source.total_price > 1000 tag: high-valueCall step
Section titled “Call step”Calls an external connector/API and optionally stores the result.
- name: enrich-customer call: crm-api # Required. Connector to call. endpoint: /customers/lookup # Optional. API endpoint. action: reserve # Optional. Action name (saga steps). params: # Optional. Mapping expressions. email: source.customer.email store_as: customer_data # Optional. Store result as variable. compensate: # Optional. Saga compensation. action: release params: reservation_id: result.reservation_idSwitch step
Section titled “Switch step”Conditional routing to different targets.
- name: route-to-warehouse switch: - when: source.shipping_address.country == "FI" target: warehouse-eu mapping: # Optional. Per-case mapping. order_id: source.id - when: source.shipping_address.country == "US" target: warehouse-us - default: true # Default case (no "when"). target: warehouse-internationalNotify step
Section titled “Notify step”Sends a notification to a channel. Both channel and message must be present.
- name: notify-if-high-value when: tagged(high-value) channel: "#big-orders" message: "New order from {{source.customer.name}}"Emit step
Section titled “Emit step”Triggers another flow (fire-and-forget). Three forms:
String shorthand:
- name: notify-audit emit: audit-log-flowObject with payload:
- name: send-notification emit: flow: notification-flow payload: subject: "'New order received'" order_id: source.idArray (multi-emit):
- name: high-value-alerts when: source.total_price > 1000 emit: - flow: vip-handler payload: customer: source.customer.email - flow: fraud-checkWait step
Section titled “Wait step”Pauses processing for a duration. Maximum: 7 days.
- name: wait-before-followup wait: 10m # Duration: "10s", "5m", "1h", "1d". timeout: 1h # Optional. Max wait before timeout action. on_timeout: dead-letter # dead-letter | continue | failDeliver step
Section titled “Deliver step”Delivers to a target connector within a multi-step flow.
- name: deliver-to-erp target: erp-system mapping: order_id: source.idMapping expressions
Section titled “Mapping expressions”Mappings are Record<string, expression>. Each expression is a string parsed into one of five types.
Expression types
Section titled “Expression types”| Type | Example |
|---|---|
| Field access | source.customer.email |
| Template | "Order {{source.id}} for {{source.customer.name}}" |
| Conditional | source.paid ? "confirmed" : "pending" |
| Literal | "fixed-value", 42, -10.5, true |
| Array map | source.line_items[] -> each |
Field access
Section titled “Field access”order_id: source.id # Simpleemail: source.customer.email # Nesteditems: source.items[] # Arrayfirst_item: source.items[0] # IndexedField references are detected by containing a dot, starting with known prefixes (source., result., item., env.), containing [, or containing ->.
Pipe transform syntax
Section titled “Pipe transform syntax”<field_or_literal> | <transform1> | <transform2>(arg)Transforms chain left-to-right. See the Transforms Guide for the complete function reference.
mapping: currency: source.currency | uppercase total: source.total_price | decimal(2) formatted_code: 'source.raw_code | trim | uppercase | pad_left(8, "0")' processed_at: '"" | now'Template strings
Section titled “Template strings”message: "Order {{source.id}} totaling {{source.total | decimal(2)}}"Must have matching {{ }} pairs. Supports inline pipe transforms.
Conditional (ternary)
Section titled “Conditional (ternary)”status: source.paid ? "confirmed" : "pending"Literals
Section titled “Literals”fixed_string: "fixed-value"single_quoted: 'also literal'number: 42negative: -10.5boolean: trueArray mapping
Section titled “Array mapping”# Default iterator (item.*)lines: source.line_items[] -> each: sku: item.sku qty: item.quantity
# Named iteratorlines: source.line_items[] -> each as line: sku: line.sku qty: line.quantity
# Nested arrays (requires "as <name>" at depth > 0)orders: source.orders[] -> each as order: id: order.id items: order.lines[] -> each as line: order_ref: order.id # Parent scope access sku: line.skuRules:
- Top-level array maps can omit
as <name>(defaults toitem.*) - Nested levels must use
as <name> - Reserved names:
source,result,env,item - Maximum nesting depth: 5 levels
- Inner loops inherit all outer loop scopes
Mapping reference shorthand
Section titled “Mapping reference shorthand”mapping: use standard-order-mappingReferences a shared/reusable mapping definition.
Data context prefixes
Section titled “Data context prefixes”| Prefix | Description |
|---|---|
source. | Fields from the inbound payload |
result. | Result of a call step (via store_as) |
item. | Default iterator variable in array maps |
env. | Environment variables |
Condition expressions
Section titled “Condition expressions”Used in when: fields on steps, switch cases, and fan-out targets.
Grammar
Section titled “Grammar”expression = or_expror_expr = and_expr ('or' and_expr)*and_expr = unary_expr ('and' unary_expr)*unary_expr = 'not' unary_expr | primaryprimary = '(' expression ')' | atomatom = tagged_call | comparisonOperators
Section titled “Operators”| Type | Example |
|---|---|
| Comparison | source.amount > 1000 |
| Equality | source.status == "active" |
| Inequality | source.status != "draft" |
| Tagged | tagged(high-value) |
| AND | source.country == "FI" and source.amount > 1000 |
| OR | source.type == "order" or source.type == "return" |
| NOT | not tagged(processed) |
| Grouped | (source.type == "order" or source.type == "return") and source.status != "draft" |
Precedence (highest to lowest): not → and → or
Keywords and, or, not are only matched at word boundaries.
Fan-out config
Section titled “Fan-out config”type: fan-out
fan_out: routing_key: <string> # Required. Expression (e.g., "source.client_id"). targets: # Required. Min 1 target. - name: <string> # Required. instance: <string> # Required. Connector instance name. endpoint: <string> # Required. API endpoint. method: POST # Default: POST. mapping: <MappingBlock> # Optional. Per-target mapping. when: <condition> # Optional. Routing condition.Example:
flow: payroll-distributionversion: 1type: fan-outsource: connector: payroll-inbound trigger: webhookfan_out: routing_key: source.client_id targets: - name: sap-hr instance: sap-production endpoint: /api/hr/payroll method: POST when: routing_key == "acme-corp" mapping: employee_id: source.employee.id gross_pay: source.amounts.gross | decimal(2) - name: default-hr instance: bamboohr-production endpoint: /api/payroll method: POST mapping: employeeId: source.employee.idon_error: retry: 3x exponential(30s) then: dead-letterDedup config
Section titled “Dedup config”Deduplicates messages by key within a time window. Placed under source:.
source: dedup: key: <string | string[]> # Required. Field path(s). window: <duration> # Required. Duration string.Single key:
dedup: key: source.id window: 24hComposite key:
dedup: key: [source.entity_id, source.event_type] window: 1hOrdering config
Section titled “Ordering config”Ensures in-order processing per key. Placed under source:.
source: ordering: key: <string | string[]> # Required. Field path(s). mode: strict | best-effort # Default: strict. lock_ttl: <duration> # Optional. Default: 60s.| Mode | Behavior |
|---|---|
strict | Guarantees in-order delivery. Messages queued per key. |
best-effort | Attempts ordering without blocking on lock contention. |
When both dedup and ordering are configured, dedup runs first, then ordering lock is acquired.
Aggregate config
Section titled “Aggregate config”Buffers messages and emits aggregated output. Placed under source:. At least one of window or count must be specified.
source: aggregate: window: <duration> # Optional. Time window. count: <integer> # Optional. Flush after N messages. group_by: <string | string[]> # Optional. Field path(s) for grouping. output: # Required. Aggregate expressions. <field>: <aggregate_expr> max_buffer: <integer> # Default: 10000.Aggregate functions
Section titled “Aggregate functions”| Function | Description |
|---|---|
count() | Count of messages in buffer |
sum(field.path) | Sum of numeric field values |
min(field.path) | Minimum value |
max(field.path) | Maximum value |
first(field.path) | Value from first message |
last(field.path) | Value from last message |
collect() | Collect all messages into array |
collect_field(field.path) | Collect a specific field from all messages |
group.key | Reference to the current group key value |
Example:
source: connector: shopify trigger: webhook event: order.created aggregate: window: 5m count: 100 group_by: source.customer_id output: customer_id: group.key order_count: count() total_amount: sum(source.total) min_amount: min(source.total) max_amount: max(source.total) first_order: first(source.order_id) last_order: last(source.order_id) all_orders: collect() order_ids: collect_field(source.order_id) max_buffer: 5000Error handling
Section titled “Error handling”on_error
Section titled “on_error”on_error: retry: <retry_string> # Optional. fallback: <string> # Optional. Fallback action name. then: <string> # Optional. Action after retries: "dead-letter", "alert". ref: <string> # Optional. Reference to shared error policy.Retry string format
Section titled “Retry string format”<maxAttempts>x <strategy>(<baseDelay>)| Field | Description |
|---|---|
maxAttempts | Integer number of retry attempts |
strategy | exponential, linear, or fixed |
baseDelay | Duration with optional unit: 30s, 1m |
Examples:
on_error: retry: 3x exponential(30s) # 3 retries, exponential, 30s base then: dead-letter
on_error: retry: 5x exponential(60s) # 5 retries, 60s base then: dead-letter
on_error: retry: 2x fixed(10s) # 2 retries, fixed 10s delay then: alertDefault (when retry string doesn’t match pattern): 3 attempts, exponential, 30s base.
on_step_failure (saga)
Section titled “on_step_failure (saga)”on_step_failure: strategy: compensate-previous # Required. then: alert(ops-team) # Optional.PII policies
Section titled “PII policies”Defined under policies: as an array. Three built-in PII policy types:
pii-mask
Section titled “pii-mask”Masks specified field paths (replaces with ***).
policies: - pii-mask: - customer.email - customer.phonepii-encrypt
Section titled “pii-encrypt”Encrypts specified field paths using AES-256-GCM.
policies: - pii-encrypt: - customer.ssn - payment.card_numberpii-strip
Section titled “pii-strip”Strips fields for specific targets (completely removed from payload).
policies: - pii-strip: fields: - customer.ssn - payment.card_number targets: - analytics - external-partnerArbitrary policies
Section titled “Arbitrary policies”The schema accepts arbitrary policy records for extensibility:
policies: - audit: fullLimits config
Section titled “Limits config”limits: max_payload_size: <size_string> # Default: 10MB. max_array_size: <integer> # Max items in an array. chunk_size: <integer> # Auto-chunking size.Size string format
Section titled “Size string format”Pattern: <number><unit> where unit is b, kb, mb, or gb.
limits: max_payload_size: 5mb max_array_size: 1000 chunk_size: 100Compensate block (saga)
Section titled “Compensate block (saga)”Defines rollback actions for saga-flow call steps. Params are parsed as mapping expressions.
- name: reserve-inventory call: inventory-api action: reserve params: items: source.line_items compensate: action: release params: reservation_id: result.reservation_idDuration strings
Section titled “Duration strings”Used in dedup.window, ordering.lock_ttl, aggregate.window, wait, timeout.
Pattern: <number><unit>
| Unit | Meaning | Example |
|---|---|---|
s | Seconds | 30s |
m | Minutes | 5m |
h | Hours | 1h |
d | Days | 7d |
Complete flow examples
Section titled “Complete flow examples”Tier 1: Simple flow
Section titled “Tier 1: Simple flow”flow: shopify-orders-to-erpversion: 2source: connector: shopify trigger: webhook event: orders/createtarget: connector: erp-system endpoint: /api/sales-orders method: POSTmapping: order_id: source.id total: source.total_price | decimal(2) customer_email: source.customer.email | lowercase currency: source.currency | uppercase processed_at: '"" | now'on_error: retry: 3x exponential(30s) then: dead-letterTier 2: Multi-step flow
Section titled “Tier 2: Multi-step flow”flow: order-processing-pipelineversion: 1source: connector: shopify trigger: webhook event: orders/createsteps: - name: classify-order when: source.total_price > 1000 tag: high-value
- name: enrich-customer call: crm-api endpoint: /customers/lookup params: email: source.customer.email store_as: customer_data
- name: route-to-warehouse switch: - when: source.shipping_address.country == "FI" target: warehouse-eu - default: true target: warehouse-international
- name: notify-vip when: tagged(high-value) channel: "#big-orders" message: "High-value order from {{source.customer.name}}"Tier 3: Saga flow
Section titled “Tier 3: Saga flow”flow: fulfillment-sagaversion: 1type: sagasource: connector: order-service trigger: webhook event: order/confirmedsteps: - name: reserve-inventory call: inventory-api action: reserve params: items: source.line_items compensate: action: release params: reservation_id: result.reservation_id
- name: charge-payment call: payment-api action: charge params: amount: source.total_price compensate: action: refund params: charge_id: result.charge_id
- name: create-shipment call: shipping-api action: create params: address: source.shipping_addresson_step_failure: strategy: compensate-previous then: alert(ops-team)on_error: retry: 2x exponential(60s) then: dead-letterTier 4: Fan-out flow
Section titled “Tier 4: Fan-out flow”flow: payroll-distributionversion: 1type: fan-outsource: connector: payroll-inbound trigger: webhookfan_out: routing_key: source.client_id targets: - name: sap-hr instance: sap-production endpoint: /api/hr/payroll method: POST when: routing_key == "acme-corp" mapping: employee_id: source.employee.id gross_pay: source.amounts.gross | decimal(2) - name: default-hr instance: bamboohr-production endpoint: /api/payroll method: POST mapping: employeeId: source.employee.idon_error: retry: 3x exponential(30s) then: dead-letterWith dedup, ordering, and aggregation
Section titled “With dedup, ordering, and aggregation”flow: aggregate-ordersversion: 1source: connector: shopify trigger: webhook event: orders/create dedup: key: source.id window: 24h ordering: key: source.customer_id mode: strict aggregate: window: 5m group_by: source.customer_id output: customer_id: group.key order_count: count() total: sum(source.total_price) order_ids: collect_field(source.id)target: connector: crm-system endpoint: /api/customer-summary method: POSTmapping: customer_id: source.customer_id order_count: source.order_count total: source.total | decimal(2)on_error: retry: 3x exponential(30s) then: dead-letterWith PII policies
Section titled “With PII policies”flow: order-to-analyticsversion: 1source: connector: order-service trigger: webhooktarget: connector: analytics-warehouse endpoint: /api/events method: POSTmapping: order_id: source.id amount: source.total_price | decimal(2) customer_email: source.customer.email customer_ssn: source.customer.ssnpolicies: - pii-mask: - customer.email - pii-encrypt: - customer.ssn - pii-strip: fields: - customer.ssn targets: - analytics-warehouselimits: max_payload_size: 5mbon_error: retry: 3x exponential(30s) then: dead-letterExecution pipeline order
Section titled “Execution pipeline order”When compiled, the runtime executes operations in this order:
- receive — always first
- dedup — if configured (before transforms)
- ordering — if configured (after dedup)
- aggregate — if configured (after ordering)
- Then one of:
- Simple flow: transform → deliver
- Multi-step flow: sequence of tag, call, store, condition, deliver, notify, emit, wait operations
- Fan-out flow: fan-out