第 20 篇:搭建集成测试基础框架 + 模拟器闭环测试
下面我们将一步一步详细完成集成测试基础框架搭建,并实现模拟器闭环测试(串口收到数据 → 协议解析 → VirtualDevice 状态更新 → 可选返回响应)。
目标:
- 不依赖真实串口硬件
- 使用 mock 完全控制输入/输出
- 验证 Pelco-D/P 命令到达 VirtualDevice 后状态是否正确变化
- 为后续宏 → 串口 → 模拟器链路打基础
20.1 项目结构调整(添加 integration 目录)
在项目根目录下创建以下结构(如果还没有):
KBD300A/ ├── core/ │ ├── protocol/ │ ├── macro/ │ ├── serial/ │ ├── simulator/ │ └── ... ├── ui/ ├── config/ ├── resources/ ├── tests/ ← 新建 │ ├── unit/ ← 已有的单元测试 │ ├── integration/ ← 集成测试放这里 │ │ ├── test_simulator_loop.py │ │ ├── test_macro_to_serial.py │ │ └── ... │ ├── conftest.py ← 全局 fixture │ └── test_data/ ← 测试用 Pelco 命令样本 └── requirements-dev.txt ← 测试依赖requirements-dev.txt(安装这些包,Python 3.7 兼容):
pytest==6.2.5 pytest-mock==3.6.1 pytest-qt==4.0.2 # 如果涉及 Qt 信号等待 coverage==5.5 # 可选,覆盖率统计安装:
pipinstall-r requirements-dev.txt20.2 创建 conftest.py(全局 fixture)
tests/conftest.py
# tests/conftest.pyimportpytestfromunittest.mockimportMagicMockfromcore.simulator.virtual_deviceimportVirtualDevicefromcore.protocolimportget_protocolfromcore.serial.managerimportSerialManager@pytest.fixturedefvirtual_device():"""每个测试一个干净的虚拟设备"""returnVirtualDevice(cam_id=1)@pytest.fixturedefmock_serial():"""模拟 serial.Serial 对象"""mock_ser=MagicMock()mock_ser.is_open=Truemock_ser.in_waiting=0mock_ser.read.side_effect=lambdasize:b""# 默认返回空mock_ser.write=MagicMock()returnmock_ser@pytest.fixturedefmock_serial_manager(mock_serial):"""模拟 SerialManager,注入 mock 串口"""manager=SerialManager(port="COMfake",baud=9600,protocol="D")manager._ser=mock_serial# 直接替换底层串口manager.is_open=Truereturnmanager@pytest.fixturedefpelco_d_protocol(mock_serial_manager):"""Pelco-D 协议实例"""returnget_protocol(mock_serial_manager,"D")@pytest.fixturedefpelco_p_protocol(mock_serial_manager):"""Pelco-P 协议实例"""returnget_protocol(mock_serial_manager,"P")20.3 准备测试数据(Pelco 命令样本)
在tests/test_data/pelco_commands.json中保存一些典型命令(方便复用):
{"pelco_d":{"ptz_right_fast":{"bytes":"FF0100001020004F","description":"地址1,右转速度32","expected_pan":32.0},"call_preset_5":{"bytes":"FF010000071F0067","description":"调用预置位5","preset_id":5},"aux_on_2":{"bytes":"FF0100000B02000E","description":"AUX2 开"}},"pelco_p":{"ptz_up_medium":{"bytes":"A0010008100800AF","description":"地址1,上仰速度8"}}}加载方式示例:
importjsonfrompathlibimportPath TEST_DATA=json.loads(Path("tests/test_data/pelco_commands.json").read_text())20.4 实现模拟器闭环测试(核心文件)
tests/integration/test_simulator_loop.py
# tests/integration/test_simulator_loop.pyimportpytestimporttimefromunittest.mockimportcallfromcore.serial.workerimportSerialWorkerfromcore.protocol.pelco_dimportPelcoDProtocolfromcore.simulator.virtual_deviceimportVirtualDevicefromtests.conftestimportmock_serial_manager# 如果需要defload_command_hex(hex_str):"""将 hex 字符串转为 bytes"""returnbytes.fromhex(hex_str)@pytest.mark.integrationdeftest_pelco_d_ptz_command_updates_virtual_device(virtual_device,mock_serial,pelco_d_protocol):""" 测试:收到 Pelco-D 右转命令 → VirtualDevice pan 角度增加 """# 准备输入:右转速度 32 的命令cmd_hex="FF 01 00 00 20 00 21"# 校验和 0x21 = 1+0+0+32+0cmd_bytes=load_command_hex(cmd_hex.replace(" ",""))# 模拟 SerialWorker 收到数据(直接注入 buffer)worker=SerialWorker(port="COMfake",baud=9600,protocol="D")worker._ser=mock_serial worker._buffer=bytearray(cmd_bytes)# 关键:注入 VirtualDevice 到协议层(需要小改 protocol 代码支持)# 临时方案:在测试中 monkey-patch 或直接调用 processpelco_d_protocol.virtual_device=virtual_device# 假设已添加属性# 模拟一次数据处理(实际项目中可调用 worker._process_buffer())parsed=pelco_d_protocol.parse_response(cmd_bytes)# 或者更真实:模拟 worker 处理流程worker._buffer.extend(cmd_bytes)worker._process_buffer()# 如果 _process_buffer 是 public 或可访问# 检查 VirtualDevice 状态status=virtual_device.get_status_dict()assert"pan"instatus pan_value=float(status["pan"].rstrip("°"))assertpan_value>0,"右转命令应使 pan 角度增加"# 可选:验证具体角度(根据 VirtualDevice 内部实现比例)# 假设 VirtualDevice 将 speed 32 映射为 +32 度assertabs(pan_value-32.0)<5.0,f"预期 pan ≈32,实际{pan_value}"@pytest.mark.integrationdeftest_pelco_d_call_preset_triggers_action(virtual_device,mock_serial,pelco_d_protocol):"""测试调用预置位命令是否触发 VirtualDevice 相应逻辑"""cmd_hex="FF 01 00 00 07 05 0D"# 调用预置位 5,校验和 1+0+0+7+5=13=0x0Dcmd_bytes=load_command_hex(cmd_hex.replace(" ",""))# 注入pelco_d_protocol.virtual_device=virtual_device# 处理命令worker=SerialWorker(port="COMfake",baud=9600,protocol="D")worker._ser=mock_serial worker._buffer=bytearray(cmd_bytes)worker._process_buffer()# 或直接调用协议 parse# 假设 VirtualDevice 在 process_command 中记录 preset 调用# 你可以添加断言,例如检查日志或状态# 或者临时添加一个调用计数器属性到 VirtualDeviceasserthasattr(virtual_device,"last_preset_called")assertvirtual_device.last_preset_called==5@pytest.mark.integrationdeftest_pelco_p_aux_on_updates_aux_state(virtual_device,mock_serial):"""Pelco-P 打开 AUX2 测试"""cmd_hex="A0 01 00 0B 02 00 0E AF"# AUX2 ONcmd_bytes=load_command_hex(cmd_hex.replace(" ",""))protocol=PelcoDProtocol(mock_serial)# 改用 P 协议protocol.virtual_device=virtual_device# 处理parsed=protocol._parse_response(cmd_bytes)assertparsed.get("type")=="aux"assertparsed.get("aux_id")==2assertparsed.get("state")=="on"# 验证虚拟设备状态assertvirtual_device.aux_states[2]isTrue20.5 小幅修改生产代码以便测试
在core/simulator/virtual_device.py中添加:
classVirtualDevice:def__init__(self,cam_id:int=1):...self.last_preset_called=None# 测试用defprocess_command(self,data:bytes)->Optional[bytes]:parsed=parse_pelco_packet(data)ifparsed.get("type")=="preset"andparsed.get("operation")=="call":self.last_preset_called=parsed.get("preset_id")# ... 原有逻辑在core/protocol/base.py或具体协议类中添加:
classProtocolBase(ABC):def__init__(self,serial_mgr):self.serial_mgr=serial_mgr self.virtual_device=None# 测试时注入20.6 运行测试
# 在项目根目录执行pytest tests/integration/ -v --tb=short# 只跑模拟器闭环测试pytest tests/integration/test_simulator_loop.py -v# 带覆盖率pytest --cov=core --cov-report=html tests/integration/👉上一篇 :pytest集成测试(serial + protocol + macro)
👉总目录:Python开发软键盘全程总览
👉下一篇: