#!/usr/bin/env python3 """ pfSense XML Integration Script Automatically processes pfSense XML backup files and integrates them into network scan results """ import json import argparse from pathlib import Path import logging from typing import Dict, List, Any from pfsense_xml_parser import PfSenseXMLParser logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class PfSenseIntegrator: """Integrates pfSense XML data into network scan results""" def __init__(self, xml_files: List[str]): self.xml_files = xml_files self.pfsense_configs = {} def load_pfsense_configs(self): """Load and parse all pfSense XML files""" logger.info(f"Loading {len(self.xml_files)} pfSense configuration files...") for xml_file in self.xml_files: try: parser = PfSenseXMLParser(xml_file) if parser.load_xml(): config = parser.parse_all() hostname = config.get('system', {}).get('hostname', 'unknown') self.pfsense_configs[hostname] = config logger.info(f"Loaded pfSense config: {hostname}") else: logger.error(f"Failed to load: {xml_file}") except Exception as e: logger.error(f"Error parsing {xml_file}: {e}") def integrate_with_scan(self, scan_file: str, output_file: str): """Integrate pfSense data into network scan results""" logger.info(f"Integrating pfSense data into {scan_file}") # Load network scan with open(scan_file, 'r') as f: scan_data = json.load(f) # Add pfSense configurations scan_data['pfsense_configs'] = self.pfsense_configs # Enhance network segments with pfSense data self._enhance_segments_with_pfsense(scan_data) # Save enhanced scan with open(output_file, 'w') as f: json.dump(scan_data, f, indent=2) logger.info(f"Enhanced scan saved to {output_file}") def _enhance_segments_with_pfsense(self, scan_data: Dict): """Enhance network segments with pfSense-specific information""" segments = scan_data.get('segments', []) for pfsense_name, pfsense_config in self.pfsense_configs.items(): logger.info(f"Processing pfSense: {pfsense_name}") # Find or create segment for this pfSense pfsense_segment = self._find_or_create_pfsense_segment(segments, pfsense_config) # Add pfSense device if not already present pfsense_device = self._add_pfsense_device(pfsense_segment, pfsense_config) # Add networks discovered from pfSense config self._add_pfsense_networks(segments, pfsense_config) def _find_or_create_pfsense_segment(self, segments: List[Dict], pfsense_config: Dict) -> Dict: """Find existing segment or create new one for pfSense""" interfaces = pfsense_config.get('interfaces', {}) # Look for LAN interface lan_interface = interfaces.get('lan') if lan_interface and lan_interface.get('ipaddr') and lan_interface.get('subnet'): lan_network = f"{lan_interface['ipaddr']}/{lan_interface['subnet']}" # Find existing segment for segment in segments: if segment.get('cidr') == lan_network: return segment # Create new segment new_segment = { 'name': f"{pfsense_config['system']['hostname']}_LAN", 'cidr': lan_network, 'gateway': lan_interface.get('ipaddr'), 'is_vpn': False, 'devices': [] } segments.append(new_segment) return new_segment # Fallback: create segment with hostname hostname = pfsense_config['system']['hostname'] for segment in segments: if segment.get('name', '').startswith(hostname): return segment new_segment = { 'name': f"{hostname}_networks", 'cidr': 'unknown', 'gateway': None, 'is_vpn': False, 'devices': [] } segments.append(new_segment) return new_segment def _add_pfsense_device(self, segment: Dict, pfsense_config: Dict) -> Dict: """Add pfSense device to segment""" hostname = pfsense_config['system']['hostname'] domain = pfsense_config['system'].get('domain', '') # Find LAN interface for IP interfaces = pfsense_config.get('interfaces', {}) lan_interface = interfaces.get('lan') pfsense_ip = lan_interface.get('ipaddr') if lan_interface else 'unknown' # Create pfSense device device = { 'ip': pfsense_ip, 'hostname': f"{hostname}.{domain}" if domain else hostname, 'mac': None, 'manufacturer': None, 'os_type': 'pfSense (FreeBSD)', 'os_version': pfsense_config.get('version', 'unknown'), 'device_type': 'firewall', 'open_ports': [22, 80, 443], # Common pfSense ports 'ssh_accessible': False, # Assume not accessible for security 'services': ['pfSense Firewall', 'Web GUI', 'SSH'], 'routes': self._extract_routes(pfsense_config), 'interfaces': self._extract_interfaces(pfsense_config), 'pfsense_config': pfsense_config } # Check if device already exists for existing_device in segment['devices']: if existing_device.get('ip') == pfsense_ip: # Update existing device with pfSense info existing_device.update(device) return existing_device # Add new device segment['devices'].append(device) return device def _extract_routes(self, pfsense_config: Dict) -> List[Dict]: """Extract routing information from pfSense config""" routes = [] # Static routes static_routes = pfsense_config.get('static_routes', []) for route in static_routes: routes.append({ 'destination': route.get('network', ''), 'gateway': route.get('gateway', ''), 'interface': 'static', 'description': route.get('description', ''), 'type': 'static' }) # Default route from gateways gateways = pfsense_config.get('gateways', {}) for gw_name, gw_config in gateways.items(): if gw_config.get('defaultgw'): routes.append({ 'destination': 'default', 'gateway': gw_config.get('gateway', ''), 'interface': gw_config.get('interface', ''), 'description': f"Default gateway: {gw_name}", 'type': 'default' }) return routes def _extract_interfaces(self, pfsense_config: Dict) -> List[Dict]: """Extract interface information""" interfaces = [] for iface_name, iface_config in pfsense_config.get('interfaces', {}).items(): interface = { 'name': iface_name, 'description': iface_config.get('description', ''), 'physical_interface': iface_config.get('interface', ''), 'ip_address': iface_config.get('ipaddr', ''), 'subnet': iface_config.get('subnet', ''), 'network_cidr': iface_config.get('network_cidr', ''), 'gateway': iface_config.get('gateway', ''), 'mtu': iface_config.get('mtu', ''), 'type': iface_config.get('type', 'physical') } interfaces.append(interface) return interfaces def _add_pfsense_networks(self, segments: List[Dict], pfsense_config: Dict): """Add networks discovered from pfSense configuration""" interfaces = pfsense_config.get('interfaces', {}) for iface_name, iface_config in interfaces.items(): if iface_name == 'wan': continue # Skip WAN for now network_cidr = iface_config.get('network_cidr') if network_cidr and network_cidr != 'unknown': # Check if segment already exists segment_exists = any(seg.get('cidr') == network_cidr for seg in segments) if not segment_exists: segment_name = f"{pfsense_config['system']['hostname']}_{iface_name.upper()}" new_segment = { 'name': segment_name, 'cidr': network_cidr, 'gateway': iface_config.get('ipaddr'), 'is_vpn': iface_name.startswith('opt') and 'wireguard' in iface_config.get('description', '').lower(), 'devices': [] } segments.append(new_segment) logger.info(f"Added network segment: {network_cidr}") # Add WireGuard networks wireguard = pfsense_config.get('wireguard', {}) for peer in wireguard.get('peers', []): for allowed_ip in peer.get('allowed_ips', []): network = f"{allowed_ip['address']}/{allowed_ip['mask']}" segment_exists = any(seg.get('cidr') == network for seg in segments) if not segment_exists: new_segment = { 'name': f"WireGuard_{peer.get('description', 'unknown').replace(' ', '_')}", 'cidr': network, 'gateway': None, 'is_vpn': True, 'devices': [] } segments.append(new_segment) logger.info(f"Added WireGuard network: {network}") def generate_network_summary(self, output_file: str): """Generate a human-readable network summary""" summary = [] summary.append("# Network Topology Summary") summary.append("Generated from pfSense XML configurations\n") for pfsense_name, config in self.pfsense_configs.items(): summary.append(f"## pfSense Firewall: {pfsense_name}") summary.append(f"**Version:** {config.get('version', 'unknown')}") summary.append(f"**Domain:** {config.get('system', {}).get('domain', 'unknown')}\n") # Interfaces interfaces = config.get('interfaces', {}) if interfaces: summary.append("### Network Interfaces") for iface_name, iface in interfaces.items(): ip = iface.get('ipaddr', 'DHCP') subnet = iface.get('subnet', '') network = iface.get('network_cidr', '') desc = iface.get('description', iface_name.upper()) summary.append(f"- **{desc}** ({iface_name}): {ip}") if network: summary.append(f" - Network: {network}") if iface.get('gateway'): summary.append(f" - Gateway: {iface.get('gateway')}") summary.append("") # Static Routes routes = config.get('static_routes', []) if routes: summary.append("### Static Routes") for route in routes: network = route.get('network', '') gateway = route.get('gateway', '') desc = route.get('description', '') summary.append(f"- {network} via {gateway}") if desc: summary.append(f" *{desc}*") summary.append("") # WireGuard wg = config.get('wireguard', {}) if wg.get('enabled') and wg.get('tunnels'): summary.append("### WireGuard VPN") for tunnel in wg.get('tunnels', []): name = tunnel.get('name', 'unknown') port = tunnel.get('listenport', 'unknown') desc = tunnel.get('description', '') summary.append(f"- **Tunnel {name}** (Port {port})") if desc: summary.append(f" *{desc}*") for peer in wg.get('peers', []): desc = peer.get('description', 'unknown') allowed_ips = peer.get('allowed_ips', []) if allowed_ips: networks = [f"{ip['address']}/{ip['mask']}" for ip in allowed_ips] summary.append(f" - Peer: {desc} - Networks: {', '.join(networks)}") summary.append("") # DHCP dhcp = config.get('dhcp', {}) if dhcp: summary.append("### DHCP Configuration") for iface_name, dhcp_config in dhcp.items(): if dhcp_config.get('enabled', True): range_from = dhcp_config.get('range_from', '') range_to = dhcp_config.get('range_to', '') if range_from and range_to: summary.append(f"- **{iface_name.upper()}**: {range_from} - {range_to}") static_maps = dhcp_config.get('static_maps', []) if static_maps: summary.append(" **Static Mappings:**") for static in static_maps: ip = static.get('ipaddr', '') mac = static.get('mac', '') hostname = static.get('hostname', '') summary.append(f" - {ip} ({mac}) - {hostname}") summary.append("") # Write summary with open(output_file, 'w') as f: f.write('\n'.join(summary)) logger.info(f"Network summary saved to {output_file}") def main(): """Command line interface""" parser = argparse.ArgumentParser(description='Integrate pfSense XML configs with network scan') parser.add_argument('xml_files', nargs='+', help='pfSense XML configuration files') parser.add_argument('-s', '--scan', help='Network scan JSON file to enhance') parser.add_argument('-o', '--output', help='Output enhanced scan file') parser.add_argument('--summary', help='Generate network summary markdown file') parser.add_argument('-v', '--verbose', action='store_true', help='Verbose output') args = parser.parse_args() if args.verbose: logging.getLogger().setLevel(logging.DEBUG) # Initialize integrator integrator = PfSenseIntegrator(args.xml_files) integrator.load_pfsense_configs() if not integrator.pfsense_configs: logger.error("No pfSense configurations loaded!") return 1 # Generate summary if requested if args.summary: integrator.generate_network_summary(args.summary) print(f"āœ… Network summary generated: {args.summary}") # Integrate with scan if provided if args.scan: output_file = args.output or args.scan.replace('.json', '_enhanced.json') integrator.integrate_with_scan(args.scan, output_file) print(f"āœ… Enhanced scan saved: {output_file}") # Show summary print(f"\nšŸ“Š Loaded {len(integrator.pfsense_configs)} pfSense configurations:") for name in integrator.pfsense_configs.keys(): print(f" - {name}") return 0 if __name__ == '__main__': exit(main())