目录

video_yolov4_tiny

基于 PNNA NPU 的 YOLOv4-tiny 视频目标检测系统,使用 3 线程流水线实现并行加速,支持 TCP 断线自动重连。

架构

主线程 (I/O + 显示)
  │
  ├── pre_q ──> 线程1 (预处理)
  │                 │
  │                 ├── infer_q ──> 线程2 (推理)
  │                                      │
  │                                      ├── post_q ──> 线程3 (后处理 + 画框)
  │                                                           │
  │                                                           ├── display_q ──> 主线程显示
  └───────────────────────────────────────────────────────────────┘

队列设计

所有队列使用 queue.Queue(maxsize=N) 限制内存:

队列 容量 用途
pre_q 2 主线程 → 预处理线程
infer_q 2 预处理线程 → 推理线程
post_q 2 推理线程 → 后处理线程
display_q 4 后处理线程 → 主线程显示

线程职责

线程1 — 预处理 (_preprocess_worker)

  • pre_q 取原始帧
  • resize 到 416×416 → BGR→RGB → HWC→CHW → 扁平化为 uint8 字节
  • 保留原始分辨率帧,送入 infer_q

线程2 — 推理 (_infer_worker)

  • infer_q 取预处理数据
  • 调用 PNNAClient.infer() 发送到 NPU 并阻塞等待结果
  • 唯一访问 socket 的线程,天然串行化 NPU 访问(NPU 不支持流水线)
  • 检测到断连时,触发协调关闭所有工作线程,通知主线程重连

线程3 — 后处理 + 画框 (_postprocess_worker)

  • post_q 取 NPU 原始输出
  • 解码 uint8 张量 → YOLO 后处理(含 DIoU NMS)→ 检测结果
  • 原始分辨率图像上绘制边界框和标签
  • 送入 display_q

主线程

  • 打开视频 → 连接 NPU → 启动 3 个 daemon 线程
  • 循环:读帧 → 检查连接状态 → push_frame() 超时轮询 → 排空 display_q 取最新结果 → 显示
  • 检测到断连后,停止管线 → 重连 → 创建新管线继续
  • 键盘控制:q/ESC 退出,+/- 缩放,0 重置缩放

关键设计决策

  1. 阻塞推送push_frame 使用阻塞 put(),管线满时主线程等待,自动将读取速率与处理速率对齐,不丢帧
  2. NPU 串行化:仅线程2 持有 PNNAClient socket 引用,阻塞式推理天然保证同一时刻只有一个推理在飞
  3. 显示队列排空:主线程每次循环排空 display_q,只显示最新完成帧,避免积压
  4. OpenCV GUIcv2.imshowcv2.waitKey 必须在主线程调用(Windows 要求),已遵守
  5. 有界队列:防止预处理/推理/后处理速率不匹配导致内存暴涨
  6. 模型预加载:DSP 端启动时一次性加载模型并常驻内存,上位机不再发送模型数据(model_preloaded=True

断线重连机制

对应 DSP 端固件的断线重连改造(模型常驻 + 循环 accept + TCP Keep-Alive + 链路监测),上位机实现了对应的自动重连功能。

DSP 端关键参数

DSP 参数 对上位机的意义
TCP Keep-Alive (idle/intvl/cnt) 3s / 1s / 3次 DSP 最快 ~6s 检测到死连接并关闭 socket
SO_RCVTIMEO 5s DSP recv() 等待数据超时
链路监测轮询 每 3s 硬件断线检测延迟
accept() 循环 即时 断连后 DSP 立刻回到 accept 等待新连接

上位机重连架构

1. communication.py — 核心重连层

  • ConnectionLostError 异常类:区分”连接断开”和”超时”,前者需重连,后者可重试
  • Socket Keep-Alive:启用 SO_KEEPALIVE(跨平台),在支持的平台设置 TCP_KEEPIDLE=3s / TCP_KEEPINTVL=1s / TCP_KEEPCNT=3
  • **非阻塞发送 + select()**:_send_request() 将 socket 设为非阻塞,使用 send() + select() 实现 5 秒硬截止时间的可靠发送。解决了 Windows 上 sendall() 在 TCP 缓冲区满时不响应 settimeout() 的问题
  • **connect_with_retry()**:指数退避重连(1s → 2s → 4s → 8s → 15s 封顶),max_attempts=0 表示无限重试
  • **reconnect()**:关闭旧 socket → 无限重试连接 → 成功后重建 msgpack.Unpacker
  • **connection_lost**:threading.Event,推理线程在检测到断连时置位,主线程轮询此标志
  • is_connected 属性:快速查询连接状态
  • **model_preloaded**:为 Trueadd_model() / release_model() 变为空操作

2. thread_pipeline.py — 推理线程感知断连

  • _infer_worker 捕获 ConnectionLostError
    • 设置 connection_lost.set() 通知主线程
    • 设置 shutdown_event.set() 协调关闭所有工作线程
    • 发送 None 哨兵值解除下游线程阻塞
    • 退出线程
  • 超时处理infer() 返回 None(推理超时)仅跳过当前帧,不触发关闭
  • **push_frame(timeout=0.5)**:队列满时 0.5 秒超时返回,主线程可轮询连接状态和键盘事件

3. main_video.py — 外层重连循环

run_once() 外层 while True:   ← 重连循环
  ├── 创建 VideoPipeline
  ├── pipeline.start()
  ├── connection_lost.clear()
  │
  └── 内层 while True:        ← 逐帧处理
        ├── 检查 connection_lost(前置检测)
        ├── cap.read()
        ├── push_frame(timeout=0.5)  ← 超时轮询 connection_lost + 键盘
        ├── 检查 connection_lost(push 后检测)
        ├── 排空 display_q → 显示
        ├── cv2.waitKey(1) → 键盘处理
        └── 若 connection_lost.is_set():
              → pipeline.stop()
              → client.reconnect()
              → break 回到外层
  • 断连检测前置:推帧前检查 connection_lost,避免向已断开管线推帧
  • 超时推帧:0.5s 超时,期间轮询连接状态和键盘,界面始终响应
  • 视频位置保持:重连期间 capframe_idx 保持不变,恢复后从断点继续

4. main.py — 单图检测重试

  • 推理包裹重试循环(最多 3 次):
    • ConnectionLostErrorclient.reconnect() → 重试
    • 推理超时 → time.sleep(1) → 重试

完整重连时序

T0:     正常推理中
T1:     网线断开
T2:     DSP Keep-Alive 检测(~6s)→ DSP 关闭 socket → 回到 accept()
T3:     上位机 infer() → recv() 超时 10s → 返回 None → 跳过帧
T4:     下一帧 infer() → _send_request() → 非阻塞 send → 缓冲区满
        → select() 等待 5s → deadline 到期 → ConnectionLostError
T5:     _infer_worker 捕获 → connection_lost.set() + shutdown_event.set()
T6:     主线程 push_frame() 超时 → 检测到 connection_lost.is_set()
        → pipeline.stop() → client.reconnect()
        → connect 指数退避重试: 1s, 2s, 4s, 8s, 15s...
T7:     DSP 网络恢复,accept() 接受新连接
T8:     主机重连成功 → 创建新 pipeline → 继续推理

总恢复时间:约 15-17 秒(最坏情况)

线程安全

共享资源 访问者 安全机制
client.sock 主线程(reconnect) + 推理线程(infer) reconnect 前先 pipeline.stop() 确保推理线程已退出
client._connected 多线程 CPython GIL 下 bool 赋值是原子的
client.connection_lost 主线程 + 推理线程 threading.Event 内置线程安全
client.unpacker 仅推理线程 单线程访问,reconnect 后新建
Pipeline 队列 主线程 + 工作线程 queue.Queue 内置线程安全
pipeline._shutdown 主线程 + 推理线程 threading.Event 内置线程安全

项目结构

├── main.py                  # 单图检测入口(支持重试)
├── main_video.py            # 视频检测入口(3线程并行 + 断线重连)
├── thread_pipeline.py       # 3线程流水线核心实现
├── communication.py         # PNNA NPU TCP 通信协议(含重连机制)
├── preprocess.py            # 图像预处理(resize/色彩转换/布局变换)
├── yolov4_tiny.py           # YOLOv4-tiny 后处理(DIoU NMS)
├── visualization.py         # 检测结果可视化 + 缩放显示
├── resource/
│   └── yolov4-tiny/
│       ├── yolov4-tiny_u8.nb   # 量化模型文件
│       ├── 3.mp4               # 测试视频
│       ├── dog.jpg             # 测试图片
│       └── input_0.dat         # 测试输入
└── README.md

配置

main_video.py 中修改:

TARGET_IP = "192.168.23.102"   # NPU 设备 IP
MODEL_PATH = "./resource/yolov4-tiny/yolov4-tiny_u8.nb"
VIDEO_PATH = "./resource/yolov4-tiny/3.mp4"

DSP 端固件需为新版(模型常驻 + 循环 accept),上位机使用 model_preloaded=True 模式。

运行

# 视频检测(3线程并行 + 断线重连)
python main_video.py

# 单图检测
python main.py

键盘快捷键

按键 功能
q / ESC 退出
+ / = 放大显示
- 缩小显示
0 重置缩放

数据流

cap.read() 原始帧 (H×W×3)
  │
  ├─ push_frame(timeout=0.5) ──> [pre_q:2]
  │                                    │
  │                                    └─ 线程1: resize(416×416) → BGR→RGB → HWC→CHW → bytes
  │                                                              │
  │                                     保留原始帧 + bytes ──> [infer_q:2]
  │                                                              │
  │                                                              └─ 线程2: client.infer(bytes) ──> NPU
  │                                                                                   │
  │                                                         原始帧 + raw_outputs ──> [post_q:2]
  │                                                                                   │
  │                                                                                   └─ 线程3: decode → YOLO post → draw on 原图
  │                                                                                                        │
  │                                                                     annotated_frame ──> [display_q:4]
  │                                                                                             │
  └─ get_result() 排空取最新 ── resize(scale) ── cv2.imshow() <──────────────────────────────────┘

测试结果

日志如下所示:

[OK] Connected to 192.168.23.102
[Log] Received 2 output tensors.
Tensor 0: shape=(13, 13, 255), dtype=uint8, size=43095 bytes
Tensor 1: shape=(26, 26, 255), dtype=uint8, size=172380 bytes

=== Detection Results ===
boxes number: 4
16 0.895512 0.289805 0.649326 0.259186 0.569164
7 0.740758 0.759681 0.218824 0.305246 0.157431
2 0.414782 0.759821 0.222086 0.286552 0.162486
1 0.635977 0.423077 0.508929 0.660444 0.612446

Result saved: H:\work_space\PycharmProjects\video_yolov4_tiny\dog_result.jpg
Press any key to close window...
[Log] Model preloaded on DSP, skipping release_model.
关于

解析视频,量化,与DSP端通讯,收到数据后进行反量化并显示画框视频

5.8 MB
邀请码
    Gitlink(确实开源)
  • 加入我们
  • 官网邮箱:gitlink@ccf.org.cn
  • QQ群
  • QQ群
  • 公众号
  • 公众号

版权所有:中国计算机学会技术支持:开源发展技术委员会
京ICP备13000930号-9 京公网安备 11010802032778号