目录

Nexu DataMate Agent:基于 Nexent 的“数据—知识—洞察”数据处理智能体 Demo

本仓库实现赛题任务一:基于 Nexent 的数据处理智能体实现。项目用一个可运行、可复现的 Python Demo 展示:智能体理解自然语言数据处理任务,自动规划 DataMate 风格算子 DAG,执行医疗文本/表格 ETL、医学实体抽取、医疗知识图谱三元组生成和洞察报告输出。

设计目标:即使本地没有完整 Nexent/DataMate 部署,也可以通过 Docker/CLI/API/MCP 完整跑通;如果已有 Nexent 或 DataMate 服务,可通过 HTTP/MCP 适配器接入。

1. 功能覆盖

赛题要求 本仓库实现
支持结构化或非结构化数据 data/raw/medical_notes.csvdata/raw/medical_notes.txt
任务理解(清洗、抽取、转换) DataTaskPlanner 根据自然语言任务识别 ETL、抽取、KG、洞察意图
多算子组合与调度 DataMate 风格算子注册表 + DAG 执行器,自动处理依赖
状态跟踪与异常处理 OperatorRun 记录每步状态、耗时、指标、错误;失败时跳过下游
典型流程自动执行 医疗 CSV ETL → 医学实体抽取 → KG 三元组 → 洞察报告
可运行 Demo CLI、HTTP API、最小 MCP stdio server、Docker Compose
与 DataMate/Nexent 集成 导出 datamate_workflow.json;提供 Nexent 工具 schema、HTTP /nexent/invoke 与 MCP tools/call

2. 架构

flowchart LR
    U["用户/Nexent Agent\n自然语言任务"] --> P["DataTaskPlanner\n任务理解与DAG规划"]
    P --> W["DataMate Workflow JSON\n可导入/适配"]
    P --> E["PlanExecutor\n依赖调度/状态跟踪"]
    E --> O1["load_csv/load_text"]
    O1 --> O2["validate_schema"]
    O2 --> O3["clean_table"]
    O3 --> O4["extract_medical_entities"]
    O4 --> O5["build_knowledge_graph"]
    O5 --> O6["generate_insights"]
    O6 --> S["JSON/CSV/Markdown 输出"]
    N["Nexent HTTP/MCP Tool"] --> P
    D["可选 DataMate HTTP Adapter"] -.-> E

核心模块:

  • src/nexu_datamate_agent/planner.py:任务理解与算子 DAG 规划。
  • src/nexu_datamate_agent/operators.py:DataMate 风格算子实现。
  • src/nexu_datamate_agent/executor.py:拓扑执行、状态跟踪、异常处理。
  • src/nexu_datamate_agent/agent.py:对外智能体接口。
  • src/nexu_datamate_agent/api_server.py:HTTP API/Nexent 自定义工具接口。
  • src/nexu_datamate_agent/mcp_server.py:最小 MCP stdio server。
  • src/nexu_datamate_agent/integrations.py:Nexent/DataMate 适配层。

3. 快速运行

3.1 本地 Python 运行

无需第三方依赖,Python 3.9+ 即可。

cd /Users/wangyue/Documents/nexu
PYTHONPATH=src python3 -m nexu_datamate_agent run \
  --task "对医疗病历CSV执行ETL清洗,抽取疾病、症状、用药和检验指标,生成医疗知识图谱三元组,并输出洞察报告。文本列: note" \
  --input data/raw/medical_notes.csv \
  --output-dir runs/demo \
  --mode medical \
  --text-column note

或使用脚本:

./scripts/run_demo.sh

3.2 查看输出

运行后会生成:

runs/demo/
├── agent_plan.json            # 智能体内部计划
├── datamate_workflow.json     # DataMate 风格 workflow DAG
├── clean_rows.csv             # 清洗后数据
├── entities.json              # 医学实体抽取结果
├── kg_triples.json            # 知识图谱三元组
├── knowledge_graph.json       # 节点/边图结构
├── insights.json              # 机器可读洞察
├── insight_report.md          # 人类可读报告
└── run_summary.json           # 状态追踪与指标汇总

3.3 Docker 运行

docker compose up --build

另开终端调用 API:

./scripts/curl_api_demo.sh

或:

curl -sS http://127.0.0.1:8088/health
curl -sS http://127.0.0.1:8088/nexent/tools
curl -sS -H 'Content-Type: application/json' \
  -d @examples/full_pipeline_request.json \
  http://127.0.0.1:8088/run | python3 -m json.tool --ensure-ascii false

4. 接入 DeepSeek Flash 任务规划器

默认情况下,本项目使用本地规则规划器,保证无网络、无模型 key 也能复现。现在已接入可选的 DeepSeek Flash 规划增强:设置环境变量并添加 --llm-planner 后,智能体会先调用 deepseek-v4-flash 直接生成完整 DataMate workflow DAG;系统会校验算子 ID、参数、依赖和环路,如果 API 不可用或 DAG 不合法,会自动回退到本地规则规划器。

配置:

cd /Users/wangyue/Documents/nexu
cp .env.example .env
# 编辑 .env 或直接 export,注意不要提交真实 key
export DEEPSEEK_API_KEY=<YOUR_DEEPSEEK_API_KEY>
export DEEPSEEK_MODEL=deepseek-v4-flash

CLI 调用:

PYTHONPATH=src python3 -m nexu_datamate_agent run \
  --llm-planner \
  --llm-model deepseek-v4-flash \
  --task "对医疗病历CSV执行ETL清洗,抽取疾病、症状、用药和检验指标,生成医疗知识图谱三元组,并输出洞察报告。文本列: note" \
  --input data/raw/medical_notes.csv \
  --output-dir runs/demo_llm \
  --mode medical \
  --text-column note

脚本调用:

export DEEPSEEK_API_KEY=<YOUR_DEEPSEEK_API_KEY>
./scripts/run_deepseek_demo.sh

API 调用时传入:

{
  "use_llm_planner": true,
  "llm_model": "deepseek-v4-flash"
}

也可以用全局开关:

export NEXU_USE_LLM_PLANNER=1

LLM 输出的计划会落盘到 agent_plan.jsondatamate_workflow.json。其中 datamate_workflow.jsonnodes/edges 来自 DeepSeek 生成并通过本地校验后的 DAG,而不是固定 pipeline。

5. 配置化医学词典与算子目录

医学词典、lab 规则、KG 谓词和洞察建议已经从代码拆出:

configs/medical_knowledge.json

可配置内容包括:

  • entity_patterns:疾病、症状、药物、检查等别名词典。
  • lab_patterns:检验指标正则、单位、置信度。
  • lab_flags:异常检验阈值和提示语。
  • kg_predicates:实体类型到知识图谱关系的映射。
  • recommendation_rules:洞察报告建议规则。

算子目录也已经从代码拆出:

configs/operators.json

运行时可通过环境变量替换:

export MEDICAL_KNOWLEDGE_CONFIG=configs/medical_knowledge.json
export OPERATOR_CATALOG_CONFIG=configs/operators.json

JSON 默认可直接用;如果安装了 PyYAML,也支持 .yaml/.yml

6. 真实 DataMate / Nexent 服务接入

本项目现在提供真实服务适配层:

src/nexu_datamate_agent/integrations.py

DataMate 官方 Docker Compose 拆为 gateway 和 Python backend。本适配器同时支持:

  • DATAMATE_BASE_URL:gateway,例如 http://127.0.0.1:18080
  • DATAMATE_PYTHON_BASE_URL:Python backend,例如 http://127.0.0.1:18000

当前官方开源栈优先暴露 operator/cleaning API;未配置通用 pipeline import 路径时,datamate-register 会把本项目 workflow 中的算子元数据导入 /api/operators/create

DataMate 相关命令:

export DATAMATE_BASE_URL=http://your-datamate-host:8080
export DATAMATE_PYTHON_BASE_URL=http://your-datamate-python-host:18000
export DATAMATE_API_KEY=your-token   # 如服务需要

PYTHONPATH=src python3 -m nexu_datamate_agent datamate-health
PYTHONPATH=src python3 -m nexu_datamate_agent datamate-probe
PYTHONPATH=src python3 -m nexu_datamate_agent datamate-convert --workflow runs/demo/datamate_workflow.json --output deployment/datamate/create_pipeline_request.json
PYTHONPATH=src python3 -m nexu_datamate_agent datamate-register --workflow runs/demo/datamate_workflow.json
PYTHONPATH=src python3 -m nexu_datamate_agent datamate-execute --workflow runs/demo/datamate_workflow.json

说明:真实 DataMate cleaning 执行需要已有 src_dataset_id;未传 DATAMATE_SRC_DATASET_ID 时,本命令会完成官方算子目录注册并返回 execution_supported=false

Nexent 官方 Docker Compose 是多服务拆分:config API 默认 5010,runtime/Agent API 默认 5014,northbound/A2A 默认 5013,MCP SSE/management 默认 5011/5015。本项目适配器优先读取 NEXENT_CONFIG_BASE_URLNEXENT_RUNTIME_BASE_URL;如果只设置旧的 NEXENT_BASE_URL,则会作为两者的 fallback。

export NEXENT_CONFIG_BASE_URL=http://your-nexent-host:5010
export NEXENT_RUNTIME_BASE_URL=http://your-nexent-host:5014
export NEXENT_API_KEY=your-token   # 如服务需要

PYTHONPATH=src python3 -m nexu_datamate_agent nexent-health
PYTHONPATH=src python3 -m nexu_datamate_agent nexent-probe
PYTHONPATH=src python3 -m nexu_datamate_agent nexent-mcp-config --port 18089 --output deployment/nexent/mcp_add_from_config.json
PYTHONPATH=src python3 -m nexu_datamate_agent nexent-register-mcp --payload deployment/nexent/mcp_add_from_config.json
PYTHONPATH=src python3 -m nexu_datamate_agent nexent-register-openapi --server-url http://127.0.0.1:8088
PYTHONPATH=src python3 -m nexu_datamate_agent nexent-invoke-agent --agent-id default --message "清洗医疗CSV并生成图谱"

真实 Nexent Agent smoke(无外部 LLM key 的可复现路径):

# 1) 本 Agent 作为容器加入 Nexent 网络,保证 Nexent 容器可访问 OpenAPI server_url
docker run -d --name nexu-agent-real --network nexent_network -p 18089:8088 \
  -w /app -v "$PWD:/app" \
  -e PYTHONPATH=/app/src \
  -e MEDICAL_NER_BACKEND=tiny \
  -e TINY_NER_MODEL=configs/tiny_medical_ner_model.json \
  -e NEXU_ENTITY_ACCELERATOR=auto \
  python:3.11-slim python -m nexu_datamate_agent serve --host 0.0.0.0 --port 8088

# 2) 导入为 Nexent OpenAPI/MCP tool
export NEXU_AGENT_BASE_URL=http://nexu-agent-real:8088
PYTHONPATH=src python3 -m nexu_datamate_agent nexent-register-openapi \
  --server-url "$NEXU_AGENT_BASE_URL" \
  --service-name nexu-datamate-agent
curl http://127.0.0.1:15010/tool/scan_tool

# 3) 可选:无真实 LLM key 时启动 OpenAI-compatible mock,用于验证真实 Nexent Agent 调度链路
docker run -d --name nexu-openai-mock --network nexent_network -p 18100:18100 \
  -w /app -v "$PWD:/app" \
  python:3.11-slim python /app/scripts/mock_openai_server.py --host 0.0.0.0 --port 18100

# 4) 若 Nexent v2.2.1 镜像的 CodeAgent parser 仍为空 tag,执行一次兼容补丁
./scripts/patch_nexent_code_tags.sh

上述 smoke 在真实 Nexent POST /agent/run 中应能看到 parsetoken_countfinal_answer 事件,且本项目输出目录包含 run_summary.jsonentities.jsonkg_triples.jsoninsight_report.md 等文件。

官方服务源码准备脚本:

./scripts/setup_official_services.sh
python3 scripts/verify_official_services_layout.py --root "$HOME/modelengine-group"
python3 scripts/generate_official_compose_overrides.py --output-dir deployment/official-overrides
python3 scripts/prepare_official_envs.py --root "$HOME/modelengine-group" --activate
./scripts/start_official_services.sh --root "$HOME/modelengine-group" --service all --pull

