Remove SVG diagram functionality

- Delete svg_generator.py and comprehensive_mapper.py
- Remove --generate-svg option from integrated_scanner.py
- Update complete_workflow.sh to remove SVG generation step
- Clean up documentation and examples
- Update test_system.py to remove SVG references
- Add missing files to repository (EXAMPLES.sh, quickstart.sh, etc.)
This commit is contained in:
mindesbunister
2025-10-10 17:08:31 +02:00
parent da5f1f2d0c
commit 0827b9a69d
14 changed files with 2416 additions and 750 deletions

6
.gitignore vendored
View File

@@ -1,9 +1,11 @@
# Network Scanner - Git Ignore
# Scan results and output
*.json
# Ignore archived results in results/ folder
results/
# Keep config.json.example
!config.json.example
*.svg
# Python
__pycache__/

260
EXAMPLES.sh Normal file
View File

@@ -0,0 +1,260 @@
#!/bin/bash
# Example usage scenarios for the network scanner
echo "=========================================="
echo "Network Scanner - Usage Examples"
echo "=========================================="
echo ""
cat << 'EOF'
# SCENARIO 1: Quick Network Overview
# -----------------------------------
# Scan your local network and get a basic overview
./network_scanner.py -v -o quick_scan.json
# SCENARIO 2: Complete Network Documentation
# -------------------------------------------
# Full scan with pfSense integration
./integrated_scanner.py -c config.json -o full_network.json -v
# SCENARIO 3: pfSense Deep Dive
# ------------------------------
# Detailed scan of a specific pfSense firewall
./pfsense_scanner.py 192.168.1.1 -u root -k ~/.ssh/id_rsa -o pfsense_main.json
# View the results:
cat pfsense_main.json | jq '.vpn' # Show VPN info
cat pfsense_main.json | jq '.routes' # Show routing table
# SCENARIO 4: Multi-Network Scan with VPN
# ----------------------------------------
# Create a config for multiple networks
cat > my_network_config.json << 'CONFIG'
{
"ssh_user": "root",
"ssh_key_path": "/home/user/.ssh/id_rsa",
"timeout": 3,
"additional_networks": [
"192.168.1.0/24", # Main network
"192.168.2.0/24", # Guest network
"10.8.0.0/24", # OpenVPN network
"10.0.0.0/24" # WireGuard VPN
],
"special_devices": {
"192.168.1.1": {
"name": "Main pfSense Firewall",
"type": "firewall",
"os": "pfSense"
},
"192.168.2.1": {
"name": "Guest Network Router",
"type": "router"
}
}
}
CONFIG
./integrated_scanner.py -c my_network_config.json -o multi_network.json
# SCENARIO 5: Scheduled Network Monitoring
# -----------------------------------------
# Add to crontab for daily network documentation
# Create wrapper script
cat > /usr/local/bin/network-scan-daily.sh << 'SCRIPT'
#!/bin/bash
DATE=$(date +%Y%m%d)
OUTPUT_DIR="/var/log/network-scans"
mkdir -p "$OUTPUT_DIR"
cd /path/to/network_scanner
./integrated_scanner.py \
-o "$OUTPUT_DIR/scan_$DATE.json"
# Keep only last 30 days
find "$OUTPUT_DIR" -name "scan_*.json" -mtime +30 -delete
SCRIPT
chmod +x /usr/local/bin/network-scan-daily.sh
# Add to crontab (run at 2 AM daily):
# 0 2 * * * /usr/local/bin/network-scan-daily.sh
# SCENARIO 6: Compare Network Changes
# ------------------------------------
# Scan and compare with previous results
# Initial scan
./integrated_scanner.py -o baseline.json
# After changes
./integrated_scanner.py -o current.json
# Compare device counts
echo "Baseline devices:"
cat baseline.json | jq '[.segments[].devices[].ip] | length'
echo "Current devices:"
cat current.json | jq '[.segments[].devices[].ip] | length'
# Find new devices
comm -13 \
<(cat baseline.json | jq -r '.segments[].devices[].ip' | sort) \
<(cat current.json | jq -r '.segments[].devices[].ip' | sort) \
| sed 's/^/NEW: /'
# Find removed devices
comm -23 \
<(cat baseline.json | jq -r '.segments[].devices[].ip' | sort) \
<(cat current.json | jq -r '.segments[].devices[].ip' | sort) \
| sed 's/^/REMOVED: /'
# SCENARIO 7: Extract Specific Information
# -----------------------------------------
# Use jq to extract specific data from scan results
# List all SSH-accessible devices
cat network_scan.json | jq -r '.segments[].devices[] | select(.ssh_accessible==true) | .ip'
# List all routers/firewalls
cat network_scan.json | jq -r '.segments[].devices[] | select(.device_type=="router" or .device_type=="firewall") | "\(.ip) - \(.hostname // "unknown")"'
# List all devices with their OS
cat network_scan.json | jq -r '.segments[].devices[] | "\(.ip)\t\(.os_type // "unknown")\t\(.hostname // "unknown")"'
# Export to CSV
echo "IP,Hostname,Type,OS" > devices.csv
cat network_scan.json | jq -r '.segments[].devices[] | "\(.ip),\(.hostname // ""),\(.device_type // ""),\(.os_type // "")"' >> devices.csv
# SCENARIO 8: Integration with Documentation
# -------------------------------------------
# Generate markdown documentation from scan
cat > generate_docs.py << 'PYTHON'
#!/usr/bin/env python3
import json
import sys
with open(sys.argv[1]) as f:
data = json.load(f)
print("# Network Documentation")
print(f"\nGenerated: {data.get('scan_timestamp', 'N/A')}")
print("\n## Network Segments\n")
for segment in data['segments']:
print(f"### {segment['name']}")
print(f"- CIDR: `{segment['cidr']}`")
print(f"- Devices: {len(segment['devices'])}")
if segment.get('is_vpn'):
print("- Type: VPN Network")
print("\n#### Devices\n")
print("| IP | Hostname | Type | OS |")
print("|---|---|---|---|")
for device in segment['devices']:
ip = device['ip']
hostname = device.get('hostname', '-')
dtype = device.get('device_type', '-')
os = device.get('os_type', '-')
print(f"| {ip} | {hostname} | {dtype} | {os} |")
print()
PYTHON
chmod +x generate_docs.py
./generate_docs.py network_scan.json > NETWORK_DOCS.md
# SCENARIO 9: Security Audit
# ---------------------------
# Check for common security issues
# Find devices with Telnet open
cat network_scan.json | jq -r '.segments[].devices[] | select(.open_ports[]? == 23) | "⚠️ Telnet open on \(.ip) (\(.hostname // "unknown"))"'
# Find devices without SSH access
cat network_scan.json | jq -r '.segments[].devices[] | select(.device_type=="router" or .device_type=="firewall") | select(.ssh_accessible==false) | "⚠️ No SSH access to \(.ip) (\(.hostname // "unknown"))"'
# List devices with many open ports
cat network_scan.json | jq -r '.segments[].devices[] | select((.open_ports | length) > 5) | " \(.ip) has \(.open_ports | length) open ports"'
# SCENARIO 10: WireGuard Topology Mapping
# ----------------------------------------
# Extract WireGuard tunnel information from pfSense
./pfsense_scanner.py 192.168.1.1 -o pfsense.json
# List all WireGuard peers
cat pfsense.json | jq -r '.vpn.wireguard[] | "Peer: \(.peer // "N/A") -> \(.allowed_ips // "N/A")"'
# Check tunnel status
cat pfsense.json | jq -r '.vpn.wireguard[] | select(.latest_handshake) | "Active tunnel to \(.endpoint) (handshake: \(.latest_handshake)s ago)"'
# SCENARIO 11: Network Capacity Planning
# ---------------------------------------
# Analyze network usage and plan capacity
# Count devices per segment
cat network_scan.json | jq -r '.segments[] | "\(.cidr): \(.devices | length) devices"'
# Calculate subnet utilization
cat network_scan.json | jq -r '.segments[] |
if .cidr | contains("/24") then
"\(.cidr): \(.devices | length)/254 = \((.devices | length) * 100 / 254 | floor)% utilized"
else
"\(.cidr): \(.devices | length) devices"
end'
# SCENARIO 12: Quick Health Check
# --------------------------------
# Create a health check script
cat > health_check.sh << 'HEALTH'
#!/bin/bash
SCAN_FILE="latest_scan.json"
echo "Network Health Check"
echo "===================="
echo ""
# Total devices
TOTAL=$(cat $SCAN_FILE | jq '[.segments[].devices[]] | length')
echo "Total devices: $TOTAL"
# SSH accessible
SSH_OK=$(cat $SCAN_FILE | jq '[.segments[].devices[] | select(.ssh_accessible==true)] | length')
echo "SSH accessible: $SSH_OK"
# By type
echo ""
echo "Device Types:"
cat $SCAN_FILE | jq -r '.segments[].devices[].device_type' | sort | uniq -c | sort -rn
# Segments
echo ""
echo "Network Segments:"
cat $SCAN_FILE | jq -r '.segments[] | " \(.name): \(.devices | length) devices"'
HEALTH
chmod +x health_check.sh
./integrated_scanner.py -o latest_scan.json
./health_check.sh
EOF
echo ""
echo "For more examples, see README.md"

