Compare commits

...

3 Commits

Author SHA1 Message Date
mindesbunister
b8e06617e8 Add pfSense XML integration and complete workflow automation
- Add pfsense_integrator.py for automatic XML parsing and integration
- Add complete_workflow.sh for one-command network discovery
- Enhance integrated_scanner.py to auto-integrate pfSense XML files
- Update README with pfSense XML features and workflow
- Generate comprehensive network summaries from XML configs
- Support for WireGuard, OpenVPN, IPsec, routing, DHCP, firewall rules
2025-10-10 11:23:09 +02:00
mindesbunister
afe8903454 Add comprehensive network mapper and workflow script
- comprehensive_mapper.py: Combines network scanning with pfSense XML parsing
- run_network_mapping.sh: Complete workflow script for network discovery
- Successfully tested with both pfSense XML files and live network scan
- Generates comprehensive JSON data and SVG network diagrams
- Includes WireGuard VPN topology, static routes, and DHCP mappings
2025-10-10 11:14:37 +02:00
mindesbunister
7621e1829d Add pfSense XML configuration parser
- Parse pfSense backup XML files to extract network configuration
- Extract interfaces, static routes, gateways, DHCP, WireGuard, OpenVPN
- Extract firewall rules, NAT rules, DNS configuration
- Generate structured JSON output for network diagram generation
- Tested on both pfSense configurations (gw-nue01, gw-st01)
2025-10-10 11:12:33 +02:00
9 changed files with 1768 additions and 0 deletions

View File

