目录

🌊 MoonFlow

基于 MoonBit 的确定性工作流引擎 — 崩溃可恢复、执行可重放、步骤可重试

MoonBit Tests License


一句话介绍

MoonFlow 是一个 确定性工作流执行引擎。你用声明式 DSL 定义工作流步骤,引擎负责调度执行。进程崩溃?重放事件日志,从断点继续,不丢状态,不重复执行。

为什么需要它

传统写法:手工状态机,崩溃后从头再来
  let status = "pending"
  if status == "pending" { do_step1(); status = "step1_done" }
  if status == "step1_done" { do_step2(); status = "step2_done" }
  // 💥 崩溃!status 丢失,step1 重复执行,数据错乱

MoonFlow:声明式 DSL,崩溃后从断点恢复
  workflow("order")
    .then("step1", ...)
    .then("step2", ...)
    .then("step3", ...)
  // 💥 崩溃 → resume_workflow(id) → 自动跳过 step1, step2,继续 step3

三大核心能力

能力 说明
🔄 崩溃恢复 所有状态变化记录为事件日志,进程重启后自动从断点继续执行
🎬 确定性重放 replay_events() 纯函数 — 同一份事件日志永远重建相同状态,便于调试和审计
📝 声明式 DSL 4 行代码定义完整工作流:workflow().then().parallel().branch().build()

30 秒快速上手

// 1. 创建引擎
let engine = @moonflow.new_memory_engine()

// 2. 注册步骤
engine.register("hello", fn(input) -> @moonflow.Async[Result[Json, @moonflow.WorkflowError]] {
  println("Hello World!")
  @moonflow.Async::pure(Ok(input))
})

// 3. 定义工作流
let wf = @moonflow.workflow("my-first-wf")
  .then("hello", @moonflow.StepConfig::default("hello"))
  .build()

// 4. 执行!
let result = @moonflow.Async::run(engine.start(wf, Json::null()))

功能矩阵

┌────────────────────────────────────────────────────────────┐
│  🏗️ 工作流 DSL                                             │
│  ├─ .then(step, config)        顺序步骤                    │
│  ├─ .parallel(group, [...])    并行步骤组                  │
│  ├─ .branch(name, cond, T, F)  条件分支                    │
│  └─ .sub_workflow(name, def)   子工作流嵌套                │
├────────────────────────────────────────────────────────────┤
│  ⚙️ 执行引擎                                               │
│  ├─ Engine::start()            启动工作流                  │
│  ├─ Engine::resume_workflow()  崩溃恢复 (跳过已完成→继续)  │
│  ├─ 自动重试                   max_retry + 递归执行        │
│  └─ 超时检测                   timeout_ms + StepTimeout    │
├────────────────────────────────────────────────────────────┤
│  💾 持久化                                                 │
│  ├─ EventLog                   不可变追加事件日志          │
│  ├─ Storage trait              Async 存储接口              │
│  ├─ MemoryStorage              内存 (测试用)               │
│  └─ FileStorage                文件 (持久化)               │
├────────────────────────────────────────────────────────────┤
│  🔍 运维能力                                               │
│  ├─ 工作流验证器               validate_workflow()         │
│  ├─ 查询 API                   query_status/summary        │
│  ├─ 结果 API                   get_result/error            │
│  ├─ 补偿回滚                   补偿计划 + 执行             │
│  ├─ 补偿回滚                   补偿计划 + 执行             │
│  ├─ 中间件                     logging/retry/circuit-breaker│
│  ├─ 指标监控                   60 项指标 + JSON 导出       │
│  ├─ 事件查询                   filter/step/time/stats     │
│  ├─ 错误分类                   Transient/Permanent/Timeout │
│  └─ 基准测试                   吞吐量/重放/内存           │
├────────────────────────────────────────────────────────────┤
│  📡 信号 & 批量                                          │
│  ├─ wait_signal(name, signal)   暂停等待外部信号          │
│  ├─ engine.signal(id, name)     发送信号恢复工作流        │
│  └─ engine.execute_batch()      批量并行 + fail-fast       │
├────────────────────────────────────────────────────────────┤
│  🧩 模式库                                               │
│  ├─ pattern_retry_backoff       指数退避重试              │
│  ├─ pattern_timeout             超时包装                  │
│  ├─ pattern_fallback            主备切换                  │
│  ├─ PatternCircuitBreaker       熔断器                    │
│  └─ PatternBulkhead             隔舱限流                  │
├────────────────────────────────────────────────────────────┤
│  🔗 分布式事务 (Saga)                                      │
│  ├─ Saga::step(forward, compensate)  正向+补偿配对         │
│  ├─ 自动补偿回滚                    反向执行补偿           │
│  └─ step_by_name()                  复用已注册处理器       │
├────────────────────────────────────────────────────────────┤
│  🪆 子工作流                                              │
│  ├─ SubWorkflow StepType             嵌套工作流            │
│  ├─ 子工作流定义存储                  engine.workflow_defs  │
│  └─ 输出合并到父工作流                透明传递              │
└────────────────────────────────────────────────────────────┘

