flowchart TD
A["视频采集"] --> B["pre_q"]
B --> C["线程1: 预处理 resize 416×416×3"]
C --> D["infer_q"]
D --> E["线程2: 发送数据至NPU 等待返回结果"]
E --> F["post_q"]
F --> G["线程3: 后处理 + 画框 13×13×255 + 26×26×255"]
G --> H["display_q"]
H --> I["主线程: 显示"]
E -->|断线| J["重连循环"]
J --> E
架构
graph LR
A[主线程 I/O + 显示] -->|pre_q| B[线程1 预处理]
B -->|infer_q| C[线程2 NPU通信]
C -->|post_q| D[线程3 后处理 + 画框]
D -->|display_q| A
video_yolov4_tiny
基于 PNNA NPU 的 YOLOv4-tiny 实时视频目标检测系统。通过 TCP 与 NPU 设备通信,将 YOLOv4-tiny 模型推理卸载到专用硬件上执行。上位机负责视频读取、图像预处理、后处理(DIoU NMS)和结果可视化,采用 3 线程流水线架构将预处理、NPU 通信(发送数据并等待 NPU 完成推理)、后处理并行化,提升吞吐量。支持 NPU 断线自动重连、指数退避重试、实时 FPS 显示和键盘交互控制。
流程图
flowchart TD A["视频采集"] --> B["pre_q"] B --> C["线程1: 预处理resize 416×416×3"] C --> D["infer_q"] D --> E["线程2: 发送数据至NPU
等待返回结果"] E --> F["post_q"] F --> G["线程3: 后处理 + 画框
13×13×255 + 26×26×255"] G --> H["display_q"] H --> I["主线程: 显示"] E -->|断线| J["重连循环"] J --> E
架构
graph LR A[主线程I/O + 显示] -->|pre_q| B[线程1
预处理] B -->|infer_q| C[线程2
NPU通信] C -->|post_q| D[线程3
后处理 + 画框] D -->|display_q| A
队列设计
所有队列使用
queue.Queue(maxsize=N)限制内存:pre_qinfer_qpost_qdisplay_q线程职责
线程1 — 预处理 (
_preprocess_worker)pre_q取原始帧infer_q线程2 — NPU通信 (
_infer_worker)infer_q取预处理数据PNNAClient.infer()发送到 NPU 并阻塞等待结果线程3 — 后处理 + 画框 (
_postprocess_worker)post_q取 NPU 原始输出display_q主线程
push_frame()超时轮询 → 排空display_q取最新结果 → 显示q/ESC退出,+/-缩放,0重置缩放关键设计决策
push_frame使用阻塞put(),管线满时主线程等待,自动将读取速率与处理速率对齐,不丢帧PNNAClientsocket 引用,阻塞式 NPU 通信天然保证同一时刻只有一个请求在飞display_q,只显示最新完成帧,避免积压cv2.imshow和cv2.waitKey必须在主线程调用(Windows 要求),已遵守model_preloaded=True)断线重连机制
对应 DSP 端固件的断线重连改造(模型常驻 + 循环 accept + TCP Keep-Alive + 链路监测),上位机实现了对应的自动重连功能。
DSP 端关键参数
上位机重连架构
1.
communication.py— 核心重连层ConnectionLostError异常类:区分”连接断开”和”超时”,前者需重连,后者可重试SO_KEEPALIVE(跨平台),在支持的平台设置TCP_KEEPIDLE=3s / TCP_KEEPINTVL=1s / TCP_KEEPCNT=3select()**:_send_request()将 socket 设为非阻塞,使用send()+select()实现 5 秒硬截止时间的可靠发送。解决了 Windows 上sendall()在 TCP 缓冲区满时不响应settimeout()的问题connect_with_retry()**:指数退避重连(1s → 2s → 4s → 8s → 15s 封顶),max_attempts=0表示无限重试reconnect()**:关闭旧 socket → 无限重试连接 → 成功后重建msgpack.Unpackerconnection_lost**:threading.Event,线程2 (NPU通信) 在检测到断连时置位,主线程轮询此标志is_connected属性:快速查询连接状态model_preloaded**:为True时add_model()/release_model()变为空操作2.
thread_pipeline.py— 线程2 (NPU通信) 感知断连_infer_worker捕获ConnectionLostError:connection_lost.set()通知主线程shutdown_event.set()协调关闭所有工作线程infer()返回None(推理超时)仅跳过当前帧,不触发关闭push_frame(timeout=0.5)**:队列满时 0.5 秒超时返回,主线程可轮询连接状态和键盘事件3.
main_video.py— 外层重连循环connection_lost,避免向已断开管线推帧cap和frame_idx保持不变,恢复后从断点继续4.
main.py— 单图检测重试ConnectionLostError→client.reconnect()→ 重试time.sleep(1)→ 重试完整重连时序
线程安全
client.sockclient._connectedclient.connection_lostthreading.Event内置线程安全client.unpackerqueue.Queue内置线程安全pipeline._shutdownthreading.Event内置线程安全项目结构
配置
在
main_video.py中修改:DSP 端固件需为新版(模型常驻 + 循环 accept),上位机使用
model_preloaded=True模式。运行
键盘快捷键
q/ESC+/=-0数据流
测试结果
日志如下所示: