Compare commits

..

5 Commits

Author SHA1 Message Date
mindesbunister
0827b9a69d Remove SVG diagram functionality
- Delete svg_generator.py and comprehensive_mapper.py
- Remove --generate-svg option from integrated_scanner.py
- Update complete_workflow.sh to remove SVG generation step
- Clean up documentation and examples
- Update test_system.py to remove SVG references
- Add missing files to repository (EXAMPLES.sh, quickstart.sh, etc.)
2025-10-10 17:08:31 +02:00
mindesbunister
da5f1f2d0c Reorganize project structure: move code to src/, docs to docs/, config to config/, scripts to scripts/, results to results/, tests to tests/. Keep only main script and latest scan results in root. 2025-10-10 15:39:59 +02:00
mindesbunister
b8e06617e8 Add pfSense XML integration and complete workflow automation
- Add pfsense_integrator.py for automatic XML parsing and integration
- Add complete_workflow.sh for one-command network discovery
- Enhance integrated_scanner.py to auto-integrate pfSense XML files
- Update README with pfSense XML features and workflow
- Generate comprehensive network summaries from XML configs
- Support for WireGuard, OpenVPN, IPsec, routing, DHCP, firewall rules
2025-10-10 11:23:09 +02:00
mindesbunister
afe8903454 Add comprehensive network mapper and workflow script
- comprehensive_mapper.py: Combines network scanning with pfSense XML parsing
- run_network_mapping.sh: Complete workflow script for network discovery
- Successfully tested with both pfSense XML files and live network scan
- Generates comprehensive JSON data and SVG network diagrams
- Includes WireGuard VPN topology, static routes, and DHCP mappings
2025-10-10 11:14:37 +02:00
mindesbunister
7621e1829d Add pfSense XML configuration parser
- Parse pfSense backup XML files to extract network configuration
- Extract interfaces, static routes, gateways, DHCP, WireGuard, OpenVPN
- Extract firewall rules, NAT rules, DNS configuration
- Generate structured JSON output for network diagram generation
- Tested on both pfSense configurations (gw-nue01, gw-st01)
2025-10-10 11:12:33 +02:00
34 changed files with 4681 additions and 719 deletions

6
.gitignore vendored
View File

@@ -1,9 +1,11 @@
# Network Scanner - Git Ignore # Network Scanner - Git Ignore
# Scan results and output # Scan results and output
*.json # Ignore archived results in results/ folder
results/
# Keep config.json.example
!config.json.example !config.json.example
*.svg
# Python # Python
__pycache__/ __pycache__/

13
EXAMPLES.sh Executable file → Normal file
View File

@@ -16,12 +16,9 @@ cat << 'EOF'
# SCENARIO 2: Complete Network Documentation # SCENARIO 2: Complete Network Documentation
# ------------------------------------------- # -------------------------------------------
# Full scan with pfSense integration and SVG generation # Full scan with pfSense integration
./integrated_scanner.py -c config.json -o full_network.json --generate-svg -v ./integrated_scanner.py -c config.json -o full_network.json -v
# View the diagram:
firefox full_network.svg
# SCENARIO 3: pfSense Deep Dive # SCENARIO 3: pfSense Deep Dive
@@ -64,7 +61,7 @@ cat > my_network_config.json << 'CONFIG'
} }
CONFIG CONFIG
./integrated_scanner.py -c my_network_config.json -o multi_network.json --generate-svg ./integrated_scanner.py -c my_network_config.json -o multi_network.json
# SCENARIO 5: Scheduled Network Monitoring # SCENARIO 5: Scheduled Network Monitoring
@@ -80,12 +77,10 @@ mkdir -p "$OUTPUT_DIR"
cd /path/to/network_scanner cd /path/to/network_scanner
./integrated_scanner.py \ ./integrated_scanner.py \
-o "$OUTPUT_DIR/scan_$DATE.json" \ -o "$OUTPUT_DIR/scan_$DATE.json"
--generate-svg
# Keep only last 30 days # Keep only last 30 days
find "$OUTPUT_DIR" -name "scan_*.json" -mtime +30 -delete find "$OUTPUT_DIR" -name "scan_*.json" -mtime +30 -delete
find "$OUTPUT_DIR" -name "scan_*.svg" -mtime +30 -delete
SCRIPT SCRIPT
chmod +x /usr/local/bin/network-scan-daily.sh chmod +x /usr/local/bin/network-scan-daily.sh

153
complete_workflow.sh Executable file
View File

@@ -0,0 +1,153 @@
#!/bin/bash
# Complete Network Discovery Workflow
# Automatically scans network, integrates pfSense XML, and generates diagrams
set -e
echo "=========================================="
echo "Complete Network Discovery Workflow"
echo "=========================================="
echo ""
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
log_info() {
echo -e "${BLUE}[INFO]${NC} $1"
}
log_success() {
echo -e "${GREEN}[SUCCESS]${NC} $1"
}
log_warning() {
echo -e "${YELLOW}[WARNING]${NC} $1"
}
log_error() {
echo -e "${RED}[ERROR]${NC} $1"
}
# Move old results to results folder
log_info "Moving old results to results folder..."
mkdir -p results
mv network_scan_*.json server_details_*.json network_summary_*.md *_failed_ssh.json results/ 2>/dev/null || true
if [ $? -eq 0 ] && [ "$(ls results/ 2>/dev/null | wc -l)" -gt 0 ]; then
log_info "Moved old result files to results/ folder"
fi
# Check if we're in the right directory
if [ ! -f "src/integrated_scanner.py" ]; then
log_error "src/integrated_scanner.py not found. Please run this script from the network scanner directory."
exit 1
fi
# Check for pfSense XML files
XML_FILES=$(ls *.xml 2>/dev/null | wc -l)
if [ "$XML_FILES" -gt 0 ]; then
log_info "Found $XML_FILES pfSense XML configuration file(s)"
else
log_warning "No pfSense XML files found. Network scan will proceed without pfSense integration."
fi
# Step 1: Run system verification
log_info "Step 1: Verifying system requirements..."
if python3 src/test_system.py >/dev/null 2>&1; then
log_success "System verification passed"
else
log_error "System verification failed. Please check the output above."
exit 1
fi
# Step 2: Run integrated network scan
log_info "Step 2: Running integrated network scan..."
SCAN_OUTPUT="network_scan_$(date +%Y%m%d_%H%M%S).json"
if python3 src/integrated_scanner.py -o "$SCAN_OUTPUT" -v; then
log_success "Network scan completed: $SCAN_OUTPUT"
# Check for failed SSH hosts file
FAILED_SSH_OUTPUT="${SCAN_OUTPUT%.json}_failed_ssh.json"
if [ -f "$FAILED_SSH_OUTPUT" ]; then
FAILED_COUNT=$(jq '.total_failed' "$FAILED_SSH_OUTPUT" 2>/dev/null || echo "unknown")
log_warning "Found $FAILED_COUNT hosts with SSH port open but failed authentication: $FAILED_SSH_OUTPUT"
fi
else
log_error "Network scan failed"
exit 1
fi
# Step 3: Collect server information from hypervisors
log_info "Step 3: Collecting server information from hypervisors..."
SERVER_OUTPUT="server_details_$(date +%Y%m%d_%H%M%S).json"
if python3 src/server_info_collector.py -o "$SERVER_OUTPUT"; then
log_success "Server information collected: $SERVER_OUTPUT"
else
log_warning "Server information collection failed"
fi
# Step 5: Generate pfSense summary if XML files exist
if [ "$XML_FILES" -gt 0 ]; then
log_info "Step 5: Generating pfSense network summary..."
SUMMARY_OUTPUT="network_summary_$(date +%Y%m%d_%H%M%S).md"
if python3 src/pfsense_integrator.py *.xml --summary "$SUMMARY_OUTPUT"; then
log_success "Network summary generated: $SUMMARY_OUTPUT"
else
log_warning "Network summary generation failed"
fi
fi
# Step 6: Show results summary
echo ""
echo "=========================================="
log_success "Network Discovery Complete!"
echo "=========================================="
echo ""
echo "Generated files:"
echo " 📊 Network Scan: $SCAN_OUTPUT"
if [ -f "$SERVER_OUTPUT" ]; then
echo " 🖥️ Server Details: $SERVER_OUTPUT"
fi
if [ -f "$FAILED_SSH_OUTPUT" ]; then
echo " 🔐 Failed SSH Hosts: $FAILED_SSH_OUTPUT"
fi
if [ "$XML_FILES" -gt 0 ]; then
echo " 📋 Network Summary: $SUMMARY_OUTPUT"
fi
echo ""
# Show network statistics
if command -v jq >/dev/null 2>&1; then
echo "Network Statistics:"
TOTAL_SEGMENTS=$(jq '.segments | length' "$SCAN_OUTPUT")
TOTAL_DEVICES=$(jq '[.segments[].devices[]] | length' "$SCAN_OUTPUT")
PFSENSE_DEVICES=$(jq '[.segments[].devices[] | select(.device_type=="firewall")] | length' "$SCAN_OUTPUT")
echo " 📡 Network Segments: $TOTAL_SEGMENTS"
echo " 🖥️ Total Devices: $TOTAL_DEVICES"
echo " 🛡️ pfSense Firewalls: $PFSENSE_DEVICES"
echo ""
fi
echo "Next steps:"
if [ -f "$SERVER_OUTPUT" ]; then
echo " 1. Review $SERVER_OUTPUT for detailed server and VM information"
STEP_NUM=2
else
STEP_NUM=1
fi
if [ -f "$FAILED_SSH_OUTPUT" ]; then
echo " $STEP_NUM. Review $FAILED_SSH_OUTPUT for hosts needing SSH credential fixes"
STEP_NUM=$((STEP_NUM + 1))
fi
if [ "$XML_FILES" -gt 0 ]; then
echo " $STEP_NUM. Review $SUMMARY_OUTPUT for detailed pfSense configuration"
STEP_NUM=$((STEP_NUM + 1))
fi
echo " $STEP_NUM. Examine $SCAN_OUTPUT for complete network data (use jq for querying)"
echo ""
log_success "Workflow completed successfully! 🎉"

View File

@@ -17,5 +17,18 @@
"max_workers": 10, "max_workers": 10,
"ping_timeout": 2, "ping_timeout": 2,
"port_scan_timeout": 1 "port_scan_timeout": 1
} },
"hypervisors": [
{
"host": "srvhost04.egonetix.de",
"port": 2222,
"user": "root"
},
{
"host": "srv-wmw-host01",
"port": 22,
"user": "root"
}
],
"probe_ssh_on_discovered": true
} }

View File

@@ -12,6 +12,8 @@ A comprehensive network topology discovery tool that scans local and VPN-connect
- 📊 **SVG Diagram Generation**: Creates visual network topology diagrams - 📊 **SVG Diagram Generation**: Creates visual network topology diagrams
- 🔄 **Routing Analysis**: Extracts and analyzes routing tables from routers - 🔄 **Routing Analysis**: Extracts and analyzes routing tables from routers
- 📝 **JSON Export**: Structured data output for further processing - 📝 **JSON Export**: Structured data output for further processing
- 📄 **pfSense XML Parsing**: Automatically parses pfSense backup XML files for complete configuration analysis
- 🔗 **Automatic Integration**: Seamlessly integrates pfSense XML data into network scans
## Requirements ## Requirements
@@ -205,6 +207,23 @@ The scanner produces JSON with the following structure:
firefox network_topology.svg firefox network_topology.svg
``` ```
### Complete Automated Workflow
For the ultimate network discovery experience, use the automated workflow script:
```bash
./complete_workflow.sh
```
This script will:
1. ✅ Verify system requirements
2. 🔍 Run integrated network scan (including pfSense XML if present)
3. 🎨 Generate SVG network diagram
4. 📋 Create network summary (if pfSense XML files exist)
5. 📊 Display statistics and next steps
**One-command network discovery!**
## SSH Access Setup ## SSH Access Setup
For automated scanning, SSH key-based authentication is recommended: For automated scanning, SSH key-based authentication is recommended:
@@ -362,3 +381,27 @@ Created for comprehensive network topology discovery and visualization.
--- ---
**Note**: Always ensure you have proper authorization before scanning networks. This tool performs active network reconnaissance. **Note**: Always ensure you have proper authorization before scanning networks. This tool performs active network reconnaissance.
### pfSense XML Integration
The scanner can automatically parse pfSense backup XML files to extract comprehensive configuration data:
- **Network Interfaces**: All interface configurations, IP addresses, VLANs
- **Routing Tables**: Static routes, dynamic routing, gateway configurations
- **VPN Configurations**: WireGuard tunnels, OpenVPN servers/clients, IPsec
- **Firewall Rules**: NAT rules, port forwarding, access control lists
- **DHCP Services**: Server configurations, static mappings, lease pools
- **DNS Settings**: Resolvers, domain configurations
- **System Information**: Hostname, domain, version, services
**Automatic Integration**: Place pfSense XML backup files in the scanner directory, and they will be automatically parsed and integrated into network scans.
**Manual Parsing**: Use `pfsense_integrator.py` to work with XML files independently:
```bash
# Parse XML files and generate summary
./pfsense_integrator.py *.xml --summary network_summary.md
# Integrate with existing scan
./pfsense_integrator.py *.xml -s scan.json -o enhanced_scan.json
```

14
docs/network_report.md Normal file
View File

@@ -0,0 +1,14 @@
# Network Mapping Report
Generated on: Fr 10. Okt 11:14:30 CEST 2025
## Network Statistics
- Network Segments: 15
- pfSense Firewalls: 2
- WireGuard Networks: 3
- Static Routes: 3
- DHCP Static Mappings: 54
## Generated Files
- comprehensive_network.json - Complete network data
- comprehensive_network.svg - Network topology diagram
- network_report.md - This summary report

53
docs/network_summary.md Normal file
View File

@@ -0,0 +1,53 @@
# Network Topology Summary
Generated from pfSense XML configurations
## pfSense Firewall: gw-nue01
**Version:** unknown
**Domain:** egonetix.lan
### Network Interfaces
- **WAN** (wan): dhcp
- **LAN** (lan): 10.0.0.1
- **wireguardnachhause** (opt1): 10.69.69.1
- Gateway: WirusguardusGW
### Static Routes
- 172.20.0.0/16 via WirusguardusGW
*heyme*
### WireGuard VPN
- **Tunnel tun_wg0** (Port 51820)
*heyme*
- Peer: wireguardheyme - Networks: 172.20.0.0/16, 10.69.69.2/32
### DHCP Configuration
## pfSense Firewall: gw-st01
**Version:** unknown
**Domain:** egonetix.lan
### Network Interfaces
- **WAN** (wan): 192.168.178.3
- Gateway: WANGW
- **LAN** (lan): 172.20.20.1
- **wireguardnnbesch** (opt1): 10.69.69.2
- Gateway: wirenuenbesch
- **HomeAssistant** (opt2): 172.20.70.1
- **WireguardOpenvpn** (opt3): 10.5.0.2
- Gateway: WireguardOpenvpnGW
### Static Routes
- 10.0.0.0/24 via wirenuenbesch
*wireguardn&uuml;nbesch*
- 12.1.0.0/24 via wirenuenbesch
*openvpn nutzer*
### WireGuard VPN
- **Tunnel tun_wg0** (Port 51820)
*de1099.nordvpn.com*
- **Tunnel tun_wg1** (Port 51821)
*wireguardn&uuml;nbesch*
- Peer: de1099.nordvpn.com - Networks: 0.0.0.0/0
- Peer: wireguardn&uuml;nbesch - Networks: 10.0.0.0/24, 10.69.69.1/32, 12.1.0.0/24
### DHCP Configuration

View File

@@ -1,284 +0,0 @@
#!/usr/bin/env python3
"""
Complete Network Scanner with pfSense Integration
Combines network scanning with pfSense-specific features
"""
import json
import logging
import argparse
from datetime import datetime
from network_scanner import NetworkScanner, NetworkSegment
from pfsense_scanner import PfSenseScanner
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class IntegratedNetworkScanner:
"""Enhanced network scanner with pfSense integration"""
def __init__(self, config: dict):
self.config = config
self.base_scanner = NetworkScanner(config)
self.pfsense_devices = []
def scan_all(self):
"""Perform complete network scan including pfSense devices"""
logger.info("Starting integrated network scan...")
# Run base network scan
self.base_scanner.scan_all()
# Identify and enhance pfSense devices
self._scan_pfsense_devices()
logger.info("Integrated scan complete")
def _scan_pfsense_devices(self):
"""Find and deeply scan pfSense devices"""
logger.info("Looking for pfSense devices...")
# Check configured special devices
special_devices = self.config.get('special_devices', {})
for segment in self.base_scanner.segments:
for device in segment.devices:
is_pfsense = False
# Check if device is marked as pfSense in config
if device.ip in special_devices:
if special_devices[device.ip].get('type') == 'firewall' or \
special_devices[device.ip].get('os') == 'pfSense':
is_pfsense = True
# Check if hostname contains pfsense
if device.hostname and 'pfsense' in device.hostname.lower():
is_pfsense = True
# Check if device looks like a pfSense (has many routes and ports 80/443)
if device.routes and len(device.routes) > 3 and \
80 in device.open_ports and 443 in device.open_ports:
is_pfsense = True
if is_pfsense and device.ssh_accessible:
logger.info(f"Enhanced scanning pfSense device: {device.ip}")
self._enhance_pfsense_device(device)
def _enhance_pfsense_device(self, device):
"""Enhance device info with pfSense-specific data"""
try:
scanner = PfSenseScanner(
device.ip,
self.config.get('ssh_user', 'root'),
self.config.get('ssh_key_path')
)
pfsense_info = scanner.get_full_info()
# Merge pfSense-specific info into device
device.device_type = 'firewall'
device.os_type = 'pfSense (FreeBSD)'
# Store additional pfSense data
if not hasattr(device, 'pfsense_info'):
device.__dict__['pfsense_info'] = pfsense_info
# Add DHCP leases to known devices
dhcp_leases = pfsense_info.get('dhcp_leases', [])
if dhcp_leases:
logger.info(f"Found {len(dhcp_leases)} DHCP leases on {device.ip}")
# Add VPN info
vpn_info = pfsense_info.get('vpn', {})
wireguard = vpn_info.get('wireguard', [])
if wireguard:
logger.info(f"Found {len(wireguard)} WireGuard tunnels on {device.ip}")
self.pfsense_devices.append(device)
except Exception as e:
logger.error(f"Error enhancing pfSense device {device.ip}: {e}")
def export_json(self, filename: str):
"""Export enhanced results to JSON"""
data = {
'scan_timestamp': datetime.now().isoformat(),
'segments': []
}
for segment in self.base_scanner.segments:
segment_data = {
'name': segment.name,
'cidr': segment.cidr,
'gateway': segment.gateway,
'is_vpn': segment.is_vpn,
'devices': []
}
for device in segment.devices:
device_data = {
'ip': device.ip,
'hostname': device.hostname,
'mac': device.mac,
'manufacturer': device.manufacturer,
'os_type': device.os_type,
'os_version': device.os_version,
'device_type': device.device_type,
'open_ports': device.open_ports,
'ssh_accessible': device.ssh_accessible,
'services': device.services,
'routes': device.routes,
'interfaces': device.interfaces
}
# Add pfSense-specific info if available
if hasattr(device, 'pfsense_info'):
device_data['pfsense_info'] = device.__dict__['pfsense_info']
segment_data['devices'].append(device_data)
data['segments'].append(segment_data)
with open(filename, 'w') as f:
json.dump(data, f, indent=2)
logger.info(f"Exported results to {filename}")
def print_summary(self):
"""Print enhanced summary"""
print("\n" + "="*80)
print("INTEGRATED NETWORK SCAN SUMMARY")
print("="*80)
total_devices = sum(len(seg.devices) for seg in self.base_scanner.segments)
print(f"\nTotal Segments: {len(self.base_scanner.segments)}")
print(f"Total Devices: {total_devices}")
print(f"pfSense Devices: {len(self.pfsense_devices)}")
for segment in self.base_scanner.segments:
print(f"\n{'='*80}")
print(f"📡 Network: {segment.name} ({segment.cidr})")
if segment.is_vpn:
print(" 🔐 VPN Network")
print(f" Devices: {len(segment.devices)}")
for device in segment.devices:
icon = self._get_device_icon(device.device_type)
print(f"\n {icon} {device.ip}")
if device.hostname:
print(f" Hostname: {device.hostname}")
if device.mac:
print(f" MAC: {device.mac}")
if device.device_type:
print(f" Type: {device.device_type}")
if device.os_type:
print(f" OS: {device.os_type} {device.os_version or ''}")
if device.open_ports:
print(f" Ports: {', '.join(map(str, device.open_ports))}")
if device.ssh_accessible:
print(f" ✓ SSH Accessible")
# Show pfSense-specific info
if hasattr(device, 'pfsense_info'):
pfsense = device.__dict__['pfsense_info']
print(f" 🛡️ pfSense Firewall:")
print(f" Interfaces: {len(pfsense.get('interfaces', []))}")
print(f" Routes: {len(pfsense.get('routes', []))}")
print(f" DHCP Leases: {len(pfsense.get('dhcp_leases', []))}")
vpn = pfsense.get('vpn', {})
wg_count = len(vpn.get('wireguard', []))
ovpn_count = len(vpn.get('openvpn', []))
if wg_count:
print(f" WireGuard Tunnels: {wg_count}")
if ovpn_count:
print(f" OpenVPN Connections: {ovpn_count}")
if device.routes and len(device.routes) > 0:
print(f" 📋 Routing Table ({len(device.routes)} routes):")
for route in device.routes[:3]: # Show first 3
dest = route.get('destination', 'unknown')
gw = route.get('gateway', 'direct')
print(f"{dest} via {gw}")
if len(device.routes) > 3:
print(f" ... and {len(device.routes) - 3} more")
print("\n" + "="*80)
def _get_device_icon(self, device_type):
"""Get emoji icon for device type"""
icons = {
'router': '🔀',
'firewall': '🛡️',
'switch': '🔌',
'server': '🖥️',
'linux_server': '🐧',
'windows_client': '💻',
'client': '💻'
}
return icons.get(device_type, '')
def main():
parser = argparse.ArgumentParser(
description='Integrated Network Scanner with pfSense Support'
)
parser.add_argument('-c', '--config', default='config.json',
help='Configuration file (default: config.json)')
parser.add_argument('-o', '--output', default='network_scan.json',
help='Output JSON file (default: network_scan.json)')
parser.add_argument('-v', '--verbose', action='store_true',
help='Verbose output')
parser.add_argument('--generate-svg', action='store_true',
help='Automatically generate SVG diagram after scan')
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
# Load configuration
try:
with open(args.config, 'r') as f:
config = json.load(f)
logger.info(f"Loaded configuration from {args.config}")
except FileNotFoundError:
logger.warning(f"Config file {args.config} not found, using defaults")
config = {}
# Run integrated scanner
scanner = IntegratedNetworkScanner(config)
scanner.scan_all()
# Print summary
scanner.print_summary()
# Export results
scanner.export_json(args.output)
print(f"\n✓ Scan complete! Results saved to {args.output}")
# Generate SVG if requested
if args.generate_svg:
try:
from svg_generator import NetworkDiagramGenerator
with open(args.output, 'r') as f:
scan_data = json.load(f)
svg_file = args.output.replace('.json', '.svg')
generator = NetworkDiagramGenerator(scan_data)
generator.generate_svg(svg_file)
print(f"✓ SVG diagram generated: {svg_file}")
except Exception as e:
logger.error(f"Error generating SVG: {e}")
if __name__ == '__main__':
main()

28
quickstart.sh Executable file → Normal file
View File

@@ -77,10 +77,9 @@ echo "What would you like to do?"
echo "" echo ""
echo "1) Run a quick scan (current network only)" echo "1) Run a quick scan (current network only)"
echo "2) Run a full scan with pfSense integration" echo "2) Run a full scan with pfSense integration"
echo "3) Scan and generate SVG diagram" echo "3) Scan specific pfSense device"
echo "4) Scan specific pfSense device" echo "4) Show help"
echo "5) Show help" echo "5) Exit"
echo "6) Exit"
echo "" echo ""
read -p "Choose an option (1-6): " choice read -p "Choose an option (1-6): " choice
@@ -91,7 +90,6 @@ case $choice in
./network_scanner.py -o quick_scan.json -v ./network_scanner.py -o quick_scan.json -v
echo "" echo ""
echo "✓ Done! Results saved to: quick_scan.json" echo "✓ Done! Results saved to: quick_scan.json"
echo " Generate diagram with: ./svg_generator.py quick_scan.json"
;; ;;
2) 2)
echo "" echo ""
@@ -101,13 +99,6 @@ case $choice in
echo "✓ Done! Results saved to: full_scan.json" echo "✓ Done! Results saved to: full_scan.json"
;; ;;
3) 3)
echo ""
echo "🔍 Running scan and generating diagram..."
./integrated_scanner.py -o scan_with_diagram.json -v --generate-svg
echo ""
echo "✓ Done! Open scan_with_diagram.svg to view the network diagram"
;;
4)
echo "" echo ""
read -p "Enter pfSense IP address: " pfsense_ip read -p "Enter pfSense IP address: " pfsense_ip
echo "🔍 Scanning pfSense at $pfsense_ip..." echo "🔍 Scanning pfSense at $pfsense_ip..."
@@ -115,7 +106,7 @@ case $choice in
echo "" echo ""
echo "✓ Done! Results saved to: pfsense_${pfsense_ip}.json" echo "✓ Done! Results saved to: pfsense_${pfsense_ip}.json"
;; ;;
5) 4)
echo "" echo ""
cat << 'HELP' cat << 'HELP'
Network Scanner - Help Network Scanner - Help
@@ -134,11 +125,7 @@ Available Scripts:
3. integrated_scanner.py 3. integrated_scanner.py
Complete scanner with pfSense integration Complete scanner with pfSense integration
Usage: ./integrated_scanner.py [-c config.json] [-o output.json] [-v] [--generate-svg] Usage: ./integrated_scanner.py [-c config.json] [-o output.json] [-v]
4. svg_generator.py
Generate SVG diagram from scan results
Usage: ./svg_generator.py <input.json> [-o output.svg]
Configuration: Configuration:
------------- -------------
@@ -159,13 +146,10 @@ Examples:
# Scan pfSense # Scan pfSense
./pfsense_scanner.py 192.168.1.1 -u root -k ~/.ssh/id_rsa ./pfsense_scanner.py 192.168.1.1 -u root -k ~/.ssh/id_rsa
# Generate diagram from existing scan
./svg_generator.py network_scan.json -o my_network.svg
For more information, see README.md For more information, see README.md
HELP HELP
;; ;;
6) 5)
echo "Goodbye!" echo "Goodbye!"
exit 0 exit 0
;; ;;

View File

@@ -0,0 +1,53 @@
# Network Topology Summary
Generated from pfSense XML configurations
## pfSense Firewall: gw-nue01
**Version:** unknown
**Domain:** egonetix.lan
### Network Interfaces
- **WAN** (wan): dhcp
- **LAN** (lan): 10.0.0.1
- **wireguardnachhause** (opt1): 10.69.69.1
- Gateway: WirusguardusGW
### Static Routes
- 172.20.0.0/16 via WirusguardusGW
*heyme*
### WireGuard VPN
- **Tunnel tun_wg0** (Port 51820)
*heyme*
- Peer: wireguardheyme - Networks: 172.20.0.0/16, 10.69.69.2/32
### DHCP Configuration
## pfSense Firewall: gw-st01
**Version:** unknown
**Domain:** egonetix.lan
### Network Interfaces
- **WAN** (wan): 192.168.178.3
- Gateway: WANGW
- **LAN** (lan): 172.20.20.1
- **wireguardnnbesch** (opt1): 10.69.69.2
- Gateway: wirenuenbesch
- **HomeAssistant** (opt2): 172.20.70.1
- **WireguardOpenvpn** (opt3): 10.5.0.2
- Gateway: WireguardOpenvpnGW
### Static Routes
- 10.0.0.0/24 via wirenuenbesch
*wireguardn&uuml;nbesch*
- 12.1.0.0/24 via wirenuenbesch
*openvpn nutzer*
### WireGuard VPN
- **Tunnel tun_wg0** (Port 51820)
*de1099.nordvpn.com*
- **Tunnel tun_wg1** (Port 51821)
*wireguardn&uuml;nbesch*
- Peer: de1099.nordvpn.com - Networks: 0.0.0.0/0
- Peer: wireguardn&uuml;nbesch - Networks: 10.0.0.0/24, 10.69.69.1/32, 12.1.0.0/24
### DHCP Configuration

View File

@@ -0,0 +1,53 @@
# Network Topology Summary
Generated from pfSense XML configurations
## pfSense Firewall: gw-nue01
**Version:** unknown
**Domain:** egonetix.lan
### Network Interfaces
- **WAN** (wan): dhcp
- **LAN** (lan): 10.0.0.1
- **wireguardnachhause** (opt1): 10.69.69.1
- Gateway: WirusguardusGW
### Static Routes
- 172.20.0.0/16 via WirusguardusGW
*heyme*
### WireGuard VPN
- **Tunnel tun_wg0** (Port 51820)
*heyme*
- Peer: wireguardheyme - Networks: 172.20.0.0/16, 10.69.69.2/32
### DHCP Configuration
## pfSense Firewall: gw-st01
**Version:** unknown
**Domain:** egonetix.lan
### Network Interfaces
- **WAN** (wan): 192.168.178.3
- Gateway: WANGW
- **LAN** (lan): 172.20.20.1
- **wireguardnnbesch** (opt1): 10.69.69.2
- Gateway: wirenuenbesch
- **HomeAssistant** (opt2): 172.20.70.1
- **WireguardOpenvpn** (opt3): 10.5.0.2
- Gateway: WireguardOpenvpnGW
### Static Routes
- 10.0.0.0/24 via wirenuenbesch
*wireguardn&uuml;nbesch*
- 12.1.0.0/24 via wirenuenbesch
*openvpn nutzer*
### WireGuard VPN
- **Tunnel tun_wg0** (Port 51820)
*de1099.nordvpn.com*
- **Tunnel tun_wg1** (Port 51821)
*wireguardn&uuml;nbesch*
- Peer: de1099.nordvpn.com - Networks: 0.0.0.0/0
- Peer: wireguardn&uuml;nbesch - Networks: 10.0.0.0/24, 10.69.69.1/32, 12.1.0.0/24
### DHCP Configuration

View File

@@ -0,0 +1,53 @@
# Network Topology Summary
Generated from pfSense XML configurations
## pfSense Firewall: gw-nue01
**Version:** unknown
**Domain:** egonetix.lan
### Network Interfaces
- **WAN** (wan): dhcp
- **LAN** (lan): 10.0.0.1
- **wireguardnachhause** (opt1): 10.69.69.1
- Gateway: WirusguardusGW
### Static Routes
- 172.20.0.0/16 via WirusguardusGW
*heyme*
### WireGuard VPN
- **Tunnel tun_wg0** (Port 51820)
*heyme*
- Peer: wireguardheyme - Networks: 172.20.0.0/16, 10.69.69.2/32
### DHCP Configuration
## pfSense Firewall: gw-st01
**Version:** unknown
**Domain:** egonetix.lan
### Network Interfaces
- **WAN** (wan): 192.168.178.3
- Gateway: WANGW
- **LAN** (lan): 172.20.20.1
- **wireguardnnbesch** (opt1): 10.69.69.2
- Gateway: wirenuenbesch
- **HomeAssistant** (opt2): 172.20.70.1
- **WireguardOpenvpn** (opt3): 10.5.0.2
- Gateway: WireguardOpenvpnGW
### Static Routes
- 10.0.0.0/24 via wirenuenbesch
*wireguardn&uuml;nbesch*
- 12.1.0.0/24 via wirenuenbesch
*openvpn nutzer*
### WireGuard VPN
- **Tunnel tun_wg0** (Port 51820)
*de1099.nordvpn.com*
- **Tunnel tun_wg1** (Port 51821)
*wireguardn&uuml;nbesch*
- Peer: de1099.nordvpn.com - Networks: 0.0.0.0/0
- Peer: wireguardn&uuml;nbesch - Networks: 10.0.0.0/24, 10.69.69.1/32, 12.1.0.0/24
### DHCP Configuration

View File

@@ -0,0 +1,53 @@
# Network Topology Summary
Generated from pfSense XML configurations
## pfSense Firewall: gw-nue01
**Version:** unknown
**Domain:** egonetix.lan
### Network Interfaces
- **WAN** (wan): dhcp
- **LAN** (lan): 10.0.0.1
- **wireguardnachhause** (opt1): 10.69.69.1
- Gateway: WirusguardusGW
### Static Routes
- 172.20.0.0/16 via WirusguardusGW
*heyme*
### WireGuard VPN
- **Tunnel tun_wg0** (Port 51820)
*heyme*
- Peer: wireguardheyme - Networks: 172.20.0.0/16, 10.69.69.2/32
### DHCP Configuration
## pfSense Firewall: gw-st01
**Version:** unknown
**Domain:** egonetix.lan
### Network Interfaces
- **WAN** (wan): 192.168.178.3
- Gateway: WANGW
- **LAN** (lan): 172.20.20.1
- **wireguardnnbesch** (opt1): 10.69.69.2
- Gateway: wirenuenbesch
- **HomeAssistant** (opt2): 172.20.70.1
- **WireguardOpenvpn** (opt3): 10.5.0.2
- Gateway: WireguardOpenvpnGW
### Static Routes
- 10.0.0.0/24 via wirenuenbesch
*wireguardn&uuml;nbesch*
- 12.1.0.0/24 via wirenuenbesch
*openvpn nutzer*
### WireGuard VPN
- **Tunnel tun_wg0** (Port 51820)
*de1099.nordvpn.com*
- **Tunnel tun_wg1** (Port 51821)
*wireguardn&uuml;nbesch*
- Peer: de1099.nordvpn.com - Networks: 0.0.0.0/0
- Peer: wireguardn&uuml;nbesch - Networks: 10.0.0.0/24, 10.69.69.1/32, 12.1.0.0/24
### DHCP Configuration

