diff --git a/content/docs/guides/metadata/flow.mdx b/content/docs/guides/metadata/flow.mdx index 6f2bbe444..306977bde 100644 --- a/content/docs/guides/metadata/flow.mdx +++ b/content/docs/guides/metadata/flow.mdx @@ -349,6 +349,36 @@ is a separate, larger capability ([ADR-0037](https://github.com/objectstack-ai/f Track B); the aggregating node covers the common parallel/batch-approval demand without it. Worked example: `showcase_invoice_signoff` in the showcase app. +### Batch approval — a `map` node, one item at a time + +"Sign off **every** task in the release, in turn" is a **`map` node**: it runs a +per-item subflow for each element of a collection, **sequentially**. Unlike +`loop` (whose body runs synchronously and cannot pause), each item is a full +child run, so the per-item subflow may **durably pause** on its own `approval` — +the map waits for that item's decision, then moves to the next. + +```typescript +{ + id: 'signoffs', + type: 'map', + label: 'Sign off each task', + config: { + collection: '{items}', // array (e.g. the release's tasks) + iteratorVariable: 'task', + flowName: 'one_task_signoff', // the per-item subflow (it may pause) + itemObject: 'showcase_task', // when an item is a record, it becomes the child's `$record` + outputVariable: 'signoffResults',// each item's subflow output, collected in order + }, +} +``` + +The run holds a **single program counter** the whole time: only one item's +approval is open at any moment; when it is decided the engine **re-enters** the +map node to start the next item. v1 is sequential and fail-fast (the first item +whose subflow fails fails the map). Concurrent fan-out (all items at once) is the +larger [ADR-0037](https://github.com/objectstack-ai/framework/blob/main/docs/adr/0037-token-scope-tree-execution.md) +Track B work. Worked example: `showcase_release_signoff` → `showcase_one_task_signoff`. + ### Nested pause — pausing inside a subflow A pausing node inside a `subflow` suspends the **whole chain as linked runs**: diff --git a/examples/app-showcase/src/flows/index.ts b/examples/app-showcase/src/flows/index.ts index 5b965a31b..c5f47c682 100644 --- a/examples/app-showcase/src/flows/index.ts +++ b/examples/app-showcase/src/flows/index.ts @@ -917,11 +917,113 @@ export const InvoiceDualSignoffFlow = defineFlow({ ], }); +/** + * One Task Sign-off — a reusable per-item **approval subflow**, invoked once + * per task by {@link ReleaseSignoffFlow}'s `map` node. The mapped task is + * exposed to this subflow as its record, so the `approval` node opens against + * *that* task. + */ +export const OneTaskSignoffSubflow = defineFlow({ + name: 'showcase_one_task_signoff', + label: 'One Task Sign-off (per-item subflow)', + description: 'Reusable subflow: requests sign-off on a single task. Invoked per item by the batch sign-off map.', + type: 'autolaunched', + template: true, + variables: [{ name: 'decision', type: 'text', isOutput: true }], + nodes: [ + { id: 'start', type: 'start', label: 'Start' }, + { + id: 'review', + type: 'approval', + label: 'Task Sign-off', + config: { + approvers: [{ type: 'role', value: 'manager' }], + behavior: 'first_response', + lockRecord: false, + }, + }, + { id: 'mark_ok', type: 'assignment', label: 'Approved', config: { assignments: { decision: 'approved' } } }, + { id: 'mark_no', type: 'assignment', label: 'Rejected', config: { assignments: { decision: 'rejected' } } }, + { id: 'end_ok', type: 'end', label: 'Signed Off' }, + { id: 'end_no', type: 'end', label: 'Declined' }, + ], + edges: [ + { id: 'e1', source: 'start', target: 'review' }, + { id: 'e2', source: 'review', target: 'mark_ok', label: 'approve' }, + { id: 'e3', source: 'review', target: 'mark_no', label: 'reject' }, + { id: 'e4', source: 'mark_ok', target: 'end_ok' }, + { id: 'e5', source: 'mark_no', target: 'end_no' }, + ], +}); + +/** + * Release Sign-off — the worked **batch-approval** example (ADR-0037 Track A2: + * the sequential `map` / multi-instance node). + * + * "Every task in the release must be signed off, one at a time" is a **single + * `map` node** over the task list. For each task it runs the + * {@link OneTaskSignoffSubflow}, which **pauses** on its `approval`; when that + * task is decided, the map **re-enters** and moves to the next task — the run + * holds a single program counter throughout (no token tree). The per-task + * decisions are collected into `signoffResults`, then the owner is notified. + * + * Trigger it with the tasks to sign off, e.g.: + * POST /api/v1/automation/showcase_release_signoff/trigger + * { "params": { "items": [ {task record}, {task record} ] } } + * then decide each task's approval in turn via /api/v1/approvals. + */ +export const ReleaseSignoffFlow = defineFlow({ + name: 'showcase_release_signoff', + label: 'Release Sign-off (batch approval / map)', + description: 'Signs off every task in a release one at a time via a map node — demonstrates batch approval (ADR-0037 Track A2).', + type: 'autolaunched', + variables: [ + { name: 'items', type: 'list', isInput: true }, + { name: 'signoffResults', type: 'list', isOutput: true }, + ], + nodes: [ + { id: 'start', type: 'start', label: 'Start' }, + { + id: 'signoffs', + type: 'map', + label: 'Sign off each task', + config: { + collection: '{items}', + iteratorVariable: 'task', + flowName: 'showcase_one_task_signoff', + itemObject: 'showcase_task', + outputVariable: 'signoffResults', + }, + }, + { + id: 'notify_done', + type: 'notify', + label: 'Notify: Release Cleared', + config: { + topic: 'release.signoff', + recipients: ['admin@objectos.ai'], + channels: ['inbox'], + severity: 'info', + title: 'Release sign-off complete', + message: 'Every task in the release has been signed off.', + }, + }, + { id: 'end', type: 'end', label: 'End' }, + ], + edges: [ + { id: 'e1', source: 'start', target: 'signoffs' }, + { id: 'e2', source: 'signoffs', target: 'notify_done' }, + { id: 'e3', source: 'notify_done', target: 'end' }, + ], +}); + export const allFlows = [ TaskCompletedFlow, ReassignWizardFlow, BudgetApprovalFlow, InvoiceDualSignoffFlow, + OneTaskSignoffSubflow, + ReleaseSignoffFlow, TaskCompletedSlackFlow, TaskAssignedNotifyFlow, ScheduledDigestFlow, diff --git a/packages/services/service-automation/src/builtin/index.ts b/packages/services/service-automation/src/builtin/index.ts index b516a1bbc..1ef6f674e 100644 --- a/packages/services/service-automation/src/builtin/index.ts +++ b/packages/services/service-automation/src/builtin/index.ts @@ -41,6 +41,7 @@ import { registerConnectorNodes } from './connector-nodes.js'; import { registerNotifyNode } from './notify-node.js'; import { registerWaitNode } from './wait-node.js'; import { registerSubflowNode } from './subflow-node.js'; +import { registerMapNode } from './map-node.js'; export { registerLogicNodes } from './logic-nodes.js'; export { registerLoopNode } from './loop-node.js'; @@ -53,6 +54,7 @@ export { registerConnectorNodes } from './connector-nodes.js'; export { registerNotifyNode } from './notify-node.js'; export { registerWaitNode, parseIsoDuration, rearmSuspendedWaitTimers } from './wait-node.js'; export { registerSubflowNode } from './subflow-node.js'; +export { registerMapNode } from './map-node.js'; /** * Seed every built-in node executor into the engine. Called by @@ -71,6 +73,7 @@ export function installBuiltinNodes(engine: AutomationEngine, ctx: PluginContext registerNotifyNode(engine, ctx); registerWaitNode(engine, ctx); registerSubflowNode(engine, ctx); + registerMapNode(engine, ctx); const types = engine.getRegisteredNodeTypes(); ctx.logger.info( diff --git a/packages/services/service-automation/src/builtin/map-node.test.ts b/packages/services/service-automation/src/builtin/map-node.test.ts new file mode 100644 index 000000000..8a245faa4 --- /dev/null +++ b/packages/services/service-automation/src/builtin/map-node.test.ts @@ -0,0 +1,166 @@ +// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. + +import { describe, it, expect, beforeEach } from 'vitest'; +import { AutomationEngine } from '../engine.js'; +import type { NodeExecutor } from '../engine.js'; +import { InMemorySuspendedRunStore } from '../suspended-run-store.js'; +import { registerMapNode } from './map-node.js'; + +function silentLogger() { + return { info() {}, warn() {}, error() {}, debug() {}, child() { return silentLogger(); } } as any; +} +function ctx() { + return { logger: silentLogger(), getService() { throw new Error('none'); } } as any; +} + +/** + * Wire an engine with the `map` node, a per-item child flow, and a parent flow + * that maps over `{items}`. `childNodes` is the child flow's middle node(s) + * (between start and end) — pass a `pauser` to exercise per-item durable pause. + */ +function setup(childNodes: Array<{ id: string; type: string }>, captured: unknown[]) { + const engine = new AutomationEngine(silentLogger()); + registerMapNode(engine, ctx()); + + // Child marker: copies the mapped item (passed as params.val) to the child's + // output variable `result`. + engine.registerNodeExecutor({ + type: 'itemmark', + async execute(_node, variables, context) { + variables.set('result', (context as any)?.params?.val); + return { success: true }; + }, + } as NodeExecutor); + // Pauses the child (stands in for an approval / screen / wait). + engine.registerNodeExecutor({ + type: 'pauser', + async execute() { return { success: true, suspend: true }; }, + } as NodeExecutor); + // Fails the child terminally. + engine.registerNodeExecutor({ + type: 'failer', + async execute() { return { success: false, error: 'boom' }; }, + } as NodeExecutor); + // Parent checker after the map node: captures the collected results array. + engine.registerNodeExecutor({ + type: 'mapcheck', + async execute(_node, variables) { + captured.push(variables.get('mapped')); + return { success: true }; + }, + } as NodeExecutor); + + const seq = [{ id: 'cs', type: 'start' }, ...childNodes, { id: 'ce', type: 'end' }]; + engine.registerFlow('child_flow', { + name: 'child_flow', + label: 'Child', + type: 'autolaunched', + variables: [{ name: 'result', type: 'text', isOutput: true }], + nodes: seq.map(n => ({ label: n.id, ...n })), + edges: seq.slice(0, -1).map((n, i) => ({ id: `c${i}`, source: n.id, target: seq[i + 1].id })), + } as never); + + engine.registerFlow('parent_flow', { + name: 'parent_flow', + label: 'Parent', + type: 'autolaunched', + variables: [{ name: 'items', type: 'list', isInput: true }], + nodes: [ + { id: 'ps', type: 'start', label: 'Start' }, + { + id: 'do_map', type: 'map', label: 'For each', + config: { flowName: 'child_flow', collection: '{items}', iteratorVariable: 'item', input: { val: '{item}' }, outputVariable: 'mapped' }, + }, + { id: 'chk', type: 'mapcheck', label: 'Check' }, + { id: 'pe', type: 'end', label: 'End' }, + ], + edges: [ + { id: 'p1', source: 'ps', target: 'do_map' }, + { id: 'p2', source: 'do_map', target: 'chk' }, + { id: 'p3', source: 'chk', target: 'pe' }, + ], + } as never); + + return engine; +} + +const childRunId = (engine: AutomationEngine) => + engine.listSuspendedRuns().find(r => r.flowName === 'child_flow')?.runId; + +describe('map node executor (sequential multi-instance)', () => { + let captured: unknown[]; + beforeEach(() => { captured = []; }); + + it('runs a synchronous per-item subflow over the collection, collecting results in order', async () => { + const engine = setup([{ id: 'cm', type: 'itemmark' }], captured); + const result = await engine.execute('parent_flow', { params: { items: ['a', 'b', 'c'] } }); + + expect(result.success).toBe(true); + expect(result.status).toBeUndefined(); // ran to completion, no pause + expect(captured).toEqual([[{ result: 'a' }, { result: 'b' }, { result: 'c' }]]); + expect(engine.listSuspendedRuns()).toHaveLength(0); + }); + + it('completes immediately on an empty collection', async () => { + const engine = setup([{ id: 'cm', type: 'itemmark' }], captured); + const result = await engine.execute('parent_flow', { params: { items: [] } }); + expect(result.success).toBe(true); + expect(captured).toEqual([[]]); + }); + + it('drives items ONE AT A TIME when each subflow pauses, re-entering on resume', async () => { + const engine = setup([{ id: 'cp', type: 'pauser' }, { id: 'cm', type: 'itemmark' }], captured); + + // Item 0 pauses → the parent suspends at the map node. + const r0 = await engine.execute('parent_flow', { params: { items: ['a', 'b', 'c'] } }); + expect(r0.status).toBe('paused'); + const parent = engine.listSuspendedRuns().find(x => x.flowName === 'parent_flow')!; + expect(parent.nodeId).toBe('do_map'); + expect(parent.correlation).toMatch(/^map:/); + expect(captured).toHaveLength(0); // not done yet + + // Resume item 0's child → bubbles → re-enters map → item 1 pauses. + const id0 = childRunId(engine)!; + const after0 = await engine.resume(id0); + expect(after0.success).toBe(true); + expect(engine.listSuspendedRuns().some(x => x.flowName === 'parent_flow')).toBe(true); // still paused + expect(captured).toHaveLength(0); + + // Resume item 1 → item 2 pauses. + await engine.resume(childRunId(engine)!); + expect(captured).toHaveLength(0); + + // Resume item 2 → all done → parent continues past the map node. + await engine.resume(childRunId(engine)!); + expect(captured).toEqual([[{ result: 'a' }, { result: 'b' }, { result: 'c' }]]); + expect(engine.listSuspendedRuns()).toHaveLength(0); + }); + + it('fails the map fast when an item subflow fails', async () => { + const engine = setup([{ id: 'cf', type: 'failer' }], captured); + const result = await engine.execute('parent_flow', { params: { items: ['a', 'b'] } }); + expect(result.success).toBe(false); + expect(result.error).toMatch(/item 0.*failed/i); + expect(captured).toEqual([]); // downstream never ran + }); + + it('survives a process restart mid-map: resume on a fresh engine continues the sequence', async () => { + const store = new InMemorySuspendedRunStore(); + const engine = setup([{ id: 'cp', type: 'pauser' }, { id: 'cm', type: 'itemmark' }], captured); + engine.setSuspendedRunStore(store); + + await engine.execute('parent_flow', { params: { items: ['a', 'b'] } }); + const id0 = childRunId(engine)!; + expect((await store.list()).length).toBe(2); // parent (map) + child item 0 + + // "Restart": a fresh engine sharing only the durable store + the registry. + const capturedB: unknown[] = []; + const engineB = setup([{ id: 'cp', type: 'pauser' }, { id: 'cm', type: 'itemmark' }], capturedB); + engineB.setSuspendedRunStore(store); + + await engineB.resume(id0); // item 0 done → item 1 pauses + await engineB.resume(childRunId(engineB)!); // item 1 done → all done + expect(capturedB).toEqual([[{ result: 'a' }, { result: 'b' }]]); + expect(await store.list()).toHaveLength(0); + }); +}); diff --git a/packages/services/service-automation/src/builtin/map-node.ts b/packages/services/service-automation/src/builtin/map-node.ts new file mode 100644 index 000000000..845dcb6c5 --- /dev/null +++ b/packages/services/service-automation/src/builtin/map-node.ts @@ -0,0 +1,155 @@ +// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. + +import type { PluginContext } from '@objectstack/core'; +import { defineActionDescriptor } from '@objectstack/spec/automation'; +import type { AutomationContext } from '@objectstack/spec/contracts'; +import type { AutomationEngine } from '../engine.js'; +import { interpolate } from './template.js'; + +/** Hard cap on map fan-out — turns a runaway collection into a clean error. */ +const MAX_MAP_ITEMS = 10_000; + +/** + * `map` built-in node — **sequential multi-instance** (ADR-0037 Track A2). + * + * Runs a per-item **subflow** for each element of a collection, **one item at a + * time**, and continues once every item's subflow has completed — collecting + * each result into `config.outputVariable`. Unlike `loop` (whose body region + * runs synchronously and cannot pause), each item here is a full child run, so + * the per-item subflow **may durably pause** (an `approval` / `screen` / + * `wait`). This is the worked "batch approval" shape: *approve each item in + * turn, then continue.* + * + * Mechanism (no token tree — one program counter, ADR-0037): + * - The node tracks its progress in flow variables (`${nodeId}.$mapState`). + * - For item *k* it invokes `config.flowName` via `engine.execute`, tagging the + * child run with `$parentRunId` + `$parentMapNode` so the engine knows to + * bubble the child's completion **back into this node** (not past it). + * - If the child completes synchronously, the result is recorded and the loop + * advances inline. If the child **pauses**, the parent suspends at this node + * (`correlation: 'map:'`); when the child later completes, the + * engine **re-enters** this node (it reads `$mapItemOutput` / `$mapItemDone`, + * records the item, and starts the next). + * + * v1 is **sequential and fail-fast**: items run in order; the first item whose + * subflow fails fails the map. Concurrent fan-out + partial aggregation is a + * deliberate follow-up (ADR-0037 — needs N:1 aggregation + serialization). + */ +export function registerMapNode(engine: AutomationEngine, ctx: PluginContext): void { + engine.registerNodeExecutor({ + type: 'map', + descriptor: defineActionDescriptor({ + type: 'map', + version: '1.0.0', + name: 'Map', + description: 'Run a per-item subflow for each element of a collection, one at a time (each item may pause).', + icon: 'list-check', + category: 'logic', + source: 'builtin', + // Each item's subflow may pause, so the map suspends and resumes per item. + supportsPause: true, + isAsync: true, + }), + async execute(node, variables, context) { + const cfg = (node.config ?? {}) as Record; + const flowName = + typeof cfg.flowName === 'string' ? cfg.flowName : typeof cfg.flow === 'string' ? cfg.flow : undefined; + if (!flowName) { + return { success: false, error: `map '${node.id}': config.flowName (the per-item subflow) is required` }; + } + + const iteratorVariable = + typeof cfg.iteratorVariable === 'string' && cfg.iteratorVariable ? cfg.iteratorVariable : 'item'; + const indexVariable = + typeof cfg.indexVariable === 'string' && cfg.indexVariable ? cfg.indexVariable : undefined; + const outVar = + typeof cfg.outputVariable === 'string' && cfg.outputVariable ? cfg.outputVariable : undefined; + + // Resolve the collection (template / bare var / already-an-array). + const rawCollection = cfg.collection; + let collection: unknown; + if (Array.isArray(rawCollection)) { + collection = rawCollection; + } else if (typeof rawCollection === 'string') { + collection = interpolate(rawCollection, variables, context ?? ({} as AutomationContext)); + if (collection == null && variables.has(rawCollection)) collection = variables.get(rawCollection); + } + if (!Array.isArray(collection)) { + return { success: false, error: `map '${node.id}': collection '${String(rawCollection)}' did not resolve to an array` }; + } + if (collection.length > MAX_MAP_ITEMS) { + return { success: false, error: `map '${node.id}': collection length ${collection.length} exceeds the ${MAX_MAP_ITEMS} cap` }; + } + + // ── Progress state, carried across re-entries in the variable map. ── + const stateKey = `${node.id}.$mapState`; + const state = (variables.get(stateKey) as { started: number; results: unknown[] } | undefined) ?? { + started: 0, + results: [], + }; + + // Re-entry: the previously-started item's subflow just completed (the + // engine bubbled its output here). Record it and clear the handoff vars. + if (variables.get(`${node.id}.$mapItemDone`) === true) { + state.results.push(variables.get(`${node.id}.$mapItemOutput`) ?? null); + variables.delete(`${node.id}.$mapItemDone`); + variables.delete(`${node.id}.$mapItemOutput`); + } + + const parentRunId = variables.get('$runId'); + + // Drive items in order. Synchronous items advance inline; a pausing item + // suspends the run and is resumed via re-entry. + while (state.started < collection.length) { + const idx = state.started; + const item = collection[idx]; + variables.set(iteratorVariable, item); + if (indexVariable) variables.set(indexVariable, idx); + + const rawInput = (cfg.input && typeof cfg.input === 'object' ? cfg.input : {}) as Record; + const params = interpolate(rawInput, variables, context ?? ({} as AutomationContext)) as Record; + + // When the mapped item IS a record (has an id), expose it as the + // child's `record` + `object` so a per-item `approval` / `update_record` + // targets that item — the natural "approve each row" shape. Otherwise + // the item is just data, passed via `params`. + const itemIsRecord = item != null && typeof item === 'object' && typeof (item as any).id === 'string'; + const itemObject = typeof cfg.itemObject === 'string' ? cfg.itemObject : (context as any)?.object; + + const childContext = { + ...(context ?? {}), + params, + ...(itemIsRecord ? { record: item, object: itemObject } : {}), + ...(parentRunId != null + ? { $parentRunId: String(parentRunId), $parentMapNode: node.id } + : {}), + } as AutomationContext; + + const child = await engine.execute(flowName, childContext); + + if (child.status === 'paused') { + if (!child.runId) { + return { success: false, error: `map '${node.id}': item ${idx} paused without a run id — cannot link the runs` }; + } + // Mark this item started and suspend; the engine re-enters on bubble. + state.started = idx + 1; + variables.set(stateKey, state); + return { success: true, suspend: true, correlation: `map:${child.runId}` }; + } + if (!child.success) { + return { success: false, error: `map '${node.id}': item ${idx} (subflow '${flowName}') failed: ${child.error ?? 'unknown error'}` }; + } + // Synchronous completion — record and advance. + state.started = idx + 1; + state.results.push(child.output ?? null); + } + + // All items done. + variables.set(stateKey, state); + if (outVar) variables.set(outVar, state.results); + return { success: true, output: { results: state.results, count: state.results.length } }; + }, + }); + + ctx.logger.info('[Map Node] 1 built-in node executor registered'); +} diff --git a/packages/services/service-automation/src/engine.ts b/packages/services/service-automation/src/engine.ts index 778a30949..2dbaccf52 100644 --- a/packages/services/service-automation/src/engine.ts +++ b/packages/services/service-automation/src/engine.ts @@ -1141,7 +1141,16 @@ export class AutomationEngine implements IAutomationService { const context = run.context; try { - await this.traverseNext(node, flow, variables, context, steps, signal?.branchLabel); + // ── Map re-entry (sequential multi-instance, ADR-0037 A2). + // A run paused at a `map` node (correlation `map:`) + // does NOT continue past the node on resume — it RE-RUNS the + // node so the executor can record the just-completed unit and + // start the next item. The default path continues past the node. + if (typeof run.correlation === 'string' && run.correlation.startsWith('map:')) { + await this.executeNode(node, flow, variables, context, steps); + } else { + await this.traverseNext(node, flow, variables, context, steps, signal?.branchLabel); + } // Collect output variables const output: Record = {}; @@ -1264,10 +1273,17 @@ export class AutomationEngine implements IAutomationService { * caller who resumed the child. */ private async bubbleToParent(run: SuspendedRun, output: Record): Promise { - const parentRunId = (run.context as Record | undefined)?.$parentRunId; + const ctx = run.context as Record | undefined; + const parentRunId = ctx?.$parentRunId; if (typeof parentRunId !== 'string' || !parentRunId) return; try { - const sig = this.buildSubflowResumeSignal(run.context, output); + // A `map` child (ADR-0037 A2): hand the unit's output to the map + // node + flag the completion, so on re-entry it records this item + // and starts the next. A plain subflow child uses the 1:1 mapping. + const mapNode = ctx?.$parentMapNode; + const sig = typeof mapNode === 'string' && mapNode + ? { variables: { [`${mapNode}.$mapItemOutput`]: output ?? null, [`${mapNode}.$mapItemDone`]: true } } + : this.buildSubflowResumeSignal(run.context, output); const parentRes = await this.resumeInternal(parentRunId, sig, false); if (!parentRes.success) { this.logger.warn( diff --git a/packages/spec/src/automation/flow.zod.ts b/packages/spec/src/automation/flow.zod.ts index 7ca9e97c4..11885f2fd 100644 --- a/packages/spec/src/automation/flow.zod.ts +++ b/packages/spec/src/automation/flow.zod.ts @@ -36,6 +36,7 @@ export const FlowNodeAction = z.enum([ 'screen', // Screen / User-Input Element 'wait', // Delay/Sleep 'subflow', // Call another flow + 'map', // Sequential multi-instance — per-item subflow, each may pause (ADR-0037 A2) 'connector_action', // Zapier-style integration action 'parallel_gateway', // BPMN Parallel Gateway — AND-split (all outgoing branches execute concurrently) 'join_gateway', // BPMN Join Gateway — AND-join (waits for all incoming branches to complete)