示例:电商订单工作流

let engine = @moonflow.new_memory_engine()

// 注册5个步骤 (含补偿步骤)
engine.register("validate_order",   validate_handler)
engine.register("reserve_stock",    reserve_handler)
engine.register("process_payment",  payment_handler)
engine.register("send_confirmation",email_handler)
engine.register("release_stock",    compensate_handler)  // 失败时释放库存

// 声明式定义
let wf = @moonflow.workflow("order-processing")
  .then("validate_order",   StepConfig::default("validate_order"))
  .then("reserve_stock",    StepConfig::with_retry("reserve_stock", 5))
  .then("process_payment",  StepConfig::with_timeout("process_payment", 10000))
  .then("send_confirmation",StepConfig::default("send_confirmation"))
  .build()

let result = @moonflow.Async::run(engine.start(wf, input))

// 💥 模拟崩溃后恢复
let engine2 = @moonflow.new_memory_engine()
// ... 重新注册 handlers ...
engine2.resume_workflow(wf_id, wf)  // 自动跳过已完成步骤

示例:ETL 数据管道 (含并行)

let wf = @moonflow.workflow("etl-pipeline")
  .then("extract_csv",      StepConfig::default("extract_csv"))
  .then("validate_schema",  StepConfig::default("validate_schema"))
  // 👇 三个转换步骤并行执行
  .parallel("transform_all", [
    "transform_users",
    "transform_orders",
    "transform_products"
  ])
  .then("load_database",    StepConfig::with_retry("load_database", 3))
  .then("send_report",      StepConfig::default("send_report"))
  .build()

示例:Saga 分布式事务

// 转账 Saga: debit_A → credit_B
// 失败时自动补偿: reverse_debit_A ← reverse_credit_B
let saga = @moonflow.Saga::new(engine, storage, "transfer")
  .step("debit_A",
    fn(input) { /* 从A扣款 */ @moonflow.Async::pure(Ok(input)) },
    fn(input) { /* 补偿: 退回A */ @moonflow.Async::pure(Ok(input)) })
  .step("credit_B",
    fn(input) { /* 向B入账 */ @moonflow.Async::pure(Ok(input)) },
    fn(input) { /* 补偿: 扣回B */ @moonflow.Async::pure(Ok(input)) })

let result = @moonflow.Async::run(saga.execute(input))
// 成功: 两个正向步骤都执行
// credit_B失败: 自动执行 compensate(debit_A),完整回滚

示例:子工作流嵌套

// 子工作流
let child_wf = @moonflow.workflow("child")
  .then("child_step", @moonflow.StepConfig::default("child_step"))
  .build()
engine.workflow_defs["sub1"] = child_wf  // 存储定义

// 父工作流中包含子工作流
let parent_wf = @moonflow.workflow("parent")
  .then("pre_check", ...)
  .sub_workflow("sub1", "child")         // 👈 嵌套子工作流
  .then("post_process", ...)
  .build()

MoonBit 特性运用

特性 运用
ADT 枚举 WorkflowEvent (7 事件)、WorkflowError (7 错误)、StepType (5 类型含 Signal)
Trait 系统 StorageTimerProvider — 可热替换后端
泛型 + 约束 Engine[S : Storage] — 编译期类型安全
Result 类型 零 panic,全部可恢复错误
模式匹配 引擎调度基于穷尽模式匹配
WASM 编译 可嵌入浏览器 / 边缘环境

项目状态

✅ moon build    — 0 errors
✅ moon test     — 72/72 passed
✅ moon run examples/order_workflow
✅ moon run examples/etl_pipeline
📦 moonbitlang/async@0.19.3

安装

moon add moonbitlang/moonflow

运行示例

moon run examples/order_workflow    # 电商订单流程
moon run examples/etl_pipeline      # ETL 数据管道

运行测试

moon test    # 72 tests, 0 failures

文档

测试矩阵

分类 编号 数量
引擎基础 T01-T05, T14-T20, T25-T30 18
确定性回放 T06-T10, T21-T24 9
快照序列化 T11-T13, T31-T33 6
崩溃恢复 T34-T42 9
Saga 事务 T43-T48 6
子工作流 T49-T50 2
事件查询 T51-T55 5
基准/压力 T56-T60 5
扩展功能 T61-T72 12
信号/模式/边界 T73-T82 10
总计 82

License

Apache-2.0

邀请码