目录

雾计算离散事件仿真框架

基于 SimPy 离散事件引擎 + NetworkX 网络拓扑的雾计算/边缘计算仿真框架。提供拓扑构建、服务放置、任务调度、性能监控四大核心能力,支持通过策略模式灵活扩展。

架构总览

┌─────────────────────────────────────────────────────────────┐
│                      Orchestrator                           │
│              (顶层编排器, 统一入口)                            │
├─────────────┬─────────────────┬─────────────────────────────┤
│    core/    │   topology/     │         fog/                │
│  仿真引擎    │   网络拓扑       │      雾计算业务               │
│             │                 │                              │
│  • Engine   │  • Builder      │  • Entity (FogNode, Task..) │
│  • Monitor  │  • Router       │  • Placement (策略模式)      │
│  (SimPy)    │  (NetworkX)     │  • Scheduler (策略模式)      │
└─────────────┴─────────────────┴─────────────────────────────┘
  • core/ — SimPy 仿真引擎封装 + 三类监控器(资源/任务/网络)
  • topology/ — NetworkX 拓扑构建器 + 最短路径路由计算
  • fog/ — 雾计算领域实体 + 可插拔的放置策略与调度策略
  • orchestrator.py — 三模块的统一编排,提供配置→运行→报告的完整生命周期

三个模块层级独立,无循环依赖。仅 fog/placement.py 依赖 topology/routing.py 的 Router。

快速开始

安装

pip install -r requirements.txt

依赖:simpy>=4.0 networkx>=3.0

最小示例

from orchestrator import Orchestrator
from topology.network import NodeSpec, LinkSpec
from fog.entities import Application, Service

# 1. 创建编排器
orch = Orchestrator(name="demo", until=100)

# 2. 构建拓扑:1云 + 1雾 + 1边缘
nodes = [
    NodeSpec(name="cloud", node_type="cloud", cpu_cores=16, ram=65536),
    NodeSpec(name="fog",   node_type="fog",   cpu_cores=4,  ram=8192),
    NodeSpec(name="edge",  node_type="edge",  cpu_cores=1,  ram=512),
]
links = [
    LinkSpec("cloud", "fog", bandwidth=10000, latency=15),
    LinkSpec("fog", "edge",  bandwidth=1000,  latency=2),
]
orch.build_topology(nodes, links)

# 3. 注册应用并放置服务
app = Application(name="iot-app", services=[
    Service(name="data-ingest",  cpu_demand=0.5, ram_demand=256,  exec_time=0.1),
    Service(name="data-process", cpu_demand=2.0, ram_demand=1024, exec_time=0.3),
])
orch.register_application(app)
orch.place_services()

# 4. 运行仿真
orch.run()

# 5. 查看报告
print(orch.report())

运行 python examples/00_minimal_demo.py 可执行此示例。

模块详解

core/ — 仿真引擎

SimulationEngine 封装 SimPy 环境,提供进程注册、事件控制、资源工厂:

from core.engine import SimulationEngine

engine = SimulationEngine(until=100)

def my_process(env, name):
    while True:
        yield env.timeout(5.0)
        print(f"[{env.now}] {name}")

engine.register(my_process, "worker-1")
engine.run()  # 阻塞至 t=100

三类监控器:

监控器 用途 关键方法
ResourceMonitor 记录节点 CPU/RAM 利用率 utilization(node), time_series(node, metric)
TaskMonitor 跟踪任务生命周期延迟/成功率 avg_latency(), success_rate()
NetworkMonitor 记录链路字节传输 link_usage(src, dst, bw, interval)

topology/ — 网络拓扑

TopologyBuilder 基于 NetworkX 构建网络图:

from topology.network import TopologyBuilder, NodeSpec, LinkSpec
from topology.routing import Router

tb = TopologyBuilder()
tb.add_node(NodeSpec(name="cloud-1", node_type="cloud", cpu_cores=64, cpu_speed=3.5, ram=262144))
tb.add_node(NodeSpec(name="fog-1",   node_type="fog",   cpu_cores=8,  cpu_speed=2.5, ram=16384))
tb.add_link(LinkSpec("cloud-1", "fog-1", bandwidth=10000, latency=20.0, link_type="wan"))

print(tb.summary())
print(tb.is_connected())