如果服务器已有 8080/3000/5432 等服务,使用生成的 override 文件避免端口冲突。例如:

# Nexent:官方 compose + 安全端口 override
cd "$HOME/modelengine-group/nexent/docker"
docker compose -f docker-compose.yml -f /path/to/nexu/deployment/official-overrides/nexent.override.yml up -d
export NEXENT_CONFIG_BASE_URL=http://127.0.0.1:15010
export NEXENT_RUNTIME_BASE_URL=http://127.0.0.1:15014
export NEXENT_NORTHBOUND_BASE_URL=http://127.0.0.1:15013

# DataMate:官方 compose + 安全端口 override
cd "$HOME/modelengine-group/DataMate"
REGISTRY=ghcr.io/modelengine-group/ docker compose \
  -f deployment/docker/datamate/docker-compose.yml \
  -f /path/to/nexu/deployment/official-overrides/datamate.override.yml up -d
export DATAMATE_BASE_URL=http://127.0.0.1:18080
export DATAMATE_PYTHON_BASE_URL=http://127.0.0.1:18000

将本 Agent 注册到真实服务:

export NEXENT_CONFIG_BASE_URL=http://your-nexent-config-host:5010
export NEXENT_RUNTIME_BASE_URL=http://your-nexent-runtime-host:5014
export DATAMATE_BASE_URL=http://your-datamate-host:port
export NEXU_AGENT_BASE_URL=http://your-agent-host:8088
./scripts/register_with_real_services.sh
./scripts/real_services_smoke.sh

官方 API 映射:

  • DataMate workflow 注册默认走 POST /api/v1/pipelines,payload 为 CreatePipelineRequest
  • DataMate workflow 执行默认走 POST /api/v1/pipelines/{pipelineId}/execute
  • 如果未设置 DATAMATE_WORKFLOW_REGISTER_PATH,当前默认走官方 Python backend 的 /api/operators/create 导入算子目录。
  • Nexent MCP 注册默认走 config API 的 POST /mcp/add-from-config,payload 为官方 AddContainerMcpServiceRequest
  • Nexent OpenAPI 工具导入默认走 config API 的 POST /tool/openapi_service
  • Nexent Agent 调用默认走 runtime API 的 POST /agent/run,payload 使用官方 AgentRequest 字段:queryagent_idconversation_idhistory 等。

如果目标部署的 REST 路径不同,可以通过 .env.example 中的 DATAMATE_*_PATHNEXENT_*_PATH 修改端点模板,不需要改代码。HTTP API 也暴露了:

GET  /integrations/datamate/health
POST /integrations/datamate/register-workflow
POST /integrations/datamate/execute-workflow
GET  /integrations/nexent/health
POST /integrations/nexent/register-tool
POST /integrations/nexent/mcp-config
POST /integrations/nexent/register-mcp
POST /integrations/nexent/invoke-agent

7. 可选小模型 / 医疗 NER

datamate.extract_medical_entities 现在支持四种后端:

  • dictionary:默认配置词典 + 正则,离线可复现。
  • tiny:本仓库内置的无依赖 Tiny Medical NER 小模型,模型文件为 configs/tiny_medical_ner_model.json
  • transformers:使用 HuggingFace token-classification 医疗 NER/通用 NER 小模型。
  • hybrid:词典/正则 + tiny/transformers 小模型结果融合。

训练/更新内置 Tiny NER:

PYTHONPATH=src python3 scripts/train_tiny_ner.py --train data/raw/medical_ner_train.jsonl --output configs/tiny_medical_ner_model.json
export MEDICAL_NER_BACKEND=tiny
export TINY_NER_MODEL=configs/tiny_medical_ner_model.json

如需 HuggingFace 医疗 NER 小模型,安装可选依赖:

pip install -e '.[ner]'

配置示例:

export MEDICAL_NER_BACKEND=hybrid
export MEDICAL_NER_MODEL=/path/to/local/medical-ner-model
# GPU/NPU/CPU device 由 Transformers device 参数控制,例如 0;为空则自动。
export MEDICAL_NER_DEVICE=

在没有 transformers 时,系统不会影响主流程;可以使用内置 tiny 后端或默认配置化医学词典。

