change
基于 MoonBit 的确定性工作流引擎 — 崩溃可恢复、执行可重放、步骤可重试
MoonFlow 是一个 确定性工作流执行引擎。你用声明式 DSL 定义工作流步骤,引擎负责调度执行。进程崩溃?重放事件日志,从断点继续,不丢状态,不重复执行。
传统写法:手工状态机,崩溃后从头再来 let status = "pending" if status == "pending" { do_step1(); status = "step1_done" } if status == "step1_done" { do_step2(); status = "step2_done" } // 💥 崩溃!status 丢失,step1 重复执行,数据错乱 MoonFlow:声明式 DSL,崩溃后从断点恢复 workflow("order") .then("step1", ...) .then("step2", ...) .then("step3", ...) // 💥 崩溃 → resume_workflow(id) → 自动跳过 step1, step2,继续 step3
replay_events()
workflow().then().parallel().branch().build()
// 1. 创建引擎 let engine = @moonflow.new_memory_engine() // 2. 注册步骤 engine.register("hello", fn(input) -> @moonflow.Async[Result[Json, @moonflow.WorkflowError]] { println("Hello World!") @moonflow.Async::pure(Ok(input)) }) // 3. 定义工作流 let wf = @moonflow.workflow("my-first-wf") .then("hello", @moonflow.StepConfig::default("hello")) .build() // 4. 执行! let result = @moonflow.Async::run(engine.start(wf, Json::null()))
┌────────────────────────────────────────────────────────────┐ │ 🏗️ 工作流 DSL │ │ ├─ .then(step, config) 顺序步骤 │ │ ├─ .parallel(group, [...]) 并行步骤组 │ │ ├─ .branch(name, cond, T, F) 条件分支 │ │ └─ .sub_workflow(name, def) 子工作流嵌套 │ ├────────────────────────────────────────────────────────────┤ │ ⚙️ 执行引擎 │ │ ├─ Engine::start() 启动工作流 │ │ ├─ Engine::resume_workflow() 崩溃恢复 (跳过已完成→继续) │ │ ├─ 自动重试 max_retry + 递归执行 │ │ └─ 超时检测 timeout_ms + StepTimeout │ ├────────────────────────────────────────────────────────────┤ │ 💾 持久化 │ │ ├─ EventLog 不可变追加事件日志 │ │ ├─ Storage trait Async 存储接口 │ │ ├─ MemoryStorage 内存 (测试用) │ │ └─ FileStorage 文件 (持久化) │ ├────────────────────────────────────────────────────────────┤ │ 🔍 运维能力 │ │ ├─ 工作流验证器 validate_workflow() │ │ ├─ 查询 API query_status/summary │ │ ├─ 结果 API get_result/error │ │ ├─ 补偿回滚 补偿计划 + 执行 │ │ ├─ 补偿回滚 补偿计划 + 执行 │ │ ├─ 中间件 logging/retry/circuit-breaker│ │ ├─ 指标监控 60 项指标 + JSON 导出 │ │ ├─ 事件查询 filter/step/time/stats │ │ ├─ 错误分类 Transient/Permanent/Timeout │ │ └─ 基准测试 吞吐量/重放/内存 │ ├────────────────────────────────────────────────────────────┤ │ 📡 信号 & 批量 │ │ ├─ wait_signal(name, signal) 暂停等待外部信号 │ │ ├─ engine.signal(id, name) 发送信号恢复工作流 │ │ └─ engine.execute_batch() 批量并行 + fail-fast │ ├────────────────────────────────────────────────────────────┤ │ 🧩 模式库 │ │ ├─ pattern_retry_backoff 指数退避重试 │ │ ├─ pattern_timeout 超时包装 │ │ ├─ pattern_fallback 主备切换 │ │ ├─ PatternCircuitBreaker 熔断器 │ │ └─ PatternBulkhead 隔舱限流 │ ├────────────────────────────────────────────────────────────┤ │ 🔗 分布式事务 (Saga) │ │ ├─ Saga::step(forward, compensate) 正向+补偿配对 │ │ ├─ 自动补偿回滚 反向执行补偿 │ │ └─ step_by_name() 复用已注册处理器 │ ├────────────────────────────────────────────────────────────┤ │ 🪆 子工作流 │ │ ├─ SubWorkflow StepType 嵌套工作流 │ │ ├─ 子工作流定义存储 engine.workflow_defs │ │ └─ 输出合并到父工作流 透明传递 │ └────────────────────────────────────────────────────────────┘
let engine = @moonflow.new_memory_engine() // 注册5个步骤 (含补偿步骤) engine.register("validate_order", validate_handler) engine.register("reserve_stock", reserve_handler) engine.register("process_payment", payment_handler) engine.register("send_confirmation",email_handler) engine.register("release_stock", compensate_handler) // 失败时释放库存 // 声明式定义 let wf = @moonflow.workflow("order-processing") .then("validate_order", StepConfig::default("validate_order")) .then("reserve_stock", StepConfig::with_retry("reserve_stock", 5)) .then("process_payment", StepConfig::with_timeout("process_payment", 10000)) .then("send_confirmation",StepConfig::default("send_confirmation")) .build() let result = @moonflow.Async::run(engine.start(wf, input)) // 💥 模拟崩溃后恢复 let engine2 = @moonflow.new_memory_engine() // ... 重新注册 handlers ... engine2.resume_workflow(wf_id, wf) // 自动跳过已完成步骤
let wf = @moonflow.workflow("etl-pipeline") .then("extract_csv", StepConfig::default("extract_csv")) .then("validate_schema", StepConfig::default("validate_schema")) // 👇 三个转换步骤并行执行 .parallel("transform_all", [ "transform_users", "transform_orders", "transform_products" ]) .then("load_database", StepConfig::with_retry("load_database", 3)) .then("send_report", StepConfig::default("send_report")) .build()
// 转账 Saga: debit_A → credit_B // 失败时自动补偿: reverse_debit_A ← reverse_credit_B let saga = @moonflow.Saga::new(engine, storage, "transfer") .step("debit_A", fn(input) { /* 从A扣款 */ @moonflow.Async::pure(Ok(input)) }, fn(input) { /* 补偿: 退回A */ @moonflow.Async::pure(Ok(input)) }) .step("credit_B", fn(input) { /* 向B入账 */ @moonflow.Async::pure(Ok(input)) }, fn(input) { /* 补偿: 扣回B */ @moonflow.Async::pure(Ok(input)) }) let result = @moonflow.Async::run(saga.execute(input)) // 成功: 两个正向步骤都执行 // credit_B失败: 自动执行 compensate(debit_A),完整回滚
// 子工作流 let child_wf = @moonflow.workflow("child") .then("child_step", @moonflow.StepConfig::default("child_step")) .build() engine.workflow_defs["sub1"] = child_wf // 存储定义 // 父工作流中包含子工作流 let parent_wf = @moonflow.workflow("parent") .then("pre_check", ...) .sub_workflow("sub1", "child") // 👈 嵌套子工作流 .then("post_process", ...) .build()
WorkflowEvent
WorkflowError
StepType
Storage
TimerProvider
Engine[S : Storage]
✅ moon build — 0 errors ✅ moon test — 72/72 passed ✅ moon run examples/order_workflow ✅ moon run examples/etl_pipeline 📦 moonbitlang/async@0.19.3
moon add moonbitlang/moonflow
moon run examples/order_workflow # 电商订单流程 moon run examples/etl_pipeline # ETL 数据管道
moon test # 72 tests, 0 failures
Apache-2.0
🌊 MoonFlow
一句话介绍
MoonFlow 是一个 确定性工作流执行引擎。你用声明式 DSL 定义工作流步骤,引擎负责调度执行。进程崩溃?重放事件日志,从断点继续,不丢状态,不重复执行。
为什么需要它
三大核心能力
replay_events()纯函数 — 同一份事件日志永远重建相同状态,便于调试和审计workflow().then().parallel().branch().build()30 秒快速上手
功能矩阵
示例:电商订单工作流
示例:ETL 数据管道 (含并行)
示例:Saga 分布式事务
示例:子工作流嵌套
MoonBit 特性运用
WorkflowEvent(7 事件)、WorkflowError(7 错误)、StepType(5 类型含 Signal)Storage、TimerProvider— 可热替换后端Engine[S : Storage]— 编译期类型安全项目状态
安装
运行示例
运行测试
文档
测试矩阵
License
Apache-2.0