Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions content/docs/guides/metadata/flow.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,36 @@ is a separate, larger capability ([ADR-0037](https://github.com/objectstack-ai/f
Track B); the aggregating node covers the common parallel/batch-approval demand
without it. Worked example: `showcase_invoice_signoff` in the showcase app.

### Batch approval — a `map` node, one item at a time

"Sign off **every** task in the release, in turn" is a **`map` node**: it runs a
per-item subflow for each element of a collection, **sequentially**. Unlike
`loop` (whose body runs synchronously and cannot pause), each item is a full
child run, so the per-item subflow may **durably pause** on its own `approval` —
the map waits for that item's decision, then moves to the next.

```typescript
{
id: 'signoffs',
type: 'map',
label: 'Sign off each task',
config: {
collection: '{items}', // array (e.g. the release's tasks)
iteratorVariable: 'task',
flowName: 'one_task_signoff', // the per-item subflow (it may pause)
itemObject: 'showcase_task', // when an item is a record, it becomes the child's `$record`
outputVariable: 'signoffResults',// each item's subflow output, collected in order
},
}
```

The run holds a **single program counter** the whole time: only one item's
approval is open at any moment; when it is decided the engine **re-enters** the
map node to start the next item. v1 is sequential and fail-fast (the first item
whose subflow fails fails the map). Concurrent fan-out (all items at once) is the
larger [ADR-0037](https://github.com/objectstack-ai/framework/blob/main/docs/adr/0037-token-scope-tree-execution.md)
Track B work. Worked example: `showcase_release_signoff` → `showcase_one_task_signoff`.

### Nested pause — pausing inside a subflow

A pausing node inside a `subflow` suspends the **whole chain as linked runs**:
Expand Down
102 changes: 102 additions & 0 deletions examples/app-showcase/src/flows/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -917,11 +917,113 @@ export const InvoiceDualSignoffFlow = defineFlow({
],
});

/**
* One Task Sign-off — a reusable per-item **approval subflow**, invoked once
* per task by {@link ReleaseSignoffFlow}'s `map` node. The mapped task is
* exposed to this subflow as its record, so the `approval` node opens against
* *that* task.
*/
export const OneTaskSignoffSubflow = defineFlow({
name: 'showcase_one_task_signoff',
label: 'One Task Sign-off (per-item subflow)',
description: 'Reusable subflow: requests sign-off on a single task. Invoked per item by the batch sign-off map.',
type: 'autolaunched',
template: true,
variables: [{ name: 'decision', type: 'text', isOutput: true }],
nodes: [
{ id: 'start', type: 'start', label: 'Start' },
{
id: 'review',
type: 'approval',
label: 'Task Sign-off',
config: {
approvers: [{ type: 'role', value: 'manager' }],
behavior: 'first_response',
lockRecord: false,
},
},
{ id: 'mark_ok', type: 'assignment', label: 'Approved', config: { assignments: { decision: 'approved' } } },
{ id: 'mark_no', type: 'assignment', label: 'Rejected', config: { assignments: { decision: 'rejected' } } },
{ id: 'end_ok', type: 'end', label: 'Signed Off' },
{ id: 'end_no', type: 'end', label: 'Declined' },
],
edges: [
{ id: 'e1', source: 'start', target: 'review' },
{ id: 'e2', source: 'review', target: 'mark_ok', label: 'approve' },
{ id: 'e3', source: 'review', target: 'mark_no', label: 'reject' },
{ id: 'e4', source: 'mark_ok', target: 'end_ok' },
{ id: 'e5', source: 'mark_no', target: 'end_no' },
],
});

/**
* Release Sign-off — the worked **batch-approval** example (ADR-0037 Track A2:
* the sequential `map` / multi-instance node).
*
* "Every task in the release must be signed off, one at a time" is a **single
* `map` node** over the task list. For each task it runs the
* {@link OneTaskSignoffSubflow}, which **pauses** on its `approval`; when that
* task is decided, the map **re-enters** and moves to the next task — the run
* holds a single program counter throughout (no token tree). The per-task
* decisions are collected into `signoffResults`, then the owner is notified.
*
* Trigger it with the tasks to sign off, e.g.:
* POST /api/v1/automation/showcase_release_signoff/trigger
* { "params": { "items": [ {task record}, {task record} ] } }
* then decide each task's approval in turn via /api/v1/approvals.
*/
export const ReleaseSignoffFlow = defineFlow({
name: 'showcase_release_signoff',
label: 'Release Sign-off (batch approval / map)',
description: 'Signs off every task in a release one at a time via a map node — demonstrates batch approval (ADR-0037 Track A2).',
type: 'autolaunched',
variables: [
{ name: 'items', type: 'list', isInput: true },
{ name: 'signoffResults', type: 'list', isOutput: true },
],
nodes: [
{ id: 'start', type: 'start', label: 'Start' },
{
id: 'signoffs',
type: 'map',
label: 'Sign off each task',
config: {
collection: '{items}',
iteratorVariable: 'task',
flowName: 'showcase_one_task_signoff',
itemObject: 'showcase_task',
outputVariable: 'signoffResults',
},
},
{
id: 'notify_done',
type: 'notify',
label: 'Notify: Release Cleared',
config: {
topic: 'release.signoff',
recipients: ['admin@objectos.ai'],
channels: ['inbox'],
severity: 'info',
title: 'Release sign-off complete',
message: 'Every task in the release has been signed off.',
},
},
{ id: 'end', type: 'end', label: 'End' },
],
edges: [
{ id: 'e1', source: 'start', target: 'signoffs' },
{ id: 'e2', source: 'signoffs', target: 'notify_done' },
{ id: 'e3', source: 'notify_done', target: 'end' },
],
});

export const allFlows = [
TaskCompletedFlow,
ReassignWizardFlow,
BudgetApprovalFlow,
InvoiceDualSignoffFlow,
OneTaskSignoffSubflow,
ReleaseSignoffFlow,
TaskCompletedSlackFlow,
TaskAssignedNotifyFlow,
ScheduledDigestFlow,
Expand Down
3 changes: 3 additions & 0 deletions packages/services/service-automation/src/builtin/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import { registerConnectorNodes } from './connector-nodes.js';
import { registerNotifyNode } from './notify-node.js';
import { registerWaitNode } from './wait-node.js';
import { registerSubflowNode } from './subflow-node.js';
import { registerMapNode } from './map-node.js';

export { registerLogicNodes } from './logic-nodes.js';
export { registerLoopNode } from './loop-node.js';
Expand All @@ -53,6 +54,7 @@ export { registerConnectorNodes } from './connector-nodes.js';
export { registerNotifyNode } from './notify-node.js';
export { registerWaitNode, parseIsoDuration, rearmSuspendedWaitTimers } from './wait-node.js';
export { registerSubflowNode } from './subflow-node.js';
export { registerMapNode } from './map-node.js';

/**
* Seed every built-in node executor into the engine. Called by
Expand All @@ -71,6 +73,7 @@ export function installBuiltinNodes(engine: AutomationEngine, ctx: PluginContext
registerNotifyNode(engine, ctx);
registerWaitNode(engine, ctx);
registerSubflowNode(engine, ctx);
registerMapNode(engine, ctx);

const types = engine.getRegisteredNodeTypes();
ctx.logger.info(
Expand Down
166 changes: 166 additions & 0 deletions packages/services/service-automation/src/builtin/map-node.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license.

import { describe, it, expect, beforeEach } from 'vitest';
import { AutomationEngine } from '../engine.js';
import type { NodeExecutor } from '../engine.js';
import { InMemorySuspendedRunStore } from '../suspended-run-store.js';
import { registerMapNode } from './map-node.js';

function silentLogger() {
return { info() {}, warn() {}, error() {}, debug() {}, child() { return silentLogger(); } } as any;
}
function ctx() {
return { logger: silentLogger(), getService() { throw new Error('none'); } } as any;
}

/**
* Wire an engine with the `map` node, a per-item child flow, and a parent flow
* that maps over `{items}`. `childNodes` is the child flow's middle node(s)
* (between start and end) — pass a `pauser` to exercise per-item durable pause.
*/
function setup(childNodes: Array<{ id: string; type: string }>, captured: unknown[]) {
const engine = new AutomationEngine(silentLogger());
registerMapNode(engine, ctx());

// Child marker: copies the mapped item (passed as params.val) to the child's
// output variable `result`.
engine.registerNodeExecutor({
type: 'itemmark',
async execute(_node, variables, context) {
variables.set('result', (context as any)?.params?.val);
return { success: true };
},
} as NodeExecutor);
// Pauses the child (stands in for an approval / screen / wait).
engine.registerNodeExecutor({
type: 'pauser',
async execute() { return { success: true, suspend: true }; },
} as NodeExecutor);
// Fails the child terminally.
engine.registerNodeExecutor({
type: 'failer',
async execute() { return { success: false, error: 'boom' }; },
} as NodeExecutor);
// Parent checker after the map node: captures the collected results array.
engine.registerNodeExecutor({
type: 'mapcheck',
async execute(_node, variables) {
captured.push(variables.get('mapped'));
return { success: true };
},
} as NodeExecutor);

const seq = [{ id: 'cs', type: 'start' }, ...childNodes, { id: 'ce', type: 'end' }];
engine.registerFlow('child_flow', {
name: 'child_flow',
label: 'Child',
type: 'autolaunched',
variables: [{ name: 'result', type: 'text', isOutput: true }],
nodes: seq.map(n => ({ label: n.id, ...n })),
edges: seq.slice(0, -1).map((n, i) => ({ id: `c${i}`, source: n.id, target: seq[i + 1].id })),
} as never);

engine.registerFlow('parent_flow', {
name: 'parent_flow',
label: 'Parent',
type: 'autolaunched',
variables: [{ name: 'items', type: 'list', isInput: true }],
nodes: [
{ id: 'ps', type: 'start', label: 'Start' },
{
id: 'do_map', type: 'map', label: 'For each',
config: { flowName: 'child_flow', collection: '{items}', iteratorVariable: 'item', input: { val: '{item}' }, outputVariable: 'mapped' },
},
{ id: 'chk', type: 'mapcheck', label: 'Check' },
{ id: 'pe', type: 'end', label: 'End' },
],
edges: [
{ id: 'p1', source: 'ps', target: 'do_map' },
{ id: 'p2', source: 'do_map', target: 'chk' },
{ id: 'p3', source: 'chk', target: 'pe' },
],
} as never);

return engine;
}

const childRunId = (engine: AutomationEngine) =>
engine.listSuspendedRuns().find(r => r.flowName === 'child_flow')?.runId;

describe('map node executor (sequential multi-instance)', () => {
let captured: unknown[];
beforeEach(() => { captured = []; });

it('runs a synchronous per-item subflow over the collection, collecting results in order', async () => {
const engine = setup([{ id: 'cm', type: 'itemmark' }], captured);
const result = await engine.execute('parent_flow', { params: { items: ['a', 'b', 'c'] } });

expect(result.success).toBe(true);
expect(result.status).toBeUndefined(); // ran to completion, no pause
expect(captured).toEqual([[{ result: 'a' }, { result: 'b' }, { result: 'c' }]]);
expect(engine.listSuspendedRuns()).toHaveLength(0);
});

it('completes immediately on an empty collection', async () => {
const engine = setup([{ id: 'cm', type: 'itemmark' }], captured);
const result = await engine.execute('parent_flow', { params: { items: [] } });
expect(result.success).toBe(true);
expect(captured).toEqual([[]]);
});

it('drives items ONE AT A TIME when each subflow pauses, re-entering on resume', async () => {
const engine = setup([{ id: 'cp', type: 'pauser' }, { id: 'cm', type: 'itemmark' }], captured);

// Item 0 pauses → the parent suspends at the map node.
const r0 = await engine.execute('parent_flow', { params: { items: ['a', 'b', 'c'] } });
expect(r0.status).toBe('paused');
const parent = engine.listSuspendedRuns().find(x => x.flowName === 'parent_flow')!;
expect(parent.nodeId).toBe('do_map');
expect(parent.correlation).toMatch(/^map:/);
expect(captured).toHaveLength(0); // not done yet

// Resume item 0's child → bubbles → re-enters map → item 1 pauses.
const id0 = childRunId(engine)!;
const after0 = await engine.resume(id0);
expect(after0.success).toBe(true);
expect(engine.listSuspendedRuns().some(x => x.flowName === 'parent_flow')).toBe(true); // still paused
expect(captured).toHaveLength(0);

// Resume item 1 → item 2 pauses.
await engine.resume(childRunId(engine)!);
expect(captured).toHaveLength(0);

// Resume item 2 → all done → parent continues past the map node.
await engine.resume(childRunId(engine)!);
expect(captured).toEqual([[{ result: 'a' }, { result: 'b' }, { result: 'c' }]]);
expect(engine.listSuspendedRuns()).toHaveLength(0);
});

it('fails the map fast when an item subflow fails', async () => {
const engine = setup([{ id: 'cf', type: 'failer' }], captured);
const result = await engine.execute('parent_flow', { params: { items: ['a', 'b'] } });
expect(result.success).toBe(false);
expect(result.error).toMatch(/item 0.*failed/i);
expect(captured).toEqual([]); // downstream never ran
});

it('survives a process restart mid-map: resume on a fresh engine continues the sequence', async () => {
const store = new InMemorySuspendedRunStore();
const engine = setup([{ id: 'cp', type: 'pauser' }, { id: 'cm', type: 'itemmark' }], captured);
engine.setSuspendedRunStore(store);

await engine.execute('parent_flow', { params: { items: ['a', 'b'] } });
const id0 = childRunId(engine)!;
expect((await store.list()).length).toBe(2); // parent (map) + child item 0

// "Restart": a fresh engine sharing only the durable store + the registry.
const capturedB: unknown[] = [];
const engineB = setup([{ id: 'cp', type: 'pauser' }, { id: 'cm', type: 'itemmark' }], capturedB);
engineB.setSuspendedRunStore(store);

await engineB.resume(id0); // item 0 done → item 1 pauses
await engineB.resume(childRunId(engineB)!); // item 1 done → all done
expect(capturedB).toEqual([[{ result: 'a' }, { result: 'b' }]]);
expect(await store.list()).toHaveLength(0);
});
});
Loading
Loading