View File

@@ -0,0 +1,53 @@
# Network Topology Summary
Generated from pfSense XML configurations
## pfSense Firewall: gw-nue01
**Version:** unknown
**Domain:** egonetix.lan
### Network Interfaces
- **WAN** (wan): dhcp
- **LAN** (lan): 10.0.0.1
- **wireguardnachhause** (opt1): 10.69.69.1
- Gateway: WirusguardusGW
### Static Routes
- 172.20.0.0/16 via WirusguardusGW
*heyme*
### WireGuard VPN
- **Tunnel tun_wg0** (Port 51820)
*heyme*
- Peer: wireguardheyme - Networks: 172.20.0.0/16, 10.69.69.2/32
### DHCP Configuration
## pfSense Firewall: gw-st01
**Version:** unknown
**Domain:** egonetix.lan
### Network Interfaces
- **WAN** (wan): 192.168.178.3
- Gateway: WANGW
- **LAN** (lan): 172.20.20.1
- **wireguardnnbesch** (opt1): 10.69.69.2
- Gateway: wirenuenbesch
- **HomeAssistant** (opt2): 172.20.70.1
- **WireguardOpenvpn** (opt3): 10.5.0.2
- Gateway: WireguardOpenvpnGW
### Static Routes
- 10.0.0.0/24 via wirenuenbesch
*wireguardn&uuml;nbesch*
- 12.1.0.0/24 via wirenuenbesch
*openvpn nutzer*
### WireGuard VPN
- **Tunnel tun_wg0** (Port 51820)
*de1099.nordvpn.com*
- **Tunnel tun_wg1** (Port 51821)
*wireguardn&uuml;nbesch*
- Peer: de1099.nordvpn.com - Networks: 0.0.0.0/0
- Peer: wireguardn&uuml;nbesch - Networks: 10.0.0.0/24, 10.69.69.1/32, 12.1.0.0/24
### DHCP Configuration

View File

@@ -0,0 +1,53 @@
# Network Topology Summary
Generated from pfSense XML configurations
## pfSense Firewall: gw-nue01
**Version:** unknown
**Domain:** egonetix.lan
### Network Interfaces
- **WAN** (wan): dhcp
- **LAN** (lan): 10.0.0.1
- **wireguardnachhause** (opt1): 10.69.69.1
- Gateway: WirusguardusGW
### Static Routes
- 172.20.0.0/16 via WirusguardusGW
*heyme*
### WireGuard VPN
- **Tunnel tun_wg0** (Port 51820)
*heyme*
- Peer: wireguardheyme - Networks: 172.20.0.0/16, 10.69.69.2/32
### DHCP Configuration
## pfSense Firewall: gw-st01
**Version:** unknown
**Domain:** egonetix.lan
### Network Interfaces
- **WAN** (wan): 192.168.178.3
- Gateway: WANGW
- **LAN** (lan): 172.20.20.1
- **wireguardnnbesch** (opt1): 10.69.69.2
- Gateway: wirenuenbesch
- **HomeAssistant** (opt2): 172.20.70.1
- **WireguardOpenvpn** (opt3): 10.5.0.2
- Gateway: WireguardOpenvpnGW
### Static Routes
- 10.0.0.0/24 via wirenuenbesch
*wireguardn&uuml;nbesch*
- 12.1.0.0/24 via wirenuenbesch
*openvpn nutzer*
### WireGuard VPN
- **Tunnel tun_wg0** (Port 51820)
*de1099.nordvpn.com*
- **Tunnel tun_wg1** (Port 51821)
*wireguardn&uuml;nbesch*
- Peer: de1099.nordvpn.com - Networks: 0.0.0.0/0
- Peer: wireguardn&uuml;nbesch - Networks: 10.0.0.0/24, 10.69.69.1/32, 12.1.0.0/24
### DHCP Configuration

265
scripts/EXAMPLES.sh Executable file
View File

@@ -0,0 +1,265 @@
#!/bin/bash
# Example usage scenarios for the network scanner
echo "=========================================="
echo "Network Scanner - Usage Examples"
echo "=========================================="
echo ""
cat << 'EOF'
# SCENARIO 1: Quick Network Overview
# -----------------------------------
# Scan your local network and get a basic overview
./network_scanner.py -v -o quick_scan.json
# SCENARIO 2: Complete Network Documentation
# -------------------------------------------
# Full scan with pfSense integration and SVG generation
./integrated_scanner.py -c config.json -o full_network.json --generate-svg -v
# View the diagram:
firefox full_network.svg
# SCENARIO 3: pfSense Deep Dive
# ------------------------------
# Detailed scan of a specific pfSense firewall
./pfsense_scanner.py 192.168.1.1 -u root -k ~/.ssh/id_rsa -o pfsense_main.json
# View the results:
cat pfsense_main.json | jq '.vpn' # Show VPN info
cat pfsense_main.json | jq '.routes' # Show routing table
# SCENARIO 4: Multi-Network Scan with VPN
# ----------------------------------------
# Create a config for multiple networks
cat > my_network_config.json << 'CONFIG'
{
"ssh_user": "root",
"ssh_key_path": "/home/user/.ssh/id_rsa",
"timeout": 3,
"additional_networks": [
"192.168.1.0/24", # Main network
"192.168.2.0/24", # Guest network
"10.8.0.0/24", # OpenVPN network
"10.0.0.0/24" # WireGuard VPN
],
"special_devices": {
"192.168.1.1": {
"name": "Main pfSense Firewall",
"type": "firewall",
"os": "pfSense"
},
"192.168.2.1": {
"name": "Guest Network Router",
"type": "router"
}
}
}
CONFIG
./integrated_scanner.py -c my_network_config.json -o multi_network.json --generate-svg
# SCENARIO 5: Scheduled Network Monitoring
# -----------------------------------------
# Add to crontab for daily network documentation
# Create wrapper script
cat > /usr/local/bin/network-scan-daily.sh << 'SCRIPT'
#!/bin/bash
DATE=$(date +%Y%m%d)
OUTPUT_DIR="/var/log/network-scans"
mkdir -p "$OUTPUT_DIR"
cd /path/to/network_scanner
./integrated_scanner.py \
-o "$OUTPUT_DIR/scan_$DATE.json" \
--generate-svg
# Keep only last 30 days
find "$OUTPUT_DIR" -name "scan_*.json" -mtime +30 -delete
find "$OUTPUT_DIR" -name "scan_*.svg" -mtime +30 -delete
SCRIPT
chmod +x /usr/local/bin/network-scan-daily.sh
# Add to crontab (run at 2 AM daily):
# 0 2 * * * /usr/local/bin/network-scan-daily.sh
# SCENARIO 6: Compare Network Changes
# ------------------------------------
# Scan and compare with previous results
# Initial scan
./integrated_scanner.py -o baseline.json
# After changes
./integrated_scanner.py -o current.json
# Compare device counts
echo "Baseline devices:"
cat baseline.json | jq '[.segments[].devices[].ip] | length'
echo "Current devices:"
cat current.json | jq '[.segments[].devices[].ip] | length'
# Find new devices
comm -13 \
<(cat baseline.json | jq -r '.segments[].devices[].ip' | sort) \
<(cat current.json | jq -r '.segments[].devices[].ip' | sort) \
| sed 's/^/NEW: /'
# Find removed devices
comm -23 \
<(cat baseline.json | jq -r '.segments[].devices[].ip' | sort) \
<(cat current.json | jq -r '.segments[].devices[].ip' | sort) \
| sed 's/^/REMOVED: /'
# SCENARIO 7: Extract Specific Information
# -----------------------------------------
# Use jq to extract specific data from scan results
# List all SSH-accessible devices
cat network_scan.json | jq -r '.segments[].devices[] | select(.ssh_accessible==true) | .ip'
# List all routers/firewalls
cat network_scan.json | jq -r '.segments[].devices[] | select(.device_type=="router" or .device_type=="firewall") | "\(.ip) - \(.hostname // "unknown")"'
# List all devices with their OS
cat network_scan.json | jq -r '.segments[].devices[] | "\(.ip)\t\(.os_type // "unknown")\t\(.hostname // "unknown")"'
# Export to CSV
echo "IP,Hostname,Type,OS" > devices.csv
cat network_scan.json | jq -r '.segments[].devices[] | "\(.ip),\(.hostname // ""),\(.device_type // ""),\(.os_type // "")"' >> devices.csv
# SCENARIO 8: Integration with Documentation
# -------------------------------------------
# Generate markdown documentation from scan
cat > generate_docs.py << 'PYTHON'
#!/usr/bin/env python3
import json
import sys
with open(sys.argv[1]) as f:
data = json.load(f)
print("# Network Documentation")
print(f"\nGenerated: {data.get('scan_timestamp', 'N/A')}")
print("\n## Network Segments\n")
for segment in data['segments']:
print(f"### {segment['name']}")
print(f"- CIDR: `{segment['cidr']}`")
print(f"- Devices: {len(segment['devices'])}")
if segment.get('is_vpn'):
print("- Type: VPN Network")
print("\n#### Devices\n")
print("| IP | Hostname | Type | OS |")
print("|---|---|---|---|")
for device in segment['devices']:
ip = device['ip']
hostname = device.get('hostname', '-')
dtype = device.get('device_type', '-')
os = device.get('os_type', '-')
print(f"| {ip} | {hostname} | {dtype} | {os} |")
print()
PYTHON
chmod +x generate_docs.py
./generate_docs.py network_scan.json > NETWORK_DOCS.md
# SCENARIO 9: Security Audit
# ---------------------------
# Check for common security issues
# Find devices with Telnet open
cat network_scan.json | jq -r '.segments[].devices[] | select(.open_ports[]? == 23) | "⚠️ Telnet open on \(.ip) (\(.hostname // "unknown"))"'
# Find devices without SSH access
cat network_scan.json | jq -r '.segments[].devices[] | select(.device_type=="router" or .device_type=="firewall") | select(.ssh_accessible==false) | "⚠️ No SSH access to \(.ip) (\(.hostname // "unknown"))"'
# List devices with many open ports
cat network_scan.json | jq -r '.segments[].devices[] | select((.open_ports | length) > 5) | " \(.ip) has \(.open_ports | length) open ports"'
# SCENARIO 10: WireGuard Topology Mapping
# ----------------------------------------
# Extract WireGuard tunnel information from pfSense
./pfsense_scanner.py 192.168.1.1 -o pfsense.json
# List all WireGuard peers
cat pfsense.json | jq -r '.vpn.wireguard[] | "Peer: \(.peer // "N/A") -> \(.allowed_ips // "N/A")"'
# Check tunnel status
cat pfsense.json | jq -r '.vpn.wireguard[] | select(.latest_handshake) | "Active tunnel to \(.endpoint) (handshake: \(.latest_handshake)s ago)"'
# SCENARIO 11: Network Capacity Planning
# ---------------------------------------
# Analyze network usage and plan capacity
# Count devices per segment
cat network_scan.json | jq -r '.segments[] | "\(.cidr): \(.devices | length) devices"'
# Calculate subnet utilization
cat network_scan.json | jq -r '.segments[] |
if .cidr | contains("/24") then
"\(.cidr): \(.devices | length)/254 = \((.devices | length) * 100 / 254 | floor)% utilized"
else
"\(.cidr): \(.devices | length) devices"
end'
# SCENARIO 12: Quick Health Check
# --------------------------------
# Create a health check script
cat > health_check.sh << 'HEALTH'
#!/bin/bash
SCAN_FILE="latest_scan.json"
echo "Network Health Check"
echo "===================="
echo ""
# Total devices
TOTAL=$(cat $SCAN_FILE | jq '[.segments[].devices[]] | length')
echo "Total devices: $TOTAL"
# SSH accessible
SSH_OK=$(cat $SCAN_FILE | jq '[.segments[].devices[] | select(.ssh_accessible==true)] | length')
echo "SSH accessible: $SSH_OK"
# By type
echo ""
echo "Device Types:"
cat $SCAN_FILE | jq -r '.segments[].devices[].device_type' | sort | uniq -c | sort -rn
# Segments
echo ""
echo "Network Segments:"
cat $SCAN_FILE | jq -r '.segments[] | " \(.name): \(.devices | length) devices"'
HEALTH
chmod +x health_check.sh
./integrated_scanner.py -o latest_scan.json
./health_check.sh
EOF
echo ""
echo "For more examples, see README.md"

181
scripts/quickstart.sh Executable file
View File

@@ -0,0 +1,181 @@
#!/bin/bash
# Quick Start Script for Network Scanner
# This script helps you get started quickly
set -e
echo "================================"
echo "Network Scanner - Quick Start"
echo "================================"
echo ""
# Check for Python
if ! command -v python3 &> /dev/null; then
echo "❌ Error: Python 3 is not installed"
exit 1
fi
echo "✓ Python 3 found"
# Create config if it doesn't exist
if [ ! -f config.json ]; then
echo ""
echo "📝 Creating configuration file..."
# Try to detect default SSH key
SSH_KEY=""
if [ -f ~/.ssh/id_rsa ]; then
SSH_KEY="$HOME/.ssh/id_rsa"
elif [ -f ~/.ssh/id_ed25519 ]; then
SSH_KEY="$HOME/.ssh/id_ed25519"
fi
# Get current user
CURRENT_USER=$(whoami)
# Try to detect local network
LOCAL_NET=$(ip route | grep -oP 'src \K[\d.]+' | head -1)
if [ -n "$LOCAL_NET" ]; then
# Convert to /24 network
NET_PREFIX=$(echo $LOCAL_NET | cut -d. -f1-3)
LOCAL_NET="${NET_PREFIX}.0/24"
else
LOCAL_NET="192.168.1.0/24"
fi
cat > config.json << EOF
{
"ssh_user": "$CURRENT_USER",
"ssh_key_path": "$SSH_KEY",
"timeout": 2,
"additional_networks": [
"$LOCAL_NET"
],
"special_devices": {
},
"scan_options": {
"max_workers": 10,
"ping_timeout": 2,
"port_scan_timeout": 1
}
}
EOF
echo "✓ Created config.json"
echo " Local network detected: $LOCAL_NET"
[ -n "$SSH_KEY" ] && echo " SSH key detected: $SSH_KEY"
echo ""
echo " Please edit config.json to customize for your network!"
echo ""
else
echo "✓ config.json already exists"
fi
# Ask what to do
echo ""
echo "What would you like to do?"
echo ""
echo "1) Run a quick scan (current network only)"
echo "2) Run a full scan with pfSense integration"
echo "3) Scan and generate SVG diagram"
echo "4) Scan specific pfSense device"
echo "5) Show help"
echo "6) Exit"
echo ""
read -p "Choose an option (1-6): " choice
case $choice in
1)
echo ""
echo "🔍 Running quick network scan..."
./network_scanner.py -o quick_scan.json -v
echo ""
echo "✓ Done! Results saved to: quick_scan.json"
echo " Generate diagram with: ./svg_generator.py quick_scan.json"
;;
2)
echo ""
echo "🔍 Running full integrated scan..."
./integrated_scanner.py -o full_scan.json -v
echo ""
echo "✓ Done! Results saved to: full_scan.json"
;;
3)
echo ""
echo "🔍 Running scan and generating diagram..."
./integrated_scanner.py -o scan_with_diagram.json -v --generate-svg
echo ""
echo "✓ Done! Open scan_with_diagram.svg to view the network diagram"
;;
4)
echo ""
read -p "Enter pfSense IP address: " pfsense_ip
echo "🔍 Scanning pfSense at $pfsense_ip..."
./pfsense_scanner.py "$pfsense_ip" -o "pfsense_${pfsense_ip}.json"
echo ""
echo "✓ Done! Results saved to: pfsense_${pfsense_ip}.json"
;;
5)
echo ""
cat << 'HELP'
Network Scanner - Help
======================
Available Scripts:
-----------------
1. network_scanner.py
Basic network scanner that discovers devices and gathers info
Usage: ./network_scanner.py [-c config.json] [-o output.json] [-v]
2. pfsense_scanner.py
Specialized scanner for pfSense firewalls
Usage: ./pfsense_scanner.py <ip> [-u user] [-k keyfile] [-o output.json]
3. integrated_scanner.py
Complete scanner with pfSense integration
Usage: ./integrated_scanner.py [-c config.json] [-o output.json] [-v] [--generate-svg]
4. svg_generator.py
Generate SVG diagram from scan results
Usage: ./svg_generator.py <input.json> [-o output.svg]
Configuration:
-------------
Edit config.json to customize:
- SSH credentials
- Network ranges to scan
- Special device definitions
- Scan timeouts
Examples:
--------
# Quick scan of current network
./network_scanner.py -v
# Full scan with diagram
./integrated_scanner.py --generate-svg
# Scan pfSense
./pfsense_scanner.py 192.168.1.1 -u root -k ~/.ssh/id_rsa
# Generate diagram from existing scan
./svg_generator.py network_scan.json -o my_network.svg
For more information, see README.md
HELP
;;
6)
echo "Goodbye!"
exit 0
;;
*)
echo "Invalid option"
exit 1
;;
esac
echo ""
echo "================================"
echo "Thanks for using Network Scanner!"
echo "================================"