8. NPU 优化与性能对比

实体抽取增加了候选词预筛加速路径:

export NEXU_ENTITY_ACCELERATOR=cpu              # 默认
export NEXU_ENTITY_ACCELERATOR=cpu_prefilter
export NEXU_ENTITY_ACCELERATOR=npu_prefilter
export NEXU_ENTITY_ACCELERATOR=custom_prefilter # NexuEntityPrefilter 自定义算子/语义回退

npu_prefilter 会尝试检测 torch_npu / Ascend NPU;可用时使用 NPU 做批量候选矩阵计算,不可用时自动回退 CPU 并在 benchmark 里记录原因。

custom_prefilter 对应本仓库设计的 Ascend C 自定义算子 NexuEntityPrefilter

  • 设计文档:docs/custom_npu_operator.md
  • IR/Host/Kernel:custom_ops/entity_prefilter/
  • 构建脚本:custom_ops/entity_prefilter/scripts/build_custom_op.sh
  • 未安装原生 ACLNN/Python 绑定时,会用 torch_npu semantic reference 或 CPU 路径保持端到端可跑。

性能对比脚本:

PYTHONPATH=src python3 scripts/benchmark_npu.py --rows 5000 --repeat 5 --include-custom --output runs/npu_benchmark.json
PYTHONPATH=src python3 scripts/benchmark_custom_op.py --rows 1000 --repeat 3 --output runs/custom_op_benchmark.json

输出包含:

  • NPU 检测状态。
  • CPU prefilter 耗时。
  • NPU prefilter 或 fallback 耗时。
  • candidate 数量。

9. CLI 命令

# 执行完整流程
PYTHONPATH=src python3 -m nexu_datamate_agent run --task "清洗医疗CSV并生成知识图谱和洞察" --input data/raw/medical_notes.csv --output-dir runs/demo

# 只生成计划,并输出 DataMate workflow JSON
PYTHONPATH=src python3 -m nexu_datamate_agent plan --task "ETL并生成知识图谱" --input data/raw/medical_notes.csv --output-dir runs/plan --format datamate

# 列出算子目录
PYTHONPATH=src python3 -m nexu_datamate_agent operators

# 启动 HTTP API
PYTHONPATH=src python3 -m nexu_datamate_agent serve --host 0.0.0.0 --port 8088

# 启动 MCP stdio server,可在 Nexent/MCP Host 中注册
PYTHONPATH=src python3 -m nexu_datamate_agent mcp

10. Nexent 集成方式

Nexent 可通过工具/MCP 接入外部能力。本 Demo 暴露一个工具:

{
  "name": "process_data_task",
  "description": "理解自然语言数据处理任务,规划并执行 DataMate 风格算子 DAG,输出清洗数据、医疗实体、知识图谱与洞察报告。"
}

5.1 HTTP 工具方式

启动服务:

PYTHONPATH=src python3 -m nexu_datamate_agent serve --host 0.0.0.0 --port 8088

工具元数据:

curl http://127.0.0.1:8088/nexent/tools
curl http://127.0.0.1:8088/openapi.json

调用:

curl -H 'Content-Type: application/json' \
  -d '{"name":"process_data_task","arguments":{"task":"清洗医疗CSV并生成知识图谱和洞察","input_path":"data/raw/medical_notes.csv","output_dir":"runs/nexent_call"}}' \
  http://127.0.0.1:8088/nexent/invoke

5.2 MCP 方式

在支持 MCP 的 Nexent/Agent Host 中配置 stdio command:

{
  "mcpServers": {
    "nexu-datamate-agent": {
      "command": "python3",
      "args": ["-m", "nexu_datamate_agent", "mcp"],
      "env": {"PYTHONPATH": "/app/src"}
    }
  }
}

本仓库的 MCP server 支持:

  • initialize
  • tools/list
  • tools/call

5.3 真实 Nexent 官方注册

官方 Nexent 推荐通过 MCP 或 OpenAPI service 导入外部工具:

# MCP container-style config payload
PYTHONPATH=src python3 -m nexu_datamate_agent nexent-mcp-config --port 18089 --output deployment/nexent/mcp_add_from_config.json
PYTHONPATH=src python3 -m nexu_datamate_agent nexent-register-mcp --payload deployment/nexent/mcp_add_from_config.json

