init
基于 SimPy 离散事件引擎 + NetworkX 网络拓扑的雾计算/边缘计算仿真框架。提供拓扑构建、服务放置、任务调度、性能监控四大核心能力,支持通过策略模式灵活扩展。
┌─────────────────────────────────────────────────────────────┐ │ Orchestrator │ │ (顶层编排器, 统一入口) │ ├─────────────┬─────────────────┬─────────────────────────────┤ │ core/ │ topology/ │ fog/ │ │ 仿真引擎 │ 网络拓扑 │ 雾计算业务 │ │ │ │ │ │ • Engine │ • Builder │ • Entity (FogNode, Task..) │ │ • Monitor │ • Router │ • Placement (策略模式) │ │ (SimPy) │ (NetworkX) │ • Scheduler (策略模式) │ └─────────────┴─────────────────┴─────────────────────────────┘
core/
topology/
fog/
orchestrator.py
三个模块层级独立,无循环依赖。仅 fog/placement.py 依赖 topology/routing.py 的 Router。
fog/placement.py
topology/routing.py
pip install -r requirements.txt
依赖:simpy>=4.0 networkx>=3.0
simpy>=4.0
networkx>=3.0
from orchestrator import Orchestrator from topology.network import NodeSpec, LinkSpec from fog.entities import Application, Service # 1. 创建编排器 orch = Orchestrator(name="demo", until=100) # 2. 构建拓扑:1云 + 1雾 + 1边缘 nodes = [ NodeSpec(name="cloud", node_type="cloud", cpu_cores=16, ram=65536), NodeSpec(name="fog", node_type="fog", cpu_cores=4, ram=8192), NodeSpec(name="edge", node_type="edge", cpu_cores=1, ram=512), ] links = [ LinkSpec("cloud", "fog", bandwidth=10000, latency=15), LinkSpec("fog", "edge", bandwidth=1000, latency=2), ] orch.build_topology(nodes, links) # 3. 注册应用并放置服务 app = Application(name="iot-app", services=[ Service(name="data-ingest", cpu_demand=0.5, ram_demand=256, exec_time=0.1), Service(name="data-process", cpu_demand=2.0, ram_demand=1024, exec_time=0.3), ]) orch.register_application(app) orch.place_services() # 4. 运行仿真 orch.run() # 5. 查看报告 print(orch.report())
运行 python examples/00_minimal_demo.py 可执行此示例。
python examples/00_minimal_demo.py
SimulationEngine 封装 SimPy 环境,提供进程注册、事件控制、资源工厂:
SimulationEngine
from core.engine import SimulationEngine engine = SimulationEngine(until=100) def my_process(env, name): while True: yield env.timeout(5.0) print(f"[{env.now}] {name}") engine.register(my_process, "worker-1") engine.run() # 阻塞至 t=100
三类监控器:
ResourceMonitor
utilization(node)
time_series(node, metric)
TaskMonitor
avg_latency()
success_rate()
NetworkMonitor
link_usage(src, dst, bw, interval)
TopologyBuilder 基于 NetworkX 构建网络图:
TopologyBuilder
from topology.network import TopologyBuilder, NodeSpec, LinkSpec from topology.routing import Router tb = TopologyBuilder() tb.add_node(NodeSpec(name="cloud-1", node_type="cloud", cpu_cores=64, cpu_speed=3.5, ram=262144)) tb.add_node(NodeSpec(name="fog-1", node_type="fog", cpu_cores=8, cpu_speed=2.5, ram=16384)) tb.add_link(LinkSpec("cloud-1", "fog-1", bandwidth=10000, latency=20.0, link_type="wan")) print(tb.summary()) print(tb.is_connected()) # 路径计算 router = Router(tb.graph) path, cost = router.shortest_path("fog-1", "cloud-1") print(f"最短路径: {' -> '.join(path)}, 延迟={cost:.1f}ms")
Router 提供最短路径、全对延迟矩阵、可达节点查询等功能。
Router
NodeSpec 字段: name, node_type(cloud/fog/edge/switch), cpu_cores, cpu_speed, ram, storage, power_idle, power_max, x, y, meta
NodeSpec
name, node_type(cloud/fog/edge/switch), cpu_cores, cpu_speed, ram, storage, power_idle, power_max, x, y, meta
LinkSpec 字段: src, dst, bandwidth(Mbps), latency(ms), link_type, meta
LinkSpec
src, dst, bandwidth(Mbps), latency(ms), link_type, meta
领域实体:
from fog.entities import FogNode, Application, Service, Task # 定义服务 svc = Service(name="video-analyze", cpu_demand=2.0, ram_demand=1024, exec_time=0.3, data_size_in=500, data_size_out=200) # 定义应用(一组服务的 DAG) app = Application(name="smart-factory", services=[svc1, svc2, svc3]) # 创建任务 task = Task(task_id="t-001", app_name="smart-factory", service_name="video-analyze", deadline=10.0, data_payload=2.5) # FogNode 运行时状态(编排器自动创建) node = FogNode(name="fog-1", node_type="fog", total_cpu=20, total_ram=16384) print(node.can_host(svc)) # True/False node.allocate(svc) # 扣减资源
放置策略(PlacementStrategy 抽象基类):
PlacementStrategy
ClosestPlacement
LeastLoadedPlacement
node_states
LatencyAwarePlacement
latency_weight
from fog.placement import LeastLoadedPlacement, LatencyAwarePlacement orch.set_placement_strategy(LeastLoadedPlacement(orch.node_states)) # 或 orch.set_placement_strategy(LatencyAwarePlacement(orch.node_states, latency_weight=0.7))
调度策略(TaskScheduler 抽象基类):
TaskScheduler
FIFOScheduler
deque
PriorityScheduler
task.meta["priority"]
DeadlineScheduler
from fog.scheduler import PriorityScheduler, DeadlineScheduler orch.set_scheduler(PriorityScheduler()) # 或 orch.set_scheduler(DeadlineScheduler())
from orchestrator import Orchestrator from topology.network import NodeSpec, LinkSpec from fog.entities import Application, Service, Task from fog.placement import LeastLoadedPlacement from fog.scheduler import PriorityScheduler orch = Orchestrator(name="智能工厂雾计算仿真", until=500) # 拓扑:1云 + 2雾网关 + 3边缘设备 nodes = [ NodeSpec(name="cloud", node_type="cloud", cpu_cores=32, ram=131072), NodeSpec(name="fog-gateway-1", node_type="fog", cpu_cores=8, ram=16384), NodeSpec(name="fog-gateway-2", node_type="fog", cpu_cores=8, ram=16384), NodeSpec(name="edge-sensor-1", node_type="edge", cpu_cores=1, ram=512), NodeSpec(name="edge-sensor-2", node_type="edge", cpu_cores=1, ram=512), NodeSpec(name="edge-camera-1", node_type="edge", cpu_cores=2, ram=1024), ] links = [ LinkSpec("cloud", "fog-gateway-1", bandwidth=10000, latency=15.0, link_type="wan"), LinkSpec("cloud", "fog-gateway-2", bandwidth=10000, latency=15.0, link_type="wan"), LinkSpec("fog-gateway-1", "fog-gateway-2", bandwidth=5000, latency=3.0), LinkSpec("fog-gateway-1", "edge-sensor-1", bandwidth=100, latency=1.0, link_type="wireless"), LinkSpec("fog-gateway-1", "edge-sensor-2", bandwidth=100, latency=1.0, link_type="wireless"), LinkSpec("fog-gateway-2", "edge-camera-1", bandwidth=500, latency=2.0, link_type="wireless"), ] orch.build_topology(nodes, links) # 视频分析应用(3 个微服务) app = Application(name="video-analytics", services=[ Service(name="frame-capture", cpu_demand=0.5, ram_demand=256, exec_time=0.05), Service(name="object-detection", cpu_demand=2.0, ram_demand=1024, exec_time=0.3), Service(name="alert-service", cpu_demand=0.2, ram_demand=128, exec_time=0.02), ]) orch.register_application(app) # 自定义策略 orch.set_placement_strategy(LeastLoadedPlacement(orch.node_states)) orch.set_scheduler(PriorityScheduler()) orch.place_services() # 手动注入高优先级告警 urgent = Task(task_id="alarm-001", app_name="video-analytics", service_name="alert-service", deadline=10.0, meta={"priority": 0}) # priority=0 最高优先 orch.submit_task(urgent) # 运行并查看结果 orch.run() print(orch.report()) # 结构化指标(用于下游分析) metrics = orch.metrics() # {'simulation_time': 500.0, 'topology_nodes': 6, 'topology_edges': 6, # 'avg_latency': 12.3, 'success_rate': 0.98, 'cpu_utilization': 0.45}
运行 python examples/03_fog_computing.py 可执行此完整示例。
python examples/03_fog_computing.py
build_topology(nodes, links)
register_application(app)
set_placement_strategy(s)
set_scheduler(s)
place_services()
run(until=None)
submit_task(task)
now()
report()
metrics()
from fog.placement import PlacementStrategy class EnergyAwarePlacement(PlacementStrategy): def __init__(self, node_states): self.node_states = node_states def select(self, service_name, candidates, router, user_node): # 选择功耗最低的节点 best = min(candidates, key=lambda n: self.node_states[n].power_consumed) return best orch.set_placement_strategy(EnergyAwarePlacement(orch.node_states))
from fog.scheduler import TaskScheduler import heapq class ShortestJobFirstScheduler(TaskScheduler): def __init__(self, service_map): self._heap = [] self._counter = 0 self._service_map = service_map # service_name -> Service def enqueue(self, task): svc = self._service_map.get(task.service_name) exec_time = svc.exec_time if svc else float("inf") heapq.heappush(self._heap, (exec_time, self._counter, task)) self._counter += 1 def dequeue(self): return heapq.heappop(self._heap)[2] if self._heap else None def queue_size(self): return len(self._heap)
simulation/ ├── README.md ├── requirements.txt ├── orchestrator.py # 顶层编排器 ├── core/ │ ├── engine.py # SimPy 仿真引擎封装 │ └── monitors.py # ResourceMonitor / TaskMonitor / NetworkMonitor ├── topology/ │ ├── network.py # TopologyBuilder (NetworkX 封装) + NodeSpec / LinkSpec │ └── routing.py # Router (最短路径、延迟矩阵、可达节点) ├── fog/ │ ├── entities.py # FogNode / Application / Service / Task │ ├── placement.py # 放置策略 (Closest / LeastLoaded / LatencyAware) │ └── scheduler.py # 调度策略 (FIFO / Priority / Deadline) └── examples/ ├── 00_minimal_demo.py # 最小端到端示例 ├── 01_basic_topology.py # 纯拓扑层演示 ├── 02_event_simulation.py # 纯 SimPy 引擎演示 └── 03_fog_computing.py # 完整三层集成示例
版权所有:中国计算机学会技术支持:开源发展技术委员会 京ICP备13000930号-9 京公网安备 11010802047560号
雾计算离散事件仿真框架
基于 SimPy 离散事件引擎 + NetworkX 网络拓扑的雾计算/边缘计算仿真框架。提供拓扑构建、服务放置、任务调度、性能监控四大核心能力,支持通过策略模式灵活扩展。
架构总览
core/— SimPy 仿真引擎封装 + 三类监控器(资源/任务/网络)topology/— NetworkX 拓扑构建器 + 最短路径路由计算fog/— 雾计算领域实体 + 可插拔的放置策略与调度策略orchestrator.py— 三模块的统一编排,提供配置→运行→报告的完整生命周期三个模块层级独立,无循环依赖。仅
fog/placement.py依赖topology/routing.py的 Router。快速开始
安装
依赖:
simpy>=4.0networkx>=3.0最小示例
运行
python examples/00_minimal_demo.py可执行此示例。模块详解
core/— 仿真引擎SimulationEngine封装 SimPy 环境,提供进程注册、事件控制、资源工厂:三类监控器:
ResourceMonitorutilization(node),time_series(node, metric)TaskMonitoravg_latency(),success_rate()NetworkMonitorlink_usage(src, dst, bw, interval)topology/— 网络拓扑TopologyBuilder基于 NetworkX 构建网络图:Router提供最短路径、全对延迟矩阵、可达节点查询等功能。NodeSpec字段:name, node_type(cloud/fog/edge/switch), cpu_cores, cpu_speed, ram, storage, power_idle, power_max, x, y, metaLinkSpec字段:src, dst, bandwidth(Mbps), latency(ms), link_type, metafog/— 雾计算业务逻辑领域实体:
放置策略(
PlacementStrategy抽象基类):ClosestPlacementLeastLoadedPlacementnode_statesLatencyAwarePlacementlatency_weight控制权重(默认 0.6)调度策略(
TaskScheduler抽象基类):FIFOSchedulerdequePrioritySchedulertask.meta["priority"]越小越优先DeadlineScheduler完整示例:智能工厂
运行
python examples/03_fog_computing.py可执行此完整示例。Orchestrator API
build_topology(nodes, links)register_application(app)set_placement_strategy(s)set_scheduler(s)place_services()run(until=None)submit_task(task)now()report()metrics()扩展指南
添加新放置策略
添加新调度策略
项目结构