esi_parser.py 17 KB


  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. ESI(EtherCAT Slave Information) to EEPROM Binary Converter
  5. Copyright (c) 2025, sakumisu
  6. SPDX-License-Identifier: Apache-2.0
  7. """
  8. import xml.etree.ElementTree as ET
  9. import struct
  10. import sys
  11. import os
  12. from typing import Dict, List, Tuple, Optional
  13. class EtherCATXMLParser:
  14. def __init__(self):
  15. # 设备基本信息
  16. self.vendor_id = 0x00000000 # 默认厂商ID
  17. self.product_code = 0x00000000 # 默认产品代码
  18. self.revision_no = 0x00000000 # 默认版本号
  19. self.serial_number = 0x00000000 # 序列号
  20. self.device_name = ""
  21. self.device_type = ""
  22. # 邮箱配置
  23. self.mailbox_protocols = 0x0
  24. self.boot_rx_mailbox = {}
  25. self.boot_tx_mailbox = {}
  26. self.std_rx_mailbox = {}
  27. self.std_tx_mailbox = {}
  28. # 字符串表
  29. self.strings = []
  30. # 类别数据
  31. self.categories = []
  32. def parse_hex_value(self, hex_str: str) -> int:
  33. """解析十六进制字符串"""
  34. if not hex_str:
  35. return 0
  36. hex_str = hex_str.strip()
  37. if hex_str.startswith('#x'):
  38. return int(hex_str[2:], 16)
  39. elif hex_str.startswith('0x'):
  40. return int(hex_str[2:], 16)
  41. else:
  42. try:
  43. return int(hex_str, 16)
  44. except:
  45. return int(hex_str, 10)
  46. def parse_device_info(self, device_elem):
  47. """解析设备基本信息"""
  48. # 获取产品代码和版本号
  49. type_elem = device_elem.find('Type')
  50. if type_elem is not None:
  51. product_code = type_elem.get('ProductCode')
  52. if product_code:
  53. self.product_code = self.parse_hex_value(product_code)
  54. revision_no = type_elem.get('RevisionNo')
  55. if revision_no:
  56. self.revision_no = self.parse_hex_value(revision_no)
  57. # 获取设备名称
  58. name_elem = device_elem.find('Name')
  59. if name_elem is not None and name_elem.text:
  60. self.device_name = name_elem.text.strip()
  61. # 获取设备类型
  62. type_name = device_elem.find('Type/Name')
  63. if type_name is not None and type_name.text:
  64. self.device_type = type_name.text.strip()
  65. def parse_vendor_info(self, vendor_elem):
  66. """解析厂商信息"""
  67. vendor_id_elem = vendor_elem.find('Id')
  68. if vendor_id_elem is not None and vendor_id_elem.text:
  69. self.vendor_id = self.parse_hex_value(vendor_id_elem.text)
  70. def parse_mailbox_info(self, device_elem):
  71. """解析邮箱信息"""
  72. mailbox_elem = device_elem.find('.//Mailbox')
  73. if mailbox_elem is not None:
  74. # 检查支持的协议
  75. self.mailbox_protocols = 0
  76. if mailbox_elem.find('CoE') is not None:
  77. self.mailbox_protocols |= 0x04 # CoE
  78. if mailbox_elem.find('FoE') is not None:
  79. self.mailbox_protocols |= 0x08 # FoE
  80. if mailbox_elem.find('EoE') is not None:
  81. self.mailbox_protocols |= 0x10 # EoE
  82. if mailbox_elem.find('SoE') is not None:
  83. self.mailbox_protocols |= 0x20 # SoE
  84. # 从SM配置中获取邮箱地址和大小
  85. sm_elems = device_elem.findall('.//Sm')
  86. for i, sm_elem in enumerate(sm_elems):
  87. start_addr = self.parse_hex_value(sm_elem.get('StartAddress', '0'))
  88. size = self.parse_hex_value(sm_elem.get('DefaultSize', '0'))
  89. if i == 0: # MBoxOut (接收)
  90. self.boot_rx_mailbox = {"offset": start_addr, "size": size}
  91. self.std_rx_mailbox = {"offset": start_addr, "size": size}
  92. elif i == 1: # MBoxIn (发送)
  93. self.boot_tx_mailbox = {"offset": start_addr, "size": size}
  94. self.std_tx_mailbox = {"offset": start_addr, "size": size}
  95. def add_string(self, text: str) -> int:
  96. """添加字符串到字符串表,返回索引"""
  97. if not text:
  98. return 0
  99. # 检查是否已存在
  100. for i, existing in enumerate(self.strings):
  101. if existing == text:
  102. return i + 1
  103. # 添加新字符串
  104. self.strings.append(text)
  105. return len(self.strings)
  106. def create_strings_category(self) -> bytes:
  107. """创建字符串类别(Category 10)"""
  108. if not self.strings:
  109. return b''
  110. data = bytearray()
  111. # 字符串数量
  112. data.append(len(self.strings))
  113. # 每个字符串: 长度 + 内容
  114. for string in self.strings:
  115. string_bytes = string.encode('ascii', errors='replace')
  116. data.append(len(string_bytes))
  117. data.extend(string_bytes)
  118. # 填充到偶数长度
  119. if len(data) % 2:
  120. data.append(0)
  121. return bytes(data)
  122. def create_general_category(self) -> bytes:
  123. """创建通用类别(Category 30)"""
  124. data = bytearray()
  125. # Group Type String Index (2 bytes)
  126. group_idx = self.add_string("ECAT_Device")
  127. data.extend(struct.pack('<H', group_idx))
  128. # Image Name String Index (2 bytes)
  129. image_idx = self.add_string("ECAT_CIA402")
  130. data.extend(struct.pack('<H', image_idx))
  131. # Order Number String Index (2 bytes)
  132. order_idx = self.add_string("")
  133. data.extend(struct.pack('<H', order_idx))
  134. # Device Name String Index (2 bytes)
  135. name_idx = self.add_string(self.device_name)
  136. data.extend(struct.pack('<H', name_idx))
  137. # CoE Details (2 bytes) - 支持SDO, PDO配置
  138. coe_details = 0x0027 # Enable SDO, SDO Info, PDO Assign, PDO Config
  139. data.extend(struct.pack('<H', coe_details))
  140. # FoE Details (2 bytes)
  141. foe_details = 0x0000
  142. data.extend(struct.pack('<H', foe_details))
  143. # EoE Details (2 bytes)
  144. eoe_details = 0x0000
  145. data.extend(struct.pack('<H', eoe_details))
  146. # SoE Channels (1 byte)
  147. soe_channels = 0x00
  148. data.append(soe_channels)
  149. # DS402 Channels (1 byte)
  150. ds402_channels = 0x01
  151. data.append(ds402_channels)
  152. # SysmanClass (1 byte)
  153. sysman_class = 0x00
  154. data.append(sysman_class)
  155. # Flags (1 byte)
  156. flags = 0x01 # Enable SafeOp
  157. data.append(flags)
  158. # Current Consumption (2 bytes)
  159. current = 0x0000
  160. data.extend(struct.pack('<H', current))
  161. # Group Type and Image Name for 2nd device (if any)
  162. data.extend(struct.pack('<H', 0x0000)) # Group Type 2
  163. data.extend(struct.pack('<H', 0x0000)) # Image Name 2
  164. # Physical Memory Address (2 bytes)
  165. phys_addr = 0x0000
  166. data.extend(struct.pack('<H', phys_addr))
  167. # 填充到偶数长度
  168. if len(data) % 2:
  169. data.append(0)
  170. return bytes(data)
  171. def create_fmmu_category(self) -> bytes:
  172. """创建FMMU类别(Category 40)"""
  173. data = bytearray()
  174. # FMMU配置 - 8个FMMU
  175. fmmu_configs = [
  176. 0x01, # FMMU0: Outputs
  177. 0x02, # FMMU1: Inputs
  178. 0x03, # FMMU2: MBox State
  179. 0x00, # FMMU3: Unused
  180. 0x00, # FMMU4: Unused
  181. 0x00, # FMMU5: Unused
  182. 0x00, # FMMU6: Unused
  183. 0x00, # FMMU7: Unused
  184. ]
  185. for config in fmmu_configs:
  186. data.append(config)
  187. return bytes(data)
  188. def create_sm_category(self) -> bytes:
  189. """创建同步管理器类别(Category 41)"""
  190. data = bytearray()
  191. # SM配置数据结构: StartAddr(2) + Length(2) + ControlByte(1) + Enable(1)
  192. sm_configs = [
  193. # SM0: MBoxOut (接收邮箱)
  194. (self.boot_rx_mailbox["offset"], self.boot_rx_mailbox["size"], 0x26, 0x01),
  195. # SM1: MBoxIn (发送邮箱)
  196. (self.boot_tx_mailbox["offset"], self.boot_tx_mailbox["size"], 0x22, 0x01),
  197. # SM2: Process Data Output
  198. (0x1100, 0x0000, 0x64, 0x00), # 长度为0表示未配置
  199. # SM3: Process Data Input
  200. (0x1400, 0x0000, 0x20, 0x00), # 长度为0表示未配置
  201. # SM4-7: 未使用
  202. (0x0000, 0x0000, 0x00, 0x00),
  203. (0x0000, 0x0000, 0x00, 0x00),
  204. (0x0000, 0x0000, 0x00, 0x00),
  205. (0x0000, 0x0000, 0x00, 0x00),
  206. ]
  207. for start_addr, length, control, enable in sm_configs:
  208. data.extend(struct.pack('<H', start_addr)) # Start Address
  209. data.extend(struct.pack('<H', length)) # Length
  210. data.append(control) # Control Byte
  211. data.append(enable) # Enable
  212. return bytes(data)
  213. def create_category(self, category_type: int, data: bytes) -> bytes:
  214. """创建类别头部+数据"""
  215. header = bytearray()
  216. # Category Type (2 bytes)
  217. header.extend(struct.pack('<H', category_type))
  218. # Category Size in words (2 bytes)
  219. size_words = (len(data) + 1) // 2
  220. header.extend(struct.pack('<H', size_words))
  221. return bytes(header) + data
  222. def generate_eeprom(self) -> bytes:
  223. """生成完整的EEPROM数据,参考eeprom.h的格式"""
  224. eeprom_data = bytearray()
  225. # === EEPROM Header (固定128字节) ===
  226. # PDI Control (2 bytes) - 0x800C (Digital I/O + SII EEPROM)
  227. eeprom_data.extend(struct.pack('<H', 0x800C))
  228. # PDI Configuration (2 bytes) - 0x6681
  229. eeprom_data.extend(struct.pack('<H', 0x6681))
  230. # Sync Impulse Length (2 bytes)
  231. eeprom_data.extend(struct.pack('<H', 0x0000))
  232. # PDI Configuration 2 (2 bytes)
  233. eeprom_data.extend(struct.pack('<H', 0x0000))
  234. # Station Alias (2 bytes)
  235. eeprom_data.extend(struct.pack('<H', 0x3412))
  236. # Reserved (2 bytes)
  237. eeprom_data.extend(struct.pack('<H', 0x0000))
  238. # Checksum (2 bytes) - 稍后计算
  239. checksum_pos = len(eeprom_data)
  240. eeprom_data.extend(struct.pack('<H', 0x0077)) # 临时值
  241. # Vendor ID (4 bytes)
  242. eeprom_data.extend(struct.pack('<L', self.vendor_id))
  243. # Product Code (4 bytes)
  244. eeprom_data.extend(struct.pack('<L', self.product_code))
  245. # Revision Number (4 bytes)
  246. eeprom_data.extend(struct.pack('<L', self.revision_no))
  247. # Serial Number (4 bytes)
  248. eeprom_data.extend(struct.pack('<L', self.serial_number))
  249. # Bootstrap Mailbox Receive Offset (2 bytes)
  250. eeprom_data.extend(struct.pack('<H', self.boot_rx_mailbox["offset"]))
  251. # Bootstrap Mailbox Receive Size (2 bytes)
  252. eeprom_data.extend(struct.pack('<H', self.boot_rx_mailbox["size"]))
  253. # Bootstrap Mailbox Send Offset (2 bytes)
  254. eeprom_data.extend(struct.pack('<H', self.boot_tx_mailbox["offset"]))
  255. # Bootstrap Mailbox Send Size (2 bytes)
  256. eeprom_data.extend(struct.pack('<H', self.boot_tx_mailbox["size"]))
  257. # Standard Mailbox Receive Offset (2 bytes)
  258. eeprom_data.extend(struct.pack('<H', self.std_rx_mailbox["offset"]))
  259. # Standard Mailbox Receive Size (2 bytes)
  260. eeprom_data.extend(struct.pack('<H', self.std_rx_mailbox["size"]))
  261. # Standard Mailbox Send Offset (2 bytes)
  262. eeprom_data.extend(struct.pack('<H', self.std_tx_mailbox["offset"]))
  263. # Standard Mailbox Send Size (2 bytes)
  264. eeprom_data.extend(struct.pack('<H', self.std_tx_mailbox["size"]))
  265. # Mailbox Protocol (2 bytes)
  266. eeprom_data.extend(struct.pack('<H', self.mailbox_protocols))
  267. # Reserved bytes to reach 128 bytes header
  268. current_size = len(eeprom_data)
  269. header_size = 128
  270. if current_size < header_size:
  271. eeprom_data.extend(b'\x00' * (header_size - current_size))
  272. # === Categories Section ===
  273. # Category 10: Strings
  274. strings_data = self.create_strings_category()
  275. if strings_data:
  276. eeprom_data.extend(self.create_category(10, strings_data))
  277. # Category 30: General
  278. general_data = self.create_general_category()
  279. eeprom_data.extend(self.create_category(30, general_data))
  280. # Category 40: FMMU
  281. fmmu_data = self.create_fmmu_category()
  282. eeprom_data.extend(self.create_category(40, fmmu_data))
  283. # Category 41: SyncM
  284. sm_data = self.create_sm_category()
  285. eeprom_data.extend(self.create_category(41, sm_data))
  286. # End of Categories marker
  287. eeprom_data.extend(struct.pack('<H', 0xFFFF))
  288. eeprom_data.extend(struct.pack('<H', 0x0000))
  289. # 填充到合适的大小 (通常是2KB)
  290. target_size = 2048
  291. if len(eeprom_data) < target_size:
  292. eeprom_data.extend(b'\xFF' * (target_size - len(eeprom_data)))
  293. # 重新计算校验和 (SII头部校验和)
  294. checksum = 0
  295. # 计算前14字节的校验和,跳过校验和字段本身
  296. for i in range(0, 14, 2):
  297. if i != 12: # 跳过校验和位置
  298. word = struct.unpack('<H', eeprom_data[i:i+2])[0]
  299. checksum += word
  300. checksum = (~checksum + 1) & 0xFFFF # 2's complement
  301. # 更新校验和
  302. struct.pack_into('<H', eeprom_data, checksum_pos, checksum)
  303. return bytes(eeprom_data)
  304. def parse_xml(self, xml_file: str) -> bool:
  305. """解析XML文件"""
  306. try:
  307. tree = ET.parse(xml_file)
  308. root = tree.getroot()
  309. # 查找并解析厂商信息
  310. vendor_elem = root.find('.//Vendor')
  311. if vendor_elem is not None:
  312. self.parse_vendor_info(vendor_elem)
  313. # 查找并解析设备信息
  314. device_elem = root.find('.//Device')
  315. if device_elem is not None:
  316. self.parse_device_info(device_elem)
  317. self.parse_mailbox_info(device_elem)
  318. print(f"Parsed XML: Vendor=0x{self.vendor_id:08X}, Product=0x{self.product_code:08X}")
  319. print(f"Device Name: {self.device_name}")
  320. print(f"Mailbox RX: 0x{self.std_rx_mailbox['offset']:04X}({self.std_rx_mailbox['size']})")
  321. print(f"Mailbox TX: 0x{self.std_tx_mailbox['offset']:04X}({self.std_tx_mailbox['size']})")
  322. return True
  323. except Exception as e:
  324. print(f"Error parsing XML file: {e}")
  325. import traceback
  326. traceback.print_exc()
  327. return False
  328. def generate_c_header(self, array_name: str = "cherryecat_eepromdata") -> str:
  329. """生成C语言头文件格式的数组"""
  330. eeprom_data = self.generate_eeprom()
  331. lines = [
  332. "/*",
  333. f"The EEPROM data is created based on EtherCAT Slave Information (ESI) XML file.",
  334. f"Generated {len(eeprom_data)} bytes of EEPROM data",
  335. f"Vendor ID: 0x{self.vendor_id:08X}",
  336. f"Product Code: 0x{self.product_code:08X}",
  337. f"Revision: 0x{self.revision_no:08X}",
  338. f"Device Name: {self.device_name}",
  339. "*/",
  340. f"unsigned char {array_name}[] = {{",
  341. ]
  342. # 按16字节一行格式化数据
  343. for i in range(0, len(eeprom_data), 16):
  344. chunk = eeprom_data[i:i+16]
  345. hex_values = [f"0x{b:02X}" for b in chunk]
  346. line = ",".join(hex_values)
  347. if i + 16 < len(eeprom_data):
  348. line += ","
  349. lines.append(line)
  350. lines.append("};")
  351. return "\n".join(lines)
  352. def main():
  353. if len(sys.argv) < 3:
  354. print("Usage: python esi_parse.py <input.xml> <output.bin> [output.h]")
  355. print(" input.xml - EtherCAT ESI XML file")
  356. print(" output.bin - Output binary EEPROM file")
  357. print(" output.h - Optional C header file output")
  358. sys.exit(1)
  359. input_file = sys.argv[1]
  360. output_file = sys.argv[2]
  361. header_file = sys.argv[3] if len(sys.argv) > 3 else None
  362. if not os.path.exists(input_file):
  363. print(f"Error: Input file '{input_file}' not found")
  364. sys.exit(1)
  365. # 创建解析器
  366. parser = EtherCATXMLParser()
  367. # 解析XML
  368. print(f"Parsing XML file: {input_file}")
  369. if not parser.parse_xml(input_file):
  370. print("Failed to parse XML file")
  371. sys.exit(1)
  372. # 生成EEPROM数据
  373. print("Generating EEPROM data...")
  374. eeprom_data = parser.generate_eeprom()
  375. # 写入二进制文件
  376. try:
  377. with open(output_file, 'wb') as f:
  378. f.write(eeprom_data)
  379. print(f"✓ Successfully converted '{input_file}' to '{output_file}'")
  380. print(f"✓ Generated {len(eeprom_data)} bytes of EEPROM data")
  381. print(f"✓ Vendor ID: 0x{parser.vendor_id:08X}")
  382. print(f"✓ Product Code: 0x{parser.product_code:08X}")
  383. print(f"✓ Revision: 0x{parser.revision_no:08X}")
  384. print(f"✓ Device Name: {parser.device_name}")
  385. except Exception as e:
  386. print(f"Error writing binary file: {e}")
  387. sys.exit(1)
  388. # 生成C头文件(可选)
  389. if header_file:
  390. try:
  391. header_content = parser.generate_c_header()
  392. with open(header_file, 'w') as f:
  393. f.write(header_content)
  394. print(f"✓ Generated C header file: {header_file}")
  395. except Exception as e:
  396. print(f"Error writing header file: {e}")
  397. if __name__ == "__main__":
  398. main()