# 路径计算
router = Router(tb.graph)
path, cost = router.shortest_path("fog-1", "cloud-1")
print(f"最短路径: {' -> '.join(path)}, 延迟={cost:.1f}ms")

Router 提供最短路径、全对延迟矩阵、可达节点查询等功能。

NodeSpec 字段: name, node_type(cloud/fog/edge/switch), cpu_cores, cpu_speed, ram, storage, power_idle, power_max, x, y, meta

LinkSpec 字段: src, dst, bandwidth(Mbps), latency(ms), link_type, meta

fog/ — 雾计算业务逻辑

领域实体:

from fog.entities import FogNode, Application, Service, Task

# 定义服务
svc = Service(name="video-analyze", cpu_demand=2.0, ram_demand=1024,
              exec_time=0.3, data_size_in=500, data_size_out=200)

# 定义应用(一组服务的 DAG)
app = Application(name="smart-factory", services=[svc1, svc2, svc3])

# 创建任务
task = Task(task_id="t-001", app_name="smart-factory",
            service_name="video-analyze", deadline=10.0,
            data_payload=2.5)

# FogNode 运行时状态(编排器自动创建)
node = FogNode(name="fog-1", node_type="fog",
               total_cpu=20, total_ram=16384)
print(node.can_host(svc))   # True/False
node.allocate(svc)          # 扣减资源

放置策略(PlacementStrategy 抽象基类):

策略 算法 说明
ClosestPlacement 最小网络延迟 默认策略,选择离用户最近的节点
LeastLoadedPlacement 最小 CPU 利用率 需要传入 node_states
LatencyAwarePlacement 延迟+负载加权 latency_weight 控制权重(默认 0.6)
from fog.placement import LeastLoadedPlacement, LatencyAwarePlacement

orch.set_placement_strategy(LeastLoadedPlacement(orch.node_states))
# 或
orch.set_placement_strategy(LatencyAwarePlacement(orch.node_states, latency_weight=0.7))

调度策略(TaskScheduler 抽象基类):

策略 数据结构 说明
FIFOScheduler deque 先入先出(默认)
PriorityScheduler 最小堆 task.meta["priority"] 越小越优先
DeadlineScheduler 最小堆 最早截止时间优先 (EDF)
from fog.scheduler import PriorityScheduler, DeadlineScheduler

orch.set_scheduler(PriorityScheduler())
# 或
orch.set_scheduler(DeadlineScheduler())

完整示例:智能工厂

from orchestrator import Orchestrator
from topology.network import NodeSpec, LinkSpec
from fog.entities import Application, Service, Task
from fog.placement import LeastLoadedPlacement
from fog.scheduler import PriorityScheduler

orch = Orchestrator(name="智能工厂雾计算仿真", until=500)

# 拓扑:1云 + 2雾网关 + 3边缘设备
nodes = [
    NodeSpec(name="cloud",         node_type="cloud", cpu_cores=32, ram=131072),
    NodeSpec(name="fog-gateway-1", node_type="fog",   cpu_cores=8,  ram=16384),
    NodeSpec(name="fog-gateway-2", node_type="fog",   cpu_cores=8,  ram=16384),
    NodeSpec(name="edge-sensor-1", node_type="edge",  cpu_cores=1,  ram=512),
    NodeSpec(name="edge-sensor-2", node_type="edge",  cpu_cores=1,  ram=512),
    NodeSpec(name="edge-camera-1", node_type="edge",  cpu_cores=2,  ram=1024),
]
links = [
    LinkSpec("cloud", "fog-gateway-1", bandwidth=10000, latency=15.0, link_type="wan"),
    LinkSpec("cloud", "fog-gateway-2", bandwidth=10000, latency=15.0, link_type="wan"),
    LinkSpec("fog-gateway-1", "fog-gateway-2", bandwidth=5000, latency=3.0),
    LinkSpec("fog-gateway-1", "edge-sensor-1", bandwidth=100, latency=1.0, link_type="wireless"),
    LinkSpec("fog-gateway-1", "edge-sensor-2", bandwidth=100, latency=1.0, link_type="wireless"),
    LinkSpec("fog-gateway-2", "edge-camera-1", bandwidth=500, latency=2.0, link_type="wireless"),
]
orch.build_topology(nodes, links)

