diff --git a/pfsense_xml_parser.py b/pfsense_xml_parser.py new file mode 100755 index 0000000..6d5545b --- /dev/null +++ b/pfsense_xml_parser.py @@ -0,0 +1,503 @@ +#!/usr/bin/env python3 +""" +pfSense XML Configuration Parser +Extracts comprehensive network information from pfSense backup XML files +""" + +import xml.etree.ElementTree as ET +import json +import argparse +from typing import Dict, List, Optional, Any +from pathlib import Path +import logging + +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + + +class PfSenseXMLParser: + """Parser for pfSense XML configuration files""" + + def __init__(self, xml_file: str): + self.xml_file = xml_file + self.tree = None + self.root = None + self.hostname = None + + def load_xml(self) -> bool: + """Load and parse the XML file""" + try: + self.tree = ET.parse(self.xml_file) + self.root = self.tree.getroot() + + # Extract hostname + hostname_elem = self.root.find('.//hostname') + if hostname_elem is not None: + self.hostname = hostname_elem.text + + logger.info(f"Loaded pfSense config: {self.hostname or 'Unknown'}") + return True + except Exception as e: + logger.error(f"Error loading XML file: {e}") + return False + + def get_text(self, element: Optional[ET.Element], default: str = "") -> str: + """Safely get text from XML element""" + if element is not None and element.text: + return element.text + return default + + def get_interfaces(self) -> Dict[str, Dict]: + """Extract network interfaces configuration""" + interfaces = {} + + if self.root is None: + return interfaces + + interfaces_elem = self.root.find('interfaces') + if interfaces_elem is None: + return interfaces + + for iface_elem in interfaces_elem: + iface_name = iface_elem.tag + if iface_name in ['wan', 'lan'] or iface_name.startswith('opt'): + interface = { + 'name': iface_name, + 'description': self.get_text(iface_elem.find('descr')), + 'enabled': iface_elem.find('enable') is not None, + 'interface': self.get_text(iface_elem.find('if')), + 'ipaddr': self.get_text(iface_elem.find('ipaddr')), + 'subnet': self.get_text(iface_elem.find('subnet')), + 'gateway': self.get_text(iface_elem.find('gateway')), + 'mtu': self.get_text(iface_elem.find('mtu')), + 'mss': self.get_text(iface_elem.find('mss')), + 'spoofmac': self.get_text(iface_elem.find('spoofmac')), + 'type': 'physical' + } + + # Determine interface type + if 'tun_wg' in interface['interface']: + interface['type'] = 'wireguard' + elif 'ovpn' in interface['interface']: + interface['type'] = 'openvpn' + elif 'ipsec' in interface['interface']: + interface['type'] = 'ipsec' + + interfaces[iface_name] = interface + + return interfaces + + def get_static_routes(self) -> List[Dict]: + """Extract static routes""" + routes = [] + + if self.root is None: + return routes + + staticroutes_elem = self.root.find('staticroutes') + if staticroutes_elem is None: + return routes + + for route_elem in staticroutes_elem: + if route_elem.tag == 'route': + route = { + 'network': self.get_text(route_elem.find('network')), + 'gateway': self.get_text(route_elem.find('gateway')), + 'description': self.get_text(route_elem.find('descr')) + } + routes.append(route) + + return routes + + def get_gateways(self) -> Dict[str, Dict]: + """Extract gateway configuration""" + gateways = {} + + if self.root is None: + return gateways + + gateways_elem = self.root.find('gateways') + if gateways_elem is None: + return gateways + + gateway_item_elem = gateways_elem.find('gateway_item') + if gateway_item_elem is not None: + for gw_elem in gateway_item_elem: + if gw_elem.tag == 'item': + name = self.get_text(gw_elem.find('name')) + if name: + gateway = { + 'name': name, + 'interface': self.get_text(gw_elem.find('interface')), + 'gateway': self.get_text(gw_elem.find('gateway')), + 'monitor': self.get_text(gw_elem.find('monitor')), + 'description': self.get_text(gw_elem.find('descr')), + 'defaultgw': gw_elem.find('defaultgw') is not None + } + gateways[name] = gateway + + return gateways + + def get_dhcp_config(self) -> Dict[str, Dict]: + """Extract DHCP server configuration""" + dhcp_config = {} + + if self.root is None: + return dhcp_config + + dhcpd_elem = self.root.find('dhcpd') + if dhcpd_elem is None: + return dhcp_config + + for dhcp_item in dhcpd_elem: + iface_name = dhcp_item.tag + dhcp_elem = dhcp_item + + config = { + 'enabled': True, + 'range': {}, + 'static_mappings': [] + } + + # DHCP range + range_elem = dhcp_elem.find('range') + if range_elem is not None: + config['range'] = { + 'from': self.get_text(range_elem.find('from')), + 'to': self.get_text(range_elem.find('to')) + } + + # DHCP options + config.update({ + 'defaultleasetime': self.get_text(dhcp_elem.find('defaultleasetime')), + 'maxleasetime': self.get_text(dhcp_elem.find('maxleasetime')), + 'gateway': self.get_text(dhcp_elem.find('gateway')), + 'domain': self.get_text(dhcp_elem.find('domain')), + 'domainsearchlist': self.get_text(dhcp_elem.find('domainsearchlist')), + 'ddnsdomain': self.get_text(dhcp_elem.find('ddnsdomain')), + 'dns1': '', + 'dns2': '', + 'ntpserver': '' + }) + + # DNS servers + dns_servers = dhcp_elem.findall('dnsserver') + if dns_servers: + config['dns1'] = self.get_text(dns_servers[0]) + if len(dns_servers) > 1: + config['dns2'] = self.get_text(dns_servers[1]) + + # NTP servers + ntp_servers = dhcp_elem.findall('ntpserver') + if ntp_servers: + config['ntpserver'] = self.get_text(ntp_servers[0]) + + # Static mappings + for staticmap_elem in dhcp_elem.findall('staticmap'): + mapping = { + 'mac': self.get_text(staticmap_elem.find('mac')), + 'ipaddr': self.get_text(staticmap_elem.find('ipaddr')), + 'hostname': self.get_text(staticmap_elem.find('hostname')), + 'description': self.get_text(staticmap_elem.find('descr')) + } + config['static_mappings'].append(mapping) + + dhcp_config[iface_name] = config + + return dhcp_config + + def get_wireguard_config(self) -> Dict[str, Any]: + """Extract WireGuard configuration""" + wg_config = { + 'enabled': False, + 'tunnels': [], + 'peers': [] + } + + if self.root is None: + return wg_config + + # Find WireGuard configuration + wg_elem = self.root.find('.//wireguard') + if wg_elem is None: + return wg_config + + # Check if enabled + config_elem = wg_elem.find('config') + if config_elem is not None: + wg_config['enabled'] = self.get_text(config_elem.find('enable')) == 'on' + + # Extract tunnels + tunnels_elem = wg_elem.find('tunnels') + if tunnels_elem is not None: + for item_elem in tunnels_elem.findall('item'): + tunnel = { + 'name': self.get_text(item_elem.find('name')), + 'enabled': self.get_text(item_elem.find('enabled')) == 'yes', + 'description': self.get_text(item_elem.find('descr')), + 'listenport': self.get_text(item_elem.find('listenport')), + 'publickey': self.get_text(item_elem.find('publickey')), + 'mtu': self.get_text(item_elem.find('mtu')) + } + wg_config['tunnels'].append(tunnel) + + # Extract peers + peers_elem = wg_elem.find('peers') + if peers_elem is not None: + for item_elem in peers_elem.findall('item'): + peer = { + 'enabled': self.get_text(item_elem.find('enabled')) == 'yes', + 'tunnel': self.get_text(item_elem.find('tun')), + 'description': self.get_text(item_elem.find('descr')), + 'publickey': self.get_text(item_elem.find('publickey')), + 'persistentkeepalive': self.get_text(item_elem.find('persistentkeepalive')), + 'allowed_ips': [] + } + + # Extract allowed IPs + allowedips_elem = item_elem.find('allowedips') + if allowedips_elem is not None: + for row_elem in allowedips_elem.findall('row'): + ip_info = { + 'address': self.get_text(row_elem.find('address')), + 'mask': self.get_text(row_elem.find('mask')), + 'description': self.get_text(row_elem.find('descr')) + } + peer['allowed_ips'].append(ip_info) + + wg_config['peers'].append(peer) + + return wg_config + + def get_openvpn_config(self) -> Dict[str, Any]: + """Extract OpenVPN configuration""" + ovpn_config = { + 'servers': [], + 'clients': [] + } + + if self.root is None: + return ovpn_config + + # OpenVPN servers + ovpnserver_elem = self.root.find('ovpnserver') + if ovpnserver_elem is not None: + # This is a complex configuration, extract basic info + ovpn_config['servers'].append({ + 'configured': True, + 'description': 'OpenVPN Server configured' + }) + + # OpenVPN clients + ovpnclient_elem = self.root.find('ovpnclient') + if ovpnclient_elem is not None: + for item_elem in ovpnclient_elem.findall('item'): + client = { + 'enabled': True, + 'description': self.get_text(item_elem.find('descr')), + 'server_addr': self.get_text(item_elem.find('server_addr')), + 'interface': self.get_text(item_elem.find('interface')) + } + ovpn_config['clients'].append(client) + + return ovpn_config + + def get_firewall_rules(self) -> List[Dict]: + """Extract firewall rules""" + rules = [] + + if self.root is None: + return rules + + filter_elem = self.root.find('filter') + if filter_elem is None: + return rules + + for rule_elem in filter_elem.findall('rule'): + rule = { + 'id': self.get_text(rule_elem.find('id')), + 'tracker': self.get_text(rule_elem.find('tracker')), + 'type': self.get_text(rule_elem.find('type')), + 'interface': self.get_text(rule_elem.find('interface')), + 'ipprotocol': self.get_text(rule_elem.find('ipprotocol')), + 'protocol': self.get_text(rule_elem.find('protocol')), + 'description': self.get_text(rule_elem.find('descr')), + 'enabled': True, + 'source': {}, + 'destination': {}, + 'log': rule_elem.find('log') is not None + } + + # Source + source_elem = rule_elem.find('source') + if source_elem is not None: + rule['source'] = { + 'address': self.get_text(source_elem.find('address')), + 'port': self.get_text(source_elem.find('port')), + 'any': source_elem.find('any') is not None + } + + # Destination + dest_elem = rule_elem.find('destination') + if dest_elem is not None: + rule['destination'] = { + 'address': self.get_text(dest_elem.find('address')), + 'port': self.get_text(dest_elem.find('port')), + 'any': dest_elem.find('any') is not None + } + + rules.append(rule) + + return rules + + def get_nat_rules(self) -> List[Dict]: + """Extract NAT rules""" + nat_rules = [] + + if self.root is None: + return nat_rules + + nat_elem = self.root.find('nat') + if nat_elem is None: + return nat_rules + + for rule_elem in nat_elem.findall('rule'): + rule = { + 'description': self.get_text(rule_elem.find('descr')), + 'interface': self.get_text(rule_elem.find('interface')), + 'protocol': self.get_text(rule_elem.find('protocol')), + 'source': {}, + 'destination': {}, + 'target': self.get_text(rule_elem.find('target')), + 'local_port': self.get_text(rule_elem.find('local-port')), + 'enabled': True + } + + # Source + source_elem = rule_elem.find('source') + if source_elem is not None: + rule['source'] = { + 'address': self.get_text(source_elem.find('address')), + 'port': self.get_text(source_elem.find('port')) + } + + # Destination + dest_elem = rule_elem.find('destination') + if dest_elem is not None: + rule['destination'] = { + 'address': self.get_text(dest_elem.find('address')), + 'port': self.get_text(dest_elem.find('port')) + } + + nat_rules.append(rule) + + return nat_rules + + def get_dns_config(self) -> Dict[str, Any]: + """Extract DNS configuration""" + dns_config = { + 'servers': [], + 'domain': '', + 'search_domains': [] + } + + if self.root is None: + return dns_config + + # DNS servers + system_elem = self.root.find('system') + if system_elem is not None: + for dns_elem in system_elem.findall('dnsserver'): + dns_config['servers'].append(self.get_text(dns_elem)) + + # Domain + domain_elem = system_elem.find('domain') + if domain_elem is not None and domain_elem.text: + dns_config['domain'] = domain_elem.text + + return dns_config + + def get_system_info(self) -> Dict[str, Any]: + """Extract system information""" + system_info = {} + + if self.root is None: + return system_info + + system_elem = self.root.find('system') + if system_elem is not None: + system_info = { + 'hostname': self.get_text(system_elem.find('hostname')), + 'domain': self.get_text(system_elem.find('domain')), + 'timezone': self.get_text(system_elem.find('timezone')), + 'language': self.get_text(system_elem.find('language')), + 'version': self.get_text(self.root.find('version')) + } + + return system_info + + def parse_all(self) -> Dict[str, Any]: + """Parse all configuration and return comprehensive data""" + if not self.load_xml(): + return {} + + logger.info(f"Parsing pfSense configuration: {self.hostname}") + + config_data = { + 'hostname': self.hostname, + 'system': self.get_system_info(), + 'interfaces': self.get_interfaces(), + 'static_routes': self.get_static_routes(), + 'gateways': self.get_gateways(), + 'dhcp': self.get_dhcp_config(), + 'wireguard': self.get_wireguard_config(), + 'openvpn': self.get_openvpn_config(), + 'firewall_rules': self.get_firewall_rules(), + 'nat_rules': self.get_nat_rules(), + 'dns': self.get_dns_config() + } + + return config_data + + def export_json(self, output_file: str): + """Export parsed data to JSON file""" + data = self.parse_all() + + with open(output_file, 'w', encoding='utf-8') as f: + json.dump(data, f, indent=2, ensure_ascii=False) + + logger.info(f"Exported pfSense config to {output_file}") + + +def main(): + """Command line interface""" + parser = argparse.ArgumentParser(description='Parse pfSense XML configuration files') + parser.add_argument('xml_file', help='pfSense XML configuration file') + parser.add_argument('-o', '--output', help='Output JSON file') + parser.add_argument('-v', '--verbose', action='store_true', help='Verbose output') + + args = parser.parse_args() + + if args.verbose: + logging.getLogger().setLevel(logging.DEBUG) + + # Parse the XML file + parser = PfSenseXMLParser(args.xml_file) + data = parser.parse_all() + + if not data: + logger.error("Failed to parse XML file") + return 1 + + # Output + if args.output: + parser.export_json(args.output) + print(f"✅ Parsed pfSense config and saved to {args.output}") + else: + print(json.dumps(data, indent=2, ensure_ascii=False)) + + return 0 + + +if __name__ == '__main__': + exit(main())