View File

@@ -32,6 +32,14 @@ log_error() {
echo -e "${RED}[ERROR]${NC} $1"
}
# Move old results to results folder
log_info "Moving old results to results folder..."
mkdir -p results
mv network_scan_*.json server_details_*.json network_summary_*.md *_failed_ssh.json results/ 2>/dev/null || true
if [ $? -eq 0 ] && [ "$(ls results/ 2>/dev/null | wc -l)" -gt 0 ]; then
log_info "Moved old result files to results/ folder"
fi
# Check if we're in the right directory
if [ ! -f "src/integrated_scanner.py" ]; then
log_error "src/integrated_scanner.py not found. Please run this script from the network scanner directory."
@@ -60,24 +68,30 @@ log_info "Step 2: Running integrated network scan..."
SCAN_OUTPUT="network_scan_$(date +%Y%m%d_%H%M%S).json"
if python3 src/integrated_scanner.py -o "$SCAN_OUTPUT" -v; then
log_success "Network scan completed: $SCAN_OUTPUT"
# Check for failed SSH hosts file
FAILED_SSH_OUTPUT="${SCAN_OUTPUT%.json}_failed_ssh.json"
if [ -f "$FAILED_SSH_OUTPUT" ]; then
FAILED_COUNT=$(jq '.total_failed' "$FAILED_SSH_OUTPUT" 2>/dev/null || echo "unknown")
log_warning "Found $FAILED_COUNT hosts with SSH port open but failed authentication: $FAILED_SSH_OUTPUT"
fi
else
log_error "Network scan failed"
exit 1
fi
# Step 3: Generate SVG diagram
log_info "Step 3: Generating network diagram..."
SVG_OUTPUT="${SCAN_OUTPUT%.json}.svg"
if python3 src/svg_generator.py "$SCAN_OUTPUT" -o "$SVG_OUTPUT"; then
log_success "SVG diagram generated: $SVG_OUTPUT"
# Step 3: Collect server information from hypervisors
log_info "Step 3: Collecting server information from hypervisors..."
SERVER_OUTPUT="server_details_$(date +%Y%m%d_%H%M%S).json"
if python3 src/server_info_collector.py -o "$SERVER_OUTPUT"; then
log_success "Server information collected: $SERVER_OUTPUT"
else
log_error "SVG generation failed"
exit 1
log_warning "Server information collection failed"
fi
# Step 4: Generate pfSense summary if XML files exist
# Step 5: Generate pfSense summary if XML files exist
if [ "$XML_FILES" -gt 0 ]; then
log_info "Step 4: Generating pfSense network summary..."
log_info "Step 5: Generating pfSense network summary..."
SUMMARY_OUTPUT="network_summary_$(date +%Y%m%d_%H%M%S).md"
if python3 src/pfsense_integrator.py *.xml --summary "$SUMMARY_OUTPUT"; then
log_success "Network summary generated: $SUMMARY_OUTPUT"
@@ -86,7 +100,7 @@ if [ "$XML_FILES" -gt 0 ]; then
fi
fi
# Step 5: Show results summary
# Step 6: Show results summary
echo ""
echo "=========================================="
log_success "Network Discovery Complete!"
@@ -94,7 +108,12 @@ echo "=========================================="
echo ""
echo "Generated files:"
echo " 📊 Network Scan: $SCAN_OUTPUT"
echo " 🎨 Network Diagram: $SVG_OUTPUT"
if [ -f "$SERVER_OUTPUT" ]; then
echo " 🖥️ Server Details: $SERVER_OUTPUT"
fi
if [ -f "$FAILED_SSH_OUTPUT" ]; then
echo " 🔐 Failed SSH Hosts: $FAILED_SSH_OUTPUT"
fi
if [ "$XML_FILES" -gt 0 ]; then
echo " 📋 Network Summary: $SUMMARY_OUTPUT"
fi
@@ -114,11 +133,21 @@ if command -v jq >/dev/null 2>&1; then
fi
echo "Next steps:"
echo " 1. Open $SVG_OUTPUT in your web browser to view the network diagram"
if [ "$XML_FILES" -gt 0 ]; then
echo " 2. Review $SUMMARY_OUTPUT for detailed pfSense configuration"
if [ -f "$SERVER_OUTPUT" ]; then
echo " 1. Review $SERVER_OUTPUT for detailed server and VM information"
STEP_NUM=2
else
STEP_NUM=1
fi
echo " 3. Examine $SCAN_OUTPUT for complete network data (use jq for querying)"
if [ -f "$FAILED_SSH_OUTPUT" ]; then
echo " $STEP_NUM. Review $FAILED_SSH_OUTPUT for hosts needing SSH credential fixes"
STEP_NUM=$((STEP_NUM + 1))
fi
if [ "$XML_FILES" -gt 0 ]; then
echo " $STEP_NUM. Review $SUMMARY_OUTPUT for detailed pfSense configuration"
STEP_NUM=$((STEP_NUM + 1))
fi
echo " $STEP_NUM. Examine $SCAN_OUTPUT for complete network data (use jq for querying)"
echo ""
log_success "Workflow completed successfully! 🎉"

View File

@@ -17,5 +17,18 @@
"max_workers": 10,
"ping_timeout": 2,
"port_scan_timeout": 1
}
},
"hypervisors": [
{
"host": "srvhost04.egonetix.de",
"port": 2222,
"user": "root"
},
{
"host": "srv-wmw-host01",
"port": 22,
"user": "root"
}
],
"probe_ssh_on_discovered": true
}

165
quickstart.sh Normal file
View File

@@ -0,0 +1,165 @@
#!/bin/bash
# Quick Start Script for Network Scanner
# This script helps you get started quickly
set -e
echo "================================"
echo "Network Scanner - Quick Start"
echo "================================"
echo ""
# Check for Python
if ! command -v python3 &> /dev/null; then
echo "❌ Error: Python 3 is not installed"
exit 1
fi
echo "✓ Python 3 found"
# Create config if it doesn't exist
if [ ! -f config.json ]; then
echo ""
echo "📝 Creating configuration file..."
# Try to detect default SSH key
SSH_KEY=""
if [ -f ~/.ssh/id_rsa ]; then
SSH_KEY="$HOME/.ssh/id_rsa"
elif [ -f ~/.ssh/id_ed25519 ]; then
SSH_KEY="$HOME/.ssh/id_ed25519"
fi
# Get current user
CURRENT_USER=$(whoami)
# Try to detect local network
LOCAL_NET=$(ip route | grep -oP 'src \K[\d.]+' | head -1)
if [ -n "$LOCAL_NET" ]; then
# Convert to /24 network
NET_PREFIX=$(echo $LOCAL_NET | cut -d. -f1-3)
LOCAL_NET="${NET_PREFIX}.0/24"
else
LOCAL_NET="192.168.1.0/24"
fi
cat > config.json << EOF
{
"ssh_user": "$CURRENT_USER",
"ssh_key_path": "$SSH_KEY",
"timeout": 2,
"additional_networks": [
"$LOCAL_NET"
],
"special_devices": {
},
"scan_options": {
"max_workers": 10,
"ping_timeout": 2,
"port_scan_timeout": 1
}
}
EOF
echo "✓ Created config.json"
echo " Local network detected: $LOCAL_NET"
[ -n "$SSH_KEY" ] && echo " SSH key detected: $SSH_KEY"
echo ""
echo " Please edit config.json to customize for your network!"
echo ""
else
echo "✓ config.json already exists"
fi
# Ask what to do
echo ""
echo "What would you like to do?"
echo ""
echo "1) Run a quick scan (current network only)"
echo "2) Run a full scan with pfSense integration"
echo "3) Scan specific pfSense device"
echo "4) Show help"
echo "5) Exit"
echo ""
read -p "Choose an option (1-6): " choice
case $choice in
1)
echo ""
echo "🔍 Running quick network scan..."
./network_scanner.py -o quick_scan.json -v
echo ""
echo "✓ Done! Results saved to: quick_scan.json"
;;
2)
echo ""
echo "🔍 Running full integrated scan..."
./integrated_scanner.py -o full_scan.json -v
echo ""
echo "✓ Done! Results saved to: full_scan.json"
;;
3)
echo ""
read -p "Enter pfSense IP address: " pfsense_ip
echo "🔍 Scanning pfSense at $pfsense_ip..."
./pfsense_scanner.py "$pfsense_ip" -o "pfsense_${pfsense_ip}.json"
echo ""
echo "✓ Done! Results saved to: pfsense_${pfsense_ip}.json"
;;
4)
echo ""
cat << 'HELP'
Network Scanner - Help
======================
Available Scripts:
-----------------
1. network_scanner.py
Basic network scanner that discovers devices and gathers info
Usage: ./network_scanner.py [-c config.json] [-o output.json] [-v]
2. pfsense_scanner.py
Specialized scanner for pfSense firewalls
Usage: ./pfsense_scanner.py <ip> [-u user] [-k keyfile] [-o output.json]
3. integrated_scanner.py
Complete scanner with pfSense integration
Usage: ./integrated_scanner.py [-c config.json] [-o output.json] [-v]
Configuration:
-------------
Edit config.json to customize:
- SSH credentials
- Network ranges to scan
- Special device definitions
- Scan timeouts
Examples:
--------
# Quick scan of current network
./network_scanner.py -v
# Full scan with diagram
./integrated_scanner.py --generate-svg
# Scan pfSense
./pfsense_scanner.py 192.168.1.1 -u root -k ~/.ssh/id_rsa
For more information, see README.md
HELP
;;
5)
echo "Goodbye!"
exit 0
;;
*)
echo "Invalid option"
exit 1
;;
esac
echo ""
echo "================================"
echo "Thanks for using Network Scanner!"
echo "================================"

View File

@@ -1,355 +0,0 @@
#!/usr/bin/env python3
"""
Comprehensive Network Diagram Generator
Combines network scanning with pfSense XML parsing for complete network topology
"""
import json
import argparse
import logging
from pathlib import Path
from typing import Dict, List, Any
from network_scanner import NetworkScanner
from pfsense_xml_parser import PfSenseXMLParser
from svg_generator import NetworkDiagramGenerator
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class ComprehensiveNetworkMapper:
"""Complete network mapping solution combining multiple data sources"""
def __init__(self, config: Dict):
self.config = config
self.network_data = {}
self.pfsense_data = {}
self.combined_data = {
'scan_timestamp': None,
'data_sources': [],
'segments': [],
'pfsense_firewalls': [],
'wireguard_networks': [],
'openvpn_networks': [],
'routing_table': [],
'dhcp_leases': [],
'static_mappings': []
}
def load_network_scan(self, scan_file: str):
"""Load network scan data"""
logger.info(f"Loading network scan data from {scan_file}")
try:
with open(scan_file, 'r') as f:
self.network_data = json.load(f)
self.combined_data['data_sources'].append('network_scan')
logger.info("Network scan data loaded successfully")
except Exception as e:
logger.error(f"Error loading network scan: {e}")
def load_pfsense_configs(self, xml_files: List[str]):
"""Load and parse pfSense XML configuration files"""
for xml_file in xml_files:
if Path(xml_file).exists():
logger.info(f"Parsing pfSense config: {xml_file}")
parser = PfSenseXMLParser(xml_file)
data = parser.parse_all()
if data:
hostname = data.get('hostname', f'pfsense_{len(self.pfsense_data)}')
self.pfsense_data[hostname] = data
self.combined_data['data_sources'].append(f'pfsense_{hostname}')
self.combined_data['pfsense_firewalls'].append(data)
logger.info(f"pfSense config {hostname} loaded successfully")
else:
logger.error(f"Failed to parse pfSense config: {xml_file}")
else:
logger.warning(f"pfSense config file not found: {xml_file}")
def merge_network_data(self):
"""Merge all data sources into comprehensive network map"""
logger.info("Merging network data sources...")
# Start with network scan data if available
if self.network_data:
self.combined_data.update({
'scan_timestamp': self.network_data.get('scan_timestamp'),
'segments': self.network_data.get('segments', [])
})
# Process pfSense data
self._process_pfsense_data()
# Extract additional network information
self._extract_network_topology()
logger.info("Network data merging complete")
def _process_pfsense_data(self):
"""Process and integrate pfSense configuration data"""
for hostname, pfsense in self.pfsense_data.items():
logger.info(f"Processing pfSense data for {hostname}")
# Add interfaces as network segments
interfaces = pfsense.get('interfaces', {})
for iface_name, iface_data in interfaces.items():
if iface_data.get('ipaddr') and iface_data.get('ipaddr') != 'dhcp':
try:
# Create network segment from interface
ip = iface_data['ipaddr']
subnet = iface_data.get('subnet', '24')
network_cidr = f"{ip.rsplit('.', 1)[0]}.0/{subnet}"
segment = {
'name': f"{hostname}_{iface_name}",
'cidr': network_cidr,
'gateway': iface_data.get('gateway', ip),
'interface': iface_data.get('interface'),
'pfsense_host': hostname,
'is_vpn': iface_data.get('type') in ['wireguard', 'openvpn'],
'devices': []
}
# Add gateway device
gateway_device = {
'ip': ip,
'hostname': hostname,
'device_type': 'firewall',
'os_type': 'pfSense',
'interface': iface_name,
'pfsense_config': True
}
segment['devices'].append(gateway_device)
self.combined_data['segments'].append(segment)
# Track VPN networks
if iface_data.get('type') == 'wireguard':
self.combined_data['wireguard_networks'].append({
'network': network_cidr,
'interface': iface_data.get('interface'),
'pfsense': hostname,
'peers': pfsense.get('wireguard', {}).get('peers', [])
})
elif iface_data.get('type') == 'openvpn':
self.combined_data['openvpn_networks'].append({
'network': network_cidr,
'interface': iface_data.get('interface'),
'pfsense': hostname
})
except Exception as e:
logger.debug(f"Error processing interface {iface_name}: {e}")
# Add static routes
static_routes = pfsense.get('static_routes', [])
for route in static_routes:
route_info = {
'network': route.get('network'),
'gateway': route.get('gateway'),
'description': route.get('description'),
'pfsense': hostname
}
self.combined_data['routing_table'].append(route_info)
# Add DHCP information
dhcp_config = pfsense.get('dhcp', {})
for iface_name, dhcp_data in dhcp_config.items():
if dhcp_data.get('static_mappings'):
for mapping in dhcp_data['static_mappings']:
mapping_info = {
'ip': mapping.get('ipaddr'),
'mac': mapping.get('mac'),
'hostname': mapping.get('hostname'),
'description': mapping.get('description'),
'interface': iface_name,
'pfsense': hostname
}
self.combined_data['static_mappings'].append(mapping_info)
def _extract_network_topology(self):
"""Extract network topology information from all sources"""
logger.info("Extracting network topology...")
# Process WireGuard peer information
for wg_net in self.combined_data['wireguard_networks']:
for peer in wg_net.get('peers', []):
for allowed_ip in peer.get('allowed_ips', []):
# Create segment for remote networks
if allowed_ip.get('mask') and allowed_ip['mask'] != '32':
remote_segment = {
'name': f"WG_Remote_{allowed_ip['address']}_{allowed_ip['mask']}",
'cidr': f"{allowed_ip['address']}/{allowed_ip['mask']}",
'gateway': wg_net.get('network', '').split('/')[0].rsplit('.', 1)[0] + '.1',
'is_vpn': True,
'vpn_type': 'wireguard',
'pfsense_host': wg_net.get('pfsense'),
'devices': []
}
self.combined_data['segments'].append(remote_segment)
def generate_svg_diagram(self, output_file: str):
"""Generate SVG network diagram"""
logger.info(f"Generating SVG diagram: {output_file}")
# Use the existing SVG generator
generator = NetworkDiagramGenerator(self.combined_data)
generator.generate_svg(output_file)
logger.info(f"SVG diagram generated: {output_file}")
def export_comprehensive_json(self, output_file: str):
"""Export comprehensive network data"""
logger.info(f"Exporting comprehensive data to {output_file}")
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(self.combined_data, f, indent=2, ensure_ascii=False)
logger.info(f"Comprehensive network data exported to {output_file}")
def print_summary(self):
"""Print comprehensive network summary"""
print("\n" + "="*80)
print("COMPREHENSIVE NETWORK MAPPING SUMMARY")
print("="*80)
print(f"\nData Sources: {', '.join(self.combined_data['data_sources'])}")
print(f"Network Segments: {len(self.combined_data['segments'])}")
print(f"pfSense Firewalls: {len(self.combined_data['pfsense_firewalls'])}")
print(f"WireGuard Networks: {len(self.combined_data['wireguard_networks'])}")
print(f"OpenVPN Networks: {len(self.combined_data['openvpn_networks'])}")
print(f"Static Routes: {len(self.combined_data['routing_table'])}")
print(f"DHCP Static Mappings: {len(self.combined_data['static_mappings'])}")
# Network segments summary
print(f"\n{'='*80}")
print("NETWORK SEGMENTS")
print(f"{'='*80}")
for segment in self.combined_data['segments']:
vpn_indicator = " (VPN)" if segment.get('is_vpn') else ""
pfsense_indicator = f" [{segment.get('pfsense_host')}]" if segment.get('pfsense_host') else ""
print(f"\n📡 {segment['name']}{vpn_indicator}{pfsense_indicator}")
print(f" CIDR: {segment['cidr']}")
print(f" Gateway: {segment.get('gateway', 'N/A')}")
print(f" Devices: {len(segment.get('devices', []))}")
# Show devices
for device in segment.get('devices', [])[:5]: # Show first 5
print(f"{device.get('ip', 'N/A')} - {device.get('hostname', 'N/A')} ({device.get('device_type', 'N/A')})")
if len(segment.get('devices', [])) > 5:
print(f" ... and {len(segment.get('devices', [])) - 5} more devices")
# pfSense summary
if self.combined_data['pfsense_firewalls']:
print(f"\n{'='*80}")
print("PFSENSE FIREWALLS")
print(f"{'='*80}")
for pfsense in self.combined_data['pfsense_firewalls']:
hostname = pfsense.get('hostname', 'Unknown')
print(f"\n🛡️ {hostname}")
print(f" Interfaces: {len(pfsense.get('interfaces', {}))}")
print(f" Static Routes: {len(pfsense.get('static_routes', []))}")
wg_config = pfsense.get('wireguard', {})
if wg_config.get('enabled'):
print(f" WireGuard: {len(wg_config.get('tunnels', []))} tunnels, {len(wg_config.get('peers', []))} peers")
dhcp_config = pfsense.get('dhcp', {})
total_mappings = sum(len(dhcp.get('static_mappings', [])) for dhcp in dhcp_config.values())
print(f" DHCP Static Mappings: {total_mappings}")
# VPN Networks
if self.combined_data['wireguard_networks'] or self.combined_data['openvpn_networks']:
print(f"\n{'='*80}")
print("VPN NETWORKS")
print(f"{'='*80}")
for wg_net in self.combined_data['wireguard_networks']:
print(f"\n🔐 WireGuard: {wg_net['network']} via {wg_net['pfsense']}")
print(f" Interface: {wg_net['interface']}")
print(f" Peers: {len(wg_net.get('peers', []))}")
for ovpn_net in self.combined_data['openvpn_networks']:
print(f"\n🔐 OpenVPN: {ovpn_net['network']} via {ovpn_net['pfsense']}")
print(f" Interface: {ovpn_net['interface']}")
print(f"\n{'='*80}")
print("NETWORK TOPOLOGY ANALYSIS COMPLETE")
print(f"{'='*80}")
def main():
"""Command line interface"""
parser = argparse.ArgumentParser(
description='Comprehensive Network Diagram Generator with pfSense Integration'
)
parser.add_argument('-c', '--config', default='config.json',
help='Network scanner configuration file')
parser.add_argument('-s', '--scan-data', help='Existing network scan JSON file')
parser.add_argument('-p', '--pfsense-xml', nargs='+',
help='pfSense XML configuration files')
parser.add_argument('-o', '--output', default='comprehensive_network.json',
help='Output comprehensive JSON file')
parser.add_argument('--svg', help='Generate SVG diagram')
parser.add_argument('-v', '--verbose', action='store_true',
help='Verbose output')
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
# Load configuration
config = {}
if Path(args.config).exists():
try:
with open(args.config, 'r') as f:
config = json.load(f)
except Exception as e:
logger.warning(f"Could not load config {args.config}: {e}")
# Initialize comprehensive mapper
mapper = ComprehensiveNetworkMapper(config)
# Load network scan data if provided
if args.scan_data and Path(args.scan_data).exists():
mapper.load_network_scan(args.scan_data)
else:
logger.info("No network scan data provided - will use pfSense data only")
# Load pfSense configurations
pfsense_files = args.pfsense_xml or []
# Auto-detect pfSense XML files if none specified
if not pfsense_files:
xml_files = list(Path('.').glob('config-*.xml'))
if xml_files:
pfsense_files = [str(f) for f in xml_files]
logger.info(f"Auto-detected pfSense XML files: {pfsense_files}")
if pfsense_files:
mapper.load_pfsense_configs(pfsense_files)
else:
logger.warning("No pfSense XML files found")
# Merge all data
mapper.merge_network_data()
# Print summary
mapper.print_summary()
# Export comprehensive data
mapper.export_comprehensive_json(args.output)
print(f"\n✅ Comprehensive network data saved to: {args.output}")
# Generate SVG if requested
if args.svg:
svg_file = args.svg
mapper.generate_svg_diagram(svg_file)
print(f"✅ SVG network diagram generated: {svg_file}")
if __name__ == '__main__':
main()

View File

@@ -447,8 +447,6 @@ def main():
help='Output JSON file (default: network_scan.json)')
parser.add_argument('-v', '--verbose', action='store_true',
help='Verbose output')
parser.add_argument('--generate-svg', action='store_true',
help='Automatically generate SVG diagram after scan')
args = parser.parse_args()
@@ -474,23 +472,15 @@ def main():
# Export results
scanner.export_json(args.output)
# Save failed SSH hosts if any
if scanner.base_scanner.failed_ssh_hosts:
failed_ssh_file = args.output.replace('.json', '_failed_ssh.json')
scanner.base_scanner.save_failed_ssh_hosts(failed_ssh_file)
print(f"\n⚠️ {len(scanner.base_scanner.failed_ssh_hosts)} hosts have SSH port open but failed authentication.")
print(f" See {failed_ssh_file} for details.")
print(f"\n✓ Scan complete! Results saved to {args.output}")
# Generate SVG if requested
if args.generate_svg:
try:
from svg_generator import NetworkDiagramGenerator
with open(args.output, 'r') as f:
scan_data = json.load(f)
svg_file = args.output.replace('.json', '.svg')
generator = NetworkDiagramGenerator(scan_data)
generator.generate_svg(svg_file)
print(f"✓ SVG diagram generated: {svg_file}")
except Exception as e:
logger.error(f"Error generating SVG: {e}")
if __name__ == '__main__':

View File

@@ -16,6 +16,7 @@ from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, asdict, field
from concurrent.futures import ThreadPoolExecutor, as_completed
import logging
from datetime import datetime
# Configure logging
logging.basicConfig(
@@ -40,6 +41,7 @@ class Device:
services: List[str] = field(default_factory=list)
routes: List[Dict] = field(default_factory=list)
interfaces: List[Dict] = field(default_factory=list)
ssh_info: Optional[Dict] = None
@dataclass
@@ -62,6 +64,7 @@ class NetworkScanner:
self.ssh_user = config.get('ssh_user', 'root')
self.ssh_key = config.get('ssh_key_path')
self.timeout = config.get('timeout', 2)
self.failed_ssh_hosts: List[Dict] = []
def discover_networks(self) -> List[str]:
"""Discover all network segments from local routing table"""
@@ -198,6 +201,13 @@ class NetworkScanner:
if device.ssh_accessible:
# Gather detailed info via SSH
self._gather_ssh_info(device)
else:
# Track hosts with SSH port open but failed SSH connection
self.failed_ssh_hosts.append({
'ip': ip,
'hostname': device.hostname,
'reason': 'SSH port open but authentication failed or connection refused'
})
# Identify device type based on ports and services
device.device_type = self._identify_device_type(device)
@@ -290,17 +300,41 @@ class NetworkScanner:
"""Gather detailed information via SSH"""
logger.info(f"Gathering SSH info from {device.ip}")
ssh_info = {}
# Get OS info
device.os_type, device.os_version = self._get_os_info(device.ip)
os_type, os_version = self._get_os_info(device.ip)
if os_type:
ssh_info['os_type'] = os_type
device.os_type = os_type
if os_version:
ssh_info['os_version'] = os_version
device.os_version = os_version
# Get network interfaces
device.interfaces = self._get_interfaces(device.ip)
interfaces = self._get_interfaces(device.ip)
if interfaces:
ssh_info['interfaces'] = interfaces
device.interfaces = interfaces
# Get routing table
device.routes = self._get_routes(device.ip)
routes = self._get_routes(device.ip)
if routes:
ssh_info['routes'] = routes
device.routes = routes
# Get running services
device.services = self._get_services(device.ip)
services = self._get_services(device.ip)
if services:
ssh_info['services'] = services
device.services = services
# Get system info
system_info = self._get_system_info(device.ip)
if system_info:
ssh_info['system'] = system_info
device.ssh_info = ssh_info
def _ssh_exec(self, ip: str, command: str) -> Optional[str]:
"""Execute command via SSH"""
@@ -409,6 +443,40 @@ class NetworkScanner:
return services[:20] # Limit to top 20
def _get_system_info(self, ip: str) -> Optional[Dict]:
"""Get basic system information"""
system_info = {}
# Get hostname
hostname = self._ssh_exec(ip, 'hostname')
if hostname:
system_info['hostname'] = hostname.strip()
# Get uptime
uptime = self._ssh_exec(ip, 'uptime -p 2>/dev/null || uptime')
if uptime:
system_info['uptime'] = uptime.strip()
# Get CPU info
cpu_info = self._ssh_exec(ip, 'nproc && cat /proc/cpuinfo | grep "model name" | head -1')
if cpu_info:
lines = cpu_info.strip().split('\n')
if len(lines) >= 2:
system_info['cpu_cores'] = int(lines[0])
system_info['cpu_model'] = lines[1].split(':')[1].strip()
# Get memory info
mem_info = self._ssh_exec(ip, 'free -h | grep Mem')
if mem_info:
system_info['memory'] = mem_info.strip()
# Get disk info
disk_info = self._ssh_exec(ip, 'df -h / | tail -1')
if disk_info:
system_info['disk'] = disk_info.strip()
return system_info if system_info else None
def _identify_device_type(self, device: Device) -> str:
"""Identify device type based on available info"""
if device.routes and len(device.routes) > 5:
@@ -505,6 +573,17 @@ class NetworkScanner:
logger.info(f"Exported results to {filename}")
def save_failed_ssh_hosts(self, filename: str):
"""Save list of hosts with SSH port open but failed authentication"""
if self.failed_ssh_hosts:
with open(filename, 'w') as f:
json.dump({
'failed_ssh_hosts': self.failed_ssh_hosts,
'total_failed': len(self.failed_ssh_hosts),
'scan_timestamp': str(datetime.now())
}, f, indent=2)
logger.info(f"Saved {len(self.failed_ssh_hosts)} failed SSH hosts to {filename}")
def print_summary(self):
"""Print a human-readable summary"""
print("\n" + "="*80)
@@ -567,6 +646,7 @@ def main():
# Export results
from datetime import datetime
scanner.export_json(args.output)
scanner.save_failed_ssh_hosts('failed_ssh_hosts.json')
print(f"\n✓ Scan complete! Results saved to {args.output}")

View File

@@ -0,0 +1,385 @@
#!/usr/bin/env python3
"""
Server Information Collector
This module collects detailed information about servers and VMs from hypervisors
via SSH. It connects to specified hypervisors and gathers system information,
VM details, resource usage, and network configurations.
"""
import json
import logging
import subprocess
import sys
from dataclasses import dataclass, asdict
from typing import Dict, List, Optional, Any
from concurrent.futures import ThreadPoolExecutor, as_completed
@dataclass
class VMInfo:
"""Information about a virtual machine."""
vmid: str
name: str
status: str
cpu: float
memory_used: int
memory_total: int
disk_used: int
disk_total: int
uptime: str
ip_addresses: List[str]
vm_type: str = 'vm' # 'vm' or 'container'
@dataclass
class ServerInfo:
"""Information about a physical server/hypervisor."""
hostname: str
os: str
kernel: str
uptime: str
cpu_model: str
cpu_cores: int
memory_total: int
memory_free: int
disk_total: int
disk_free: int
load_average: str
network_interfaces: Dict[str, str]
vms: List[VMInfo]
containers: Optional[List[VMInfo]] = None
@dataclass
class HypervisorConfig:
"""Configuration for a hypervisor connection."""
host: str
port: int
user: str
class ServerInfoCollector:
"""Collects server and VM information from hypervisors via SSH."""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.logger = logging.getLogger(__name__)
self.hypervisors = [
HypervisorConfig(**hv) for hv in config.get('hypervisors', [])
]
self.ssh_user = config.get('ssh_user', 'root')
self.ssh_key_path = config.get('ssh_key_path')
self.timeout = config.get('timeout', 10)
def run_ssh_command(self, host: str, port: int, user: str, command: str) -> Optional[str]:
"""Run a command via SSH on a remote host."""
ssh_cmd = [
'ssh',
'-o', 'ConnectTimeout=5',
'-o', 'StrictHostKeyChecking=no',
'-o', 'UserKnownHostsFile=/dev/null',
'-p', str(port),
f'{user}@{host}',
command
]
try:
result = subprocess.run(
ssh_cmd,
capture_output=True,
text=True,
timeout=self.timeout
)
if result.returncode == 0:
return result.stdout.strip()
else:
self.logger.warning(f"SSH command failed on {host}:{port}: {result.stderr}")
return None
except subprocess.TimeoutExpired:
self.logger.warning(f"SSH command timed out on {host}:{port}")
return None
except Exception as e:
self.logger.error(f"SSH error on {host}:{port}: {e}")
return None
def detect_hypervisor_type(self, host: str, port: int, user: str) -> str:
"""Detect the type of hypervisor (Proxmox, VMware, etc.)."""
# Check for Proxmox
if self.run_ssh_command(host, port, user, 'which pvesh'):
return 'proxmox'
# Check for VMware
if self.run_ssh_command(host, port, user, 'which vmware'):
return 'vmware'
# Check for KVM/libvirt
if self.run_ssh_command(host, port, user, 'which virsh'):
return 'kvm'
return 'unknown'
def collect_proxmox_info(self, host: str, port: int, user: str) -> ServerInfo:
"""Collect information from a Proxmox hypervisor."""
hostname = self.run_ssh_command(host, port, user, 'hostname') or host
# System info
os_info = self.run_ssh_command(host, port, user, 'cat /etc/os-release | grep PRETTY_NAME | cut -d\'"\' -f2') or 'Unknown'
kernel = self.run_ssh_command(host, port, user, 'uname -r') or 'Unknown'
uptime = self.run_ssh_command(host, port, user, 'uptime -p') or 'Unknown'
# CPU info
cpu_model = self.run_ssh_command(host, port, user, 'cat /proc/cpuinfo | grep "model name" | head -1 | cut -d: -f2 | xargs') or 'Unknown'
cpu_cores = int(self.run_ssh_command(host, port, user, 'nproc') or '0')
# Memory info
mem_info = self.run_ssh_command(host, port, user, 'free -b | grep Mem')
if mem_info:
parts = mem_info.split()
memory_total = int(parts[1])
memory_free = int(parts[3])
else:
memory_total = memory_free = 0
# Disk info
disk_info = self.run_ssh_command(host, port, user, 'df -B1 / | tail -1')
if disk_info:
parts = disk_info.split()
disk_total = int(parts[1])
disk_free = int(parts[3])
else:
disk_total = disk_free = 0
# Load average
load_average = self.run_ssh_command(host, port, user, 'uptime | cut -d: -f5') or 'Unknown'
# Network interfaces
network_interfaces = {}
net_info = self.run_ssh_command(host, port, user, 'ip -o addr show | grep -v lo')
if net_info:
for line in net_info.split('\n'):
if line.strip():
parts = line.split()
if len(parts) >= 4:
iface = parts[1]
ip = parts[3].split('/')[0]
network_interfaces[iface] = ip
# VMs
vms = []
vm_list = self.run_ssh_command(host, port, user, 'qm list')
if vm_list:
for line in vm_list.split('\n')[1:]: # Skip header
if line.strip():
parts = line.split()
if len(parts) >= 3:
vmid = parts[0]
name = parts[1]
status = parts[2]
# Get VM resource usage
cpu = memory_used = memory_total = disk_used = disk_total = 0
uptime = 'Unknown'
# Try to get current status
status_output = self.run_ssh_command(host, port, user, f'qm status {vmid} --verbose')
if status_output:
# Parse status output for resource usage using grep-like extraction
if 'mem:' in status_output:
try:
mem_line = [line for line in status_output.split('\n') if line.strip().startswith('mem:')][0]
memory_used = int(mem_line.split(':')[1].strip())
except:
pass
if 'maxmem:' in status_output:
try:
maxmem_line = [line for line in status_output.split('\n') if line.strip().startswith('maxmem:')][0]
memory_total = int(maxmem_line.split(':')[1].strip())
except:
pass
if 'maxdisk:' in status_output:
try:
maxdisk_line = [line for line in status_output.split('\n') if line.strip().startswith('maxdisk:')][0]
disk_total = int(maxdisk_line.split(':')[1].strip())
except:
pass
if 'uptime:' in status_output:
try:
uptime_line = [line for line in status_output.split('\n') if line.strip().startswith('uptime:')][0]
uptime = uptime_line.split(':')[1].strip()
except:
pass
# Get VM config for IP addresses
vm_config = self.run_ssh_command(host, port, user, f'qm config {vmid}')
ip_addresses = []
if vm_config:
for config_line in vm_config.split('\n'):
if 'ip=' in config_line:
ip = config_line.split('=')[1].strip()
ip_addresses.append(ip)
vm_info = VMInfo(
vmid=vmid,
name=name,
status=status,
cpu=cpu,
memory_used=memory_used,
memory_total=memory_total,
disk_used=disk_used,
disk_total=disk_total,
uptime=uptime,
ip_addresses=ip_addresses,
vm_type='vm'
)
vms.append(vm_info)
# Containers
containers = []
ct_list = self.run_ssh_command(host, port, user, 'pct list')
if ct_list:
for line in ct_list.split('\n')[1:]: # Skip header
if line.strip():
parts = line.split()
if len(parts) >= 3:
vmid = parts[0]
status = parts[1]
# Handle variable number of columns (Lock column may be empty)
if len(parts) == 4:
name = parts[3]
else:
name = parts[2]
# Get container resource usage
cpu = memory_used = memory_total = disk_used = disk_total = 0
uptime = 'Unknown'
# Try to get current status
status_output = self.run_ssh_command(host, port, user, f'pct status {vmid}')
if status_output:
# Parse status output for resource usage
lines = status_output.split('\n')
for line in lines:
line = line.strip()
if ':' in line:
key, value = line.split(':', 1)
key = key.strip()
value = value.strip()
if key == 'status':
# Status is already from pct list
pass
# Container resource info might be limited
# Could add more parsing here if available
# Get container config for IP addresses
ct_config = self.run_ssh_command(host, port, user, f'pct config {vmid}')
ip_addresses = []
if ct_config:
for config_line in ct_config.split('\n'):
if 'ip=' in config_line:
ip = config_line.split('=')[1].strip()
ip_addresses.append(ip)
container_info = VMInfo(
vmid=vmid,
name=name,
status=status,
cpu=cpu,
memory_used=memory_used,
memory_total=memory_total,
disk_used=disk_used,
disk_total=disk_total,
uptime=uptime,
ip_addresses=ip_addresses,
vm_type='container'
)
containers.append(container_info)
return ServerInfo(
hostname=hostname,
os=os_info,
kernel=kernel,
uptime=uptime,
cpu_model=cpu_model,
cpu_cores=cpu_cores,
memory_total=memory_total,
memory_free=memory_free,
disk_total=disk_total,
disk_free=disk_free,
load_average=load_average,
network_interfaces=network_interfaces,
vms=vms,
containers=containers
)
def collect_server_info(self, hypervisor: HypervisorConfig) -> Optional[ServerInfo]:
"""Collect information from a single hypervisor."""
self.logger.info(f"Collecting info from {hypervisor.host}:{hypervisor.port}")
hv_type = self.detect_hypervisor_type(hypervisor.host, hypervisor.port, hypervisor.user)
if hv_type == 'proxmox':
return self.collect_proxmox_info(hypervisor.host, hypervisor.port, hypervisor.user)
else:
self.logger.warning(f"Unsupported hypervisor type: {hv_type} on {hypervisor.host}")
return None
def collect_all_server_info(self) -> Dict[str, ServerInfo]:
"""Collect information from all configured hypervisors."""
server_info = {}
with ThreadPoolExecutor(max_workers=len(self.hypervisors)) as executor:
future_to_hv = {
executor.submit(self.collect_server_info, hv): hv
for hv in self.hypervisors
}
for future in as_completed(future_to_hv):
hv = future_to_hv[future]
try:
info = future.result()
if info:
server_info[f"{hv.host}:{hv.port}"] = info
except Exception as e:
self.logger.error(f"Failed to collect info from {hv.host}:{hv.port}: {e}")
return server_info
def save_to_file(self, server_info: Dict[str, ServerInfo], filename: str):
"""Save collected server information to a JSON file."""
data = {host: asdict(info) for host, info in server_info.items()}
with open(filename, 'w') as f:
json.dump(data, f, indent=2)
self.logger.info(f"Server information saved to {filename}")
def main():
"""Main function for standalone execution."""
import argparse
parser = argparse.ArgumentParser(description='Collect server information from hypervisors')
parser.add_argument('-o', '--output', default='server_details.json', help='Output JSON file')
args = parser.parse_args()
logging.basicConfig(level=logging.INFO)
# Load config
try:
with open('config.json', 'r') as f:
config = json.load(f)
except FileNotFoundError:
print("config.json not found. Please create it from config.json.example")
sys.exit(1)
collector = ServerInfoCollector(config)
server_info = collector.collect_all_server_info()
if server_info:
collector.save_to_file(server_info, args.output)
print(f"Collected information from {len(server_info)} hypervisors")
else:
print("No server information collected")
if __name__ == '__main__':
main()

View File

@@ -1,353 +0,0 @@
#!/usr/bin/env python3
"""
SVG Network Diagram Generator
Converts network scan results into an SVG network diagram
"""
import json
import math
from typing import Dict, List, Tuple
from xml.etree.ElementTree import Element, SubElement, tostring
from xml.dom import minidom
class NetworkDiagramGenerator:
"""Generate SVG diagrams from network scan data"""
# Device type styling
DEVICE_STYLES = {
'router': {'color': '#FF6B6B', 'icon': '🔀', 'shape': 'rect'},
'firewall': {'color': '#FF0000', 'icon': '🛡️', 'shape': 'rect'},
'switch': {'color': '#4ECDC4', 'icon': '🔌', 'shape': 'rect'},
'server': {'color': '#45B7D1', 'icon': '🖥️', 'shape': 'rect'},
'linux_server': {'color': '#45B7D1', 'icon': '🐧', 'shape': 'rect'},
'windows_client': {'color': '#95E1D3', 'icon': '💻', 'shape': 'rect'},
'client': {'color': '#A8E6CF', 'icon': '💻', 'shape': 'rect'},
'default': {'color': '#CCCCCC', 'icon': '', 'shape': 'rect'}
}
# Network segment colors
SEGMENT_COLORS = [
'#E3F2FD', '#F3E5F5', '#E8F5E9', '#FFF3E0', '#FCE4EC',
'#E0F2F1', '#F1F8E9', '#FFF9C4', '#FFE0B2', '#F8BBD0'
]
def __init__(self, scan_data: Dict):
self.scan_data = scan_data
self.width = 1600
self.height = 1200
self.margin = 50
self.device_width = 120
self.device_height = 80
self.segment_spacing = 200
def generate_svg(self, output_file: str):
"""Generate SVG diagram"""
svg = Element('svg', {
'width': str(self.width),
'height': str(self.height),
'xmlns': 'http://www.w3.org/2000/svg',
'version': '1.1'
})
# Add styles
self._add_styles(svg)
# Add title
self._add_title(svg)
# Calculate layout
segments = self.scan_data.get('segments', [])
layout = self._calculate_layout(segments)
# Draw network segments
for i, (segment, positions) in enumerate(zip(segments, layout)):
self._draw_segment(svg, segment, positions, i)
# Draw connections
self._draw_connections(svg, segments, layout)
# Add legend
self._add_legend(svg)
# Save to file
self._save_svg(svg, output_file)
def _add_styles(self, svg: Element):
"""Add CSS styles"""
style = SubElement(svg, 'style')
style.text = """
.device-box { stroke: #333; stroke-width: 2; }
.device-label { font-family: Arial, sans-serif; font-size: 12px; fill: #000; }
.device-icon { font-size: 24px; }
.segment-box { stroke: #666; stroke-width: 2; fill-opacity: 0.1; }
.segment-label { font-family: Arial, sans-serif; font-size: 14px; font-weight: bold; fill: #333; }
.connection { stroke: #999; stroke-width: 2; stroke-dasharray: 5,5; }
.title { font-family: Arial, sans-serif; font-size: 24px; font-weight: bold; fill: #333; }
.info-text { font-family: Arial, sans-serif; font-size: 10px; fill: #666; }
.legend-text { font-family: Arial, sans-serif; font-size: 11px; fill: #333; }
"""
def _add_title(self, svg: Element):
"""Add diagram title"""
title = SubElement(svg, 'text', {
'x': str(self.width // 2),
'y': '30',
'text-anchor': 'middle',
'class': 'title'
})
title.text = 'Network Topology Diagram'
def _calculate_layout(self, segments: List[Dict]) -> List[List[Tuple]]:
"""Calculate positions for all devices"""
layout = []
# Arrange segments horizontally
segment_width = (self.width - 2 * self.margin) / len(segments) if segments else self.width
for seg_idx, segment in enumerate(segments):
devices = segment.get('devices', [])
positions = []
# Calculate segment area
seg_x = self.margin + seg_idx * segment_width
seg_y = self.margin + 60
seg_w = segment_width - 20
seg_h = self.height - seg_y - self.margin
# Arrange devices in a grid within segment
cols = math.ceil(math.sqrt(len(devices)))
rows = math.ceil(len(devices) / cols) if cols > 0 else 1
device_spacing_x = seg_w / (cols + 1) if cols > 0 else seg_w / 2
device_spacing_y = seg_h / (rows + 1) if rows > 0 else seg_h / 2
for dev_idx, device in enumerate(devices):
col = dev_idx % cols
row = dev_idx // cols
x = seg_x + (col + 1) * device_spacing_x
y = seg_y + (row + 1) * device_spacing_y
positions.append((x, y, device))
layout.append(positions)
return layout
def _draw_segment(self, svg: Element, segment: Dict, positions: List[Tuple], seg_idx: int):
"""Draw a network segment and its devices"""
if not positions:
return
# Calculate segment bounds
xs = [pos[0] for pos in positions]
ys = [pos[1] for pos in positions]
min_x = min(xs) - self.device_width
max_x = max(xs) + self.device_width
min_y = min(ys) - self.device_height
max_y = max(ys) + self.device_height
# Draw segment background
color = self.SEGMENT_COLORS[seg_idx % len(self.SEGMENT_COLORS)]
SubElement(svg, 'rect', {
'x': str(min_x - 20),
'y': str(min_y - 40),
'width': str(max_x - min_x + 40),
'height': str(max_y - min_y + 60),
'fill': color,
'class': 'segment-box',
'rx': '10'
})
# Draw segment label
label = SubElement(svg, 'text', {
'x': str(min_x),
'y': str(min_y - 20),
'class': 'segment-label'
})
vpn_indicator = ' (VPN)' if segment.get('is_vpn') else ''
label.text = f"{segment.get('name', 'Unknown')} - {segment.get('cidr', '')}{vpn_indicator}"
# Draw devices
for x, y, device in positions:
self._draw_device(svg, device, x, y)
def _draw_device(self, svg: Element, device: Dict, x: float, y: float):
"""Draw a single device"""
device_type = device.get('device_type', 'default')
style = self.DEVICE_STYLES.get(device_type, self.DEVICE_STYLES['default'])
# Draw device box
box_x = x - self.device_width / 2
box_y = y - self.device_height / 2
SubElement(svg, 'rect', {
'x': str(box_x),
'y': str(box_y),
'width': str(self.device_width),
'height': str(self.device_height),
'fill': style['color'],
'class': 'device-box',
'rx': '5'
})
# Add icon
icon_text = SubElement(svg, 'text', {
'x': str(x),
'y': str(y - 15),
'text-anchor': 'middle',
'class': 'device-icon'
})
icon_text.text = style['icon']
# Add IP address
ip_text = SubElement(svg, 'text', {
'x': str(x),
'y': str(y + 5),
'text-anchor': 'middle',
'class': 'device-label',
'font-weight': 'bold'
})
ip_text.text = device.get('ip', 'Unknown')
# Add hostname
hostname = device.get('hostname', '')
if hostname:
hostname_text = SubElement(svg, 'text', {
'x': str(x),
'y': str(y + 20),
'text-anchor': 'middle',
'class': 'info-text'
})
# Truncate long hostnames
if len(hostname) > 20:
hostname = hostname[:17] + '...'
hostname_text.text = hostname
# Add additional info
info_y = y + 32
if device.get('ssh_accessible'):
ssh_text = SubElement(svg, 'text', {
'x': str(x),
'y': str(info_y),
'text-anchor': 'middle',
'class': 'info-text',
'fill': 'green'
})
ssh_text.text = '🔓 SSH'
def _draw_connections(self, svg: Element, segments: List[Dict], layout: List[List[Tuple]]):
"""Draw connections between devices based on routing info"""
# This is simplified - you'd analyze routing tables to determine actual connections
# For now, connect routers between segments
routers = []
for positions in layout:
for x, y, device in positions:
if device.get('device_type') in ['router', 'firewall']:
routers.append((x, y, device))
# Connect consecutive routers
for i in range(len(routers) - 1):
x1, y1, _ = routers[i]
x2, y2, _ = routers[i + 1]
SubElement(svg, 'line', {
'x1': str(x1),
'y1': str(y1),
'x2': str(x2),
'y2': str(y2),
'class': 'connection'
})
def _add_legend(self, svg: Element):
"""Add legend explaining device types"""
legend_x = self.width - 200
legend_y = self.height - 200
# Legend box
SubElement(svg, 'rect', {
'x': str(legend_x - 10),
'y': str(legend_y - 10),
'width': '190',
'height': str(len(self.DEVICE_STYLES) * 25 + 30),
'fill': 'white',
'stroke': '#333',
'stroke-width': '1',
'rx': '5'
})
# Legend title
title = SubElement(svg, 'text', {
'x': str(legend_x),
'y': str(legend_y + 5),
'class': 'legend-text',
'font-weight': 'bold'
})
title.text = 'Device Types'
# Legend items
y_offset = 25
for device_type, style in self.DEVICE_STYLES.items():
if device_type == 'default':
continue
# Color box
SubElement(svg, 'rect', {
'x': str(legend_x),
'y': str(legend_y + y_offset - 8),
'width': '15',
'height': '15',
'fill': style['color'],
'stroke': '#333'
})
# Label
label = SubElement(svg, 'text', {
'x': str(legend_x + 20),
'y': str(legend_y + y_offset + 4),
'class': 'legend-text'
})
label.text = f"{style['icon']} {device_type.replace('_', ' ').title()}"
y_offset += 25
def _save_svg(self, svg: Element, output_file: str):
"""Save SVG to file with pretty formatting"""
rough_string = tostring(svg, encoding='unicode')
reparsed = minidom.parseString(rough_string)
pretty_svg = reparsed.toprettyxml(indent=' ')
# Remove extra blank lines
pretty_svg = '\n'.join([line for line in pretty_svg.split('\n') if line.strip()])
with open(output_file, 'w', encoding='utf-8') as f:
f.write(pretty_svg)
def main():
"""Command line interface"""
import argparse
parser = argparse.ArgumentParser(description='Generate SVG network diagram from scan data')
parser.add_argument('input', help='Input JSON file from network scan')
parser.add_argument('-o', '--output', default='network_diagram.svg',
help='Output SVG file (default: network_diagram.svg)')
args = parser.parse_args()
# Load scan data
with open(args.input, 'r') as f:
scan_data = json.load(f)
# Generate diagram
generator = NetworkDiagramGenerator(scan_data)
generator.generate_svg(args.output)
print(f"✓ Network diagram generated: {args.output}")
if __name__ == '__main__':
main()

View File

@@ -58,7 +58,6 @@ def test_scripts_exist():
scripts = [
'src/network_scanner.py',
'src/pfsense_scanner.py',
'src/svg_generator.py',
'src/integrated_scanner.py',
'scripts/quickstart.sh'
]
@@ -79,7 +78,6 @@ def test_script_syntax():
scripts = [
'src/network_scanner.py',
'src/pfsense_scanner.py',
'src/svg_generator.py',
'src/integrated_scanner.py'
]
@@ -230,7 +228,6 @@ def main():
print("Next steps:")
print("1. Edit config.json with your network details")
print("2. Run: ./quickstart.sh")
print(" or: ./integrated_scanner.py --generate-svg")
else:
print("⚠️ Some tests failed. Please check the errors above.")
print()

12
test_config.json Normal file
View File

@@ -0,0 +1,12 @@
{
"ssh_user": "root",
"ssh_key_path": null,
"timeout": 2,
"additional_networks": [],
"special_devices": {},
"scan_options": {
"max_workers": 10,
"ping_timeout": 2,
"port_scan_timeout": 1
}
}

12
tests/test_config.json Normal file
View File

@@ -0,0 +1,12 @@
{
"ssh_user": "root",
"ssh_key_path": null,
"timeout": 2,
"additional_networks": [],
"special_devices": {},
"scan_options": {
"max_workers": 10,
"ping_timeout": 2,
"port_scan_timeout": 1
}
}

1429
tests/test_output.json Normal file

File diff suppressed because it is too large Load Diff