| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- ENI (EtherCAT Network Information) Parser
- Copyright (c) 2025, sakumisu
- SPDX-License-Identifier: Apache-2.0
- """
- import xml.etree.ElementTree as ET
- import sys
- import os
- from typing import Dict, List, Tuple, Optional
- class ENIParser:
- def __init__(self):
- self.slaves = []
- def parse_hex_value(self, hex_str: str) -> int:
- """解析十六进制字符串"""
- if not hex_str:
- return 0
- hex_str = hex_str.strip()
- if hex_str.startswith('#x'):
- return int(hex_str[2:], 16)
- elif hex_str.startswith('0x'):
- return int(hex_str[2:], 16)
- else:
- try:
- return int(hex_str, 16)
- except:
- try:
- return int(hex_str, 10)
- except:
- return 0
- def parse_slave_info(self, slave_elem):
- """解析从站基本信息"""
- slave_info = {}
- info_elem = slave_elem.find('Info')
- if info_elem is not None:
- name_elem = info_elem.find('Name')
- if name_elem is not None:
- slave_info['name'] = name_elem.text.strip() if name_elem.text else ""
- vendor_id_elem = info_elem.find('VendorId')
- if vendor_id_elem is not None:
- slave_info['vendor_id'] = int(vendor_id_elem.text)
- product_code_elem = info_elem.find('ProductCode')
- if product_code_elem is not None:
- slave_info['product_code'] = int(product_code_elem.text)
- revision_no_elem = info_elem.find('RevisionNo')
- if revision_no_elem is not None:
- slave_info['revision_no'] = int(revision_no_elem.text)
- return slave_info
- def parse_pdo_entry(self, entry_elem):
- """解析单个PDO条目"""
- entry_info = {}
- # 解析Index
- index_elem = entry_elem.find('Index')
- if index_elem is not None and index_elem.text:
- entry_info['index'] = self.parse_hex_value(index_elem.text)
- else:
- entry_info['index'] = 0x0000
- # 解析SubIndex
- subindex_elem = entry_elem.find('SubIndex')
- if subindex_elem is not None and subindex_elem.text:
- entry_info['subindex'] = int(subindex_elem.text)
- else:
- entry_info['subindex'] = 0x00
- # 解析BitLen
- bitlen_elem = entry_elem.find('BitLen')
- if bitlen_elem is not None and bitlen_elem.text:
- entry_info['bit_length'] = int(bitlen_elem.text)
- else:
- entry_info['bit_length'] = 16
- # 解析Name (作为注释)
- name_elem = entry_elem.find('Name')
- if name_elem is not None and name_elem.text:
- entry_info['name'] = name_elem.text.strip()
- else:
- # 如果Index是0或#x0,标记为Padding
- if entry_info['index'] == 0:
- entry_info['name'] = 'Padding'
- else:
- entry_info['name'] = f'Object_{entry_info["index"]:04X}'
- # 解析DataType
- datatype_elem = entry_elem.find('DataType')
- if datatype_elem is not None and datatype_elem.text:
- entry_info['data_type'] = datatype_elem.text.strip()
- else:
- entry_info['data_type'] = 'UINT'
- # 解析Comment
- comment_elem = entry_elem.find('Comment')
- if comment_elem is not None and comment_elem.text:
- entry_info['comment'] = comment_elem.text.strip()
- else:
- entry_info['comment'] = ''
- return entry_info
- def parse_process_data(self, slave_elem):
- """解析过程数据配置"""
- process_data = {
- 'rx_pdos': [], # 输出PDO (主站->从站)
- 'tx_pdos': [], # 输入PDO (从站->主站)
- 'syncs': []
- }
- process_elem = slave_elem.find('ProcessData')
- if process_elem is None:
- return process_data
- # 解析RxPDO (输出)
- for rxpdo_elem in process_elem.findall('RxPdo'):
- pdo_info = {}
- # 解析PDO Index
- index_elem = rxpdo_elem.find('Index')
- if index_elem is not None:
- pdo_info['index'] = self.parse_hex_value(index_elem.text)
- # 解析PDO Name
- name_elem = rxpdo_elem.find('Name')
- if name_elem is not None:
- pdo_info['name'] = name_elem.text.strip() if name_elem.text else ""
- # 解析所有Entry
- entries = []
- for entry_elem in rxpdo_elem.findall('Entry'):
- entry_info = self.parse_pdo_entry(entry_elem)
- entries.append(entry_info)
- pdo_info['entries'] = entries
- process_data['rx_pdos'].append(pdo_info)
- # 解析TxPDO (输入)
- for txpdo_elem in process_elem.findall('TxPdo'):
- pdo_info = {}
- # 解析PDO Index
- index_elem = txpdo_elem.find('Index')
- if index_elem is not None:
- pdo_info['index'] = self.parse_hex_value(index_elem.text)
- # 解析PDO Name
- name_elem = txpdo_elem.find('Name')
- if name_elem is not None:
- pdo_info['name'] = name_elem.text.strip() if name_elem.text else ""
- # 解析所有Entry
- entries = []
- for entry_elem in txpdo_elem.findall('Entry'):
- entry_info = self.parse_pdo_entry(entry_elem)
- entries.append(entry_info)
- pdo_info['entries'] = entries
- process_data['tx_pdos'].append(pdo_info)
- # 解析同步管理器配置
- sm2_elem = process_elem.find('Sm2')
- if sm2_elem is not None:
- sm_info = {
- 'index': 2,
- 'direction': 'EC_DIR_OUTPUT',
- 'type': sm2_elem.find('Type').text if sm2_elem.find('Type') is not None else 'Outputs'
- }
- process_data['syncs'].append(sm_info)
- sm3_elem = process_elem.find('Sm3')
- if sm3_elem is not None:
- sm_info = {
- 'index': 3,
- 'direction': 'EC_DIR_INPUT',
- 'type': sm3_elem.find('Type').text if sm3_elem.find('Type') is not None else 'Inputs'
- }
- process_data['syncs'].append(sm_info)
- return process_data
- def parse_eni(self, eni_file: str) -> bool:
- """解析ENI文件"""
- try:
- tree = ET.parse(eni_file)
- root = tree.getroot()
- # 解析从站配置
- for slave_elem in root.findall('.//Slave'):
- slave_info = self.parse_slave_info(slave_elem)
- process_data = self.parse_process_data(slave_elem)
- slave_config = {
- 'info': slave_info,
- 'process_data': process_data
- }
- self.slaves.append(slave_config)
- return True
- except Exception as e:
- print(f"Error parsing ENI file: {e}")
- import traceback
- traceback.print_exc()
- return False
- def generate_slave_name(self, slave_info):
- """生成从站名称标识符"""
- name = slave_info.get('name', 'slave')
- # 清理名称,只保留字母数字和下划线
- clean_name = ''.join(c if c.isalnum() or c == '_' else '_' for c in name.lower())
- clean_name = clean_name.replace('__', '_').strip('_')
- # 根据产品代码生成后缀
- product_code = slave_info.get('product_code', 0)
- return f'eni_{product_code:04x}'
- def generate_c_code(self) -> str:
- """生成C代码"""
- lines = [
- "/*",
- " * Generated CherryECAT PDO configuration from ENI file",
- " * Auto-generated - do not modify manually",
- " */",
- "",
- "#include \"ec_master.h\"",
- ""
- ]
- for slave_idx, slave in enumerate(self.slaves):
- slave_info = slave['info']
- process_data = slave['process_data']
- slave_name = self.generate_slave_name(slave_info)
- lines.append(f"// Slave {slave_idx + 1}: {slave_info.get('name', 'Unknown')}")
- lines.append(f"// Vendor ID: 0x{slave_info.get('vendor_id', 0):08X}")
- lines.append(f"// Product Code: 0x{slave_info.get('product_code', 0):08X}")
- lines.append("")
- # 生成RxPDO entries (输出)
- rx_entries_generated = set()
- for pdo in process_data['rx_pdos']:
- pdo_index = pdo.get('index', 0)
- pdo_hex = f"{pdo_index:04x}"
- entries_name = f"{slave_name}_output_pdo_entries"
- if entries_name not in rx_entries_generated:
- lines.append(f"static ec_pdo_entry_info_t {entries_name}[] = {{")
- # 生成每个entry
- for entry in pdo.get('entries', []):
- comment = entry.get('name', 'Padding')
- lines.append(f" {{ 0x{entry['index']:04x}, 0x{entry['subindex']:02x}, 0x{entry['bit_length']:02x} }}, // {comment}")
- lines.append("};")
- lines.append("")
- rx_entries_generated.add(entries_name)
- # 生成TxPDO entries (输入)
- tx_entries_generated = set()
- for pdo in process_data['tx_pdos']:
- pdo_index = pdo.get('index', 0)
- pdo_hex = f"{pdo_index:04x}"
- entries_name = f"{slave_name}_input_pdo_entries"
- if entries_name not in tx_entries_generated:
- lines.append(f"static ec_pdo_entry_info_t {entries_name}[] = {{")
- # 生成每个entry
- for entry in pdo.get('entries', []):
- comment = entry.get('name', 'Padding')
- lines.append(f" {{ 0x{entry['index']:04x}, 0x{entry['subindex']:02x}, 0x{entry['bit_length']:02x} }}, // {comment}")
- lines.append("};")
- lines.append("")
- tx_entries_generated.add(entries_name)
- # 生成统一的PDO info数组(合并RxPDO和TxPDO)
- if process_data['rx_pdos'] or process_data['tx_pdos']:
- lines.append(f"static ec_pdo_info_t {slave_name}_pdos[] = {{")
- # 添加RxPDO (输出)
- for pdo in process_data['rx_pdos']:
- pdo_index = pdo.get('index', 0)
- entries_name = f"{slave_name}_output_pdo_entries"
- entry_count = len(pdo.get('entries', []))
- lines.append(f" {{ 0x{pdo_index:04x}, {entry_count}, {entries_name} }},")
- # 添加TxPDO (输入)
- for pdo in process_data['tx_pdos']:
- pdo_index = pdo.get('index', 0)
- entries_name = f"{slave_name}_input_pdo_entries"
- entry_count = len(pdo.get('entries', []))
- lines.append(f" {{ 0x{pdo_index:04x}, {entry_count}, {entries_name} }},")
- lines.append("};")
- lines.append("")
- # 生成同步管理器配置
- if process_data['rx_pdos'] or process_data['tx_pdos']:
- lines.append(f"static ec_sync_info_t {slave_name}_syncs[] = {{")
- pdo_index = 0 # PDO数组中的索引
- # 添加SM2 (输出)
- if process_data['rx_pdos']:
- rx_pdo_count = len(process_data['rx_pdos'])
- lines.append(f" {{ 2, EC_DIR_OUTPUT, {rx_pdo_count}, &{slave_name}_pdos[{pdo_index}], EC_WD_DISABLE }},")
- pdo_index += rx_pdo_count
- # 添加SM3 (输入)
- if process_data['tx_pdos']:
- tx_pdo_count = len(process_data['tx_pdos'])
- lines.append(f" {{ 3, EC_DIR_INPUT, {tx_pdo_count}, &{slave_name}_pdos[{pdo_index}], EC_WD_DISABLE }},")
- lines.append("};")
- lines.append("")
- return "\n".join(lines)
- def main():
- if len(sys.argv) != 3:
- print("Usage: python eni_parser.py <input.xml> <output.h>")
- print(" input.xml - ENI XML file")
- print(" output.h - Output C header file")
- sys.exit(1)
- input_file = sys.argv[1]
- output_file = sys.argv[2]
- if not os.path.exists(input_file):
- print(f"Error: Input file '{input_file}' not found")
- sys.exit(1)
- # 创建解析器
- parser = ENIParser()
- # 解析ENI文件
- print(f"Parsing ENI file: {input_file}")
- if not parser.parse_eni(input_file):
- print("Failed to parse ENI file")
- sys.exit(1)
- # 生成C代码
- print("Generating C code...")
- c_code = parser.generate_c_code()
- # 写入输出文件
- try:
- with open(output_file, 'w') as f:
- f.write(c_code)
- print(f"✓ Successfully converted '{input_file}' to '{output_file}'")
- print(f"✓ Generated C code for {len(parser.slaves)} slave(s)")
- # 显示生成的PDO映射信息
- for slave_idx, slave in enumerate(parser.slaves):
- process_data = slave['process_data']
- print(f"✓ Slave {slave_idx + 1}:")
- for pdo in process_data['rx_pdos']:
- print(f" - RxPDO 0x{pdo.get('index', 0):04X}: {len(pdo.get('entries', []))} entries")
- for pdo in process_data['tx_pdos']:
- print(f" - TxPDO 0x{pdo.get('index', 0):04X}: {len(pdo.get('entries', []))} entries")
- except Exception as e:
- print(f"Error writing output file: {e}")
- sys.exit(1)
- if __name__ == "__main__":
- main()
|