#!/usr/bin/env python3 """ pfSense XML Configuration Parser Extracts comprehensive network information from pfSense backup XML files """ import xml.etree.ElementTree as ET import json import argparse from typing import Dict, List, Optional, Any from pathlib import Path import logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class PfSenseXMLParser: """Parser for pfSense XML configuration files""" def __init__(self, xml_file: str): self.xml_file = xml_file self.tree = None self.root = None self.hostname = None def load_xml(self) -> bool: """Load and parse the XML file""" try: self.tree = ET.parse(self.xml_file) self.root = self.tree.getroot() # Extract hostname hostname_elem = self.root.find('.//hostname') if hostname_elem is not None: self.hostname = hostname_elem.text logger.info(f"Loaded pfSense config: {self.hostname or 'Unknown'}") return True except Exception as e: logger.error(f"Error loading XML file: {e}") return False def get_text(self, element: Optional[ET.Element], default: str = "") -> str: """Safely get text from XML element""" if element is not None and element.text: return element.text return default def get_interfaces(self) -> Dict[str, Dict]: """Extract network interfaces configuration""" interfaces = {} if self.root is None: return interfaces interfaces_elem = self.root.find('interfaces') if interfaces_elem is None: return interfaces for iface_elem in interfaces_elem: iface_name = iface_elem.tag if iface_name in ['wan', 'lan'] or iface_name.startswith('opt'): interface = { 'name': iface_name, 'description': self.get_text(iface_elem.find('descr')), 'enabled': iface_elem.find('enable') is not None, 'interface': self.get_text(iface_elem.find('if')), 'ipaddr': self.get_text(iface_elem.find('ipaddr')), 'subnet': self.get_text(iface_elem.find('subnet')), 'gateway': self.get_text(iface_elem.find('gateway')), 'mtu': self.get_text(iface_elem.find('mtu')), 'mss': self.get_text(iface_elem.find('mss')), 'spoofmac': self.get_text(iface_elem.find('spoofmac')), 'type': 'physical' } # Determine interface type if 'tun_wg' in interface['interface']: interface['type'] = 'wireguard' elif 'ovpn' in interface['interface']: interface['type'] = 'openvpn' elif 'ipsec' in interface['interface']: interface['type'] = 'ipsec' interfaces[iface_name] = interface return interfaces def get_static_routes(self) -> List[Dict]: """Extract static routes""" routes = [] if self.root is None: return routes staticroutes_elem = self.root.find('staticroutes') if staticroutes_elem is None: return routes for route_elem in staticroutes_elem: if route_elem.tag == 'route': route = { 'network': self.get_text(route_elem.find('network')), 'gateway': self.get_text(route_elem.find('gateway')), 'description': self.get_text(route_elem.find('descr')) } routes.append(route) return routes def get_gateways(self) -> Dict[str, Dict]: """Extract gateway configuration""" gateways = {} if self.root is None: return gateways gateways_elem = self.root.find('gateways') if gateways_elem is None: return gateways gateway_item_elem = gateways_elem.find('gateway_item') if gateway_item_elem is not None: for gw_elem in gateway_item_elem: if gw_elem.tag == 'item': name = self.get_text(gw_elem.find('name')) if name: gateway = { 'name': name, 'interface': self.get_text(gw_elem.find('interface')), 'gateway': self.get_text(gw_elem.find('gateway')), 'monitor': self.get_text(gw_elem.find('monitor')), 'description': self.get_text(gw_elem.find('descr')), 'defaultgw': gw_elem.find('defaultgw') is not None } gateways[name] = gateway return gateways def get_dhcp_config(self) -> Dict[str, Dict]: """Extract DHCP server configuration""" dhcp_config = {} if self.root is None: return dhcp_config dhcpd_elem = self.root.find('dhcpd') if dhcpd_elem is None: return dhcp_config for dhcp_item in dhcpd_elem: iface_name = dhcp_item.tag dhcp_elem = dhcp_item config = { 'enabled': True, 'range': {}, 'static_mappings': [] } # DHCP range range_elem = dhcp_elem.find('range') if range_elem is not None: config['range'] = { 'from': self.get_text(range_elem.find('from')), 'to': self.get_text(range_elem.find('to')) } # DHCP options config.update({ 'defaultleasetime': self.get_text(dhcp_elem.find('defaultleasetime')), 'maxleasetime': self.get_text(dhcp_elem.find('maxleasetime')), 'gateway': self.get_text(dhcp_elem.find('gateway')), 'domain': self.get_text(dhcp_elem.find('domain')), 'domainsearchlist': self.get_text(dhcp_elem.find('domainsearchlist')), 'ddnsdomain': self.get_text(dhcp_elem.find('ddnsdomain')), 'dns1': '', 'dns2': '', 'ntpserver': '' }) # DNS servers dns_servers = dhcp_elem.findall('dnsserver') if dns_servers: config['dns1'] = self.get_text(dns_servers[0]) if len(dns_servers) > 1: config['dns2'] = self.get_text(dns_servers[1]) # NTP servers ntp_servers = dhcp_elem.findall('ntpserver') if ntp_servers: config['ntpserver'] = self.get_text(ntp_servers[0]) # Static mappings for staticmap_elem in dhcp_elem.findall('staticmap'): mapping = { 'mac': self.get_text(staticmap_elem.find('mac')), 'ipaddr': self.get_text(staticmap_elem.find('ipaddr')), 'hostname': self.get_text(staticmap_elem.find('hostname')), 'description': self.get_text(staticmap_elem.find('descr')) } config['static_mappings'].append(mapping) dhcp_config[iface_name] = config return dhcp_config def get_wireguard_config(self) -> Dict[str, Any]: """Extract WireGuard configuration""" wg_config = { 'enabled': False, 'tunnels': [], 'peers': [] } if self.root is None: return wg_config # Find WireGuard configuration wg_elem = self.root.find('.//wireguard') if wg_elem is None: return wg_config # Check if enabled config_elem = wg_elem.find('config') if config_elem is not None: wg_config['enabled'] = self.get_text(config_elem.find('enable')) == 'on' # Extract tunnels tunnels_elem = wg_elem.find('tunnels') if tunnels_elem is not None: for item_elem in tunnels_elem.findall('item'): tunnel = { 'name': self.get_text(item_elem.find('name')), 'enabled': self.get_text(item_elem.find('enabled')) == 'yes', 'description': self.get_text(item_elem.find('descr')), 'listenport': self.get_text(item_elem.find('listenport')), 'publickey': self.get_text(item_elem.find('publickey')), 'mtu': self.get_text(item_elem.find('mtu')) } wg_config['tunnels'].append(tunnel) # Extract peers peers_elem = wg_elem.find('peers') if peers_elem is not None: for item_elem in peers_elem.findall('item'): peer = { 'enabled': self.get_text(item_elem.find('enabled')) == 'yes', 'tunnel': self.get_text(item_elem.find('tun')), 'description': self.get_text(item_elem.find('descr')), 'publickey': self.get_text(item_elem.find('publickey')), 'persistentkeepalive': self.get_text(item_elem.find('persistentkeepalive')), 'allowed_ips': [] } # Extract allowed IPs allowedips_elem = item_elem.find('allowedips') if allowedips_elem is not None: for row_elem in allowedips_elem.findall('row'): ip_info = { 'address': self.get_text(row_elem.find('address')), 'mask': self.get_text(row_elem.find('mask')), 'description': self.get_text(row_elem.find('descr')) } peer['allowed_ips'].append(ip_info) wg_config['peers'].append(peer) return wg_config def get_openvpn_config(self) -> Dict[str, Any]: """Extract OpenVPN configuration""" ovpn_config = { 'servers': [], 'clients': [] } if self.root is None: return ovpn_config # OpenVPN servers ovpnserver_elem = self.root.find('ovpnserver') if ovpnserver_elem is not None: # This is a complex configuration, extract basic info ovpn_config['servers'].append({ 'configured': True, 'description': 'OpenVPN Server configured' }) # OpenVPN clients ovpnclient_elem = self.root.find('ovpnclient') if ovpnclient_elem is not None: for item_elem in ovpnclient_elem.findall('item'): client = { 'enabled': True, 'description': self.get_text(item_elem.find('descr')), 'server_addr': self.get_text(item_elem.find('server_addr')), 'interface': self.get_text(item_elem.find('interface')) } ovpn_config['clients'].append(client) return ovpn_config def get_firewall_rules(self) -> List[Dict]: """Extract firewall rules""" rules = [] if self.root is None: return rules filter_elem = self.root.find('filter') if filter_elem is None: return rules for rule_elem in filter_elem.findall('rule'): rule = { 'id': self.get_text(rule_elem.find('id')), 'tracker': self.get_text(rule_elem.find('tracker')), 'type': self.get_text(rule_elem.find('type')), 'interface': self.get_text(rule_elem.find('interface')), 'ipprotocol': self.get_text(rule_elem.find('ipprotocol')), 'protocol': self.get_text(rule_elem.find('protocol')), 'description': self.get_text(rule_elem.find('descr')), 'enabled': True, 'source': {}, 'destination': {}, 'log': rule_elem.find('log') is not None } # Source source_elem = rule_elem.find('source') if source_elem is not None: rule['source'] = { 'address': self.get_text(source_elem.find('address')), 'port': self.get_text(source_elem.find('port')), 'any': source_elem.find('any') is not None } # Destination dest_elem = rule_elem.find('destination') if dest_elem is not None: rule['destination'] = { 'address': self.get_text(dest_elem.find('address')), 'port': self.get_text(dest_elem.find('port')), 'any': dest_elem.find('any') is not None } rules.append(rule) return rules def get_nat_rules(self) -> List[Dict]: """Extract NAT rules""" nat_rules = [] if self.root is None: return nat_rules nat_elem = self.root.find('nat') if nat_elem is None: return nat_rules for rule_elem in nat_elem.findall('rule'): rule = { 'description': self.get_text(rule_elem.find('descr')), 'interface': self.get_text(rule_elem.find('interface')), 'protocol': self.get_text(rule_elem.find('protocol')), 'source': {}, 'destination': {}, 'target': self.get_text(rule_elem.find('target')), 'local_port': self.get_text(rule_elem.find('local-port')), 'enabled': True } # Source source_elem = rule_elem.find('source') if source_elem is not None: rule['source'] = { 'address': self.get_text(source_elem.find('address')), 'port': self.get_text(source_elem.find('port')) } # Destination dest_elem = rule_elem.find('destination') if dest_elem is not None: rule['destination'] = { 'address': self.get_text(dest_elem.find('address')), 'port': self.get_text(dest_elem.find('port')) } nat_rules.append(rule) return nat_rules def get_dns_config(self) -> Dict[str, Any]: """Extract DNS configuration""" dns_config = { 'servers': [], 'domain': '', 'search_domains': [] } if self.root is None: return dns_config # DNS servers system_elem = self.root.find('system') if system_elem is not None: for dns_elem in system_elem.findall('dnsserver'): dns_config['servers'].append(self.get_text(dns_elem)) # Domain domain_elem = system_elem.find('domain') if domain_elem is not None and domain_elem.text: dns_config['domain'] = domain_elem.text return dns_config def get_system_info(self) -> Dict[str, Any]: """Extract system information""" system_info = {} if self.root is None: return system_info system_elem = self.root.find('system') if system_elem is not None: system_info = { 'hostname': self.get_text(system_elem.find('hostname')), 'domain': self.get_text(system_elem.find('domain')), 'timezone': self.get_text(system_elem.find('timezone')), 'language': self.get_text(system_elem.find('language')), 'version': self.get_text(self.root.find('version')) } return system_info def parse_all(self) -> Dict[str, Any]: """Parse all configuration and return comprehensive data""" if not self.load_xml(): return {} logger.info(f"Parsing pfSense configuration: {self.hostname}") config_data = { 'hostname': self.hostname, 'system': self.get_system_info(), 'interfaces': self.get_interfaces(), 'static_routes': self.get_static_routes(), 'gateways': self.get_gateways(), 'dhcp': self.get_dhcp_config(), 'wireguard': self.get_wireguard_config(), 'openvpn': self.get_openvpn_config(), 'firewall_rules': self.get_firewall_rules(), 'nat_rules': self.get_nat_rules(), 'dns': self.get_dns_config() } return config_data def export_json(self, output_file: str): """Export parsed data to JSON file""" data = self.parse_all() with open(output_file, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) logger.info(f"Exported pfSense config to {output_file}") def main(): """Command line interface""" parser = argparse.ArgumentParser(description='Parse pfSense XML configuration files') parser.add_argument('xml_file', help='pfSense XML configuration file') parser.add_argument('-o', '--output', help='Output JSON file') parser.add_argument('-v', '--verbose', action='store_true', help='Verbose output') args = parser.parse_args() if args.verbose: logging.getLogger().setLevel(logging.DEBUG) # Parse the XML file parser = PfSenseXMLParser(args.xml_file) data = parser.parse_all() if not data: logger.error("Failed to parse XML file") return 1 # Output if args.output: parser.export_json(args.output) print(f"✅ Parsed pfSense config and saved to {args.output}") else: print(json.dumps(data, indent=2, ensure_ascii=False)) return 0 if __name__ == '__main__': exit(main())