214
scripts/run_network_mapping.sh Executable file
View File

@@ -0,0 +1,214 @@
#!/bin/bash
# Complete Network Mapping Workflow
# This script runs the full network discovery and diagram generation process
set -e # Exit on any error
echo "=========================================="
echo "COMPREHENSIVE NETWORK MAPPING WORKFLOW"
echo "=========================================="
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
# Function to print colored output
print_status() {
echo -e "${GREEN}[INFO]${NC} $1"
}
print_warning() {
echo -e "${YELLOW}[WARN]${NC} $1"
}
print_error() {
echo -e "${RED}[ERROR]${NC} $1"
}
print_step() {
echo -e "${BLUE}[STEP]${NC} $1"
}
# Check if required files exist
check_requirements() {
print_step "Checking requirements..."
if [ ! -f "network_scanner.py" ]; then
print_error "network_scanner.py not found!"
exit 1
fi
if [ ! -f "pfsense_xml_parser.py" ]; then
print_error "pfsense_xml_parser.py not found!"
exit 1
fi
if [ ! -f "comprehensive_mapper.py" ]; then
print_error "comprehensive_mapper.py not found!"
exit 1
fi
if [ ! -f "svg_generator.py" ]; then
print_error "svg_generator.py not found!"
exit 1
fi
print_status "All required scripts found"
}
# Find pfSense XML files
find_pfsense_files() {
print_step "Looking for pfSense XML configuration files..."
PFSENSE_FILES=$(ls config-*.xml 2>/dev/null || true)
if [ -z "$PFSENSE_FILES" ]; then
print_warning "No pfSense XML files found in current directory"
print_warning "Please place your pfSense backup XML files here"
echo "Expected format: config-hostname-timestamp.xml"
return 1
fi
print_status "Found pfSense XML files:"
echo "$PFSENSE_FILES" | while read -r file; do
echo " - $file"
done
# Export for use in other functions
export PFSENSE_FILES
return 0
}
# Run network scan (optional)
run_network_scan() {
print_step "Running network scan..."
if [ -f "config.json" ]; then
print_status "Using existing config.json for network scan"
python3 network_scanner.py -c config.json -o network_scan.json
else
print_warning "No config.json found - skipping network scan"
print_warning "Create config.json to enable live network scanning"
return 1
fi
}
# Run comprehensive mapping
run_comprehensive_mapping() {
print_step "Running comprehensive network mapping..."
# Build command with pfSense files
CMD="./comprehensive_mapper.py -o comprehensive_network.json --svg comprehensive_network.svg -v"
if [ -n "$PFSENSE_FILES" ]; then
CMD="$CMD -p $PFSENSE_FILES"
fi
if [ -f "network_scan.json" ]; then
CMD="$CMD -s network_scan.json"
fi
print_status "Executing: $CMD"
eval $CMD
}
# Generate summary report
generate_report() {
print_step "Generating summary report..."
if [ -f "comprehensive_network.json" ]; then
echo "# Network Mapping Report" > network_report.md
echo "Generated on: $(date)" >> network_report.md
echo "" >> network_report.md
# Extract key statistics
SEGMENTS=$(jq '.segments | length' comprehensive_network.json)
PFSENSE_COUNT=$(jq '.pfsense_firewalls | length' comprehensive_network.json)
WG_NETWORKS=$(jq '.wireguard_networks | length' comprehensive_network.json)
STATIC_ROUTES=$(jq '.routing_table | length' comprehensive_network.json)
DHCP_MAPPINGS=$(jq '.static_mappings | length' comprehensive_network.json)
echo "## Network Statistics" >> network_report.md
echo "- Network Segments: $SEGMENTS" >> network_report.md
echo "- pfSense Firewalls: $PFSENSE_COUNT" >> network_report.md
echo "- WireGuard Networks: $WG_NETWORKS" >> network_report.md
echo "- Static Routes: $STATIC_ROUTES" >> network_report.md
echo "- DHCP Static Mappings: $DHCP_MAPPINGS" >> network_report.md
echo "" >> network_report.md
echo "## Generated Files" >> network_report.md
echo "- comprehensive_network.json - Complete network data" >> network_report.md
echo "- comprehensive_network.svg - Network topology diagram" >> network_report.md
echo "- network_report.md - This summary report" >> network_report.md
print_status "Report generated: network_report.md"
else
print_error "No comprehensive network data found"
fi
}
# Main workflow
main() {
echo "Starting comprehensive network mapping workflow..."
echo ""
check_requirements
if ! find_pfsense_files; then
print_error "Cannot proceed without pfSense XML files"
exit 1
fi
# Optional network scan
if [ -f "config.json" ]; then
run_network_scan
fi
# Comprehensive mapping (required)
run_comprehensive_mapping
# Generate report
generate_report
echo ""
print_status "Workflow completed successfully!"
echo ""
echo "Generated files:"
echo " 📊 comprehensive_network.json - Complete network data"
echo " 🖼️ comprehensive_network.svg - Network topology diagram"
echo " 📋 network_report.md - Summary report"
echo ""
echo "Open comprehensive_network.svg in your browser to view the network diagram"
}
# Handle command line arguments
case "${1:-}" in
"scan-only")
check_requirements
run_network_scan
;;
"map-only")
check_requirements
find_pfsense_files
run_comprehensive_mapping
;;
"report-only")
generate_report
;;
"help"|"-h"|"--help")
echo "Usage: $0 [command]"
echo ""
echo "Commands:"
echo " (no command) - Run full workflow"
echo " scan-only - Run only network scan"
echo " map-only - Run only comprehensive mapping"
echo " report-only - Generate only summary report"
echo " help - Show this help"
;;
*)
main
;;
esac

487
src/integrated_scanner.py Executable file
View File

@@ -0,0 +1,487 @@
#!/usr/bin/env python3
"""
Complete Network Scanner with pfSense Integration
Combines network scanning with pfSense-specific features
"""
import json
import logging
import argparse
import subprocess
import re
from datetime import datetime
from typing import Dict
from network_scanner import NetworkScanner, NetworkSegment
from pfsense_scanner import PfSenseScanner
from dataclasses import asdict
from network_scanner import Device
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class IntegratedNetworkScanner:
"""Enhanced network scanner with pfSense integration"""
def __init__(self, config: dict):
self.config = config
self.base_scanner = NetworkScanner(config)
self.pfsense_devices = []
def scan_all(self):
"""Perform complete network scan including pfSense devices"""
logger.info("Starting integrated network scan...")
# Run base network scan
self.base_scanner.scan_all()
# Check for pfSense XML files and integrate them
self._integrate_pfsense_xml()
# Scan pfSense LAN networks
self._scan_pfsense_lan_networks()
# Identify and enhance pfSense devices
self._scan_pfsense_devices()
logger.info("Integrated scan complete")
def _scan_pfsense_lan_networks(self):
"""Scan LAN networks served by pfSense devices"""
logger.info("Scanning pfSense LAN networks...")
# Get pfSense configurations from the integrator
try:
from pfsense_integrator import PfSenseIntegrator
import glob
xml_files = glob.glob("*.xml")
if xml_files:
integrator = PfSenseIntegrator(xml_files)
integrator.load_pfsense_configs()
for pfsense_name, pfsense_config in integrator.pfsense_configs.items():
logger.info(f"Checking LAN networks for pfSense: {pfsense_name}")
interfaces = pfsense_config.get('interfaces', {})
for iface_name, iface_config in interfaces.items():
# Skip WAN interfaces and VPN interfaces
if iface_name.lower() in ['wan', 'wireguard', 'openvpn', 'ipsec']:
continue
# Get the network for this interface
ipaddr = iface_config.get('ipaddr')
subnet = iface_config.get('subnet')
if ipaddr and subnet:
try:
# Calculate the network from IP and subnet
import ipaddress
ip = ipaddress.IPv4Address(ipaddr)
net = ipaddress.IPv4Network(f"{ip}/{subnet}", strict=False)
network_cidr = str(net)
logger.info(f"Scanning pfSense LAN network: {network_cidr} (interface: {iface_name})")
# Check if this network is already scanned
existing_networks = [seg.cidr for seg in self.base_scanner.segments]
if network_cidr not in existing_networks:
# Scan this network
lan_segment = self.base_scanner.scan_network(
network_cidr,
f"{pfsense_name}_{iface_name.upper()}",
is_vpn=False
)
self.base_scanner.segments.append(lan_segment)
logger.info(f"Added LAN network {network_cidr} with {len(lan_segment.devices)} devices")
else:
logger.info(f"Network {network_cidr} already scanned")
except Exception as e:
logger.error(f"Error scanning pfSense LAN network {ipaddr}/{subnet}: {e}")
except Exception as e:
logger.error(f"Error scanning pfSense LAN networks: {e}")
def _integrate_pfsense_xml(self):
"""Automatically integrate pfSense XML files if present"""
import glob
from pfsense_integrator import PfSenseIntegrator
# Look for XML files in current directory
xml_files = glob.glob("*.xml")
if not xml_files:
logger.info("No pfSense XML files found, skipping XML integration")
return
logger.info(f"Found {len(xml_files)} pfSense XML files, integrating...")
try:
integrator = PfSenseIntegrator(xml_files)
integrator.load_pfsense_configs()
# Create temporary scan file for integration
import tempfile
import os
temp_scan = tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False)
try:
# Export current scan data
temp_data = {
'scan_timestamp': None,
'segments': [
{
'name': seg.name,
'cidr': seg.cidr,
'gateway': seg.gateway,
'is_vpn': seg.is_vpn,
'devices': [asdict(dev) for dev in seg.devices]
}
for seg in self.base_scanner.segments
]
}
json.dump(temp_data, temp_scan)
temp_scan.close()
# Integrate pfSense data
integrator.integrate_with_scan(temp_scan.name, temp_scan.name + '_enhanced')
# Load enhanced data back
with open(temp_scan.name + '_enhanced', 'r') as f:
enhanced_data = json.load(f)
# Update segments
vpn_interfaces = self._check_vpn_interfaces()
self.base_scanner.segments = []
for seg_data in enhanced_data.get('segments', []):
segment = NetworkSegment(
name=seg_data['name'],
cidr=seg_data['cidr'],
gateway=seg_data['gateway'],
is_vpn=seg_data['is_vpn'],
devices=[]
)
# Only scan VPN networks if VPN interfaces are active
if seg_data['is_vpn']:
if vpn_interfaces:
logger.info(f"Scanning VPN network {seg_data['cidr']} (VPN interfaces active)")
# Scan the VPN network for devices
vpn_segment = self.base_scanner.scan_network(
seg_data['cidr'],
seg_data['name'],
is_vpn=True
)
segment.devices = vpn_segment.devices
else:
logger.info(f"VPN network {seg_data['cidr']} (no active VPN interfaces)")
# Try to scan anyway if network might be reachable
try:
logger.info(f"Attempting to scan VPN network {seg_data['cidr']} anyway...")
vpn_segment = self.base_scanner.scan_network(
seg_data['cidr'],
seg_data['name'],
is_vpn=True
)
segment.devices = vpn_segment.devices
logger.info(f"Successfully scanned VPN network {seg_data['cidr']} with {len(vpn_segment.devices)} devices")
except Exception as e:
logger.warning(f"Could not scan VPN network {seg_data['cidr']}: {e}")
# Keep any devices that were already in the segment data
for dev_data in seg_data['devices']:
device_kwargs = {k: v for k, v in dev_data.items()
if k in ['ip', 'hostname', 'mac', 'manufacturer', 'os_type',
'os_version', 'device_type', 'open_ports', 'ssh_accessible',
'services', 'routes', 'interfaces']}
device = Device(**device_kwargs)
segment.devices.append(device)
else:
# For non-VPN networks, use the devices from the segment data
for dev_data in seg_data['devices']:
device_kwargs = {k: v for k, v in dev_data.items()
if k in ['ip', 'hostname', 'mac', 'manufacturer', 'os_type',
'os_version', 'device_type', 'open_ports', 'ssh_accessible',
'services', 'routes', 'interfaces']}
device = Device(**device_kwargs)
segment.devices.append(device)
self.base_scanner.segments.append(segment)
logger.info(f"Integrated {len(integrator.pfsense_configs)} pfSense configurations")
finally:
# Clean up temp files
try:
os.unlink(temp_scan.name)
os.unlink(temp_scan.name + '_enhanced')
except:
pass
except Exception as e:
logger.error(f"Error integrating pfSense XML: {e}")
def _scan_pfsense_devices(self):
"""Find and deeply scan pfSense devices"""
logger.info("Looking for pfSense devices...")
# Check configured special devices
special_devices = self.config.get('special_devices', {})
for segment in self.base_scanner.segments:
for device in segment.devices:
is_pfsense = False
# Check if device is marked as pfSense in config
if device.ip in special_devices:
if special_devices[device.ip].get('type') == 'firewall' or \
special_devices[device.ip].get('os') == 'pfSense':
is_pfsense = True
# Check if hostname contains pfsense
if device.hostname and 'pfsense' in device.hostname.lower():
is_pfsense = True
# Check if device looks like a pfSense (has many routes and ports 80/443)
if device.routes and len(device.routes) > 3 and \
80 in device.open_ports and 443 in device.open_ports:
is_pfsense = True
if is_pfsense and device.ssh_accessible:
logger.info(f"Enhanced scanning pfSense device: {device.ip}")
self._enhance_pfsense_device(device)
def _enhance_pfsense_device(self, device):
"""Enhance device info with pfSense-specific data"""
try:
scanner = PfSenseScanner(
device.ip,
self.config.get('ssh_user', 'root'),
self.config.get('ssh_key_path')
)
pfsense_info = scanner.get_full_info()
# Merge pfSense-specific info into device
device.device_type = 'firewall'
device.os_type = 'pfSense (FreeBSD)'
# Store additional pfSense data
if not hasattr(device, 'pfsense_info'):
device.__dict__['pfsense_info'] = pfsense_info
# Add DHCP leases to known devices
dhcp_leases = pfsense_info.get('dhcp_leases', [])
if dhcp_leases:
logger.info(f"Found {len(dhcp_leases)} DHCP leases on {device.ip}")
# Add VPN info
vpn_info = pfsense_info.get('vpn', {})
wireguard = vpn_info.get('wireguard', [])
if wireguard:
logger.info(f"Found {len(wireguard)} WireGuard tunnels on {device.ip}")
self.pfsense_devices.append(device)
except Exception as e:
logger.error(f"Error enhancing pfSense device {device.ip}: {e}")
def export_json(self, filename: str):
"""Export enhanced results to JSON"""
data = {
'scan_timestamp': datetime.now().isoformat(),
'segments': []
}
for segment in self.base_scanner.segments:
segment_data = {
'name': segment.name,
'cidr': segment.cidr,
'gateway': segment.gateway,
'is_vpn': segment.is_vpn,
'devices': []
}
for device in segment.devices:
device_data = {
'ip': device.ip,
'hostname': device.hostname,
'mac': device.mac,
'manufacturer': device.manufacturer,
'os_type': device.os_type,
'os_version': device.os_version,
'device_type': device.device_type,
'open_ports': device.open_ports,
'ssh_accessible': device.ssh_accessible,
'services': device.services,
'routes': device.routes,
'interfaces': device.interfaces
}
# Add pfSense-specific info if available
if hasattr(device, 'pfsense_info'):
device_data['pfsense_info'] = device.__dict__['pfsense_info']
segment_data['devices'].append(device_data)
data['segments'].append(segment_data)
with open(filename, 'w') as f:
json.dump(data, f, indent=2)
logger.info(f"Exported results to {filename}")
def print_summary(self):
"""Print enhanced summary"""
print("\n" + "="*80)
print("INTEGRATED NETWORK SCAN SUMMARY")
print("="*80)
total_devices = sum(len(seg.devices) for seg in self.base_scanner.segments)
print(f"\nTotal Segments: {len(self.base_scanner.segments)}")
print(f"Total Devices: {total_devices}")
print(f"pfSense Devices: {len(self.pfsense_devices)}")
for segment in self.base_scanner.segments:
print(f"\n{'='*80}")
print(f"📡 Network: {segment.name} ({segment.cidr})")
if segment.is_vpn:
print(" 🔐 VPN Network")
print(f" Devices: {len(segment.devices)}")
for device in segment.devices:
icon = self._get_device_icon(device.device_type)
print(f"\n {icon} {device.ip}")
if device.hostname:
print(f" Hostname: {device.hostname}")
if device.mac:
print(f" MAC: {device.mac}")
if device.device_type:
print(f" Type: {device.device_type}")
if device.os_type:
print(f" OS: {device.os_type} {device.os_version or ''}")
if device.open_ports:
print(f" Ports: {', '.join(map(str, device.open_ports))}")
if device.ssh_accessible:
print(f" ✓ SSH Accessible")
# Show pfSense-specific info
if hasattr(device, 'pfsense_info'):
pfsense = device.__dict__['pfsense_info']
print(f" 🛡️ pfSense Firewall:")
print(f" Interfaces: {len(pfsense.get('interfaces', []))}")
print(f" Routes: {len(pfsense.get('routes', []))}")
print(f" DHCP Leases: {len(pfsense.get('dhcp_leases', []))}")
vpn = pfsense.get('vpn', {})
wg_count = len(vpn.get('wireguard', []))
ovpn_count = len(vpn.get('openvpn', []))
if wg_count:
print(f" WireGuard Tunnels: {wg_count}")
if ovpn_count:
print(f" OpenVPN Connections: {ovpn_count}")
if device.routes and len(device.routes) > 0:
print(f" 📋 Routing Table ({len(device.routes)} routes):")
for route in device.routes[:3]: # Show first 3
dest = route.get('destination', 'unknown')
gw = route.get('gateway', 'direct')
print(f"{dest} via {gw}")
if len(device.routes) > 3:
print(f" ... and {len(device.routes) - 3} more")
print("\n" + "="*80)
def _get_device_icon(self, device_type):
"""Get emoji icon for device type"""
icons = {
'router': '🔀',
'firewall': '🛡️',
'switch': '🔌',
'server': '🖥️',
'linux_server': '🐧',
'windows_client': '💻',
'client': '💻'
}
return icons.get(device_type, '')
def _check_vpn_interfaces(self) -> dict:
"""Check which VPN interfaces are active"""
vpn_interfaces = {}
try:
# Check for WireGuard interfaces
result = subprocess.run(
['ip', 'link', 'show'],
capture_output=True,
text=True,
timeout=5
)
for line in result.stdout.splitlines():
if 'wg' in line.lower() or 'tun' in line.lower() or 'tap' in line.lower():
# Extract interface name (format: 123: wg0: <POINTOPOINT,NOARP,UP,LOWER_UP> ...)
match = re.search(r'\d+:\s+(\w+):', line)
if match:
iface = match.group(1)
vpn_interfaces[iface] = True
logger.info(f"Found active VPN interface: {iface}")
except Exception as e:
logger.warning(f"Error checking VPN interfaces: {e}")
return vpn_interfaces
def main():
parser = argparse.ArgumentParser(
description='Integrated Network Scanner with pfSense Support'
)
parser.add_argument('-c', '--config', default='config.json',
help='Configuration file (default: config.json)')
parser.add_argument('-o', '--output', default='network_scan.json',
help='Output JSON file (default: network_scan.json)')
parser.add_argument('-v', '--verbose', action='store_true',
help='Verbose output')
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
# Load configuration
try:
with open(args.config, 'r') as f:
config = json.load(f)
logger.info(f"Loaded configuration from {args.config}")
except FileNotFoundError:
logger.warning(f"Config file {args.config} not found, using defaults")
config = {}
# Run integrated scanner
scanner = IntegratedNetworkScanner(config)
scanner.scan_all()
# Print summary
scanner.print_summary()
# Export results
scanner.export_json(args.output)
# Save failed SSH hosts if any
if scanner.base_scanner.failed_ssh_hosts:
failed_ssh_file = args.output.replace('.json', '_failed_ssh.json')
scanner.base_scanner.save_failed_ssh_hosts(failed_ssh_file)
print(f"\n⚠️ {len(scanner.base_scanner.failed_ssh_hosts)} hosts have SSH port open but failed authentication.")
print(f" See {failed_ssh_file} for details.")
print(f"\n✓ Scan complete! Results saved to {args.output}")
if __name__ == '__main__':
main()

View File

@@ -11,10 +11,12 @@ import json
import re import re
import socket import socket
import argparse import argparse
import multiprocessing
from typing import Dict, List, Optional, Tuple from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, asdict from dataclasses import dataclass, asdict, field
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
import logging import logging
from datetime import datetime
# Configure logging # Configure logging
logging.basicConfig( logging.basicConfig(
@@ -34,21 +36,12 @@ class Device:
os_type: Optional[str] = None os_type: Optional[str] = None
os_version: Optional[str] = None os_version: Optional[str] = None
device_type: Optional[str] = None # router, switch, server, client, etc. device_type: Optional[str] = None # router, switch, server, client, etc.
open_ports: List[int] = None open_ports: List[int] = field(default_factory=list)
ssh_accessible: bool = False ssh_accessible: bool = False
services: List[str] = None services: List[str] = field(default_factory=list)
routes: List[Dict] = None routes: List[Dict] = field(default_factory=list)
interfaces: List[Dict] = None interfaces: List[Dict] = field(default_factory=list)
ssh_info: Optional[Dict] = None
def __post_init__(self):
if self.open_ports is None:
self.open_ports = []
if self.services is None:
self.services = []
if self.routes is None:
self.routes = []
if self.interfaces is None:
self.interfaces = []
@dataclass @dataclass
@@ -59,11 +52,7 @@ class NetworkSegment:
gateway: Optional[str] = None gateway: Optional[str] = None
vlan: Optional[int] = None vlan: Optional[int] = None
is_vpn: bool = False is_vpn: bool = False
devices: List[Device] = None devices: List[Device] = field(default_factory=list)
def __post_init__(self):
if self.devices is None:
self.devices = []
class NetworkScanner: class NetworkScanner:
@@ -75,6 +64,7 @@ class NetworkScanner:
self.ssh_user = config.get('ssh_user', 'root') self.ssh_user = config.get('ssh_user', 'root')
self.ssh_key = config.get('ssh_key_path') self.ssh_key = config.get('ssh_key_path')
self.timeout = config.get('timeout', 2) self.timeout = config.get('timeout', 2)
self.failed_ssh_hosts: List[Dict] = []
def discover_networks(self) -> List[str]: def discover_networks(self) -> List[str]:
"""Discover all network segments from local routing table""" """Discover all network segments from local routing table"""
@@ -114,6 +104,33 @@ class NetworkScanner:
return networks return networks
def _check_vpn_interfaces(self) -> Dict[str, bool]:
"""Check which VPN interfaces are active"""
vpn_interfaces = {}
try:
# Check for WireGuard interfaces
result = subprocess.run(
['ip', 'link', 'show'],
capture_output=True,
text=True,
timeout=5
)
for line in result.stdout.splitlines():
if 'wg' in line.lower() or 'tun' in line.lower() or 'tap' in line.lower():
# Extract interface name (format: 123: wg0: <POINTOPOINT,NOARP,UP,LOWER_UP> ...)
match = re.search(r'\d+:\s+(\w+):', line)
if match:
iface = match.group(1)
vpn_interfaces[iface] = True
logger.info(f"Found active VPN interface: {iface}")
except Exception as e:
logger.warning(f"Error checking VPN interfaces: {e}")
return vpn_interfaces
def ping_sweep(self, network: str) -> List[str]: def ping_sweep(self, network: str) -> List[str]:
"""Perform ping sweep to find live hosts""" """Perform ping sweep to find live hosts"""
logger.info(f"Performing ping sweep on {network}") logger.info(f"Performing ping sweep on {network}")
@@ -128,7 +145,10 @@ class NetworkScanner:
logger.warning(f"Large network {network}, limiting scan") logger.warning(f"Large network {network}, limiting scan")
hosts = hosts[:254] hosts = hosts[:254]
with ThreadPoolExecutor(max_workers=50) as executor: max_ping_workers = min(len(hosts), multiprocessing.cpu_count() * 8)
logger.info(f"Pinging {len(hosts)} hosts using {max_ping_workers} concurrent workers")
with ThreadPoolExecutor(max_workers=max_ping_workers) as executor:
future_to_ip = { future_to_ip = {
executor.submit(self._ping_host, str(ip)): str(ip) executor.submit(self._ping_host, str(ip)): str(ip)
for ip in hosts for ip in hosts
@@ -181,6 +201,13 @@ class NetworkScanner:
if device.ssh_accessible: if device.ssh_accessible:
# Gather detailed info via SSH # Gather detailed info via SSH
self._gather_ssh_info(device) self._gather_ssh_info(device)
else:
# Track hosts with SSH port open but failed SSH connection
self.failed_ssh_hosts.append({
'ip': ip,
'hostname': device.hostname,
'reason': 'SSH port open but authentication failed or connection refused'
})
# Identify device type based on ports and services # Identify device type based on ports and services
device.device_type = self._identify_device_type(device) device.device_type = self._identify_device_type(device)
@@ -222,10 +249,22 @@ class NetworkScanner:
common_ports = [22, 80, 443, 8080, 8443, 3389, 445, 139, 21, 23, 25, 53, 3306, 5432] common_ports = [22, 80, 443, 8080, 8443, 3389, 445, 139, 21, 23, 25, 53, 3306, 5432]
open_ports = [] open_ports = []
for port in common_ports: max_port_workers = min(len(common_ports), multiprocessing.cpu_count() * 2)
if self._check_port(ip, port):
open_ports.append(port) with ThreadPoolExecutor(max_workers=max_port_workers) as executor:
logger.debug(f"{ip}:{port} - OPEN") future_to_port = {
executor.submit(self._check_port, ip, port): port
for port in common_ports
}
for future in as_completed(future_to_port):
port = future_to_port[future]
try:
if future.result():
open_ports.append(port)
logger.debug(f"{ip}:{port} - OPEN")
except Exception as e:
logger.debug(f"Error checking {ip}:{port}: {e}")
return open_ports return open_ports
@@ -261,17 +300,41 @@ class NetworkScanner:
"""Gather detailed information via SSH""" """Gather detailed information via SSH"""
logger.info(f"Gathering SSH info from {device.ip}") logger.info(f"Gathering SSH info from {device.ip}")
ssh_info = {}
# Get OS info # Get OS info
device.os_type, device.os_version = self._get_os_info(device.ip) os_type, os_version = self._get_os_info(device.ip)
if os_type:
ssh_info['os_type'] = os_type
device.os_type = os_type
if os_version:
ssh_info['os_version'] = os_version
device.os_version = os_version
# Get network interfaces # Get network interfaces
device.interfaces = self._get_interfaces(device.ip) interfaces = self._get_interfaces(device.ip)
if interfaces:
ssh_info['interfaces'] = interfaces
device.interfaces = interfaces
# Get routing table # Get routing table
device.routes = self._get_routes(device.ip) routes = self._get_routes(device.ip)
if routes:
ssh_info['routes'] = routes
device.routes = routes
# Get running services # Get running services
device.services = self._get_services(device.ip) services = self._get_services(device.ip)
if services:
ssh_info['services'] = services
device.services = services
# Get system info
system_info = self._get_system_info(device.ip)
if system_info:
ssh_info['system'] = system_info
device.ssh_info = ssh_info
def _ssh_exec(self, ip: str, command: str) -> Optional[str]: def _ssh_exec(self, ip: str, command: str) -> Optional[str]:
"""Execute command via SSH""" """Execute command via SSH"""
@@ -380,6 +443,40 @@ class NetworkScanner:
return services[:20] # Limit to top 20 return services[:20] # Limit to top 20
def _get_system_info(self, ip: str) -> Optional[Dict]:
"""Get basic system information"""
system_info = {}
# Get hostname
hostname = self._ssh_exec(ip, 'hostname')
if hostname:
system_info['hostname'] = hostname.strip()
# Get uptime
uptime = self._ssh_exec(ip, 'uptime -p 2>/dev/null || uptime')
if uptime:
system_info['uptime'] = uptime.strip()
# Get CPU info
cpu_info = self._ssh_exec(ip, 'nproc && cat /proc/cpuinfo | grep "model name" | head -1')
if cpu_info:
lines = cpu_info.strip().split('\n')
if len(lines) >= 2:
system_info['cpu_cores'] = int(lines[0])
system_info['cpu_model'] = lines[1].split(':')[1].strip()
# Get memory info
mem_info = self._ssh_exec(ip, 'free -h | grep Mem')
if mem_info:
system_info['memory'] = mem_info.strip()
# Get disk info
disk_info = self._ssh_exec(ip, 'df -h / | tail -1')
if disk_info:
system_info['disk'] = disk_info.strip()
return system_info if system_info else None
def _identify_device_type(self, device: Device) -> str: def _identify_device_type(self, device: Device) -> str:
"""Identify device type based on available info""" """Identify device type based on available info"""
if device.routes and len(device.routes) > 5: if device.routes and len(device.routes) > 5:
@@ -395,7 +492,7 @@ class NetworkScanner:
else: else:
return 'client' return 'client'
def scan_network(self, network: str, name: str = None, is_vpn: bool = False) -> NetworkSegment: def scan_network(self, network: str, name: Optional[str] = None, is_vpn: bool = False) -> NetworkSegment:
"""Scan a complete network segment""" """Scan a complete network segment"""
logger.info(f"Scanning network segment: {network}") logger.info(f"Scanning network segment: {network}")
@@ -409,7 +506,10 @@ class NetworkScanner:
live_hosts = self.ping_sweep(network) live_hosts = self.ping_sweep(network)
# Gather device info # Gather device info
with ThreadPoolExecutor(max_workers=10) as executor: max_device_workers = min(len(live_hosts), multiprocessing.cpu_count() * 2)
logger.info(f"Gathering device info for {len(live_hosts)} hosts using {max_device_workers} concurrent workers")
with ThreadPoolExecutor(max_workers=max_device_workers) as executor:
future_to_ip = { future_to_ip = {
executor.submit(self.get_device_info, ip): ip executor.submit(self.get_device_info, ip): ip
for ip in live_hosts for ip in live_hosts
@@ -431,13 +531,24 @@ class NetworkScanner:
# Discover networks # Discover networks
networks = self.discover_networks() networks = self.discover_networks()
# Scan each network # Scan networks concurrently
for network in networks: max_concurrent_networks = min(len(networks), multiprocessing.cpu_count())
try: logger.info(f"Scanning {len(networks)} networks using {max_concurrent_networks} concurrent processes")
segment = self.scan_network(network)
self.segments.append(segment) with ThreadPoolExecutor(max_workers=max_concurrent_networks) as executor:
except Exception as e: future_to_network = {
logger.error(f"Error scanning {network}: {e}") executor.submit(self.scan_network, network): network
for network in networks
}
for future in as_completed(future_to_network):
network = future_to_network[future]
try:
segment = future.result()
self.segments.append(segment)
logger.info(f"Completed scanning network: {network}")
except Exception as e:
logger.error(f"Error scanning {network}: {e}")
logger.info(f"Scan complete. Found {len(self.segments)} segments") logger.info(f"Scan complete. Found {len(self.segments)} segments")
@@ -462,6 +573,17 @@ class NetworkScanner:
logger.info(f"Exported results to {filename}") logger.info(f"Exported results to {filename}")
def save_failed_ssh_hosts(self, filename: str):
"""Save list of hosts with SSH port open but failed authentication"""
if self.failed_ssh_hosts:
with open(filename, 'w') as f:
json.dump({
'failed_ssh_hosts': self.failed_ssh_hosts,
'total_failed': len(self.failed_ssh_hosts),
'scan_timestamp': str(datetime.now())
}, f, indent=2)
logger.info(f"Saved {len(self.failed_ssh_hosts)} failed SSH hosts to {filename}")
def print_summary(self): def print_summary(self):
"""Print a human-readable summary""" """Print a human-readable summary"""
print("\n" + "="*80) print("\n" + "="*80)
@@ -524,6 +646,7 @@ def main():
# Export results # Export results
from datetime import datetime from datetime import datetime
scanner.export_json(args.output) scanner.export_json(args.output)
scanner.save_failed_ssh_hosts('failed_ssh_hosts.json')
print(f"\n✓ Scan complete! Results saved to {args.output}") print(f"\n✓ Scan complete! Results saved to {args.output}")

416
src/pfsense_integrator.py Executable file
View File

@@ -0,0 +1,416 @@
#!/usr/bin/env python3
"""
pfSense XML Integration Script
Automatically processes pfSense XML backup files and integrates them into network scan results
"""
import json
import argparse
from pathlib import Path
import logging
from typing import Dict, List, Any
from pfsense_xml_parser import PfSenseXMLParser
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class PfSenseIntegrator:
"""Integrates pfSense XML data into network scan results"""
def __init__(self, xml_files: List[str]):
self.xml_files = xml_files
self.pfsense_configs = {}
def load_pfsense_configs(self):
"""Load and parse all pfSense XML files"""
logger.info(f"Loading {len(self.xml_files)} pfSense configuration files...")
for xml_file in self.xml_files:
try:
parser = PfSenseXMLParser(xml_file)
if parser.load_xml():
config = parser.parse_all()
hostname = config.get('system', {}).get('hostname', 'unknown')
self.pfsense_configs[hostname] = config
logger.info(f"Loaded pfSense config: {hostname}")
else:
logger.error(f"Failed to load: {xml_file}")
except Exception as e:
logger.error(f"Error parsing {xml_file}: {e}")
def integrate_with_scan(self, scan_file: str, output_file: str):
"""Integrate pfSense data into network scan results"""
logger.info(f"Integrating pfSense data into {scan_file}")
# Load network scan
with open(scan_file, 'r') as f:
scan_data = json.load(f)
# Add pfSense configurations
scan_data['pfsense_configs'] = self.pfsense_configs
# Enhance network segments with pfSense data
self._enhance_segments_with_pfsense(scan_data)
# Save enhanced scan
with open(output_file, 'w') as f:
json.dump(scan_data, f, indent=2)
logger.info(f"Enhanced scan saved to {output_file}")
def _enhance_segments_with_pfsense(self, scan_data: Dict):
"""Enhance network segments with pfSense-specific information"""
segments = scan_data.get('segments', [])
for pfsense_name, pfsense_config in self.pfsense_configs.items():
logger.info(f"Processing pfSense: {pfsense_name}")
# Find or create segment for this pfSense
pfsense_segment = self._find_or_create_pfsense_segment(segments, pfsense_config)
# Add pfSense device if not already present
pfsense_device = self._add_pfsense_device(pfsense_segment, pfsense_config)
# Add networks discovered from pfSense config
self._add_pfsense_networks(segments, pfsense_config)
def _find_or_create_pfsense_segment(self, segments: List[Dict], pfsense_config: Dict) -> Dict:
"""Find existing segment or create new one for pfSense"""
interfaces = pfsense_config.get('interfaces', {})
# Look for LAN interface
lan_interface = interfaces.get('lan')
if lan_interface and lan_interface.get('ipaddr') and lan_interface.get('subnet'):
# Calculate the proper network address
import ipaddress
ip = ipaddress.IPv4Address(lan_interface['ipaddr'])
subnet_bits = int(lan_interface['subnet'])
network = ipaddress.IPv4Network(f"{ip}/{subnet_bits}", strict=False)
lan_network = str(network)
# Find existing segment
for segment in segments:
if segment.get('cidr') == lan_network:
return segment
# Create new segment
new_segment = {
'name': f"{pfsense_config['system']['hostname']}_LAN",
'cidr': lan_network,
'gateway': lan_interface.get('ipaddr'),
'is_vpn': False,
'devices': []
}
segments.append(new_segment)
return new_segment
# Fallback: create segment with hostname
hostname = pfsense_config['system']['hostname']
for segment in segments:
if segment.get('name', '').startswith(hostname):
return segment
new_segment = {
'name': f"{hostname}_networks",
'cidr': 'unknown',
'gateway': None,
'is_vpn': False,
'devices': []
}
segments.append(new_segment)
return new_segment
def _add_pfsense_device(self, segment: Dict, pfsense_config: Dict) -> Dict:
"""Add pfSense device to segment"""
hostname = pfsense_config['system']['hostname']
domain = pfsense_config['system'].get('domain', '')
# Find LAN interface for IP
interfaces = pfsense_config.get('interfaces', {})
lan_interface = interfaces.get('lan')
pfsense_ip = lan_interface.get('ipaddr') if lan_interface else 'unknown'
# Create pfSense device
device = {
'ip': pfsense_ip,
'hostname': f"{hostname}.{domain}" if domain else hostname,
'mac': None,
'manufacturer': None,
'os_type': 'pfSense (FreeBSD)',
'os_version': pfsense_config.get('version', 'unknown'),
'device_type': 'firewall',
'open_ports': [22, 80, 443], # Common pfSense ports
'ssh_accessible': False, # Assume not accessible for security
'services': ['pfSense Firewall', 'Web GUI', 'SSH'],
'routes': self._extract_routes(pfsense_config),
'interfaces': self._extract_interfaces(pfsense_config),
'pfsense_config': pfsense_config
}
# Check if device already exists
for existing_device in segment['devices']:
if existing_device.get('ip') == pfsense_ip:
# Update existing device with pfSense info
existing_device.update(device)
return existing_device
# Add new device
segment['devices'].append(device)
return device
def _extract_routes(self, pfsense_config: Dict) -> List[Dict]:
"""Extract routing information from pfSense config"""
routes = []
# Static routes
static_routes = pfsense_config.get('static_routes', [])
for route in static_routes:
routes.append({
'destination': route.get('network', ''),
'gateway': route.get('gateway', ''),
'interface': 'static',
'description': route.get('description', ''),
'type': 'static'
})
# Default route from gateways
gateways = pfsense_config.get('gateways', {})
for gw_name, gw_config in gateways.items():
if gw_config.get('defaultgw'):
routes.append({
'destination': 'default',
'gateway': gw_config.get('gateway', ''),
'interface': gw_config.get('interface', ''),
'description': f"Default gateway: {gw_name}",
'type': 'default'
})
return routes
def _extract_interfaces(self, pfsense_config: Dict) -> List[Dict]:
"""Extract interface information"""
interfaces = []
for iface_name, iface_config in pfsense_config.get('interfaces', {}).items():
interface = {
'name': iface_name,
'description': iface_config.get('description', ''),
'physical_interface': iface_config.get('interface', ''),
'ip_address': iface_config.get('ipaddr', ''),
'subnet': iface_config.get('subnet', ''),
'network_cidr': iface_config.get('network_cidr', ''),
'gateway': iface_config.get('gateway', ''),
'mtu': iface_config.get('mtu', ''),
'type': iface_config.get('type', 'physical')
}
interfaces.append(interface)
return interfaces
def _add_pfsense_networks(self, segments: List[Dict], pfsense_config: Dict):
"""Add networks discovered from pfSense configuration"""
interfaces = pfsense_config.get('interfaces', {})
for iface_name, iface_config in interfaces.items():
if iface_name == 'wan':
continue # Skip WAN for now
network_cidr = iface_config.get('network_cidr')
if network_cidr and network_cidr != 'unknown':
# Filter out invalid networks
if self._is_invalid_network(network_cidr):
logger.warning(f"Skipping invalid interface network: {network_cidr}")
continue
# Check if segment already exists
segment_exists = any(seg.get('cidr') == network_cidr for seg in segments)
if not segment_exists:
segment_name = f"{pfsense_config['system']['hostname']}_{iface_name.upper()}"
new_segment = {
'name': segment_name,
'cidr': network_cidr,
'gateway': iface_config.get('ipaddr'),
'is_vpn': iface_name.startswith('opt') and 'wireguard' in iface_config.get('description', '').lower(),
'devices': []
}
segments.append(new_segment)
logger.info(f"Added network segment: {network_cidr}")
# Add WireGuard networks
wireguard = pfsense_config.get('wireguard', {})
for peer in wireguard.get('peers', []):
for allowed_ip in peer.get('allowed_ips', []):
network = f"{allowed_ip['address']}/{allowed_ip['mask']}"
# Filter out invalid networks
if self._is_invalid_network(network):
logger.warning(f"Skipping invalid WireGuard network: {network}")
continue
segment_exists = any(seg.get('cidr') == network for seg in segments)
if not segment_exists:
new_segment = {
'name': f"WireGuard_{peer.get('description', 'unknown').replace(' ', '_')}",
'cidr': network,
'gateway': None,
'is_vpn': True,
'devices': []
}
segments.append(new_segment)
logger.info(f"Added WireGuard network: {network}")
def _is_invalid_network(self, network: str) -> bool:
"""Check if a network should not be scanned"""
try:
import ipaddress
net = ipaddress.ip_network(network, strict=False)
# Skip networks that are too large or invalid
if net.prefixlen == 0: # 0.0.0.0/0 - route all traffic
return True
if net.prefixlen < 8: # Very large networks
return True
if net.network_address.is_private and net.prefixlen < 16: # Large private networks
return True
if str(net.network_address) == '0.0.0.0': # Invalid network
return True
return False
except ValueError:
return True # Invalid network format
def generate_network_summary(self, output_file: str):
"""Generate a human-readable network summary"""
summary = []
summary.append("# Network Topology Summary")
summary.append("Generated from pfSense XML configurations\n")
for pfsense_name, config in self.pfsense_configs.items():
summary.append(f"## pfSense Firewall: {pfsense_name}")
summary.append(f"**Version:** {config.get('version', 'unknown')}")
summary.append(f"**Domain:** {config.get('system', {}).get('domain', 'unknown')}\n")
# Interfaces
interfaces = config.get('interfaces', {})
if interfaces:
summary.append("### Network Interfaces")
for iface_name, iface in interfaces.items():
ip = iface.get('ipaddr', 'DHCP')
subnet = iface.get('subnet', '')
network = iface.get('network_cidr', '')
desc = iface.get('description', iface_name.upper())
summary.append(f"- **{desc}** ({iface_name}): {ip}")
if network:
summary.append(f" - Network: {network}")
if iface.get('gateway'):
summary.append(f" - Gateway: {iface.get('gateway')}")
summary.append("")
# Static Routes
routes = config.get('static_routes', [])
if routes:
summary.append("### Static Routes")
for route in routes:
network = route.get('network', '')
gateway = route.get('gateway', '')
desc = route.get('description', '')
summary.append(f"- {network} via {gateway}")
if desc:
summary.append(f" *{desc}*")
summary.append("")
# WireGuard
wg = config.get('wireguard', {})
if wg.get('enabled') and wg.get('tunnels'):
summary.append("### WireGuard VPN")
for tunnel in wg.get('tunnels', []):
name = tunnel.get('name', 'unknown')
port = tunnel.get('listenport', 'unknown')
desc = tunnel.get('description', '')
summary.append(f"- **Tunnel {name}** (Port {port})")
if desc:
summary.append(f" *{desc}*")
for peer in wg.get('peers', []):
desc = peer.get('description', 'unknown')
allowed_ips = peer.get('allowed_ips', [])
if allowed_ips:
networks = [f"{ip['address']}/{ip['mask']}" for ip in allowed_ips]
summary.append(f" - Peer: {desc} - Networks: {', '.join(networks)}")
summary.append("")
# DHCP
dhcp = config.get('dhcp', {})
if dhcp:
summary.append("### DHCP Configuration")
for iface_name, dhcp_config in dhcp.items():
if dhcp_config.get('enabled', True):
range_from = dhcp_config.get('range_from', '')
range_to = dhcp_config.get('range_to', '')
if range_from and range_to:
summary.append(f"- **{iface_name.upper()}**: {range_from} - {range_to}")
static_maps = dhcp_config.get('static_maps', [])
if static_maps:
summary.append(" **Static Mappings:**")
for static in static_maps:
ip = static.get('ipaddr', '')
mac = static.get('mac', '')
hostname = static.get('hostname', '')
summary.append(f" - {ip} ({mac}) - {hostname}")
summary.append("")
# Write summary
with open(output_file, 'w') as f:
f.write('\n'.join(summary))
logger.info(f"Network summary saved to {output_file}")
def main():
"""Command line interface"""
parser = argparse.ArgumentParser(description='Integrate pfSense XML configs with network scan')
parser.add_argument('xml_files', nargs='+', help='pfSense XML configuration files')
parser.add_argument('-s', '--scan', help='Network scan JSON file to enhance')
parser.add_argument('-o', '--output', help='Output enhanced scan file')
parser.add_argument('--summary', help='Generate network summary markdown file')
parser.add_argument('-v', '--verbose', action='store_true', help='Verbose output')
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
# Initialize integrator
integrator = PfSenseIntegrator(args.xml_files)
integrator.load_pfsense_configs()
if not integrator.pfsense_configs:
logger.error("No pfSense configurations loaded!")
return 1
# Generate summary if requested
if args.summary:
integrator.generate_network_summary(args.summary)
print(f"✅ Network summary generated: {args.summary}")
# Integrate with scan if provided
if args.scan:
output_file = args.output or args.scan.replace('.json', '_enhanced.json')
integrator.integrate_with_scan(args.scan, output_file)
print(f"✅ Enhanced scan saved: {output_file}")
# Show summary
print(f"\n📊 Loaded {len(integrator.pfsense_configs)} pfSense configurations:")
for name in integrator.pfsense_configs.keys():
print(f" - {name}")
return 0
if __name__ == '__main__':
exit(main())

503
src/pfsense_xml_parser.py Executable file
View File

@@ -0,0 +1,503 @@
#!/usr/bin/env python3
"""
pfSense XML Configuration Parser
Extracts comprehensive network information from pfSense backup XML files
"""
import xml.etree.ElementTree as ET
import json
import argparse
from typing import Dict, List, Optional, Any
from pathlib import Path
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class PfSenseXMLParser:
"""Parser for pfSense XML configuration files"""
def __init__(self, xml_file: str):
self.xml_file = xml_file
self.tree = None
self.root = None
self.hostname = None
def load_xml(self) -> bool:
"""Load and parse the XML file"""
try:
self.tree = ET.parse(self.xml_file)
self.root = self.tree.getroot()
# Extract hostname
hostname_elem = self.root.find('.//hostname')
if hostname_elem is not None:
self.hostname = hostname_elem.text
logger.info(f"Loaded pfSense config: {self.hostname or 'Unknown'}")
return True
except Exception as e:
logger.error(f"Error loading XML file: {e}")
return False
def get_text(self, element: Optional[ET.Element], default: str = "") -> str:
"""Safely get text from XML element"""
if element is not None and element.text:
return element.text
return default
def get_interfaces(self) -> Dict[str, Dict]:
"""Extract network interfaces configuration"""
interfaces = {}
if self.root is None:
return interfaces
interfaces_elem = self.root.find('interfaces')
if interfaces_elem is None:
return interfaces
for iface_elem in interfaces_elem:
iface_name = iface_elem.tag
if iface_name in ['wan', 'lan'] or iface_name.startswith('opt'):
interface = {
'name': iface_name,
'description': self.get_text(iface_elem.find('descr')),
'enabled': iface_elem.find('enable') is not None,
'interface': self.get_text(iface_elem.find('if')),
'ipaddr': self.get_text(iface_elem.find('ipaddr')),
'subnet': self.get_text(iface_elem.find('subnet')),
'gateway': self.get_text(iface_elem.find('gateway')),
'mtu': self.get_text(iface_elem.find('mtu')),
'mss': self.get_text(iface_elem.find('mss')),
'spoofmac': self.get_text(iface_elem.find('spoofmac')),
'type': 'physical'
}
# Determine interface type
if 'tun_wg' in interface['interface']:
interface['type'] = 'wireguard'
elif 'ovpn' in interface['interface']:
interface['type'] = 'openvpn'
elif 'ipsec' in interface['interface']:
interface['type'] = 'ipsec'
interfaces[iface_name] = interface
return interfaces
def get_static_routes(self) -> List[Dict]:
"""Extract static routes"""
routes = []
if self.root is None:
return routes
staticroutes_elem = self.root.find('staticroutes')
if staticroutes_elem is None:
return routes
for route_elem in staticroutes_elem:
if route_elem.tag == 'route':
route = {
'network': self.get_text(route_elem.find('network')),
'gateway': self.get_text(route_elem.find('gateway')),
'description': self.get_text(route_elem.find('descr'))
}
routes.append(route)
return routes
def get_gateways(self) -> Dict[str, Dict]:
"""Extract gateway configuration"""
gateways = {}
if self.root is None:
return gateways
gateways_elem = self.root.find('gateways')
if gateways_elem is None:
return gateways
gateway_item_elem = gateways_elem.find('gateway_item')
if gateway_item_elem is not None:
for gw_elem in gateway_item_elem:
if gw_elem.tag == 'item':
name = self.get_text(gw_elem.find('name'))
if name:
gateway = {
'name': name,
'interface': self.get_text(gw_elem.find('interface')),
'gateway': self.get_text(gw_elem.find('gateway')),
'monitor': self.get_text(gw_elem.find('monitor')),
'description': self.get_text(gw_elem.find('descr')),
'defaultgw': gw_elem.find('defaultgw') is not None
}
gateways[name] = gateway
return gateways
def get_dhcp_config(self) -> Dict[str, Dict]:
"""Extract DHCP server configuration"""
dhcp_config = {}
if self.root is None:
return dhcp_config
dhcpd_elem = self.root.find('dhcpd')
if dhcpd_elem is None:
return dhcp_config
for dhcp_item in dhcpd_elem:
iface_name = dhcp_item.tag
dhcp_elem = dhcp_item
config = {
'enabled': True,
'range': {},
'static_mappings': []
}
# DHCP range
range_elem = dhcp_elem.find('range')
if range_elem is not None:
config['range'] = {
'from': self.get_text(range_elem.find('from')),
'to': self.get_text(range_elem.find('to'))
}
# DHCP options
config.update({
'defaultleasetime': self.get_text(dhcp_elem.find('defaultleasetime')),
'maxleasetime': self.get_text(dhcp_elem.find('maxleasetime')),
'gateway': self.get_text(dhcp_elem.find('gateway')),
'domain': self.get_text(dhcp_elem.find('domain')),
'domainsearchlist': self.get_text(dhcp_elem.find('domainsearchlist')),
'ddnsdomain': self.get_text(dhcp_elem.find('ddnsdomain')),
'dns1': '',
'dns2': '',
'ntpserver': ''
})
# DNS servers
dns_servers = dhcp_elem.findall('dnsserver')
if dns_servers:
config['dns1'] = self.get_text(dns_servers[0])
if len(dns_servers) > 1:
config['dns2'] = self.get_text(dns_servers[1])
# NTP servers
ntp_servers = dhcp_elem.findall('ntpserver')
if ntp_servers:
config['ntpserver'] = self.get_text(ntp_servers[0])
# Static mappings
for staticmap_elem in dhcp_elem.findall('staticmap'):
mapping = {
'mac': self.get_text(staticmap_elem.find('mac')),
'ipaddr': self.get_text(staticmap_elem.find('ipaddr')),
'hostname': self.get_text(staticmap_elem.find('hostname')),
'description': self.get_text(staticmap_elem.find('descr'))
}
config['static_mappings'].append(mapping)
dhcp_config[iface_name] = config
return dhcp_config
def get_wireguard_config(self) -> Dict[str, Any]:
"""Extract WireGuard configuration"""
wg_config = {
'enabled': False,
'tunnels': [],
'peers': []
}
if self.root is None:
return wg_config
# Find WireGuard configuration
wg_elem = self.root.find('.//wireguard')
if wg_elem is None:
return wg_config
# Check if enabled
config_elem = wg_elem.find('config')
if config_elem is not None:
wg_config['enabled'] = self.get_text(config_elem.find('enable')) == 'on'
# Extract tunnels
tunnels_elem = wg_elem.find('tunnels')
if tunnels_elem is not None:
for item_elem in tunnels_elem.findall('item'):
tunnel = {
'name': self.get_text(item_elem.find('name')),
'enabled': self.get_text(item_elem.find('enabled')) == 'yes',
'description': self.get_text(item_elem.find('descr')),
'listenport': self.get_text(item_elem.find('listenport')),
'publickey': self.get_text(item_elem.find('publickey')),
'mtu': self.get_text(item_elem.find('mtu'))
}
wg_config['tunnels'].append(tunnel)
# Extract peers
peers_elem = wg_elem.find('peers')
if peers_elem is not None:
for item_elem in peers_elem.findall('item'):
peer = {
'enabled': self.get_text(item_elem.find('enabled')) == 'yes',
'tunnel': self.get_text(item_elem.find('tun')),
'description': self.get_text(item_elem.find('descr')),
'publickey': self.get_text(item_elem.find('publickey')),
'persistentkeepalive': self.get_text(item_elem.find('persistentkeepalive')),
'allowed_ips': []
}
# Extract allowed IPs
allowedips_elem = item_elem.find('allowedips')
if allowedips_elem is not None:
for row_elem in allowedips_elem.findall('row'):
ip_info = {
'address': self.get_text(row_elem.find('address')),
'mask': self.get_text(row_elem.find('mask')),
'description': self.get_text(row_elem.find('descr'))
}
peer['allowed_ips'].append(ip_info)
wg_config['peers'].append(peer)
return wg_config
def get_openvpn_config(self) -> Dict[str, Any]:
"""Extract OpenVPN configuration"""
ovpn_config = {
'servers': [],
'clients': []
}
if self.root is None:
return ovpn_config
# OpenVPN servers
ovpnserver_elem = self.root.find('ovpnserver')
if ovpnserver_elem is not None:
# This is a complex configuration, extract basic info
ovpn_config['servers'].append({
'configured': True,
'description': 'OpenVPN Server configured'
})
# OpenVPN clients
ovpnclient_elem = self.root.find('ovpnclient')
if ovpnclient_elem is not None:
for item_elem in ovpnclient_elem.findall('item'):
client = {
'enabled': True,
'description': self.get_text(item_elem.find('descr')),
'server_addr': self.get_text(item_elem.find('server_addr')),
'interface': self.get_text(item_elem.find('interface'))
}
ovpn_config['clients'].append(client)
return ovpn_config
def get_firewall_rules(self) -> List[Dict]:
"""Extract firewall rules"""
rules = []
if self.root is None:
return rules
filter_elem = self.root.find('filter')
if filter_elem is None:
return rules
for rule_elem in filter_elem.findall('rule'):
rule = {
'id': self.get_text(rule_elem.find('id')),
'tracker': self.get_text(rule_elem.find('tracker')),
'type': self.get_text(rule_elem.find('type')),
'interface': self.get_text(rule_elem.find('interface')),
'ipprotocol': self.get_text(rule_elem.find('ipprotocol')),
'protocol': self.get_text(rule_elem.find('protocol')),
'description': self.get_text(rule_elem.find('descr')),
'enabled': True,
'source': {},
'destination': {},
'log': rule_elem.find('log') is not None
}
# Source
source_elem = rule_elem.find('source')
if source_elem is not None:
rule['source'] = {
'address': self.get_text(source_elem.find('address')),
'port': self.get_text(source_elem.find('port')),
'any': source_elem.find('any') is not None
}
# Destination
dest_elem = rule_elem.find('destination')
if dest_elem is not None:
rule['destination'] = {
'address': self.get_text(dest_elem.find('address')),
'port': self.get_text(dest_elem.find('port')),
'any': dest_elem.find('any') is not None
}
rules.append(rule)
return rules
def get_nat_rules(self) -> List[Dict]:
"""Extract NAT rules"""
nat_rules = []
if self.root is None:
return nat_rules
nat_elem = self.root.find('nat')
if nat_elem is None:
return nat_rules
for rule_elem in nat_elem.findall('rule'):
rule = {
'description': self.get_text(rule_elem.find('descr')),
'interface': self.get_text(rule_elem.find('interface')),
'protocol': self.get_text(rule_elem.find('protocol')),
'source': {},
'destination': {},
'target': self.get_text(rule_elem.find('target')),
'local_port': self.get_text(rule_elem.find('local-port')),
'enabled': True
}
# Source
source_elem = rule_elem.find('source')
if source_elem is not None:
rule['source'] = {
'address': self.get_text(source_elem.find('address')),
'port': self.get_text(source_elem.find('port'))
}
# Destination
dest_elem = rule_elem.find('destination')
if dest_elem is not None:
rule['destination'] = {
'address': self.get_text(dest_elem.find('address')),
'port': self.get_text(dest_elem.find('port'))
}
nat_rules.append(rule)
return nat_rules
def get_dns_config(self) -> Dict[str, Any]:
"""Extract DNS configuration"""
dns_config = {
'servers': [],
'domain': '',
'search_domains': []
}
if self.root is None:
return dns_config
# DNS servers
system_elem = self.root.find('system')
if system_elem is not None:
for dns_elem in system_elem.findall('dnsserver'):
dns_config['servers'].append(self.get_text(dns_elem))
# Domain
domain_elem = system_elem.find('domain')
if domain_elem is not None and domain_elem.text:
dns_config['domain'] = domain_elem.text
return dns_config
def get_system_info(self) -> Dict[str, Any]:
"""Extract system information"""
system_info = {}
if self.root is None:
return system_info
system_elem = self.root.find('system')
if system_elem is not None:
system_info = {
'hostname': self.get_text(system_elem.find('hostname')),
'domain': self.get_text(system_elem.find('domain')),
'timezone': self.get_text(system_elem.find('timezone')),
'language': self.get_text(system_elem.find('language')),
'version': self.get_text(self.root.find('version'))
}
return system_info
def parse_all(self) -> Dict[str, Any]:
"""Parse all configuration and return comprehensive data"""
if not self.load_xml():
return {}
logger.info(f"Parsing pfSense configuration: {self.hostname}")
config_data = {
'hostname': self.hostname,
'system': self.get_system_info(),
'interfaces': self.get_interfaces(),
'static_routes': self.get_static_routes(),
'gateways': self.get_gateways(),
'dhcp': self.get_dhcp_config(),
'wireguard': self.get_wireguard_config(),
'openvpn': self.get_openvpn_config(),
'firewall_rules': self.get_firewall_rules(),
'nat_rules': self.get_nat_rules(),
'dns': self.get_dns_config()
}
return config_data
def export_json(self, output_file: str):
"""Export parsed data to JSON file"""
data = self.parse_all()
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
logger.info(f"Exported pfSense config to {output_file}")
def main():
"""Command line interface"""
parser = argparse.ArgumentParser(description='Parse pfSense XML configuration files')
parser.add_argument('xml_file', help='pfSense XML configuration file')
parser.add_argument('-o', '--output', help='Output JSON file')
parser.add_argument('-v', '--verbose', action='store_true', help='Verbose output')
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
# Parse the XML file
parser = PfSenseXMLParser(args.xml_file)
data = parser.parse_all()
if not data:
logger.error("Failed to parse XML file")
return 1
# Output
if args.output:
parser.export_json(args.output)
print(f"✅ Parsed pfSense config and saved to {args.output}")
else:
print(json.dumps(data, indent=2, ensure_ascii=False))
return 0
if __name__ == '__main__':
exit(main())

View File

@@ -0,0 +1,385 @@
#!/usr/bin/env python3
"""
Server Information Collector
This module collects detailed information about servers and VMs from hypervisors
via SSH. It connects to specified hypervisors and gathers system information,
VM details, resource usage, and network configurations.
"""
import json
import logging
import subprocess
import sys
from dataclasses import dataclass, asdict
from typing import Dict, List, Optional, Any
from concurrent.futures import ThreadPoolExecutor, as_completed
@dataclass
class VMInfo:
"""Information about a virtual machine."""
vmid: str
name: str
status: str
cpu: float
memory_used: int
memory_total: int
disk_used: int
disk_total: int
uptime: str
ip_addresses: List[str]
vm_type: str = 'vm' # 'vm' or 'container'
@dataclass
class ServerInfo:
"""Information about a physical server/hypervisor."""
hostname: str
os: str
kernel: str
uptime: str
cpu_model: str
cpu_cores: int
memory_total: int
memory_free: int
disk_total: int
disk_free: int
load_average: str
network_interfaces: Dict[str, str]
vms: List[VMInfo]
containers: Optional[List[VMInfo]] = None
@dataclass
class HypervisorConfig:
"""Configuration for a hypervisor connection."""
host: str
port: int
user: str
class ServerInfoCollector:
"""Collects server and VM information from hypervisors via SSH."""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.logger = logging.getLogger(__name__)
self.hypervisors = [
HypervisorConfig(**hv) for hv in config.get('hypervisors', [])
]
self.ssh_user = config.get('ssh_user', 'root')
self.ssh_key_path = config.get('ssh_key_path')
self.timeout = config.get('timeout', 10)
def run_ssh_command(self, host: str, port: int, user: str, command: str) -> Optional[str]:
"""Run a command via SSH on a remote host."""
ssh_cmd = [
'ssh',
'-o', 'ConnectTimeout=5',
'-o', 'StrictHostKeyChecking=no',
'-o', 'UserKnownHostsFile=/dev/null',
'-p', str(port),
f'{user}@{host}',
command
]
try:
result = subprocess.run(
ssh_cmd,
capture_output=True,
text=True,
timeout=self.timeout
)
if result.returncode == 0:
return result.stdout.strip()
else:
self.logger.warning(f"SSH command failed on {host}:{port}: {result.stderr}")
return None
except subprocess.TimeoutExpired:
self.logger.warning(f"SSH command timed out on {host}:{port}")
return None
except Exception as e:
self.logger.error(f"SSH error on {host}:{port}: {e}")
return None
def detect_hypervisor_type(self, host: str, port: int, user: str) -> str:
"""Detect the type of hypervisor (Proxmox, VMware, etc.)."""
# Check for Proxmox
if self.run_ssh_command(host, port, user, 'which pvesh'):
return 'proxmox'
# Check for VMware
if self.run_ssh_command(host, port, user, 'which vmware'):
return 'vmware'
# Check for KVM/libvirt
if self.run_ssh_command(host, port, user, 'which virsh'):
return 'kvm'
return 'unknown'
def collect_proxmox_info(self, host: str, port: int, user: str) -> ServerInfo:
"""Collect information from a Proxmox hypervisor."""
hostname = self.run_ssh_command(host, port, user, 'hostname') or host
# System info
os_info = self.run_ssh_command(host, port, user, 'cat /etc/os-release | grep PRETTY_NAME | cut -d\'"\' -f2') or 'Unknown'
kernel = self.run_ssh_command(host, port, user, 'uname -r') or 'Unknown'
uptime = self.run_ssh_command(host, port, user, 'uptime -p') or 'Unknown'
# CPU info
cpu_model = self.run_ssh_command(host, port, user, 'cat /proc/cpuinfo | grep "model name" | head -1 | cut -d: -f2 | xargs') or 'Unknown'
cpu_cores = int(self.run_ssh_command(host, port, user, 'nproc') or '0')
# Memory info
mem_info = self.run_ssh_command(host, port, user, 'free -b | grep Mem')
if mem_info:
parts = mem_info.split()
memory_total = int(parts[1])
memory_free = int(parts[3])
else:
memory_total = memory_free = 0
# Disk info
disk_info = self.run_ssh_command(host, port, user, 'df -B1 / | tail -1')
if disk_info:
parts = disk_info.split()
disk_total = int(parts[1])
disk_free = int(parts[3])
else:
disk_total = disk_free = 0
# Load average
load_average = self.run_ssh_command(host, port, user, 'uptime | cut -d: -f5') or 'Unknown'
# Network interfaces
network_interfaces = {}
net_info = self.run_ssh_command(host, port, user, 'ip -o addr show | grep -v lo')
if net_info:
for line in net_info.split('\n'):
if line.strip():
parts = line.split()
if len(parts) >= 4:
iface = parts[1]
ip = parts[3].split('/')[0]
network_interfaces[iface] = ip
# VMs
vms = []
vm_list = self.run_ssh_command(host, port, user, 'qm list')
if vm_list:
for line in vm_list.split('\n')[1:]: # Skip header
if line.strip():
parts = line.split()
if len(parts) >= 3:
vmid = parts[0]
name = parts[1]
status = parts[2]
# Get VM resource usage
cpu = memory_used = memory_total = disk_used = disk_total = 0
uptime = 'Unknown'
# Try to get current status
status_output = self.run_ssh_command(host, port, user, f'qm status {vmid} --verbose')
if status_output:
# Parse status output for resource usage using grep-like extraction
if 'mem:' in status_output:
try:
mem_line = [line for line in status_output.split('\n') if line.strip().startswith('mem:')][0]
memory_used = int(mem_line.split(':')[1].strip())
except:
pass
if 'maxmem:' in status_output:
try:
maxmem_line = [line for line in status_output.split('\n') if line.strip().startswith('maxmem:')][0]
memory_total = int(maxmem_line.split(':')[1].strip())
except:
pass
if 'maxdisk:' in status_output:
try:
maxdisk_line = [line for line in status_output.split('\n') if line.strip().startswith('maxdisk:')][0]
disk_total = int(maxdisk_line.split(':')[1].strip())
except:
pass
if 'uptime:' in status_output:
try:
uptime_line = [line for line in status_output.split('\n') if line.strip().startswith('uptime:')][0]
uptime = uptime_line.split(':')[1].strip()
except:
pass
# Get VM config for IP addresses
vm_config = self.run_ssh_command(host, port, user, f'qm config {vmid}')
ip_addresses = []
if vm_config:
for config_line in vm_config.split('\n'):
if 'ip=' in config_line:
ip = config_line.split('=')[1].strip()
ip_addresses.append(ip)
vm_info = VMInfo(
vmid=vmid,
name=name,
status=status,
cpu=cpu,
memory_used=memory_used,
memory_total=memory_total,
disk_used=disk_used,
disk_total=disk_total,
uptime=uptime,
ip_addresses=ip_addresses,
vm_type='vm'
)
vms.append(vm_info)
# Containers
containers = []
ct_list = self.run_ssh_command(host, port, user, 'pct list')
if ct_list:
for line in ct_list.split('\n')[1:]: # Skip header
if line.strip():
parts = line.split()
if len(parts) >= 3:
vmid = parts[0]
status = parts[1]
# Handle variable number of columns (Lock column may be empty)
if len(parts) == 4:
name = parts[3]
else:
name = parts[2]
# Get container resource usage
cpu = memory_used = memory_total = disk_used = disk_total = 0
uptime = 'Unknown'
# Try to get current status
status_output = self.run_ssh_command(host, port, user, f'pct status {vmid}')
if status_output:
# Parse status output for resource usage
lines = status_output.split('\n')
for line in lines:
line = line.strip()
if ':' in line:
key, value = line.split(':', 1)
key = key.strip()
value = value.strip()
if key == 'status':
# Status is already from pct list
pass
# Container resource info might be limited
# Could add more parsing here if available
# Get container config for IP addresses
ct_config = self.run_ssh_command(host, port, user, f'pct config {vmid}')
ip_addresses = []
if ct_config:
for config_line in ct_config.split('\n'):
if 'ip=' in config_line:
ip = config_line.split('=')[1].strip()
ip_addresses.append(ip)
container_info = VMInfo(
vmid=vmid,
name=name,
status=status,
cpu=cpu,
memory_used=memory_used,
memory_total=memory_total,
disk_used=disk_used,
disk_total=disk_total,
uptime=uptime,
ip_addresses=ip_addresses,
vm_type='container'
)
containers.append(container_info)
return ServerInfo(
hostname=hostname,
os=os_info,
kernel=kernel,
uptime=uptime,
cpu_model=cpu_model,
cpu_cores=cpu_cores,
memory_total=memory_total,
memory_free=memory_free,
disk_total=disk_total,
disk_free=disk_free,
load_average=load_average,
network_interfaces=network_interfaces,
vms=vms,
containers=containers
)
def collect_server_info(self, hypervisor: HypervisorConfig) -> Optional[ServerInfo]:
"""Collect information from a single hypervisor."""
self.logger.info(f"Collecting info from {hypervisor.host}:{hypervisor.port}")
hv_type = self.detect_hypervisor_type(hypervisor.host, hypervisor.port, hypervisor.user)
if hv_type == 'proxmox':
return self.collect_proxmox_info(hypervisor.host, hypervisor.port, hypervisor.user)
else:
self.logger.warning(f"Unsupported hypervisor type: {hv_type} on {hypervisor.host}")
return None
def collect_all_server_info(self) -> Dict[str, ServerInfo]:
"""Collect information from all configured hypervisors."""
server_info = {}
with ThreadPoolExecutor(max_workers=len(self.hypervisors)) as executor:
future_to_hv = {
executor.submit(self.collect_server_info, hv): hv
for hv in self.hypervisors
}
for future in as_completed(future_to_hv):
hv = future_to_hv[future]
try:
info = future.result()
if info:
server_info[f"{hv.host}:{hv.port}"] = info
except Exception as e:
self.logger.error(f"Failed to collect info from {hv.host}:{hv.port}: {e}")
return server_info
def save_to_file(self, server_info: Dict[str, ServerInfo], filename: str):
"""Save collected server information to a JSON file."""
data = {host: asdict(info) for host, info in server_info.items()}
with open(filename, 'w') as f:
json.dump(data, f, indent=2)
self.logger.info(f"Server information saved to {filename}")
def main():
"""Main function for standalone execution."""
import argparse
parser = argparse.ArgumentParser(description='Collect server information from hypervisors')
parser.add_argument('-o', '--output', default='server_details.json', help='Output JSON file')
args = parser.parse_args()
logging.basicConfig(level=logging.INFO)
# Load config
try:
with open('config.json', 'r') as f:
config = json.load(f)
except FileNotFoundError:
print("config.json not found. Please create it from config.json.example")
sys.exit(1)
collector = ServerInfoCollector(config)
server_info = collector.collect_all_server_info()
if server_info:
collector.save_to_file(server_info, args.output)
print(f"Collected information from {len(server_info)} hypervisors")
else:
print("No server information collected")
if __name__ == '__main__':
main()

View File

@@ -56,11 +56,10 @@ def test_commands():
def test_scripts_exist(): def test_scripts_exist():
"""Check if all scripts exist""" """Check if all scripts exist"""
scripts = [ scripts = [
'network_scanner.py', 'src/network_scanner.py',
'pfsense_scanner.py', 'src/pfsense_scanner.py',
'svg_generator.py', 'src/integrated_scanner.py',
'integrated_scanner.py', 'scripts/quickstart.sh'
'quickstart.sh'
] ]
all_ok = True all_ok = True
@@ -77,10 +76,9 @@ def test_scripts_exist():
def test_script_syntax(): def test_script_syntax():
"""Test Python script syntax""" """Test Python script syntax"""
scripts = [ scripts = [
'network_scanner.py', 'src/network_scanner.py',
'pfsense_scanner.py', 'src/pfsense_scanner.py',
'svg_generator.py', 'src/integrated_scanner.py'
'integrated_scanner.py'
] ]
all_ok = True all_ok = True
@@ -230,7 +228,6 @@ def main():
print("Next steps:") print("Next steps:")
print("1. Edit config.json with your network details") print("1. Edit config.json with your network details")
print("2. Run: ./quickstart.sh") print("2. Run: ./quickstart.sh")
print(" or: ./integrated_scanner.py --generate-svg")
else: else:
print("⚠️ Some tests failed. Please check the errors above.") print("⚠️ Some tests failed. Please check the errors above.")
print() print()

View File

@@ -1,353 +0,0 @@
#!/usr/bin/env python3
"""
SVG Network Diagram Generator
Converts network scan results into an SVG network diagram
"""
import json
import math
from typing import Dict, List, Tuple
from xml.etree.ElementTree import Element, SubElement, tostring
from xml.dom import minidom
class NetworkDiagramGenerator:
"""Generate SVG diagrams from network scan data"""
# Device type styling
DEVICE_STYLES = {
'router': {'color': '#FF6B6B', 'icon': '🔀', 'shape': 'rect'},
'firewall': {'color': '#FF0000', 'icon': '🛡️', 'shape': 'rect'},
'switch': {'color': '#4ECDC4', 'icon': '🔌', 'shape': 'rect'},
'server': {'color': '#45B7D1', 'icon': '🖥️', 'shape': 'rect'},
'linux_server': {'color': '#45B7D1', 'icon': '🐧', 'shape': 'rect'},
'windows_client': {'color': '#95E1D3', 'icon': '💻', 'shape': 'rect'},
'client': {'color': '#A8E6CF', 'icon': '💻', 'shape': 'rect'},
'default': {'color': '#CCCCCC', 'icon': '', 'shape': 'rect'}
}
# Network segment colors
SEGMENT_COLORS = [
'#E3F2FD', '#F3E5F5', '#E8F5E9', '#FFF3E0', '#FCE4EC',
'#E0F2F1', '#F1F8E9', '#FFF9C4', '#FFE0B2', '#F8BBD0'
]
def __init__(self, scan_data: Dict):
self.scan_data = scan_data
self.width = 1600
self.height = 1200
self.margin = 50
self.device_width = 120
self.device_height = 80
self.segment_spacing = 200
def generate_svg(self, output_file: str):
"""Generate SVG diagram"""
svg = Element('svg', {
'width': str(self.width),
'height': str(self.height),
'xmlns': 'http://www.w3.org/2000/svg',
'version': '1.1'
})
# Add styles
self._add_styles(svg)
# Add title
self._add_title(svg)
# Calculate layout
segments = self.scan_data.get('segments', [])
layout = self._calculate_layout(segments)
# Draw network segments
for i, (segment, positions) in enumerate(zip(segments, layout)):
self._draw_segment(svg, segment, positions, i)
# Draw connections
self._draw_connections(svg, segments, layout)
# Add legend
self._add_legend(svg)
# Save to file
self._save_svg(svg, output_file)
def _add_styles(self, svg: Element):
"""Add CSS styles"""
style = SubElement(svg, 'style')
style.text = """
.device-box { stroke: #333; stroke-width: 2; }
.device-label { font-family: Arial, sans-serif; font-size: 12px; fill: #000; }
.device-icon { font-size: 24px; }
.segment-box { stroke: #666; stroke-width: 2; fill-opacity: 0.1; }
.segment-label { font-family: Arial, sans-serif; font-size: 14px; font-weight: bold; fill: #333; }
.connection { stroke: #999; stroke-width: 2; stroke-dasharray: 5,5; }
.title { font-family: Arial, sans-serif; font-size: 24px; font-weight: bold; fill: #333; }
.info-text { font-family: Arial, sans-serif; font-size: 10px; fill: #666; }
.legend-text { font-family: Arial, sans-serif; font-size: 11px; fill: #333; }
"""
def _add_title(self, svg: Element):
"""Add diagram title"""
title = SubElement(svg, 'text', {
'x': str(self.width // 2),
'y': '30',
'text-anchor': 'middle',
'class': 'title'
})
title.text = 'Network Topology Diagram'
def _calculate_layout(self, segments: List[Dict]) -> List[List[Tuple]]:
"""Calculate positions for all devices"""
layout = []
# Arrange segments horizontally
segment_width = (self.width - 2 * self.margin) / len(segments) if segments else self.width
for seg_idx, segment in enumerate(segments):
devices = segment.get('devices', [])
positions = []
# Calculate segment area
seg_x = self.margin + seg_idx * segment_width
seg_y = self.margin + 60
seg_w = segment_width - 20
seg_h = self.height - seg_y - self.margin
# Arrange devices in a grid within segment
cols = math.ceil(math.sqrt(len(devices)))
rows = math.ceil(len(devices) / cols) if cols > 0 else 1
device_spacing_x = seg_w / (cols + 1) if cols > 0 else seg_w / 2
device_spacing_y = seg_h / (rows + 1) if rows > 0 else seg_h / 2
for dev_idx, device in enumerate(devices):
col = dev_idx % cols
row = dev_idx // cols
x = seg_x + (col + 1) * device_spacing_x
y = seg_y + (row + 1) * device_spacing_y
positions.append((x, y, device))
layout.append(positions)
return layout
def _draw_segment(self, svg: Element, segment: Dict, positions: List[Tuple], seg_idx: int):
"""Draw a network segment and its devices"""
if not positions:
return
# Calculate segment bounds
xs = [pos[0] for pos in positions]
ys = [pos[1] for pos in positions]
min_x = min(xs) - self.device_width
max_x = max(xs) + self.device_width
min_y = min(ys) - self.device_height
max_y = max(ys) + self.device_height
# Draw segment background
color = self.SEGMENT_COLORS[seg_idx % len(self.SEGMENT_COLORS)]
SubElement(svg, 'rect', {
'x': str(min_x - 20),
'y': str(min_y - 40),
'width': str(max_x - min_x + 40),
'height': str(max_y - min_y + 60),
'fill': color,
'class': 'segment-box',
'rx': '10'
})
# Draw segment label
label = SubElement(svg, 'text', {
'x': str(min_x),
'y': str(min_y - 20),
'class': 'segment-label'
})
vpn_indicator = ' (VPN)' if segment.get('is_vpn') else ''
label.text = f"{segment.get('name', 'Unknown')} - {segment.get('cidr', '')}{vpn_indicator}"
# Draw devices
for x, y, device in positions:
self._draw_device(svg, device, x, y)
def _draw_device(self, svg: Element, device: Dict, x: float, y: float):
"""Draw a single device"""
device_type = device.get('device_type', 'default')
style = self.DEVICE_STYLES.get(device_type, self.DEVICE_STYLES['default'])
# Draw device box
box_x = x - self.device_width / 2
box_y = y - self.device_height / 2
SubElement(svg, 'rect', {
'x': str(box_x),
'y': str(box_y),
'width': str(self.device_width),
'height': str(self.device_height),
'fill': style['color'],
'class': 'device-box',
'rx': '5'
})
# Add icon
icon_text = SubElement(svg, 'text', {
'x': str(x),
'y': str(y - 15),
'text-anchor': 'middle',
'class': 'device-icon'
})
icon_text.text = style['icon']
# Add IP address
ip_text = SubElement(svg, 'text', {
'x': str(x),
'y': str(y + 5),
'text-anchor': 'middle',
'class': 'device-label',
'font-weight': 'bold'
})
ip_text.text = device.get('ip', 'Unknown')
# Add hostname
hostname = device.get('hostname', '')
if hostname:
hostname_text = SubElement(svg, 'text', {
'x': str(x),
'y': str(y + 20),
'text-anchor': 'middle',
'class': 'info-text'
})
# Truncate long hostnames
if len(hostname) > 20:
hostname = hostname[:17] + '...'
hostname_text.text = hostname
# Add additional info
info_y = y + 32
if device.get('ssh_accessible'):
ssh_text = SubElement(svg, 'text', {
'x': str(x),
'y': str(info_y),
'text-anchor': 'middle',
'class': 'info-text',
'fill': 'green'
})
ssh_text.text = '🔓 SSH'
def _draw_connections(self, svg: Element, segments: List[Dict], layout: List[List[Tuple]]):
"""Draw connections between devices based on routing info"""
# This is simplified - you'd analyze routing tables to determine actual connections
# For now, connect routers between segments
routers = []
for positions in layout:
for x, y, device in positions:
if device.get('device_type') in ['router', 'firewall']:
routers.append((x, y, device))
# Connect consecutive routers
for i in range(len(routers) - 1):
x1, y1, _ = routers[i]
x2, y2, _ = routers[i + 1]
SubElement(svg, 'line', {
'x1': str(x1),
'y1': str(y1),
'x2': str(x2),
'y2': str(y2),
'class': 'connection'
})
def _add_legend(self, svg: Element):
"""Add legend explaining device types"""
legend_x = self.width - 200
legend_y = self.height - 200
# Legend box
SubElement(svg, 'rect', {
'x': str(legend_x - 10),
'y': str(legend_y - 10),
'width': '190',
'height': str(len(self.DEVICE_STYLES) * 25 + 30),
'fill': 'white',
'stroke': '#333',
'stroke-width': '1',
'rx': '5'
})
# Legend title
title = SubElement(svg, 'text', {
'x': str(legend_x),
'y': str(legend_y + 5),
'class': 'legend-text',
'font-weight': 'bold'
})
title.text = 'Device Types'
# Legend items
y_offset = 25
for device_type, style in self.DEVICE_STYLES.items():
if device_type == 'default':
continue
# Color box
SubElement(svg, 'rect', {
'x': str(legend_x),
'y': str(legend_y + y_offset - 8),
'width': '15',
'height': '15',
'fill': style['color'],
'stroke': '#333'
})
# Label
label = SubElement(svg, 'text', {
'x': str(legend_x + 20),
'y': str(legend_y + y_offset + 4),
'class': 'legend-text'
})
label.text = f"{style['icon']} {device_type.replace('_', ' ').title()}"
y_offset += 25
def _save_svg(self, svg: Element, output_file: str):
"""Save SVG to file with pretty formatting"""
rough_string = tostring(svg, encoding='unicode')
reparsed = minidom.parseString(rough_string)
pretty_svg = reparsed.toprettyxml(indent=' ')
# Remove extra blank lines
pretty_svg = '\n'.join([line for line in pretty_svg.split('\n') if line.strip()])
with open(output_file, 'w', encoding='utf-8') as f:
f.write(pretty_svg)
def main():
"""Command line interface"""
import argparse
parser = argparse.ArgumentParser(description='Generate SVG network diagram from scan data')
parser.add_argument('input', help='Input JSON file from network scan')
parser.add_argument('-o', '--output', default='network_diagram.svg',
help='Output SVG file (default: network_diagram.svg)')
args = parser.parse_args()
# Load scan data
with open(args.input, 'r') as f:
scan_data = json.load(f)
# Generate diagram
generator = NetworkDiagramGenerator(scan_data)
generator.generate_svg(args.output)
print(f"✓ Network diagram generated: {args.output}")
if __name__ == '__main__':
main()

12
test_config.json Normal file
View File

@@ -0,0 +1,12 @@
{
"ssh_user": "root",
"ssh_key_path": null,
"timeout": 2,
"additional_networks": [],
"special_devices": {},
"scan_options": {
"max_workers": 10,
"ping_timeout": 2,
"port_scan_timeout": 1
}
}

12
tests/test_config.json Normal file
View File

@@ -0,0 +1,12 @@
{
"ssh_user": "root",
"ssh_key_path": null,
"timeout": 2,
"additional_networks": [],
"special_devices": {},
"scan_options": {
"max_workers": 10,
"ping_timeout": 2,
"port_scan_timeout": 1
}
}

1429
tests/test_output.json Normal file

File diff suppressed because it is too large Load Diff