# 视频分析应用(3 个微服务)
app = Application(name="video-analytics", services=[
    Service(name="frame-capture",    cpu_demand=0.5, ram_demand=256,  exec_time=0.05),
    Service(name="object-detection", cpu_demand=2.0, ram_demand=1024, exec_time=0.3),
    Service(name="alert-service",    cpu_demand=0.2, ram_demand=128,  exec_time=0.02),
])
orch.register_application(app)

# 自定义策略
orch.set_placement_strategy(LeastLoadedPlacement(orch.node_states))
orch.set_scheduler(PriorityScheduler())
orch.place_services()

# 手动注入高优先级告警
urgent = Task(task_id="alarm-001", app_name="video-analytics",
              service_name="alert-service", deadline=10.0,
              meta={"priority": 0})  # priority=0 最高优先
orch.submit_task(urgent)

# 运行并查看结果
orch.run()
print(orch.report())

# 结构化指标(用于下游分析)
metrics = orch.metrics()
# {'simulation_time': 500.0, 'topology_nodes': 6, 'topology_edges': 6,
#  'avg_latency': 12.3, 'success_rate': 0.98, 'cpu_utilization': 0.45}

运行 python examples/03_fog_computing.py 可执行此完整示例。

Orchestrator API

方法 阶段 说明
build_topology(nodes, links) 配置 构建网络拓扑,初始化 FogNode 状态
register_application(app) 配置 注册应用及其服务
set_placement_strategy(s) 配置 设置放置策略(默认 ClosestPlacement)
set_scheduler(s) 配置 设置调度策略(默认 FIFO)
place_services() 配置 为所有服务选择部署节点
run(until=None) 运行 启动仿真(注册后台进程并启动引擎)
submit_task(task) 运行 手动注入外部任务
now() 运行 返回当前仿真时间
report() 报告 返回人类可读的多行文本报告
metrics() 报告 返回结构化 dict(simulation_time, avg_latency, success_rate, cpu_utilization…)

扩展指南

添加新放置策略

from fog.placement import PlacementStrategy

class EnergyAwarePlacement(PlacementStrategy):
    def __init__(self, node_states):
        self.node_states = node_states

    def select(self, service_name, candidates, router, user_node):
        # 选择功耗最低的节点
        best = min(candidates, key=lambda n: self.node_states[n].power_consumed)
        return best

orch.set_placement_strategy(EnergyAwarePlacement(orch.node_states))

添加新调度策略

from fog.scheduler import TaskScheduler
import heapq

class ShortestJobFirstScheduler(TaskScheduler):
    def __init__(self, service_map):
        self._heap = []
        self._counter = 0
        self._service_map = service_map  # service_name -> Service

    def enqueue(self, task):
        svc = self._service_map.get(task.service_name)
        exec_time = svc.exec_time if svc else float("inf")
        heapq.heappush(self._heap, (exec_time, self._counter, task))
        self._counter += 1

    def dequeue(self):
        return heapq.heappop(self._heap)[2] if self._heap else None

    def queue_size(self):
        return len(self._heap)

项目结构

simulation/
├── README.md
├── requirements.txt
├── orchestrator.py          # 顶层编排器
├── core/
│   ├── engine.py            # SimPy 仿真引擎封装
│   └── monitors.py          # ResourceMonitor / TaskMonitor / NetworkMonitor
├── topology/
│   ├── network.py           # TopologyBuilder (NetworkX 封装) + NodeSpec / LinkSpec
│   └── routing.py           # Router (最短路径、延迟矩阵、可达节点)
├── fog/
│   ├── entities.py          # FogNode / Application / Service / Task
│   ├── placement.py         # 放置策略 (Closest / LeastLoaded / LatencyAware)
│   └── scheduler.py         # 调度策略 (FIFO / Priority / Deadline)
└── examples/
    ├── 00_minimal_demo.py   # 最小端到端示例
    ├── 01_basic_topology.py # 纯拓扑层演示
    ├── 02_event_simulation.py # 纯 SimPy 引擎演示
    └── 03_fog_computing.py  # 完整三层集成示例
关于
49.0 KB
邀请码
    Gitlink(确实开源)
  • 加入我们
  • 官网邮箱:gitlink@ccf.org.cn
  • QQ群
  • QQ群
  • 公众号
  • 公众号

版权所有:中国计算机学会技术支持:开源发展技术委员会
京ICP备13000930号-9 京公网安备 11010802047560号