eni_parser.py 14 KB


  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. ENI (EtherCAT Network Information) Parser
  5. Copyright (c) 2025, sakumisu
  6. SPDX-License-Identifier: Apache-2.0
  7. """
  8. import xml.etree.ElementTree as ET
  9. import sys
  10. import os
  11. from typing import Dict, List, Tuple, Optional
  12. class ENIParser:
  13. def __init__(self):
  14. self.slaves = []
  15. def parse_hex_value(self, hex_str: str) -> int:
  16. """解析十六进制字符串"""
  17. if not hex_str:
  18. return 0
  19. hex_str = hex_str.strip()
  20. if hex_str.startswith('#x'):
  21. return int(hex_str[2:], 16)
  22. elif hex_str.startswith('0x'):
  23. return int(hex_str[2:], 16)
  24. else:
  25. try:
  26. return int(hex_str, 16)
  27. except:
  28. try:
  29. return int(hex_str, 10)
  30. except:
  31. return 0
  32. def parse_slave_info(self, slave_elem):
  33. """解析从站基本信息"""
  34. slave_info = {}
  35. info_elem = slave_elem.find('Info')
  36. if info_elem is not None:
  37. name_elem = info_elem.find('Name')
  38. if name_elem is not None:
  39. slave_info['name'] = name_elem.text.strip() if name_elem.text else ""
  40. vendor_id_elem = info_elem.find('VendorId')
  41. if vendor_id_elem is not None:
  42. slave_info['vendor_id'] = int(vendor_id_elem.text)
  43. product_code_elem = info_elem.find('ProductCode')
  44. if product_code_elem is not None:
  45. slave_info['product_code'] = int(product_code_elem.text)
  46. revision_no_elem = info_elem.find('RevisionNo')
  47. if revision_no_elem is not None:
  48. slave_info['revision_no'] = int(revision_no_elem.text)
  49. return slave_info
  50. def parse_pdo_entry(self, entry_elem):
  51. """解析单个PDO条目"""
  52. entry_info = {}
  53. # 解析Index
  54. index_elem = entry_elem.find('Index')
  55. if index_elem is not None and index_elem.text:
  56. entry_info['index'] = self.parse_hex_value(index_elem.text)
  57. else:
  58. entry_info['index'] = 0x0000
  59. # 解析SubIndex
  60. subindex_elem = entry_elem.find('SubIndex')
  61. if subindex_elem is not None and subindex_elem.text:
  62. entry_info['subindex'] = int(subindex_elem.text)
  63. else:
  64. entry_info['subindex'] = 0x00
  65. # 解析BitLen
  66. bitlen_elem = entry_elem.find('BitLen')
  67. if bitlen_elem is not None and bitlen_elem.text:
  68. entry_info['bit_length'] = int(bitlen_elem.text)
  69. else:
  70. entry_info['bit_length'] = 16
  71. # 解析Name (作为注释)
  72. name_elem = entry_elem.find('Name')
  73. if name_elem is not None and name_elem.text:
  74. entry_info['name'] = name_elem.text.strip()
  75. else:
  76. # 如果Index是0或#x0,标记为Padding
  77. if entry_info['index'] == 0:
  78. entry_info['name'] = 'Padding'
  79. else:
  80. entry_info['name'] = f'Object_{entry_info["index"]:04X}'
  81. # 解析DataType
  82. datatype_elem = entry_elem.find('DataType')
  83. if datatype_elem is not None and datatype_elem.text:
  84. entry_info['data_type'] = datatype_elem.text.strip()
  85. else:
  86. entry_info['data_type'] = 'UINT'
  87. # 解析Comment
  88. comment_elem = entry_elem.find('Comment')
  89. if comment_elem is not None and comment_elem.text:
  90. entry_info['comment'] = comment_elem.text.strip()
  91. else:
  92. entry_info['comment'] = ''
  93. return entry_info
  94. def parse_process_data(self, slave_elem):
  95. """解析过程数据配置"""
  96. process_data = {
  97. 'rx_pdos': [], # 输出PDO (主站->从站)
  98. 'tx_pdos': [], # 输入PDO (从站->主站)
  99. 'syncs': []
  100. }
  101. process_elem = slave_elem.find('ProcessData')
  102. if process_elem is None:
  103. return process_data
  104. # 解析RxPDO (输出)
  105. for rxpdo_elem in process_elem.findall('RxPdo'):
  106. pdo_info = {}
  107. # 解析PDO Index
  108. index_elem = rxpdo_elem.find('Index')
  109. if index_elem is not None:
  110. pdo_info['index'] = self.parse_hex_value(index_elem.text)
  111. # 解析PDO Name
  112. name_elem = rxpdo_elem.find('Name')
  113. if name_elem is not None:
  114. pdo_info['name'] = name_elem.text.strip() if name_elem.text else ""
  115. # 解析所有Entry
  116. entries = []
  117. for entry_elem in rxpdo_elem.findall('Entry'):
  118. entry_info = self.parse_pdo_entry(entry_elem)
  119. entries.append(entry_info)
  120. pdo_info['entries'] = entries
  121. process_data['rx_pdos'].append(pdo_info)
  122. # 解析TxPDO (输入)
  123. for txpdo_elem in process_elem.findall('TxPdo'):
  124. pdo_info = {}
  125. # 解析PDO Index
  126. index_elem = txpdo_elem.find('Index')
  127. if index_elem is not None:
  128. pdo_info['index'] = self.parse_hex_value(index_elem.text)
  129. # 解析PDO Name
  130. name_elem = txpdo_elem.find('Name')
  131. if name_elem is not None:
  132. pdo_info['name'] = name_elem.text.strip() if name_elem.text else ""
  133. # 解析所有Entry
  134. entries = []
  135. for entry_elem in txpdo_elem.findall('Entry'):
  136. entry_info = self.parse_pdo_entry(entry_elem)
  137. entries.append(entry_info)
  138. pdo_info['entries'] = entries
  139. process_data['tx_pdos'].append(pdo_info)
  140. # 解析同步管理器配置
  141. sm2_elem = process_elem.find('Sm2')
  142. if sm2_elem is not None:
  143. sm_info = {
  144. 'index': 2,
  145. 'direction': 'EC_DIR_OUTPUT',
  146. 'type': sm2_elem.find('Type').text if sm2_elem.find('Type') is not None else 'Outputs'
  147. }
  148. process_data['syncs'].append(sm_info)
  149. sm3_elem = process_elem.find('Sm3')
  150. if sm3_elem is not None:
  151. sm_info = {
  152. 'index': 3,
  153. 'direction': 'EC_DIR_INPUT',
  154. 'type': sm3_elem.find('Type').text if sm3_elem.find('Type') is not None else 'Inputs'
  155. }
  156. process_data['syncs'].append(sm_info)
  157. return process_data
  158. def parse_eni(self, eni_file: str) -> bool:
  159. """解析ENI文件"""
  160. try:
  161. tree = ET.parse(eni_file)
  162. root = tree.getroot()
  163. # 解析从站配置
  164. for slave_elem in root.findall('.//Slave'):
  165. slave_info = self.parse_slave_info(slave_elem)
  166. process_data = self.parse_process_data(slave_elem)
  167. slave_config = {
  168. 'info': slave_info,
  169. 'process_data': process_data
  170. }
  171. self.slaves.append(slave_config)
  172. return True
  173. except Exception as e:
  174. print(f"Error parsing ENI file: {e}")
  175. import traceback
  176. traceback.print_exc()
  177. return False
  178. def generate_slave_name(self, slave_info):
  179. """生成从站名称标识符"""
  180. name = slave_info.get('name', 'slave')
  181. # 清理名称,只保留字母数字和下划线
  182. clean_name = ''.join(c if c.isalnum() or c == '_' else '_' for c in name.lower())
  183. clean_name = clean_name.replace('__', '_').strip('_')
  184. # 根据产品代码生成后缀
  185. product_code = slave_info.get('product_code', 0)
  186. return f'eni_{product_code:04x}'
  187. def generate_c_code(self) -> str:
  188. """生成C代码"""
  189. lines = [
  190. "/*",
  191. " * Generated CherryECAT PDO configuration from ENI file",
  192. " * Auto-generated - do not modify manually",
  193. " */",
  194. "",
  195. "#include \"ec_master.h\"",
  196. ""
  197. ]
  198. for slave_idx, slave in enumerate(self.slaves):
  199. slave_info = slave['info']
  200. process_data = slave['process_data']
  201. slave_name = self.generate_slave_name(slave_info)
  202. lines.append(f"// Slave {slave_idx + 1}: {slave_info.get('name', 'Unknown')}")
  203. lines.append(f"// Vendor ID: 0x{slave_info.get('vendor_id', 0):08X}")
  204. lines.append(f"// Product Code: 0x{slave_info.get('product_code', 0):08X}")
  205. lines.append("")
  206. # 生成RxPDO entries (输出)
  207. rx_entries_generated = set()
  208. for pdo in process_data['rx_pdos']:
  209. pdo_index = pdo.get('index', 0)
  210. pdo_hex = f"{pdo_index:04x}"
  211. entries_name = f"{slave_name}_output_pdo_entries"
  212. if entries_name not in rx_entries_generated:
  213. lines.append(f"static ec_pdo_entry_info_t {entries_name}[] = {{")
  214. # 生成每个entry
  215. for entry in pdo.get('entries', []):
  216. comment = entry.get('name', 'Padding')
  217. lines.append(f" {{ 0x{entry['index']:04x}, 0x{entry['subindex']:02x}, 0x{entry['bit_length']:02x} }}, // {comment}")
  218. lines.append("};")
  219. lines.append("")
  220. rx_entries_generated.add(entries_name)
  221. # 生成TxPDO entries (输入)
  222. tx_entries_generated = set()
  223. for pdo in process_data['tx_pdos']:
  224. pdo_index = pdo.get('index', 0)
  225. pdo_hex = f"{pdo_index:04x}"
  226. entries_name = f"{slave_name}_input_pdo_entries"
  227. if entries_name not in tx_entries_generated:
  228. lines.append(f"static ec_pdo_entry_info_t {entries_name}[] = {{")
  229. # 生成每个entry
  230. for entry in pdo.get('entries', []):
  231. comment = entry.get('name', 'Padding')
  232. lines.append(f" {{ 0x{entry['index']:04x}, 0x{entry['subindex']:02x}, 0x{entry['bit_length']:02x} }}, // {comment}")
  233. lines.append("};")
  234. lines.append("")
  235. tx_entries_generated.add(entries_name)
  236. # 生成统一的PDO info数组(合并RxPDO和TxPDO)
  237. if process_data['rx_pdos'] or process_data['tx_pdos']:
  238. lines.append(f"static ec_pdo_info_t {slave_name}_pdos[] = {{")
  239. # 添加RxPDO (输出)
  240. for pdo in process_data['rx_pdos']:
  241. pdo_index = pdo.get('index', 0)
  242. entries_name = f"{slave_name}_output_pdo_entries"
  243. entry_count = len(pdo.get('entries', []))
  244. lines.append(f" {{ 0x{pdo_index:04x}, {entry_count}, {entries_name} }},")
  245. # 添加TxPDO (输入)
  246. for pdo in process_data['tx_pdos']:
  247. pdo_index = pdo.get('index', 0)
  248. entries_name = f"{slave_name}_input_pdo_entries"
  249. entry_count = len(pdo.get('entries', []))
  250. lines.append(f" {{ 0x{pdo_index:04x}, {entry_count}, {entries_name} }},")
  251. lines.append("};")
  252. lines.append("")
  253. # 生成同步管理器配置
  254. if process_data['rx_pdos'] or process_data['tx_pdos']:
  255. lines.append(f"static ec_sync_info_t {slave_name}_syncs[] = {{")
  256. pdo_index = 0 # PDO数组中的索引
  257. # 添加SM2 (输出)
  258. if process_data['rx_pdos']:
  259. rx_pdo_count = len(process_data['rx_pdos'])
  260. lines.append(f" {{ 2, EC_DIR_OUTPUT, {rx_pdo_count}, &{slave_name}_pdos[{pdo_index}], EC_WD_DISABLE }},")
  261. pdo_index += rx_pdo_count
  262. # 添加SM3 (输入)
  263. if process_data['tx_pdos']:
  264. tx_pdo_count = len(process_data['tx_pdos'])
  265. lines.append(f" {{ 3, EC_DIR_INPUT, {tx_pdo_count}, &{slave_name}_pdos[{pdo_index}], EC_WD_DISABLE }},")
  266. lines.append("};")
  267. lines.append("")
  268. return "\n".join(lines)
  269. def main():
  270. if len(sys.argv) != 3:
  271. print("Usage: python eni_parser.py <input.xml> <output.h>")
  272. print(" input.xml - ENI XML file")
  273. print(" output.h - Output C header file")
  274. sys.exit(1)
  275. input_file = sys.argv[1]
  276. output_file = sys.argv[2]
  277. if not os.path.exists(input_file):
  278. print(f"Error: Input file '{input_file}' not found")
  279. sys.exit(1)
  280. # 创建解析器
  281. parser = ENIParser()
  282. # 解析ENI文件
  283. print(f"Parsing ENI file: {input_file}")
  284. if not parser.parse_eni(input_file):
  285. print("Failed to parse ENI file")
  286. sys.exit(1)
  287. # 生成C代码
  288. print("Generating C code...")
  289. c_code = parser.generate_c_code()
  290. # 写入输出文件
  291. try:
  292. with open(output_file, 'w') as f:
  293. f.write(c_code)
  294. print(f"✓ Successfully converted '{input_file}' to '{output_file}'")
  295. print(f"✓ Generated C code for {len(parser.slaves)} slave(s)")
  296. # 显示生成的PDO映射信息
  297. for slave_idx, slave in enumerate(parser.slaves):
  298. process_data = slave['process_data']
  299. print(f"✓ Slave {slave_idx + 1}:")
  300. for pdo in process_data['rx_pdos']:
  301. print(f" - RxPDO 0x{pdo.get('index', 0):04X}: {len(pdo.get('entries', []))} entries")
  302. for pdo in process_data['tx_pdos']:
  303. print(f" - TxPDO 0x{pdo.get('index', 0):04X}: {len(pdo.get('entries', []))} entries")
  304. except Exception as e:
  305. print(f"Error writing output file: {e}")
  306. sys.exit(1)
  307. if __name__ == "__main__":
  308. main()