GET /integrations/datamate/health
POST /integrations/datamate/register-workflow
POST /integrations/datamate/execute-workflow
GET /integrations/nexent/health
POST /integrations/nexent/register-tool
POST /integrations/nexent/mcp-config
POST /integrations/nexent/register-mcp
POST /integrations/nexent/invoke-agent
7. 可选小模型 / 医疗 NER
datamate.extract_medical_entities 现在支持四种后端:
dictionary:默认配置词典 + 正则,离线可复现。
tiny:本仓库内置的无依赖 Tiny Medical NER 小模型,模型文件为 configs/tiny_medical_ner_model.json。
transformers:使用 HuggingFace token-classification 医疗 NER/通用 NER 小模型。
Nexu DataMate Agent:基于 Nexent 的“数据—知识—洞察”数据处理智能体 Demo
本仓库实现赛题任务一:基于 Nexent 的数据处理智能体实现。项目用一个可运行、可复现的 Python Demo 展示:智能体理解自然语言数据处理任务,自动规划 DataMate 风格算子 DAG,执行医疗文本/表格 ETL、医学实体抽取、医疗知识图谱三元组生成和洞察报告输出。
1. 功能覆盖
data/raw/medical_notes.csv和data/raw/medical_notes.txtDataTaskPlanner根据自然语言任务识别 ETL、抽取、KG、洞察意图OperatorRun记录每步状态、耗时、指标、错误;失败时跳过下游datamate_workflow.json;提供 Nexent 工具 schema、HTTP/nexent/invoke与 MCPtools/call2. 架构
flowchart LR U["用户/Nexent Agent\n自然语言任务"] --> P["DataTaskPlanner\n任务理解与DAG规划"] P --> W["DataMate Workflow JSON\n可导入/适配"] P --> E["PlanExecutor\n依赖调度/状态跟踪"] E --> O1["load_csv/load_text"] O1 --> O2["validate_schema"] O2 --> O3["clean_table"] O3 --> O4["extract_medical_entities"] O4 --> O5["build_knowledge_graph"] O5 --> O6["generate_insights"] O6 --> S["JSON/CSV/Markdown 输出"] N["Nexent HTTP/MCP Tool"] --> P D["可选 DataMate HTTP Adapter"] -.-> E核心模块:
src/nexu_datamate_agent/planner.py:任务理解与算子 DAG 规划。src/nexu_datamate_agent/operators.py:DataMate 风格算子实现。src/nexu_datamate_agent/executor.py:拓扑执行、状态跟踪、异常处理。src/nexu_datamate_agent/agent.py:对外智能体接口。src/nexu_datamate_agent/api_server.py:HTTP API/Nexent 自定义工具接口。src/nexu_datamate_agent/mcp_server.py:最小 MCP stdio server。src/nexu_datamate_agent/integrations.py:Nexent/DataMate 适配层。3. 快速运行
3.1 本地 Python 运行
无需第三方依赖,Python 3.9+ 即可。
或使用脚本:
3.2 查看输出
运行后会生成:
3.3 Docker 运行
另开终端调用 API:
或:
4. 接入 DeepSeek Flash 任务规划器
默认情况下,本项目使用本地规则规划器,保证无网络、无模型 key 也能复现。现在已接入可选的 DeepSeek Flash 规划增强:设置环境变量并添加
--llm-planner后,智能体会先调用deepseek-v4-flash直接生成完整 DataMate workflow DAG;系统会校验算子 ID、参数、依赖和环路,如果 API 不可用或 DAG 不合法,会自动回退到本地规则规划器。配置:
CLI 调用:
脚本调用:
API 调用时传入:
也可以用全局开关:
LLM 输出的计划会落盘到
agent_plan.json和datamate_workflow.json。其中datamate_workflow.json的nodes/edges来自 DeepSeek 生成并通过本地校验后的 DAG,而不是固定 pipeline。5. 配置化医学词典与算子目录
医学词典、lab 规则、KG 谓词和洞察建议已经从代码拆出:
可配置内容包括:
entity_patterns:疾病、症状、药物、检查等别名词典。lab_patterns:检验指标正则、单位、置信度。lab_flags:异常检验阈值和提示语。kg_predicates:实体类型到知识图谱关系的映射。recommendation_rules:洞察报告建议规则。算子目录也已经从代码拆出:
运行时可通过环境变量替换:
JSON 默认可直接用;如果安装了 PyYAML,也支持
.yaml/.yml。6. 真实 DataMate / Nexent 服务接入
本项目现在提供真实服务适配层:
DataMate 官方 Docker Compose 拆为 gateway 和 Python backend。本适配器同时支持:
DATAMATE_BASE_URL:gateway,例如http://127.0.0.1:18080DATAMATE_PYTHON_BASE_URL:Python backend,例如http://127.0.0.1:18000当前官方开源栈优先暴露 operator/cleaning API;未配置通用 pipeline import 路径时,
datamate-register会把本项目 workflow 中的算子元数据导入/api/operators/create。DataMate 相关命令:
Nexent 官方 Docker Compose 是多服务拆分:config API 默认
5010,runtime/Agent API 默认5014,northbound/A2A 默认5013,MCP SSE/management 默认5011/5015。本项目适配器优先读取NEXENT_CONFIG_BASE_URL和NEXENT_RUNTIME_BASE_URL;如果只设置旧的NEXENT_BASE_URL,则会作为两者的 fallback。真实 Nexent Agent smoke(无外部 LLM key 的可复现路径):
上述 smoke 在真实 Nexent
POST /agent/run中应能看到parse、token_count、final_answer事件,且本项目输出目录包含run_summary.json、entities.json、kg_triples.json、insight_report.md等文件。官方服务源码准备脚本:
如果服务器已有 8080/3000/5432 等服务,使用生成的 override 文件避免端口冲突。例如:
将本 Agent 注册到真实服务:
官方 API 映射:
POST /api/v1/pipelines,payload 为CreatePipelineRequest。POST /api/v1/pipelines/{pipelineId}/execute。DATAMATE_WORKFLOW_REGISTER_PATH,当前默认走官方 Python backend 的/api/operators/create导入算子目录。POST /mcp/add-from-config,payload 为官方AddContainerMcpServiceRequest。POST /tool/openapi_service。POST /agent/run,payload 使用官方AgentRequest字段:query、agent_id、conversation_id、history等。如果目标部署的 REST 路径不同,可以通过
.env.example中的DATAMATE_*_PATH和NEXENT_*_PATH修改端点模板,不需要改代码。HTTP API 也暴露了:7. 可选小模型 / 医疗 NER
datamate.extract_medical_entities现在支持四种后端:dictionary:默认配置词典 + 正则,离线可复现。tiny:本仓库内置的无依赖 Tiny Medical NER 小模型,模型文件为configs/tiny_medical_ner_model.json。transformers:使用 HuggingFace token-classification 医疗 NER/通用 NER 小模型。hybrid:词典/正则 + tiny/transformers 小模型结果融合。训练/更新内置 Tiny NER:
如需 HuggingFace 医疗 NER 小模型,安装可选依赖:
配置示例:
在没有
transformers时,系统不会影响主流程;可以使用内置tiny后端或默认配置化医学词典。8. NPU 优化与性能对比
实体抽取增加了候选词预筛加速路径:
npu_prefilter会尝试检测torch_npu/ Ascend NPU;可用时使用 NPU 做批量候选矩阵计算,不可用时自动回退 CPU 并在 benchmark 里记录原因。custom_prefilter对应本仓库设计的 Ascend C 自定义算子NexuEntityPrefilter:docs/custom_npu_operator.mdcustom_ops/entity_prefilter/custom_ops/entity_prefilter/scripts/build_custom_op.shtorch_npusemantic reference 或 CPU 路径保持端到端可跑。性能对比脚本:
输出包含:
9. CLI 命令
10. Nexent 集成方式
Nexent 可通过工具/MCP 接入外部能力。本 Demo 暴露一个工具:
5.1 HTTP 工具方式
启动服务:
工具元数据:
调用:
5.2 MCP 方式
在支持 MCP 的 Nexent/Agent Host 中配置 stdio command:
本仓库的 MCP server 支持:
initializetools/listtools/call5.3 真实 Nexent 官方注册
官方 Nexent 推荐通过 MCP 或 OpenAPI service 导入外部工具:
11. DataMate 集成方式
本 Demo 的算子接口使用 DataMate 风格元数据:
执行后会导出:
agent_plan.json:内部执行计划。datamate_workflow.json:节点/边/参数形式的 workflow,可按目标 DataMate 版本转换导入。如果已有 DataMate 服务,可设置:
然后在
DataMateHTTPAdapter中将本地算子调用替换为远端调用。默认保留本地执行以保证赛题验收可复现。12. 典型输出示例
insight_report.md摘要会包含:13. 测试
14. 性能验证 baseline
可生成合成医疗文本表格并记录各算子耗时,为后续 NPU/异构算力优化提供 CPU baseline:
输出
runs/benchmark/benchmark_summary.json,包含端到端耗时、吞吐和每个算子的duration_ms。若后续将实体抽取、正则匹配或聚合逻辑迁移到 NPU,可直接复用该脚本做 A/B 对比。15. 评分点对应说明
架构设计合理性(5)
Agent、Planner、Executor、Operator、Integration Adapter分层明确。智能体能力(10)
功能完整性(10)
工程质量(5)
16. 可扩展方向
DataTaskPlanner._interpret替换为小模型推理。build_knowledge_graph输出导入 Neo4j/TuGraph 等图数据库。DataMateHTTPAdapter对接真实 DataMate workflow runtime。17. 官方资料