@@ -12,6 +12,8 @@ A comprehensive network topology discovery tool that scans local and VPN-connect
- 📊 **SVG Diagram Generation**: Creates visual network topology diagrams
- 🔄 **Routing Analysis**: Extracts and analyzes routing tables from routers
- 📝 **JSON Export**: Structured data output for further processing
- 📄 **pfSense XML Parsing**: Automatically parses pfSense backup XML files for complete configuration analysis
- 🔗 **Automatic Integration**: Seamlessly integrates pfSense XML data into network scans
## Requirements
@@ -205,6 +207,23 @@ The scanner produces JSON with the following structure:
firefox network_topology.svg
```
### Complete Automated Workflow
For the ultimate network discovery experience, use the automated workflow script:
```bash
./complete_workflow.sh
```
This script will:
1. ✅ Verify system requirements
2. 🔍 Run integrated network scan (including pfSense XML if present)
3. 🎨 Generate SVG network diagram
4. 📋 Create network summary (if pfSense XML files exist)
5. 📊 Display statistics and next steps
**One-command network discovery!**
## SSH Access Setup
For automated scanning, SSH key-based authentication is recommended:
@@ -362,3 +381,27 @@ Created for comprehensive network topology discovery and visualization.
---
**Note**: Always ensure you have proper authorization before scanning networks. This tool performs active network reconnaissance.
### pfSense XML Integration
The scanner can automatically parse pfSense backup XML files to extract comprehensive configuration data:
- **Network Interfaces**: All interface configurations, IP addresses, VLANs
- **Routing Tables**: Static routes, dynamic routing, gateway configurations
- **VPN Configurations**: WireGuard tunnels, OpenVPN servers/clients, IPsec
- **Firewall Rules**: NAT rules, port forwarding, access control lists
- **DHCP Services**: Server configurations, static mappings, lease pools
- **DNS Settings**: Resolvers, domain configurations
- **System Information**: Hostname, domain, version, services
**Automatic Integration**: Place pfSense XML backup files in the scanner directory, and they will be automatically parsed and integrated into network scans.
**Manual Parsing**: Use `pfsense_integrator.py` to work with XML files independently:
```bash
# Parse XML files and generate summary
./pfsense_integrator.py *.xml --summary network_summary.md
# Integrate with existing scan
./pfsense_integrator.py *.xml -s scan.json -o enhanced_scan.json
```

124
complete_workflow.sh Executable file
View File

@@ -0,0 +1,124 @@
#!/bin/bash
# Complete Network Discovery Workflow
# Automatically scans network, integrates pfSense XML, and generates diagrams
set -e
echo "=========================================="
echo "Complete Network Discovery Workflow"
echo "=========================================="
echo ""
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
log_info() {
echo -e "${BLUE}[INFO]${NC} $1"
}
log_success() {
echo -e "${GREEN}[SUCCESS]${NC} $1"
}
log_warning() {
echo -e "${YELLOW}[WARNING]${NC} $1"
}
log_error() {
echo -e "${RED}[ERROR]${NC} $1"
}
# Check if we're in the right directory
if [ ! -f "integrated_scanner.py" ]; then
log_error "integrated_scanner.py not found. Please run this script from the network scanner directory."
exit 1
fi
# Check for pfSense XML files
XML_FILES=$(ls *.xml 2>/dev/null | wc -l)
if [ "$XML_FILES" -gt 0 ]; then
log_info "Found $XML_FILES pfSense XML configuration file(s)"
else
log_warning "No pfSense XML files found. Network scan will proceed without pfSense integration."
fi
# Step 1: Run system verification
log_info "Step 1: Verifying system requirements..."
if ./test_system.py >/dev/null 2>&1; then
log_success "System verification passed"
else
log_error "System verification failed. Please check the output above."
exit 1
fi
# Step 2: Run integrated network scan
log_info "Step 2: Running integrated network scan..."
SCAN_OUTPUT="network_scan_$(date +%Y%m%d_%H%M%S).json"
if ./integrated_scanner.py -o "$SCAN_OUTPUT" -v; then
log_success "Network scan completed: $SCAN_OUTPUT"
else
log_error "Network scan failed"
exit 1
fi
# Step 3: Generate SVG diagram
log_info "Step 3: Generating network diagram..."
SVG_OUTPUT="${SCAN_OUTPUT%.json}.svg"
if ./svg_generator.py "$SCAN_OUTPUT" -o "$SVG_OUTPUT"; then
log_success "SVG diagram generated: $SVG_OUTPUT"
else
log_error "SVG generation failed"
exit 1
fi
# Step 4: Generate pfSense summary if XML files exist
if [ "$XML_FILES" -gt 0 ]; then
log_info "Step 4: Generating pfSense network summary..."
SUMMARY_OUTPUT="network_summary_$(date +%Y%m%d_%H%M%S).md"
if ./pfsense_integrator.py *.xml --summary "$SUMMARY_OUTPUT"; then
log_success "Network summary generated: $SUMMARY_OUTPUT"
else
log_warning "Network summary generation failed"
fi
fi
# Step 5: Show results summary
echo ""
echo "=========================================="
log_success "Network Discovery Complete!"
echo "=========================================="
echo ""
echo "Generated files:"
echo " 📊 Network Scan: $SCAN_OUTPUT"
echo " 🎨 Network Diagram: $SVG_OUTPUT"
if [ "$XML_FILES" -gt 0 ]; then
echo " 📋 Network Summary: $SUMMARY_OUTPUT"
fi
echo ""
# Show network statistics
if command -v jq >/dev/null 2>&1; then
echo "Network Statistics:"
TOTAL_SEGMENTS=$(jq '.segments | length' "$SCAN_OUTPUT")
TOTAL_DEVICES=$(jq '[.segments[].devices[]] | length' "$SCAN_OUTPUT")
PFSENSE_DEVICES=$(jq '[.segments[].devices[] | select(.device_type=="firewall")] | length' "$SCAN_OUTPUT")
echo " 📡 Network Segments: $TOTAL_SEGMENTS"
echo " 🖥️ Total Devices: $TOTAL_DEVICES"
echo " 🛡️ pfSense Firewalls: $PFSENSE_DEVICES"
echo ""
fi
echo "Next steps:"
echo " 1. Open $SVG_OUTPUT in your web browser to view the network diagram"
if [ "$XML_FILES" -gt 0 ]; then
echo " 2. Review $SUMMARY_OUTPUT for detailed pfSense configuration"
fi
echo " 3. Examine $SCAN_OUTPUT for complete network data (use jq for querying)"
echo ""
log_success "Workflow completed successfully! 🎉"

355
comprehensive_mapper.py Executable file
View File

@@ -0,0 +1,355 @@
#!/usr/bin/env python3
"""
Comprehensive Network Diagram Generator
Combines network scanning with pfSense XML parsing for complete network topology
"""
import json
import argparse
import logging
from pathlib import Path
from typing import Dict, List, Any
from network_scanner import NetworkScanner
from pfsense_xml_parser import PfSenseXMLParser
from svg_generator import NetworkDiagramGenerator
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class ComprehensiveNetworkMapper:
"""Complete network mapping solution combining multiple data sources"""
def __init__(self, config: Dict):
self.config = config
self.network_data = {}
self.pfsense_data = {}
self.combined_data = {
'scan_timestamp': None,
'data_sources': [],
'segments': [],
'pfsense_firewalls': [],
'wireguard_networks': [],
'openvpn_networks': [],
'routing_table': [],
'dhcp_leases': [],
'static_mappings': []
}
def load_network_scan(self, scan_file: str):
"""Load network scan data"""
logger.info(f"Loading network scan data from {scan_file}")
try:
with open(scan_file, 'r') as f:
self.network_data = json.load(f)
self.combined_data['data_sources'].append('network_scan')
logger.info("Network scan data loaded successfully")
except Exception as e:
logger.error(f"Error loading network scan: {e}")
def load_pfsense_configs(self, xml_files: List[str]):
"""Load and parse pfSense XML configuration files"""
for xml_file in xml_files:
if Path(xml_file).exists():
logger.info(f"Parsing pfSense config: {xml_file}")
parser = PfSenseXMLParser(xml_file)
data = parser.parse_all()
if data:
hostname = data.get('hostname', f'pfsense_{len(self.pfsense_data)}')
self.pfsense_data[hostname] = data
self.combined_data['data_sources'].append(f'pfsense_{hostname}')
self.combined_data['pfsense_firewalls'].append(data)
logger.info(f"pfSense config {hostname} loaded successfully")
else:
logger.error(f"Failed to parse pfSense config: {xml_file}")
else:
logger.warning(f"pfSense config file not found: {xml_file}")
def merge_network_data(self):
"""Merge all data sources into comprehensive network map"""
logger.info("Merging network data sources...")
# Start with network scan data if available
if self.network_data:
self.combined_data.update({
'scan_timestamp': self.network_data.get('scan_timestamp'),
'segments': self.network_data.get('segments', [])
})
# Process pfSense data
self._process_pfsense_data()
# Extract additional network information
self._extract_network_topology()
logger.info("Network data merging complete")
def _process_pfsense_data(self):
"""Process and integrate pfSense configuration data"""
for hostname, pfsense in self.pfsense_data.items():
logger.info(f"Processing pfSense data for {hostname}")
# Add interfaces as network segments
interfaces = pfsense.get('interfaces', {})
for iface_name, iface_data in interfaces.items():
if iface_data.get('ipaddr') and iface_data.get('ipaddr') != 'dhcp':
try:
# Create network segment from interface
ip = iface_data['ipaddr']
subnet = iface_data.get('subnet', '24')
network_cidr = f"{ip.rsplit('.', 1)[0]}.0/{subnet}"
segment = {
'name': f"{hostname}_{iface_name}",
'cidr': network_cidr,
'gateway': iface_data.get('gateway', ip),
'interface': iface_data.get('interface'),
'pfsense_host': hostname,
'is_vpn': iface_data.get('type') in ['wireguard', 'openvpn'],
'devices': []
}
# Add gateway device
gateway_device = {
'ip': ip,
'hostname': hostname,
'device_type': 'firewall',
'os_type': 'pfSense',
'interface': iface_name,
'pfsense_config': True
}
segment['devices'].append(gateway_device)
self.combined_data['segments'].append(segment)
# Track VPN networks
if iface_data.get('type') == 'wireguard':
self.combined_data['wireguard_networks'].append({
'network': network_cidr,
'interface': iface_data.get('interface'),
'pfsense': hostname,
'peers': pfsense.get('wireguard', {}).get('peers', [])
})
elif iface_data.get('type') == 'openvpn':
self.combined_data['openvpn_networks'].append({
'network': network_cidr,
'interface': iface_data.get('interface'),
'pfsense': hostname
})
except Exception as e:
logger.debug(f"Error processing interface {iface_name}: {e}")
# Add static routes
static_routes = pfsense.get('static_routes', [])
for route in static_routes:
route_info = {
'network': route.get('network'),
'gateway': route.get('gateway'),
'description': route.get('description'),
'pfsense': hostname
}
self.combined_data['routing_table'].append(route_info)
# Add DHCP information
dhcp_config = pfsense.get('dhcp', {})
for iface_name, dhcp_data in dhcp_config.items():
if dhcp_data.get('static_mappings'):
for mapping in dhcp_data['static_mappings']:
mapping_info = {
'ip': mapping.get('ipaddr'),
'mac': mapping.get('mac'),
'hostname': mapping.get('hostname'),
'description': mapping.get('description'),
'interface': iface_name,
'pfsense': hostname
}
self.combined_data['static_mappings'].append(mapping_info)
def _extract_network_topology(self):
"""Extract network topology information from all sources"""
logger.info("Extracting network topology...")
# Process WireGuard peer information
for wg_net in self.combined_data['wireguard_networks']:
for peer in wg_net.get('peers', []):
for allowed_ip in peer.get('allowed_ips', []):
# Create segment for remote networks
if allowed_ip.get('mask') and allowed_ip['mask'] != '32':
remote_segment = {
'name': f"WG_Remote_{allowed_ip['address']}_{allowed_ip['mask']}",
'cidr': f"{allowed_ip['address']}/{allowed_ip['mask']}",
'gateway': wg_net.get('network', '').split('/')[0].rsplit('.', 1)[0] + '.1',
'is_vpn': True,
'vpn_type': 'wireguard',
'pfsense_host': wg_net.get('pfsense'),
'devices': []
}
self.combined_data['segments'].append(remote_segment)
def generate_svg_diagram(self, output_file: str):
"""Generate SVG network diagram"""
logger.info(f"Generating SVG diagram: {output_file}")
# Use the existing SVG generator
generator = NetworkDiagramGenerator(self.combined_data)
generator.generate_svg(output_file)
logger.info(f"SVG diagram generated: {output_file}")
def export_comprehensive_json(self, output_file: str):
"""Export comprehensive network data"""
logger.info(f"Exporting comprehensive data to {output_file}")
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(self.combined_data, f, indent=2, ensure_ascii=False)
logger.info(f"Comprehensive network data exported to {output_file}")
def print_summary(self):
"""Print comprehensive network summary"""
print("\n" + "="*80)
print("COMPREHENSIVE NETWORK MAPPING SUMMARY")
print("="*80)
print(f"\nData Sources: {', '.join(self.combined_data['data_sources'])}")
print(f"Network Segments: {len(self.combined_data['segments'])}")
print(f"pfSense Firewalls: {len(self.combined_data['pfsense_firewalls'])}")
print(f"WireGuard Networks: {len(self.combined_data['wireguard_networks'])}")
print(f"OpenVPN Networks: {len(self.combined_data['openvpn_networks'])}")
print(f"Static Routes: {len(self.combined_data['routing_table'])}")
print(f"DHCP Static Mappings: {len(self.combined_data['static_mappings'])}")
# Network segments summary
print(f"\n{'='*80}")
print("NETWORK SEGMENTS")
print(f"{'='*80}")
for segment in self.combined_data['segments']:
vpn_indicator = " (VPN)" if segment.get('is_vpn') else ""
pfsense_indicator = f" [{segment.get('pfsense_host')}]" if segment.get('pfsense_host') else ""
print(f"\n📡 {segment['name']}{vpn_indicator}{pfsense_indicator}")
print(f" CIDR: {segment['cidr']}")
print(f" Gateway: {segment.get('gateway', 'N/A')}")
print(f" Devices: {len(segment.get('devices', []))}")
# Show devices
for device in segment.get('devices', [])[:5]: # Show first 5
print(f"{device.get('ip', 'N/A')} - {device.get('hostname', 'N/A')} ({device.get('device_type', 'N/A')})")
if len(segment.get('devices', [])) > 5:
print(f" ... and {len(segment.get('devices', [])) - 5} more devices")
# pfSense summary
if self.combined_data['pfsense_firewalls']:
print(f"\n{'='*80}")
print("PFSENSE FIREWALLS")
print(f"{'='*80}")
for pfsense in self.combined_data['pfsense_firewalls']:
hostname = pfsense.get('hostname', 'Unknown')
print(f"\n🛡️ {hostname}")
print(f" Interfaces: {len(pfsense.get('interfaces', {}))}")
print(f" Static Routes: {len(pfsense.get('static_routes', []))}")
wg_config = pfsense.get('wireguard', {})
if wg_config.get('enabled'):
print(f" WireGuard: {len(wg_config.get('tunnels', []))} tunnels, {len(wg_config.get('peers', []))} peers")
dhcp_config = pfsense.get('dhcp', {})
total_mappings = sum(len(dhcp.get('static_mappings', [])) for dhcp in dhcp_config.values())
print(f" DHCP Static Mappings: {total_mappings}")
# VPN Networks
if self.combined_data['wireguard_networks'] or self.combined_data['openvpn_networks']:
print(f"\n{'='*80}")
print("VPN NETWORKS")
print(f"{'='*80}")
for wg_net in self.combined_data['wireguard_networks']:
print(f"\n🔐 WireGuard: {wg_net['network']} via {wg_net['pfsense']}")
print(f" Interface: {wg_net['interface']}")
print(f" Peers: {len(wg_net.get('peers', []))}")
for ovpn_net in self.combined_data['openvpn_networks']:
print(f"\n🔐 OpenVPN: {ovpn_net['network']} via {ovpn_net['pfsense']}")
print(f" Interface: {ovpn_net['interface']}")
print(f"\n{'='*80}")
print("NETWORK TOPOLOGY ANALYSIS COMPLETE")
print(f"{'='*80}")
def main():
"""Command line interface"""
parser = argparse.ArgumentParser(
description='Comprehensive Network Diagram Generator with pfSense Integration'
)
parser.add_argument('-c', '--config', default='config.json',
help='Network scanner configuration file')
parser.add_argument('-s', '--scan-data', help='Existing network scan JSON file')
parser.add_argument('-p', '--pfsense-xml', nargs='+',
help='pfSense XML configuration files')
parser.add_argument('-o', '--output', default='comprehensive_network.json',
help='Output comprehensive JSON file')
parser.add_argument('--svg', help='Generate SVG diagram')
parser.add_argument('-v', '--verbose', action='store_true',
help='Verbose output')
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
# Load configuration
config = {}
if Path(args.config).exists():
try:
with open(args.config, 'r') as f:
config = json.load(f)
except Exception as e:
logger.warning(f"Could not load config {args.config}: {e}")
# Initialize comprehensive mapper
mapper = ComprehensiveNetworkMapper(config)
# Load network scan data if provided
if args.scan_data and Path(args.scan_data).exists():
mapper.load_network_scan(args.scan_data)
else:
logger.info("No network scan data provided - will use pfSense data only")
# Load pfSense configurations
pfsense_files = args.pfsense_xml or []
# Auto-detect pfSense XML files if none specified
if not pfsense_files:
xml_files = list(Path('.').glob('config-*.xml'))
if xml_files:
pfsense_files = [str(f) for f in xml_files]
logger.info(f"Auto-detected pfSense XML files: {pfsense_files}")
if pfsense_files:
mapper.load_pfsense_configs(pfsense_files)
else:
logger.warning("No pfSense XML files found")
# Merge all data
mapper.merge_network_data()
# Print summary
mapper.print_summary()
# Export comprehensive data
mapper.export_comprehensive_json(args.output)
print(f"\n✅ Comprehensive network data saved to: {args.output}")
# Generate SVG if requested
if args.svg:
svg_file = args.svg
mapper.generate_svg_diagram(svg_file)
print(f"✅ SVG network diagram generated: {svg_file}")
if __name__ == '__main__':
main()

View File

@@ -10,6 +10,8 @@ import argparse
from datetime import datetime
from network_scanner import NetworkScanner, NetworkSegment
from pfsense_scanner import PfSenseScanner
from dataclasses import asdict
from network_scanner import Device
logging.basicConfig(
level=logging.INFO,
@@ -33,11 +35,91 @@ class IntegratedNetworkScanner:
# Run base network scan
self.base_scanner.scan_all()
# Check for pfSense XML files and integrate them
self._integrate_pfsense_xml()
# Identify and enhance pfSense devices
self._scan_pfsense_devices()
logger.info("Integrated scan complete")
def _integrate_pfsense_xml(self):
"""Automatically integrate pfSense XML files if present"""
import glob
from pfsense_integrator import PfSenseIntegrator
# Look for XML files in current directory
xml_files = glob.glob("*.xml")
if not xml_files:
logger.info("No pfSense XML files found, skipping XML integration")
return
logger.info(f"Found {len(xml_files)} pfSense XML files, integrating...")
try:
integrator = PfSenseIntegrator(xml_files)
integrator.load_pfsense_configs()
# Create temporary scan file for integration
import tempfile
import os
temp_scan = tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False)
try:
# Export current scan data
temp_data = {
'scan_timestamp': None,
'segments': [
{
'name': seg.name,
'cidr': seg.cidr,
'gateway': seg.gateway,
'is_vpn': seg.is_vpn,
'devices': [asdict(dev) for dev in seg.devices]
}
for seg in self.base_scanner.segments
]
}
json.dump(temp_data, temp_scan)
temp_scan.close()
# Integrate pfSense data
integrator.integrate_with_scan(temp_scan.name, temp_scan.name + '_enhanced')
# Load enhanced data back
with open(temp_scan.name + '_enhanced', 'r') as f:
enhanced_data = json.load(f)
# Update segments
self.base_scanner.segments = []
for seg_data in enhanced_data.get('segments', []):
segment = NetworkSegment(
name=seg_data['name'],
cidr=seg_data['cidr'],
gateway=seg_data['gateway'],
is_vpn=seg_data['is_vpn'],
devices=[]
)
for dev_data in seg_data['devices']:
device = Device(**dev_data)
segment.devices.append(device)
self.base_scanner.segments.append(segment)
logger.info(f"Integrated {len(integrator.pfsense_configs)} pfSense configurations")
finally:
# Clean up temp files
try:
os.unlink(temp_scan.name)
os.unlink(temp_scan.name + '_enhanced')
except:
pass
except Exception as e:
logger.error(f"Error integrating pfSense XML: {e}")
def _scan_pfsense_devices(self):
"""Find and deeply scan pfSense devices"""
logger.info("Looking for pfSense devices...")

14
network_report.md Normal file
View File

@@ -0,0 +1,14 @@
# Network Mapping Report
Generated on: Fr 10. Okt 11:14:30 CEST 2025
## Network Statistics
- Network Segments: 15
- pfSense Firewalls: 2
- WireGuard Networks: 3
- Static Routes: 3
- DHCP Static Mappings: 54
## Generated Files
- comprehensive_network.json - Complete network data
- comprehensive_network.svg - Network topology diagram
- network_report.md - This summary report

53
network_summary.md Normal file
View File

@@ -0,0 +1,53 @@
# Network Topology Summary
Generated from pfSense XML configurations
## pfSense Firewall: gw-nue01
**Version:** unknown
**Domain:** egonetix.lan
### Network Interfaces
- **WAN** (wan): dhcp
- **LAN** (lan): 10.0.0.1
- **wireguardnachhause** (opt1): 10.69.69.1
- Gateway: WirusguardusGW
### Static Routes
- 172.20.0.0/16 via WirusguardusGW
*heyme*
### WireGuard VPN
- **Tunnel tun_wg0** (Port 51820)
*heyme*
- Peer: wireguardheyme - Networks: 172.20.0.0/16, 10.69.69.2/32
### DHCP Configuration
## pfSense Firewall: gw-st01
**Version:** unknown
**Domain:** egonetix.lan
### Network Interfaces
- **WAN** (wan): 192.168.178.3
- Gateway: WANGW
- **LAN** (lan): 172.20.20.1
- **wireguardnnbesch** (opt1): 10.69.69.2
- Gateway: wirenuenbesch
- **HomeAssistant** (opt2): 172.20.70.1
- **WireguardOpenvpn** (opt3): 10.5.0.2
- Gateway: WireguardOpenvpnGW
### Static Routes
- 10.0.0.0/24 via wirenuenbesch
*wireguardnünbesch*
- 12.1.0.0/24 via wirenuenbesch
*openvpn nutzer*
### WireGuard VPN
- **Tunnel tun_wg0** (Port 51820)
*de1099.nordvpn.com*
- **Tunnel tun_wg1** (Port 51821)
*wireguardnünbesch*
- Peer: de1099.nordvpn.com - Networks: 0.0.0.0/0
- Peer: wireguardnünbesch - Networks: 10.0.0.0/24, 10.69.69.1/32, 12.1.0.0/24
### DHCP Configuration

380
pfsense_integrator.py Executable file
View File

@@ -0,0 +1,380 @@
#!/usr/bin/env python3
"""
pfSense XML Integration Script
Automatically processes pfSense XML backup files and integrates them into network scan results
"""
import json
import argparse
from pathlib import Path
import logging
from typing import Dict, List, Any
from pfsense_xml_parser import PfSenseXMLParser
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class PfSenseIntegrator:
"""Integrates pfSense XML data into network scan results"""
def __init__(self, xml_files: List[str]):
self.xml_files = xml_files
self.pfsense_configs = {}
def load_pfsense_configs(self):
"""Load and parse all pfSense XML files"""
logger.info(f"Loading {len(self.xml_files)} pfSense configuration files...")
for xml_file in self.xml_files:
try:
parser = PfSenseXMLParser(xml_file)
if parser.load_xml():
config = parser.parse_all()
hostname = config.get('system', {}).get('hostname', 'unknown')
self.pfsense_configs[hostname] = config
logger.info(f"Loaded pfSense config: {hostname}")
else:
logger.error(f"Failed to load: {xml_file}")
except Exception as e:
logger.error(f"Error parsing {xml_file}: {e}")
def integrate_with_scan(self, scan_file: str, output_file: str):
"""Integrate pfSense data into network scan results"""
logger.info(f"Integrating pfSense data into {scan_file}")
# Load network scan
with open(scan_file, 'r') as f:
scan_data = json.load(f)
# Add pfSense configurations
scan_data['pfsense_configs'] = self.pfsense_configs
# Enhance network segments with pfSense data
self._enhance_segments_with_pfsense(scan_data)
# Save enhanced scan
with open(output_file, 'w') as f:
json.dump(scan_data, f, indent=2)
logger.info(f"Enhanced scan saved to {output_file}")
def _enhance_segments_with_pfsense(self, scan_data: Dict):
"""Enhance network segments with pfSense-specific information"""
segments = scan_data.get('segments', [])
for pfsense_name, pfsense_config in self.pfsense_configs.items():
logger.info(f"Processing pfSense: {pfsense_name}")
# Find or create segment for this pfSense
pfsense_segment = self._find_or_create_pfsense_segment(segments, pfsense_config)
# Add pfSense device if not already present
pfsense_device = self._add_pfsense_device(pfsense_segment, pfsense_config)
# Add networks discovered from pfSense config
self._add_pfsense_networks(segments, pfsense_config)
def _find_or_create_pfsense_segment(self, segments: List[Dict], pfsense_config: Dict) -> Dict:
"""Find existing segment or create new one for pfSense"""
interfaces = pfsense_config.get('interfaces', {})
# Look for LAN interface
lan_interface = interfaces.get('lan')
if lan_interface and lan_interface.get('ipaddr') and lan_interface.get('subnet'):
lan_network = f"{lan_interface['ipaddr']}/{lan_interface['subnet']}"
# Find existing segment
for segment in segments:
if segment.get('cidr') == lan_network:
return segment
# Create new segment
new_segment = {
'name': f"{pfsense_config['system']['hostname']}_LAN",
'cidr': lan_network,
'gateway': lan_interface.get('ipaddr'),
'is_vpn': False,
'devices': []
}
segments.append(new_segment)
return new_segment
# Fallback: create segment with hostname
hostname = pfsense_config['system']['hostname']
for segment in segments:
if segment.get('name', '').startswith(hostname):
return segment
new_segment = {
'name': f"{hostname}_networks",
'cidr': 'unknown',
'gateway': None,
'is_vpn': False,
'devices': []
}
segments.append(new_segment)
return new_segment
def _add_pfsense_device(self, segment: Dict, pfsense_config: Dict) -> Dict:
"""Add pfSense device to segment"""
hostname = pfsense_config['system']['hostname']
domain = pfsense_config['system'].get('domain', '')
# Find LAN interface for IP
interfaces = pfsense_config.get('interfaces', {})
lan_interface = interfaces.get('lan')
pfsense_ip = lan_interface.get('ipaddr') if lan_interface else 'unknown'
# Create pfSense device
device = {
'ip': pfsense_ip,
'hostname': f"{hostname}.{domain}" if domain else hostname,
'mac': None,
'manufacturer': None,
'os_type': 'pfSense (FreeBSD)',
'os_version': pfsense_config.get('version', 'unknown'),
'device_type': 'firewall',
'open_ports': [22, 80, 443], # Common pfSense ports
'ssh_accessible': False, # Assume not accessible for security
'services': ['pfSense Firewall', 'Web GUI', 'SSH'],
'routes': self._extract_routes(pfsense_config),
'interfaces': self._extract_interfaces(pfsense_config),
'pfsense_config': pfsense_config
}
# Check if device already exists
for existing_device in segment['devices']:
if existing_device.get('ip') == pfsense_ip:
# Update existing device with pfSense info
existing_device.update(device)
return existing_device
# Add new device
segment['devices'].append(device)
return device
def _extract_routes(self, pfsense_config: Dict) -> List[Dict]:
"""Extract routing information from pfSense config"""
routes = []
# Static routes
static_routes = pfsense_config.get('static_routes', [])
for route in static_routes:
routes.append({
'destination': route.get('network', ''),
'gateway': route.get('gateway', ''),
'interface': 'static',
'description': route.get('description', ''),
'type': 'static'
})
# Default route from gateways
gateways = pfsense_config.get('gateways', {})
for gw_name, gw_config in gateways.items():
if gw_config.get('defaultgw'):
routes.append({
'destination': 'default',
'gateway': gw_config.get('gateway', ''),
'interface': gw_config.get('interface', ''),
'description': f"Default gateway: {gw_name}",
'type': 'default'
})
return routes
def _extract_interfaces(self, pfsense_config: Dict) -> List[Dict]:
"""Extract interface information"""
interfaces = []
for iface_name, iface_config in pfsense_config.get('interfaces', {}).items():
interface = {
'name': iface_name,
'description': iface_config.get('description', ''),
'physical_interface': iface_config.get('interface', ''),
'ip_address': iface_config.get('ipaddr', ''),
'subnet': iface_config.get('subnet', ''),
'network_cidr': iface_config.get('network_cidr', ''),
'gateway': iface_config.get('gateway', ''),
'mtu': iface_config.get('mtu', ''),
'type': iface_config.get('type', 'physical')
}
interfaces.append(interface)
return interfaces
def _add_pfsense_networks(self, segments: List[Dict], pfsense_config: Dict):
"""Add networks discovered from pfSense configuration"""
interfaces = pfsense_config.get('interfaces', {})
for iface_name, iface_config in interfaces.items():
if iface_name == 'wan':
continue # Skip WAN for now
network_cidr = iface_config.get('network_cidr')
if network_cidr and network_cidr != 'unknown':
# Check if segment already exists
segment_exists = any(seg.get('cidr') == network_cidr for seg in segments)
if not segment_exists:
segment_name = f"{pfsense_config['system']['hostname']}_{iface_name.upper()}"
new_segment = {
'name': segment_name,
'cidr': network_cidr,
'gateway': iface_config.get('ipaddr'),
'is_vpn': iface_name.startswith('opt') and 'wireguard' in iface_config.get('description', '').lower(),
'devices': []
}
segments.append(new_segment)
logger.info(f"Added network segment: {network_cidr}")
# Add WireGuard networks
wireguard = pfsense_config.get('wireguard', {})
for peer in wireguard.get('peers', []):
for allowed_ip in peer.get('allowed_ips', []):
network = f"{allowed_ip['address']}/{allowed_ip['mask']}"
segment_exists = any(seg.get('cidr') == network for seg in segments)
if not segment_exists:
new_segment = {
'name': f"WireGuard_{peer.get('description', 'unknown').replace(' ', '_')}",
'cidr': network,
'gateway': None,
'is_vpn': True,
'devices': []
}
segments.append(new_segment)
logger.info(f"Added WireGuard network: {network}")
def generate_network_summary(self, output_file: str):
"""Generate a human-readable network summary"""
summary = []
summary.append("# Network Topology Summary")
summary.append("Generated from pfSense XML configurations\n")
for pfsense_name, config in self.pfsense_configs.items():
summary.append(f"## pfSense Firewall: {pfsense_name}")
summary.append(f"**Version:** {config.get('version', 'unknown')}")
summary.append(f"**Domain:** {config.get('system', {}).get('domain', 'unknown')}\n")
# Interfaces
interfaces = config.get('interfaces', {})
if interfaces:
summary.append("### Network Interfaces")
for iface_name, iface in interfaces.items():
ip = iface.get('ipaddr', 'DHCP')
subnet = iface.get('subnet', '')
network = iface.get('network_cidr', '')
desc = iface.get('description', iface_name.upper())
summary.append(f"- **{desc}** ({iface_name}): {ip}")
if network:
summary.append(f" - Network: {network}")
if iface.get('gateway'):
summary.append(f" - Gateway: {iface.get('gateway')}")
summary.append("")
# Static Routes
routes = config.get('static_routes', [])
if routes:
summary.append("### Static Routes")
for route in routes:
network = route.get('network', '')
gateway = route.get('gateway', '')
desc = route.get('description', '')
summary.append(f"- {network} via {gateway}")
if desc:
summary.append(f" *{desc}*")
summary.append("")
# WireGuard
wg = config.get('wireguard', {})
if wg.get('enabled') and wg.get('tunnels'):
summary.append("### WireGuard VPN")
for tunnel in wg.get('tunnels', []):
name = tunnel.get('name', 'unknown')
port = tunnel.get('listenport', 'unknown')
desc = tunnel.get('description', '')
summary.append(f"- **Tunnel {name}** (Port {port})")
if desc:
summary.append(f" *{desc}*")
for peer in wg.get('peers', []):
desc = peer.get('description', 'unknown')
allowed_ips = peer.get('allowed_ips', [])
if allowed_ips:
networks = [f"{ip['address']}/{ip['mask']}" for ip in allowed_ips]
summary.append(f" - Peer: {desc} - Networks: {', '.join(networks)}")
summary.append("")
# DHCP
dhcp = config.get('dhcp', {})
if dhcp:
summary.append("### DHCP Configuration")
for iface_name, dhcp_config in dhcp.items():
if dhcp_config.get('enabled', True):
range_from = dhcp_config.get('range_from', '')
range_to = dhcp_config.get('range_to', '')
if range_from and range_to:
summary.append(f"- **{iface_name.upper()}**: {range_from} - {range_to}")
static_maps = dhcp_config.get('static_maps', [])
if static_maps:
summary.append(" **Static Mappings:**")
for static in static_maps:
ip = static.get('ipaddr', '')
mac = static.get('mac', '')
hostname = static.get('hostname', '')
summary.append(f" - {ip} ({mac}) - {hostname}")
summary.append("")
# Write summary
with open(output_file, 'w') as f:
f.write('\n'.join(summary))
logger.info(f"Network summary saved to {output_file}")
def main():
"""Command line interface"""
parser = argparse.ArgumentParser(description='Integrate pfSense XML configs with network scan')
parser.add_argument('xml_files', nargs='+', help='pfSense XML configuration files')
parser.add_argument('-s', '--scan', help='Network scan JSON file to enhance')
parser.add_argument('-o', '--output', help='Output enhanced scan file')
parser.add_argument('--summary', help='Generate network summary markdown file')
parser.add_argument('-v', '--verbose', action='store_true', help='Verbose output')
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
# Initialize integrator
integrator = PfSenseIntegrator(args.xml_files)
integrator.load_pfsense_configs()
if not integrator.pfsense_configs:
logger.error("No pfSense configurations loaded!")
return 1
# Generate summary if requested
if args.summary:
integrator.generate_network_summary(args.summary)
print(f"✅ Network summary generated: {args.summary}")
# Integrate with scan if provided
if args.scan:
output_file = args.output or args.scan.replace('.json', '_enhanced.json')
integrator.integrate_with_scan(args.scan, output_file)
print(f"✅ Enhanced scan saved: {output_file}")
# Show summary
print(f"\n📊 Loaded {len(integrator.pfsense_configs)} pfSense configurations:")
for name in integrator.pfsense_configs.keys():
print(f" - {name}")
return 0
if __name__ == '__main__':
exit(main())

503
pfsense_xml_parser.py Executable file
View File

@@ -0,0 +1,503 @@
#!/usr/bin/env python3
"""
pfSense XML Configuration Parser
Extracts comprehensive network information from pfSense backup XML files
"""
import xml.etree.ElementTree as ET
import json
import argparse
from typing import Dict, List, Optional, Any
from pathlib import Path
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class PfSenseXMLParser:
"""Parser for pfSense XML configuration files"""
def __init__(self, xml_file: str):
self.xml_file = xml_file
self.tree = None
self.root = None
self.hostname = None
def load_xml(self) -> bool:
"""Load and parse the XML file"""
try:
self.tree = ET.parse(self.xml_file)
self.root = self.tree.getroot()
# Extract hostname
hostname_elem = self.root.find('.//hostname')
if hostname_elem is not None:
self.hostname = hostname_elem.text
logger.info(f"Loaded pfSense config: {self.hostname or 'Unknown'}")
return True
except Exception as e:
logger.error(f"Error loading XML file: {e}")
return False
def get_text(self, element: Optional[ET.Element], default: str = "") -> str:
"""Safely get text from XML element"""
if element is not None and element.text:
return element.text
return default
def get_interfaces(self) -> Dict[str, Dict]:
"""Extract network interfaces configuration"""
interfaces = {}
if self.root is None:
return interfaces
interfaces_elem = self.root.find('interfaces')
if interfaces_elem is None:
return interfaces
for iface_elem in interfaces_elem:
iface_name = iface_elem.tag
if iface_name in ['wan', 'lan'] or iface_name.startswith('opt'):
interface = {
'name': iface_name,
'description': self.get_text(iface_elem.find('descr')),
'enabled': iface_elem.find('enable') is not None,
'interface': self.get_text(iface_elem.find('if')),
'ipaddr': self.get_text(iface_elem.find('ipaddr')),
'subnet': self.get_text(iface_elem.find('subnet')),
'gateway': self.get_text(iface_elem.find('gateway')),
'mtu': self.get_text(iface_elem.find('mtu')),
'mss': self.get_text(iface_elem.find('mss')),
'spoofmac': self.get_text(iface_elem.find('spoofmac')),
'type': 'physical'
}
# Determine interface type
if 'tun_wg' in interface['interface']:
interface['type'] = 'wireguard'
elif 'ovpn' in interface['interface']:
interface['type'] = 'openvpn'
elif 'ipsec' in interface['interface']:
interface['type'] = 'ipsec'
interfaces[iface_name] = interface
return interfaces
def get_static_routes(self) -> List[Dict]:
"""Extract static routes"""
routes = []
if self.root is None:
return routes
staticroutes_elem = self.root.find('staticroutes')
if staticroutes_elem is None:
return routes
for route_elem in staticroutes_elem:
if route_elem.tag == 'route':
route = {
'network': self.get_text(route_elem.find('network')),
'gateway': self.get_text(route_elem.find('gateway')),
'description': self.get_text(route_elem.find('descr'))
}
routes.append(route)
return routes
def get_gateways(self) -> Dict[str, Dict]:
"""Extract gateway configuration"""
gateways = {}
if self.root is None:
return gateways
gateways_elem = self.root.find('gateways')
if gateways_elem is None:
return gateways
gateway_item_elem = gateways_elem.find('gateway_item')
if gateway_item_elem is not None:
for gw_elem in gateway_item_elem:
if gw_elem.tag == 'item':
name = self.get_text(gw_elem.find('name'))
if name:
gateway = {
'name': name,
'interface': self.get_text(gw_elem.find('interface')),
'gateway': self.get_text(gw_elem.find('gateway')),
'monitor': self.get_text(gw_elem.find('monitor')),
'description': self.get_text(gw_elem.find('descr')),
'defaultgw': gw_elem.find('defaultgw') is not None
}
gateways[name] = gateway
return gateways
def get_dhcp_config(self) -> Dict[str, Dict]:
"""Extract DHCP server configuration"""
dhcp_config = {}
if self.root is None:
return dhcp_config
dhcpd_elem = self.root.find('dhcpd')
if dhcpd_elem is None:
return dhcp_config
for dhcp_item in dhcpd_elem:
iface_name = dhcp_item.tag
dhcp_elem = dhcp_item
config = {
'enabled': True,
'range': {},
'static_mappings': []
}
# DHCP range
range_elem = dhcp_elem.find('range')
if range_elem is not None:
config['range'] = {
'from': self.get_text(range_elem.find('from')),
'to': self.get_text(range_elem.find('to'))
}
# DHCP options
config.update({
'defaultleasetime': self.get_text(dhcp_elem.find('defaultleasetime')),
'maxleasetime': self.get_text(dhcp_elem.find('maxleasetime')),
'gateway': self.get_text(dhcp_elem.find('gateway')),
'domain': self.get_text(dhcp_elem.find('domain')),
'domainsearchlist': self.get_text(dhcp_elem.find('domainsearchlist')),
'ddnsdomain': self.get_text(dhcp_elem.find('ddnsdomain')),
'dns1': '',
'dns2': '',
'ntpserver': ''
})
# DNS servers
dns_servers = dhcp_elem.findall('dnsserver')
if dns_servers:
config['dns1'] = self.get_text(dns_servers[0])
if len(dns_servers) > 1:
config['dns2'] = self.get_text(dns_servers[1])
# NTP servers
ntp_servers = dhcp_elem.findall('ntpserver')
if ntp_servers:
config['ntpserver'] = self.get_text(ntp_servers[0])
# Static mappings
for staticmap_elem in dhcp_elem.findall('staticmap'):
mapping = {
'mac': self.get_text(staticmap_elem.find('mac')),
'ipaddr': self.get_text(staticmap_elem.find('ipaddr')),
'hostname': self.get_text(staticmap_elem.find('hostname')),
'description': self.get_text(staticmap_elem.find('descr'))
}
config['static_mappings'].append(mapping)
dhcp_config[iface_name] = config
return dhcp_config
def get_wireguard_config(self) -> Dict[str, Any]:
"""Extract WireGuard configuration"""
wg_config = {
'enabled': False,
'tunnels': [],
'peers': []
}
if self.root is None:
return wg_config
# Find WireGuard configuration
wg_elem = self.root.find('.//wireguard')
if wg_elem is None:
return wg_config
# Check if enabled
config_elem = wg_elem.find('config')
if config_elem is not None:
wg_config['enabled'] = self.get_text(config_elem.find('enable')) == 'on'
# Extract tunnels
tunnels_elem = wg_elem.find('tunnels')
if tunnels_elem is not None:
for item_elem in tunnels_elem.findall('item'):
tunnel = {
'name': self.get_text(item_elem.find('name')),
'enabled': self.get_text(item_elem.find('enabled')) == 'yes',
'description': self.get_text(item_elem.find('descr')),
'listenport': self.get_text(item_elem.find('listenport')),
'publickey': self.get_text(item_elem.find('publickey')),
'mtu': self.get_text(item_elem.find('mtu'))
}
wg_config['tunnels'].append(tunnel)
# Extract peers
peers_elem = wg_elem.find('peers')
if peers_elem is not None:
for item_elem in peers_elem.findall('item'):
peer = {
'enabled': self.get_text(item_elem.find('enabled')) == 'yes',
'tunnel': self.get_text(item_elem.find('tun')),
'description': self.get_text(item_elem.find('descr')),
'publickey': self.get_text(item_elem.find('publickey')),
'persistentkeepalive': self.get_text(item_elem.find('persistentkeepalive')),
'allowed_ips': []
}
# Extract allowed IPs
allowedips_elem = item_elem.find('allowedips')
if allowedips_elem is not None:
for row_elem in allowedips_elem.findall('row'):
ip_info = {
'address': self.get_text(row_elem.find('address')),
'mask': self.get_text(row_elem.find('mask')),
'description': self.get_text(row_elem.find('descr'))
}
peer['allowed_ips'].append(ip_info)
wg_config['peers'].append(peer)
return wg_config
def get_openvpn_config(self) -> Dict[str, Any]:
"""Extract OpenVPN configuration"""
ovpn_config = {
'servers': [],
'clients': []
}
if self.root is None:
return ovpn_config
# OpenVPN servers
ovpnserver_elem = self.root.find('ovpnserver')
if ovpnserver_elem is not None:
# This is a complex configuration, extract basic info
ovpn_config['servers'].append({
'configured': True,
'description': 'OpenVPN Server configured'
})
# OpenVPN clients
ovpnclient_elem = self.root.find('ovpnclient')
if ovpnclient_elem is not None:
for item_elem in ovpnclient_elem.findall('item'):
client = {
'enabled': True,
'description': self.get_text(item_elem.find('descr')),
'server_addr': self.get_text(item_elem.find('server_addr')),
'interface': self.get_text(item_elem.find('interface'))
}
ovpn_config['clients'].append(client)
return ovpn_config
def get_firewall_rules(self) -> List[Dict]:
"""Extract firewall rules"""
rules = []
if self.root is None:
return rules
filter_elem = self.root.find('filter')
if filter_elem is None:
return rules
for rule_elem in filter_elem.findall('rule'):
rule = {
'id': self.get_text(rule_elem.find('id')),
'tracker': self.get_text(rule_elem.find('tracker')),
'type': self.get_text(rule_elem.find('type')),
'interface': self.get_text(rule_elem.find('interface')),
'ipprotocol': self.get_text(rule_elem.find('ipprotocol')),
'protocol': self.get_text(rule_elem.find('protocol')),
'description': self.get_text(rule_elem.find('descr')),
'enabled': True,
'source': {},
'destination': {},
'log': rule_elem.find('log') is not None
}
# Source
source_elem = rule_elem.find('source')
if source_elem is not None:
rule['source'] = {
'address': self.get_text(source_elem.find('address')),
'port': self.get_text(source_elem.find('port')),
'any': source_elem.find('any') is not None
}
# Destination
dest_elem = rule_elem.find('destination')
if dest_elem is not None:
rule['destination'] = {
'address': self.get_text(dest_elem.find('address')),
'port': self.get_text(dest_elem.find('port')),
'any': dest_elem.find('any') is not None
}
rules.append(rule)
return rules
def get_nat_rules(self) -> List[Dict]:
"""Extract NAT rules"""
nat_rules = []
if self.root is None:
return nat_rules
nat_elem = self.root.find('nat')
if nat_elem is None:
return nat_rules
for rule_elem in nat_elem.findall('rule'):
rule = {
'description': self.get_text(rule_elem.find('descr')),
'interface': self.get_text(rule_elem.find('interface')),
'protocol': self.get_text(rule_elem.find('protocol')),
'source': {},
'destination': {},
'target': self.get_text(rule_elem.find('target')),
'local_port': self.get_text(rule_elem.find('local-port')),
'enabled': True
}
# Source
source_elem = rule_elem.find('source')
if source_elem is not None:
rule['source'] = {
'address': self.get_text(source_elem.find('address')),
'port': self.get_text(source_elem.find('port'))
}
# Destination
dest_elem = rule_elem.find('destination')
if dest_elem is not None:
rule['destination'] = {
'address': self.get_text(dest_elem.find('address')),
'port': self.get_text(dest_elem.find('port'))
}
nat_rules.append(rule)
return nat_rules
def get_dns_config(self) -> Dict[str, Any]:
"""Extract DNS configuration"""
dns_config = {
'servers': [],
'domain': '',
'search_domains': []
}
if self.root is None:
return dns_config
# DNS servers
system_elem = self.root.find('system')
if system_elem is not None:
for dns_elem in system_elem.findall('dnsserver'):
dns_config['servers'].append(self.get_text(dns_elem))
# Domain
domain_elem = system_elem.find('domain')
if domain_elem is not None and domain_elem.text:
dns_config['domain'] = domain_elem.text
return dns_config
def get_system_info(self) -> Dict[str, Any]:
"""Extract system information"""
system_info = {}
if self.root is None:
return system_info
system_elem = self.root.find('system')
if system_elem is not None:
system_info = {
'hostname': self.get_text(system_elem.find('hostname')),
'domain': self.get_text(system_elem.find('domain')),
'timezone': self.get_text(system_elem.find('timezone')),
'language': self.get_text(system_elem.find('language')),
'version': self.get_text(self.root.find('version'))
}
return system_info
def parse_all(self) -> Dict[str, Any]:
"""Parse all configuration and return comprehensive data"""
if not self.load_xml():
return {}
logger.info(f"Parsing pfSense configuration: {self.hostname}")
config_data = {
'hostname': self.hostname,
'system': self.get_system_info(),
'interfaces': self.get_interfaces(),
'static_routes': self.get_static_routes(),
'gateways': self.get_gateways(),
'dhcp': self.get_dhcp_config(),
'wireguard': self.get_wireguard_config(),
'openvpn': self.get_openvpn_config(),
'firewall_rules': self.get_firewall_rules(),
'nat_rules': self.get_nat_rules(),
'dns': self.get_dns_config()
}
return config_data
def export_json(self, output_file: str):
"""Export parsed data to JSON file"""
data = self.parse_all()
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
logger.info(f"Exported pfSense config to {output_file}")
def main():
"""Command line interface"""
parser = argparse.ArgumentParser(description='Parse pfSense XML configuration files')
parser.add_argument('xml_file', help='pfSense XML configuration file')
parser.add_argument('-o', '--output', help='Output JSON file')
parser.add_argument('-v', '--verbose', action='store_true', help='Verbose output')
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
# Parse the XML file
parser = PfSenseXMLParser(args.xml_file)
data = parser.parse_all()
if not data:
logger.error("Failed to parse XML file")
return 1
# Output
if args.output:
parser.export_json(args.output)
print(f"✅ Parsed pfSense config and saved to {args.output}")
else:
print(json.dumps(data, indent=2, ensure_ascii=False))
return 0
if __name__ == '__main__':
exit(main())

214
run_network_mapping.sh Executable file
View File

@@ -0,0 +1,214 @@
#!/bin/bash
# Complete Network Mapping Workflow
# This script runs the full network discovery and diagram generation process
set -e # Exit on any error
echo "=========================================="
echo "COMPREHENSIVE NETWORK MAPPING WORKFLOW"
echo "=========================================="
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
# Function to print colored output
print_status() {
echo -e "${GREEN}[INFO]${NC} $1"
}
print_warning() {
echo -e "${YELLOW}[WARN]${NC} $1"
}
print_error() {
echo -e "${RED}[ERROR]${NC} $1"
}
print_step() {
echo -e "${BLUE}[STEP]${NC} $1"
}
# Check if required files exist
check_requirements() {
print_step "Checking requirements..."
if [ ! -f "network_scanner.py" ]; then
print_error "network_scanner.py not found!"
exit 1
fi
if [ ! -f "pfsense_xml_parser.py" ]; then
print_error "pfsense_xml_parser.py not found!"
exit 1
fi
if [ ! -f "comprehensive_mapper.py" ]; then
print_error "comprehensive_mapper.py not found!"
exit 1
fi
if [ ! -f "svg_generator.py" ]; then
print_error "svg_generator.py not found!"
exit 1
fi
print_status "All required scripts found"
}
# Find pfSense XML files
find_pfsense_files() {
print_step "Looking for pfSense XML configuration files..."
PFSENSE_FILES=$(ls config-*.xml 2>/dev/null || true)
if [ -z "$PFSENSE_FILES" ]; then
print_warning "No pfSense XML files found in current directory"
print_warning "Please place your pfSense backup XML files here"
echo "Expected format: config-hostname-timestamp.xml"
return 1
fi
print_status "Found pfSense XML files:"
echo "$PFSENSE_FILES" | while read -r file; do
echo " - $file"
done
# Export for use in other functions
export PFSENSE_FILES
return 0
}
# Run network scan (optional)
run_network_scan() {
print_step "Running network scan..."
if [ -f "config.json" ]; then
print_status "Using existing config.json for network scan"
python3 network_scanner.py -c config.json -o network_scan.json
else
print_warning "No config.json found - skipping network scan"
print_warning "Create config.json to enable live network scanning"
return 1
fi
}
# Run comprehensive mapping
run_comprehensive_mapping() {
print_step "Running comprehensive network mapping..."
# Build command with pfSense files
CMD="./comprehensive_mapper.py -o comprehensive_network.json --svg comprehensive_network.svg -v"
if [ -n "$PFSENSE_FILES" ]; then
CMD="$CMD -p $PFSENSE_FILES"
fi
if [ -f "network_scan.json" ]; then
CMD="$CMD -s network_scan.json"
fi
print_status "Executing: $CMD"
eval $CMD
}
# Generate summary report
generate_report() {
print_step "Generating summary report..."
if [ -f "comprehensive_network.json" ]; then
echo "# Network Mapping Report" > network_report.md
echo "Generated on: $(date)" >> network_report.md
echo "" >> network_report.md
# Extract key statistics
SEGMENTS=$(jq '.segments | length' comprehensive_network.json)
PFSENSE_COUNT=$(jq '.pfsense_firewalls | length' comprehensive_network.json)
WG_NETWORKS=$(jq '.wireguard_networks | length' comprehensive_network.json)
STATIC_ROUTES=$(jq '.routing_table | length' comprehensive_network.json)
DHCP_MAPPINGS=$(jq '.static_mappings | length' comprehensive_network.json)
echo "## Network Statistics" >> network_report.md
echo "- Network Segments: $SEGMENTS" >> network_report.md
echo "- pfSense Firewalls: $PFSENSE_COUNT" >> network_report.md
echo "- WireGuard Networks: $WG_NETWORKS" >> network_report.md
echo "- Static Routes: $STATIC_ROUTES" >> network_report.md
echo "- DHCP Static Mappings: $DHCP_MAPPINGS" >> network_report.md
echo "" >> network_report.md
echo "## Generated Files" >> network_report.md
echo "- comprehensive_network.json - Complete network data" >> network_report.md
echo "- comprehensive_network.svg - Network topology diagram" >> network_report.md
echo "- network_report.md - This summary report" >> network_report.md
print_status "Report generated: network_report.md"
else
print_error "No comprehensive network data found"
fi
}
# Main workflow
main() {
echo "Starting comprehensive network mapping workflow..."
echo ""
check_requirements
if ! find_pfsense_files; then
print_error "Cannot proceed without pfSense XML files"
exit 1
fi
# Optional network scan
if [ -f "config.json" ]; then
run_network_scan
fi
# Comprehensive mapping (required)
run_comprehensive_mapping
# Generate report
generate_report
echo ""
print_status "Workflow completed successfully!"
echo ""
echo "Generated files:"
echo " 📊 comprehensive_network.json - Complete network data"
echo " 🖼️ comprehensive_network.svg - Network topology diagram"
echo " 📋 network_report.md - Summary report"
echo ""
echo "Open comprehensive_network.svg in your browser to view the network diagram"
}
# Handle command line arguments
case "${1:-}" in
"scan-only")
check_requirements
run_network_scan
;;
"map-only")
check_requirements
find_pfsense_files
run_comprehensive_mapping
;;
"report-only")
generate_report
;;
"help"|"-h"|"--help")
echo "Usage: $0 [command]"
echo ""
echo "Commands:"
echo " (no command) - Run full workflow"
echo " scan-only - Run only network scan"
echo " map-only - Run only comprehensive mapping"
echo " report-only - Generate only summary report"
echo " help - Show this help"
;;
*)
main
;;
esac