# OpenAPI service import;server-url 必须是 Nexent 容器能访问的地址
PYTHONPATH=src python3 -m nexu_datamate_agent serve --host 0.0.0.0 --port 8088
PYTHONPATH=src python3 -m nexu_datamate_agent nexent-register-openapi --server-url http://host.docker.internal:8088

11. DataMate 集成方式

本 Demo 的算子接口使用 DataMate 风格元数据:

OperatorSpec(
  id="datamate.clean_table",
  name="Clean Structured Table",
  inputs={"rows": {"type": "array<object>"}},
  outputs={"rows": {"type": "array<object>"}}
)

执行后会导出:

  • agent_plan.json:内部执行计划。
  • datamate_workflow.json:节点/边/参数形式的 workflow,可按目标 DataMate 版本转换导入。

如果已有 DataMate 服务,可设置:

export DATAMATE_BASE_URL="http://your-datamate-host:port"
export DATAMATE_ENDPOINT_TEMPLATE="/api/operators/{operator_id}/execute"

然后在 DataMateHTTPAdapter 中将本地算子调用替换为远端调用。默认保留本地执行以保证赛题验收可复现。

12. 典型输出示例

insight_report.md 摘要会包含:

  • 输入记录数、患者数、抽取实体数、知识三元组数。
  • 清洗质量:去重行数、空值数量、字段标准化结果。
  • 高频疾病、症状、用药。
  • HbA1c、血糖、LDL-C、CRP、WBC 等异常检验提示。
  • 后续算子优化建议。

13. 测试

PYTHONPATH=src python3 -m unittest discover -s tests -v

14. 性能验证 baseline

可生成合成医疗文本表格并记录各算子耗时,为后续 NPU/异构算力优化提供 CPU baseline:

PYTHONPATH=src python3 scripts/benchmark_pipeline.py --rows 1000 --repeat 3 --output-dir runs/benchmark

输出 runs/benchmark/benchmark_summary.json,包含端到端耗时、吞吐和每个算子的 duration_ms。若后续将实体抽取、正则匹配或聚合逻辑迁移到 NPU,可直接复用该脚本做 A/B 对比。

15. 评分点对应说明

架构设计合理性(5)

  • AgentPlannerExecutorOperatorIntegration Adapter 分层明确。
  • 算子无智能体状态依赖,便于替换为 DataMate 原生算子或远程算子。

智能体能力(10)

  • 自然语言任务解析为 ETL/抽取/KG/洞察意图。
  • 自动补齐依赖步骤,例如要求生成 KG 时自动加入实体抽取。
  • 生成可解释 DAG 和 DataMate workflow JSON。

功能完整性(10)

  • 支持 CSV/TSV/TXT。
  • 完整跑通:加载 → 校验 → 清洗 → 抽取 → KG → 洞察 → 保存。
  • 生成可验证输出文件与报告。

工程质量(5)

  • 无第三方依赖即可运行。
  • Docker、脚本、测试、示例数据齐全。
  • 错误和状态信息结构化输出。

16. 可扩展方向

  • 用小于 1B 的模型微调/蒸馏任务规划器,将 DataTaskPlanner._interpret 替换为小模型推理。
  • 将实体抽取替换为医疗 NER 模型,并通过 NPU 推理加速。
  • build_knowledge_graph 输出导入 Neo4j/TuGraph 等图数据库。
  • DataMateHTTPAdapter 对接真实 DataMate workflow runtime。
  • 将算子热点(正则抽取、聚合、图构建)迁移到 NPU/昇腾 CANN/Triton 类内核并进行性能对比。

17. 官方资料

关于

基于 Nexent/DataMate 的数据—知识—洞察智能体与 Ascend NPU 自定义算子实现

145.0 KB
邀请码
    Gitlink(确实开源)
  • 加入我们
  • 官网邮箱:gitlink@ccf.org.cn
  • QQ群
  • QQ群
  • 公众号
  • 公众号

版权所有:中国计算机学会技术支持:开源发展技术委员会
京ICP备13000930号-9 京公网安备 11010802047560号