news 2026/4/17 11:21:51

can(6) canopen python库使用

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
can(6) canopen python库使用

1.发送消息

# canopen_sender.py - CANopen 发送端(主节点) import canopen import can import time def start_canopen_sender(): """启动 CANopen 发送端,连接虚拟总线并发送消息""" # 统一的虚拟总线配置(必须与 bus_creator.py 一致) VIRTUAL_CHANNEL = "virtual_canopen_bus" BITRATE = 500000 MASTER_NODE_ID = 1 # 主节点 ID SLAVE_NODE_ID = 2 # 接收端(从节点)ID # 1. 初始化 CANopen 网络,连接虚拟总线 network = canopen.Network() network.connect( interface="virtual", channel=VIRTUAL_CHANNEL, bitrate=BITRATE ) print(f"✅ CANopen 发送端已连接虚拟总线:{VIRTUAL_CHANNEL}") master_node = network.add_node(MASTER_NODE_ID, object_dictionary=None) # 2. 发送 CANopen 心跳报文(主节点自身心跳) print("\n===== 发送主节点心跳报文 =====") heartbeat_id = 0x700 + MASTER_NODE_ID # CANopen 心跳 ID 规则:0x700 + 节点ID heartbeat_msg = can.Message( arbitration_id=heartbeat_id, data=[0x05], # 0x05=运行状态 is_extended_id=False, channel=VIRTUAL_CHANNEL ) network.bus.send(heartbeat_msg) print(f"📤 发送心跳报文:ID=0x{heartbeat_id:X},数据=0x05(运行状态)") # 3. 发送 SDO 请求(向从节点读取设备名称 0x1008:0x00) print("\n===== 发送 SDO 读取请求 =====") sdo_request_id = 0x600 + SLAVE_NODE_ID # SDO 请求 ID 规则:0x600 + 从节点ID # SDO 读取请求数据(格式:0x40 + 索引高字节 + 索引低字节 + 子索引 + 保留) sdo_request_data = b'\x40\x08\x10\x00\x00\x00\x00\x00' # 读取 0x1008:0x00 sdo_msg = can.Message( arbitration_id=sdo_request_id, data=sdo_request_data, is_extended_id=False, channel=VIRTUAL_CHANNEL ) network.bus.send(sdo_msg) print(f"📤 发送 SDO 请求:ID=0x{sdo_request_id:X},数据={sdo_request_data.hex()}") # 4. 监听是否有从节点响应(可选) print("\n===== 监听从节点响应(5秒)=====") start_time = time.time() while time.time() - start_time < 5: resp_msg = network.bus.recv(timeout=0.5) if resp_msg: print(f"📥 收到响应:ID=0x{resp_msg.arbitration_id:X},数据={resp_msg.data.hex()}") network.disconnect() if __name__ == "__main__": start_canopen_sender()

2.接收

# canopen_receiver.py - CANopen 接收端(从节点) import canopen import can import time import threading class CANopenReceiver: """CANopen 接收端,监听并响应虚拟总线消息""" def __init__(self): # 统一的虚拟总线配置(与 bus_creator.py 一致) self.VIRTUAL_CHANNEL = "virtual_canopen_bus" self.BITRATE = 500000 self.SLAVE_NODE_ID = 2 # 从节点 ID self.network: canopen.Network = None self.running = False def start(self): """启动接收端,连接虚拟总线并监听消息""" try: # 1. 连接虚拟总线 self.network = canopen.Network() self.network.connect( interface="virtual", channel=self.VIRTUAL_CHANNEL, bitrate=self.BITRATE ) print(f"✅ CANopen 接收端已连接虚拟总线:{self.VIRTUAL_CHANNEL}") self.running = True # 2. 启动监听线程 listen_thread = threading.Thread(target=self._listen_loop, daemon=True) listen_thread.start() print(f"ℹ️ 开始监听虚拟总线消息(节点 ID={self.SLAVE_NODE_ID}),按 Ctrl+C 退出") # 3. 保持脚本运行 while self.running: time.sleep(1) except KeyboardInterrupt: print("\n⚠️ 接收到退出信号,停止监听...") except Exception as e: print(f"❌ 接收端启动失败:{e}") finally: self.running = False self.network.disconnect() print("✅ 接收端已断开虚拟总线连接") def _listen_loop(self): """循环监听虚拟总线消息,并响应 SDO 请求""" while self.running: msg = self.network.bus.recv(timeout=0.1) # 非阻塞接收 if msg: self._process_message(msg) def _process_message(self, msg: can.Message): """解析并处理收到的 CANopen 消息""" arb_id = msg.arbitration_id data = msg.data.hex() print(f"\n📥 收到消息:ID=0x{arb_id:X},数据={data}") # 识别并响应 SDO 请求(0x600 + 从节点ID) if arb_id == 0x600 + self.SLAVE_NODE_ID: print(" → 收到 SDO 读取请求,准备响应...") self._respond_sdo_request(msg) # 识别心跳报文(0x700 + 节点ID) elif 0x700 <= arb_id < 0x800: node_id = arb_id - 0x700 state = msg.data[0] if len(msg.data) > 0 else 0 state_map = {0: "初始化", 5: "运行", 127: "故障"} state_desc = state_map.get(state, f"未知({state})") print(f" → 心跳报文:节点{node_id} 状态={state_desc}") def _respond_sdo_request(self, req_msg: can.Message): """响应 SDO 读取请求(模拟从节点返回设备名称)""" # SDO 响应 ID 规则:0x580 + 从节点ID sdo_resp_id = 0x580 + self.SLAVE_NODE_ID # 模拟返回设备名称 "VirtualCAN"(截断为 4 字节示例) resp_data = b'\x43\x08\x10\x00\x08\x56\x69\x72' # 响应格式 + "Vir..." resp_msg = can.Message( arbitration_id=sdo_resp_id, data=resp_data, is_extended_id=False, channel=self.VIRTUAL_CHANNEL ) self.network.bus.send(resp_msg) print(f"📤 发送 SDO 响应:ID=0x{sdo_resp_id:X},数据={resp_data.hex()}") if __name__ == "__main__": receiver = CANopenReceiver() receiver.start()

3.代码解释

3.1 interface="virtual", -- CAN 总线的通信接口类型

其他类型

  • Windows:interface='pcan'(PCAN 硬件)、interface='kvaser'(Kvaser 硬件);
  • 虚拟总线(跨平台):interface='virtual'(python-can 内置虚拟总线)。
  • interface = ‘socketcan'Linux 系统下 Python 操作 CAN 总线的专属接口类型,依赖 Linux 的 SocketCAN 协议栈,需配合 Linux 的 CAN 接口(物理 / 虚拟)使用
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/15 6:58:05

macbert模型介绍

后续会继续更新&#xff0c;感兴趣的友友给博主点个免费的关注吧~

作者头像 李华
网站建设 2026/4/16 18:08:26

学长亲荐2026继续教育必备TOP9一键生成论文工具测评

学长亲荐2026继续教育必备TOP9一键生成论文工具测评 2026年继续教育论文工具测评&#xff1a;为何需要一份专业榜单&#xff1f; 在当前继续教育日益普及的背景下&#xff0c;越来越多的学员面临论文写作的挑战。无论是选题困难、资料查找繁琐&#xff0c;还是格式规范不熟悉&a…

作者头像 李华
网站建设 2026/4/14 21:12:54

LLM优化CRISPR设计脱靶率砍半

&#x1f4dd; 博客主页&#xff1a;Jax的CSDN主页 LLM驱动的CRISPR脱靶率优化&#xff1a;从理论到实践的突破目录LLM驱动的CRISPR脱靶率优化&#xff1a;从理论到实践的突破 引言&#xff1a;基因编辑的安全瓶颈与LLM的破局机遇 维度一&#xff1a;技术应用场景——从实验室到…

作者头像 李华
网站建设 2026/4/14 15:07:57

揭秘AI论文降重内幕:9款工具实测,AI率从64%降至8%

开头&#xff1a;90%的学生都不知道的AI论文“生死劫” 你是否经历过这样的绝望&#xff1f;花3天用AI生成的论文初稿&#xff0c;提交后被导师打回&#xff0c;理由是“AI痕迹过重”&#xff1b;熬夜改了5版&#xff0c;查重时AI率仍高达40%&#xff0c;甚至被系统标记为“疑…

作者头像 李华
网站建设 2026/4/15 14:49:58

基于STM32单片机的汽车疲劳驾驶监测系统设计

基于STM32单片机的汽车疲劳驾驶监测系统设计摘要随着汽车保有量的持续增长&#xff0c;交通安全问题日益受到社会关注。疲劳驾驶和酒后驾驶是导致交通事故的主要人为因素之一。本文设计了一种基于STM32单片机的汽车疲劳驾驶监测系统&#xff0c;通过集成MAX30102心率血氧传感器…

作者头像 李华