Compare commits

...

2 Commits

Author SHA1 Message Date
mindesbunister
0827b9a69d Remove SVG diagram functionality
- Delete svg_generator.py and comprehensive_mapper.py
- Remove --generate-svg option from integrated_scanner.py
- Update complete_workflow.sh to remove SVG generation step
- Clean up documentation and examples
- Update test_system.py to remove SVG references
- Add missing files to repository (EXAMPLES.sh, quickstart.sh, etc.)
2025-10-10 17:08:31 +02:00
mindesbunister
da5f1f2d0c Reorganize project structure: move code to src/, docs to docs/, config to config/, scripts to scripts/, results to results/, tests to tests/. Keep only main script and latest scan results in root. 2025-10-10 15:39:59 +02:00
34 changed files with 3025 additions and 831 deletions

6
.gitignore vendored
View File

@@ -1,9 +1,11 @@
# Network Scanner - Git Ignore
# Scan results and output
*.json
# Ignore archived results in results/ folder
results/
# Keep config.json.example
!config.json.example
*.svg
# Python
__pycache__/

13
EXAMPLES.sh Executable file → Normal file
View File

@@ -16,12 +16,9 @@ cat << 'EOF'
# SCENARIO 2: Complete Network Documentation
# -------------------------------------------
# Full scan with pfSense integration and SVG generation
# Full scan with pfSense integration
./integrated_scanner.py -c config.json -o full_network.json --generate-svg -v
# View the diagram:
firefox full_network.svg
./integrated_scanner.py -c config.json -o full_network.json -v
# SCENARIO 3: pfSense Deep Dive
@@ -64,7 +61,7 @@ cat > my_network_config.json << 'CONFIG'
}
CONFIG
./integrated_scanner.py -c my_network_config.json -o multi_network.json --generate-svg
./integrated_scanner.py -c my_network_config.json -o multi_network.json
# SCENARIO 5: Scheduled Network Monitoring
@@ -80,12 +77,10 @@ mkdir -p "$OUTPUT_DIR"
cd /path/to/network_scanner
./integrated_scanner.py \
-o "$OUTPUT_DIR/scan_$DATE.json" \
--generate-svg
-o "$OUTPUT_DIR/scan_$DATE.json"
# Keep only last 30 days
find "$OUTPUT_DIR" -name "scan_*.json" -mtime +30 -delete
find "$OUTPUT_DIR" -name "scan_*.svg" -mtime +30 -delete
SCRIPT
chmod +x /usr/local/bin/network-scan-daily.sh

View File

@@ -32,9 +32,17 @@ log_error() {
echo -e "${RED}[ERROR]${NC} $1"
}
# Move old results to results folder
log_info "Moving old results to results folder..."
mkdir -p results
mv network_scan_*.json server_details_*.json network_summary_*.md *_failed_ssh.json results/ 2>/dev/null || true
if [ $? -eq 0 ] && [ "$(ls results/ 2>/dev/null | wc -l)" -gt 0 ]; then
log_info "Moved old result files to results/ folder"
fi
# Check if we're in the right directory
if [ ! -f "integrated_scanner.py" ]; then
log_error "integrated_scanner.py not found. Please run this script from the network scanner directory."
if [ ! -f "src/integrated_scanner.py" ]; then
log_error "src/integrated_scanner.py not found. Please run this script from the network scanner directory."
exit 1
fi
@@ -48,7 +56,7 @@ fi
# Step 1: Run system verification
log_info "Step 1: Verifying system requirements..."
if ./test_system.py >/dev/null 2>&1; then
if python3 src/test_system.py >/dev/null 2>&1; then
log_success "System verification passed"
else
log_error "System verification failed. Please check the output above."
@@ -58,35 +66,41 @@ fi
# Step 2: Run integrated network scan
log_info "Step 2: Running integrated network scan..."
SCAN_OUTPUT="network_scan_$(date +%Y%m%d_%H%M%S).json"
if ./integrated_scanner.py -o "$SCAN_OUTPUT" -v; then
if python3 src/integrated_scanner.py -o "$SCAN_OUTPUT" -v; then
log_success "Network scan completed: $SCAN_OUTPUT"
# Check for failed SSH hosts file
FAILED_SSH_OUTPUT="${SCAN_OUTPUT%.json}_failed_ssh.json"
if [ -f "$FAILED_SSH_OUTPUT" ]; then
FAILED_COUNT=$(jq '.total_failed' "$FAILED_SSH_OUTPUT" 2>/dev/null || echo "unknown")
log_warning "Found $FAILED_COUNT hosts with SSH port open but failed authentication: $FAILED_SSH_OUTPUT"
fi
else
log_error "Network scan failed"
exit 1
fi
# Step 3: Generate SVG diagram
log_info "Step 3: Generating network diagram..."
SVG_OUTPUT="${SCAN_OUTPUT%.json}.svg"
if ./svg_generator.py "$SCAN_OUTPUT" -o "$SVG_OUTPUT"; then
log_success "SVG diagram generated: $SVG_OUTPUT"
# Step 3: Collect server information from hypervisors
log_info "Step 3: Collecting server information from hypervisors..."
SERVER_OUTPUT="server_details_$(date +%Y%m%d_%H%M%S).json"
if python3 src/server_info_collector.py -o "$SERVER_OUTPUT"; then
log_success "Server information collected: $SERVER_OUTPUT"
else
log_error "SVG generation failed"
exit 1
log_warning "Server information collection failed"
fi
# Step 4: Generate pfSense summary if XML files exist
# Step 5: Generate pfSense summary if XML files exist
if [ "$XML_FILES" -gt 0 ]; then
log_info "Step 4: Generating pfSense network summary..."
log_info "Step 5: Generating pfSense network summary..."
SUMMARY_OUTPUT="network_summary_$(date +%Y%m%d_%H%M%S).md"
if ./pfsense_integrator.py *.xml --summary "$SUMMARY_OUTPUT"; then
if python3 src/pfsense_integrator.py *.xml --summary "$SUMMARY_OUTPUT"; then
log_success "Network summary generated: $SUMMARY_OUTPUT"
else
log_warning "Network summary generation failed"
fi
fi
# Step 5: Show results summary
# Step 6: Show results summary
echo ""
echo "=========================================="
log_success "Network Discovery Complete!"
@@ -94,7 +108,12 @@ echo "=========================================="
echo ""
echo "Generated files:"
echo " 📊 Network Scan: $SCAN_OUTPUT"
echo " 🎨 Network Diagram: $SVG_OUTPUT"
if [ -f "$SERVER_OUTPUT" ]; then
echo " 🖥️ Server Details: $SERVER_OUTPUT"
fi
if [ -f "$FAILED_SSH_OUTPUT" ]; then
echo " 🔐 Failed SSH Hosts: $FAILED_SSH_OUTPUT"
fi
if [ "$XML_FILES" -gt 0 ]; then
echo " 📋 Network Summary: $SUMMARY_OUTPUT"
fi
@@ -114,11 +133,21 @@ if command -v jq >/dev/null 2>&1; then
fi
echo "Next steps:"
echo " 1. Open $SVG_OUTPUT in your web browser to view the network diagram"
if [ "$XML_FILES" -gt 0 ]; then
echo " 2. Review $SUMMARY_OUTPUT for detailed pfSense configuration"
if [ -f "$SERVER_OUTPUT" ]; then
echo " 1. Review $SERVER_OUTPUT for detailed server and VM information"
STEP_NUM=2
else
STEP_NUM=1
fi
echo " 3. Examine $SCAN_OUTPUT for complete network data (use jq for querying)"
if [ -f "$FAILED_SSH_OUTPUT" ]; then
echo " $STEP_NUM. Review $FAILED_SSH_OUTPUT for hosts needing SSH credential fixes"
STEP_NUM=$((STEP_NUM + 1))
fi
if [ "$XML_FILES" -gt 0 ]; then
echo " $STEP_NUM. Review $SUMMARY_OUTPUT for detailed pfSense configuration"
STEP_NUM=$((STEP_NUM + 1))
fi
echo " $STEP_NUM. Examine $SCAN_OUTPUT for complete network data (use jq for querying)"
echo ""
log_success "Workflow completed successfully! 🎉"

View File

@@ -1,355 +0,0 @@
#!/usr/bin/env python3
"""
Comprehensive Network Diagram Generator
Combines network scanning with pfSense XML parsing for complete network topology
"""
import json
import argparse
import logging
from pathlib import Path
from typing import Dict, List, Any
from network_scanner import NetworkScanner
from pfsense_xml_parser import PfSenseXMLParser
from svg_generator import NetworkDiagramGenerator
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class ComprehensiveNetworkMapper:
"""Complete network mapping solution combining multiple data sources"""
def __init__(self, config: Dict):
self.config = config
self.network_data = {}
self.pfsense_data = {}
self.combined_data = {
'scan_timestamp': None,
'data_sources': [],
'segments': [],
'pfsense_firewalls': [],
'wireguard_networks': [],
'openvpn_networks': [],
'routing_table': [],
'dhcp_leases': [],
'static_mappings': []
}
def load_network_scan(self, scan_file: str):
"""Load network scan data"""
logger.info(f"Loading network scan data from {scan_file}")
try:
with open(scan_file, 'r') as f:
self.network_data = json.load(f)
self.combined_data['data_sources'].append('network_scan')
logger.info("Network scan data loaded successfully")
except Exception as e:
logger.error(f"Error loading network scan: {e}")
def load_pfsense_configs(self, xml_files: List[str]):
"""Load and parse pfSense XML configuration files"""
for xml_file in xml_files:
if Path(xml_file).exists():
logger.info(f"Parsing pfSense config: {xml_file}")
parser = PfSenseXMLParser(xml_file)
data = parser.parse_all()
if data:
hostname = data.get('hostname', f'pfsense_{len(self.pfsense_data)}')
self.pfsense_data[hostname] = data
self.combined_data['data_sources'].append(f'pfsense_{hostname}')
self.combined_data['pfsense_firewalls'].append(data)
logger.info(f"pfSense config {hostname} loaded successfully")
else:
logger.error(f"Failed to parse pfSense config: {xml_file}")
else:
logger.warning(f"pfSense config file not found: {xml_file}")
def merge_network_data(self):
"""Merge all data sources into comprehensive network map"""
logger.info("Merging network data sources...")
# Start with network scan data if available
if self.network_data:
self.combined_data.update({
'scan_timestamp': self.network_data.get('scan_timestamp'),
'segments': self.network_data.get('segments', [])
})
# Process pfSense data
self._process_pfsense_data()
# Extract additional network information
self._extract_network_topology()
logger.info("Network data merging complete")
def _process_pfsense_data(self):
"""Process and integrate pfSense configuration data"""
for hostname, pfsense in self.pfsense_data.items():
logger.info(f"Processing pfSense data for {hostname}")
# Add interfaces as network segments
interfaces = pfsense.get('interfaces', {})
for iface_name, iface_data in interfaces.items():
if iface_data.get('ipaddr') and iface_data.get('ipaddr') != 'dhcp':
try:
# Create network segment from interface
ip = iface_data['ipaddr']
subnet = iface_data.get('subnet', '24')
network_cidr = f"{ip.rsplit('.', 1)[0]}.0/{subnet}"
segment = {
'name': f"{hostname}_{iface_name}",
'cidr': network_cidr,
'gateway': iface_data.get('gateway', ip),
'interface': iface_data.get('interface'),
'pfsense_host': hostname,
'is_vpn': iface_data.get('type') in ['wireguard', 'openvpn'],
'devices': []
}
# Add gateway device
gateway_device = {
'ip': ip,
'hostname': hostname,
'device_type': 'firewall',
'os_type': 'pfSense',
'interface': iface_name,
'pfsense_config': True
}
segment['devices'].append(gateway_device)
self.combined_data['segments'].append(segment)
# Track VPN networks
if iface_data.get('type') == 'wireguard':
self.combined_data['wireguard_networks'].append({
'network': network_cidr,
'interface': iface_data.get('interface'),
'pfsense': hostname,
'peers': pfsense.get('wireguard', {}).get('peers', [])
})
elif iface_data.get('type') == 'openvpn':
self.combined_data['openvpn_networks'].append({
'network': network_cidr,
'interface': iface_data.get('interface'),
'pfsense': hostname
})
except Exception as e:
logger.debug(f"Error processing interface {iface_name}: {e}")
# Add static routes
static_routes = pfsense.get('static_routes', [])
for route in static_routes:
route_info = {
'network': route.get('network'),
'gateway': route.get('gateway'),
'description': route.get('description'),
'pfsense': hostname
}
self.combined_data['routing_table'].append(route_info)
# Add DHCP information
dhcp_config = pfsense.get('dhcp', {})
for iface_name, dhcp_data in dhcp_config.items():
if dhcp_data.get('static_mappings'):
for mapping in dhcp_data['static_mappings']:
mapping_info = {
'ip': mapping.get('ipaddr'),
'mac': mapping.get('mac'),
'hostname': mapping.get('hostname'),
'description': mapping.get('description'),
'interface': iface_name,
'pfsense': hostname
}
self.combined_data['static_mappings'].append(mapping_info)
def _extract_network_topology(self):
"""Extract network topology information from all sources"""
logger.info("Extracting network topology...")
# Process WireGuard peer information
for wg_net in self.combined_data['wireguard_networks']:
for peer in wg_net.get('peers', []):
for allowed_ip in peer.get('allowed_ips', []):
# Create segment for remote networks
if allowed_ip.get('mask') and allowed_ip['mask'] != '32':
remote_segment = {
'name': f"WG_Remote_{allowed_ip['address']}_{allowed_ip['mask']}",
'cidr': f"{allowed_ip['address']}/{allowed_ip['mask']}",
'gateway': wg_net.get('network', '').split('/')[0].rsplit('.', 1)[0] + '.1',
'is_vpn': True,
'vpn_type': 'wireguard',
'pfsense_host': wg_net.get('pfsense'),
'devices': []
}
self.combined_data['segments'].append(remote_segment)
def generate_svg_diagram(self, output_file: str):
"""Generate SVG network diagram"""
logger.info(f"Generating SVG diagram: {output_file}")
# Use the existing SVG generator
generator = NetworkDiagramGenerator(self.combined_data)
generator.generate_svg(output_file)
logger.info(f"SVG diagram generated: {output_file}")
def export_comprehensive_json(self, output_file: str):
"""Export comprehensive network data"""
logger.info(f"Exporting comprehensive data to {output_file}")
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(self.combined_data, f, indent=2, ensure_ascii=False)
logger.info(f"Comprehensive network data exported to {output_file}")
def print_summary(self):
"""Print comprehensive network summary"""
print("\n" + "="*80)
print("COMPREHENSIVE NETWORK MAPPING SUMMARY")
print("="*80)
print(f"\nData Sources: {', '.join(self.combined_data['data_sources'])}")
print(f"Network Segments: {len(self.combined_data['segments'])}")
print(f"pfSense Firewalls: {len(self.combined_data['pfsense_firewalls'])}")
print(f"WireGuard Networks: {len(self.combined_data['wireguard_networks'])}")
print(f"OpenVPN Networks: {len(self.combined_data['openvpn_networks'])}")
print(f"Static Routes: {len(self.combined_data['routing_table'])}")
print(f"DHCP Static Mappings: {len(self.combined_data['static_mappings'])}")
# Network segments summary
print(f"\n{'='*80}")
print("NETWORK SEGMENTS")
print(f"{'='*80}")
for segment in self.combined_data['segments']:
vpn_indicator = " (VPN)" if segment.get('is_vpn') else ""
pfsense_indicator = f" [{segment.get('pfsense_host')}]" if segment.get('pfsense_host') else ""
print(f"\n📡 {segment['name']}{vpn_indicator}{pfsense_indicator}")
print(f" CIDR: {segment['cidr']}")
print(f" Gateway: {segment.get('gateway', 'N/A')}")
print(f" Devices: {len(segment.get('devices', []))}")
# Show devices
for device in segment.get('devices', [])[:5]: # Show first 5
print(f"{device.get('ip', 'N/A')} - {device.get('hostname', 'N/A')} ({device.get('device_type', 'N/A')})")
if len(segment.get('devices', [])) > 5:
print(f" ... and {len(segment.get('devices', [])) - 5} more devices")
# pfSense summary
if self.combined_data['pfsense_firewalls']:
print(f"\n{'='*80}")
print("PFSENSE FIREWALLS")
print(f"{'='*80}")
for pfsense in self.combined_data['pfsense_firewalls']:
hostname = pfsense.get('hostname', 'Unknown')
print(f"\n🛡️ {hostname}")
print(f" Interfaces: {len(pfsense.get('interfaces', {}))}")
print(f" Static Routes: {len(pfsense.get('static_routes', []))}")
wg_config = pfsense.get('wireguard', {})
if wg_config.get('enabled'):
print(f" WireGuard: {len(wg_config.get('tunnels', []))} tunnels, {len(wg_config.get('peers', []))} peers")
dhcp_config = pfsense.get('dhcp', {})
total_mappings = sum(len(dhcp.get('static_mappings', [])) for dhcp in dhcp_config.values())
print(f" DHCP Static Mappings: {total_mappings}")
# VPN Networks
if self.combined_data['wireguard_networks'] or self.combined_data['openvpn_networks']:
print(f"\n{'='*80}")
print("VPN NETWORKS")
print(f"{'='*80}")
for wg_net in self.combined_data['wireguard_networks']:
print(f"\n🔐 WireGuard: {wg_net['network']} via {wg_net['pfsense']}")
print(f" Interface: {wg_net['interface']}")
print(f" Peers: {len(wg_net.get('peers', []))}")
for ovpn_net in self.combined_data['openvpn_networks']:
print(f"\n🔐 OpenVPN: {ovpn_net['network']} via {ovpn_net['pfsense']}")
print(f" Interface: {ovpn_net['interface']}")
print(f"\n{'='*80}")
print("NETWORK TOPOLOGY ANALYSIS COMPLETE")
print(f"{'='*80}")
def main():
"""Command line interface"""
parser = argparse.ArgumentParser(
description='Comprehensive Network Diagram Generator with pfSense Integration'
)
parser.add_argument('-c', '--config', default='config.json',
help='Network scanner configuration file')
parser.add_argument('-s', '--scan-data', help='Existing network scan JSON file')
parser.add_argument('-p', '--pfsense-xml', nargs='+',
help='pfSense XML configuration files')
parser.add_argument('-o', '--output', default='comprehensive_network.json',
help='Output comprehensive JSON file')
parser.add_argument('--svg', help='Generate SVG diagram')
parser.add_argument('-v', '--verbose', action='store_true',
help='Verbose output')
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
# Load configuration
config = {}
if Path(args.config).exists():
try:
with open(args.config, 'r') as f:
config = json.load(f)
except Exception as e:
logger.warning(f"Could not load config {args.config}: {e}")
# Initialize comprehensive mapper
mapper = ComprehensiveNetworkMapper(config)
# Load network scan data if provided
if args.scan_data and Path(args.scan_data).exists():
mapper.load_network_scan(args.scan_data)
else:
logger.info("No network scan data provided - will use pfSense data only")
# Load pfSense configurations
pfsense_files = args.pfsense_xml or []
# Auto-detect pfSense XML files if none specified
if not pfsense_files:
xml_files = list(Path('.').glob('config-*.xml'))
if xml_files:
pfsense_files = [str(f) for f in xml_files]
logger.info(f"Auto-detected pfSense XML files: {pfsense_files}")
if pfsense_files:
mapper.load_pfsense_configs(pfsense_files)
else:
logger.warning("No pfSense XML files found")
# Merge all data
mapper.merge_network_data()
# Print summary
mapper.print_summary()
# Export comprehensive data
mapper.export_comprehensive_json(args.output)
print(f"\n✅ Comprehensive network data saved to: {args.output}")
# Generate SVG if requested
if args.svg:
svg_file = args.svg
mapper.generate_svg_diagram(svg_file)
print(f"✅ SVG network diagram generated: {svg_file}")
if __name__ == '__main__':
main()

View File

@@ -17,5 +17,18 @@
"max_workers": 10,
"ping_timeout": 2,
"port_scan_timeout": 1
}
},
"hypervisors": [
{
"host": "srvhost04.egonetix.de",
"port": 2222,
"user": "root"
},
{
"host": "srv-wmw-host01",
"port": 22,
"user": "root"
}
],
"probe_ssh_on_discovered": true
}

28
quickstart.sh Executable file → Normal file
View File

@@ -77,10 +77,9 @@ echo "What would you like to do?"
echo ""
echo "1) Run a quick scan (current network only)"
echo "2) Run a full scan with pfSense integration"
echo "3) Scan and generate SVG diagram"
echo "4) Scan specific pfSense device"
echo "5) Show help"
echo "6) Exit"
echo "3) Scan specific pfSense device"
echo "4) Show help"
echo "5) Exit"
echo ""
read -p "Choose an option (1-6): " choice
@@ -91,7 +90,6 @@ case $choice in
./network_scanner.py -o quick_scan.json -v
echo ""
echo "✓ Done! Results saved to: quick_scan.json"
echo " Generate diagram with: ./svg_generator.py quick_scan.json"
;;
2)
echo ""
@@ -101,13 +99,6 @@ case $choice in
echo "✓ Done! Results saved to: full_scan.json"
;;
3)
echo ""
echo "🔍 Running scan and generating diagram..."
./integrated_scanner.py -o scan_with_diagram.json -v --generate-svg
echo ""
echo "✓ Done! Open scan_with_diagram.svg to view the network diagram"
;;
4)
echo ""
read -p "Enter pfSense IP address: " pfsense_ip
echo "🔍 Scanning pfSense at $pfsense_ip..."
@@ -115,7 +106,7 @@ case $choice in
echo ""
echo "✓ Done! Results saved to: pfsense_${pfsense_ip}.json"
;;
5)
4)
echo ""
cat << 'HELP'
Network Scanner - Help
@@ -134,11 +125,7 @@ Available Scripts:
3. integrated_scanner.py
Complete scanner with pfSense integration
Usage: ./integrated_scanner.py [-c config.json] [-o output.json] [-v] [--generate-svg]
4. svg_generator.py
Generate SVG diagram from scan results
Usage: ./svg_generator.py <input.json> [-o output.svg]
Usage: ./integrated_scanner.py [-c config.json] [-o output.json] [-v]
Configuration:
-------------
@@ -159,13 +146,10 @@ Examples:
# Scan pfSense
./pfsense_scanner.py 192.168.1.1 -u root -k ~/.ssh/id_rsa
# Generate diagram from existing scan
./svg_generator.py network_scan.json -o my_network.svg
For more information, see README.md
HELP
;;
6)
5)
echo "Goodbye!"
exit 0
;;

View File

@@ -0,0 +1,53 @@
# Network Topology Summary
Generated from pfSense XML configurations
## pfSense Firewall: gw-nue01
**Version:** unknown
**Domain:** egonetix.lan
### Network Interfaces
- **WAN** (wan): dhcp
- **LAN** (lan): 10.0.0.1
- **wireguardnachhause** (opt1): 10.69.69.1
- Gateway: WirusguardusGW
### Static Routes
- 172.20.0.0/16 via WirusguardusGW
*heyme*
### WireGuard VPN
- **Tunnel tun_wg0** (Port 51820)
*heyme*
- Peer: wireguardheyme - Networks: 172.20.0.0/16, 10.69.69.2/32
### DHCP Configuration
## pfSense Firewall: gw-st01
**Version:** unknown
**Domain:** egonetix.lan
### Network Interfaces
- **WAN** (wan): 192.168.178.3
- Gateway: WANGW
- **LAN** (lan): 172.20.20.1
- **wireguardnnbesch** (opt1): 10.69.69.2
- Gateway: wirenuenbesch
- **HomeAssistant** (opt2): 172.20.70.1
- **WireguardOpenvpn** (opt3): 10.5.0.2
- Gateway: WireguardOpenvpnGW
### Static Routes
- 10.0.0.0/24 via wirenuenbesch
*wireguardn&uuml;nbesch*
- 12.1.0.0/24 via wirenuenbesch
*openvpn nutzer*
### WireGuard VPN
- **Tunnel tun_wg0** (Port 51820)
*de1099.nordvpn.com*
- **Tunnel tun_wg1** (Port 51821)
*wireguardn&uuml;nbesch*
- Peer: de1099.nordvpn.com - Networks: 0.0.0.0/0
- Peer: wireguardn&uuml;nbesch - Networks: 10.0.0.0/24, 10.69.69.1/32, 12.1.0.0/24
### DHCP Configuration

View File

@@ -0,0 +1,53 @@
# Network Topology Summary
Generated from pfSense XML configurations
## pfSense Firewall: gw-nue01
**Version:** unknown
**Domain:** egonetix.lan
### Network Interfaces
- **WAN** (wan): dhcp
- **LAN** (lan): 10.0.0.1
- **wireguardnachhause** (opt1): 10.69.69.1
- Gateway: WirusguardusGW
### Static Routes
- 172.20.0.0/16 via WirusguardusGW
*heyme*
### WireGuard VPN
- **Tunnel tun_wg0** (Port 51820)
*heyme*
- Peer: wireguardheyme - Networks: 172.20.0.0/16, 10.69.69.2/32
### DHCP Configuration
## pfSense Firewall: gw-st01
**Version:** unknown
**Domain:** egonetix.lan
### Network Interfaces
- **WAN** (wan): 192.168.178.3
- Gateway: WANGW
- **LAN** (lan): 172.20.20.1
- **wireguardnnbesch** (opt1): 10.69.69.2
- Gateway: wirenuenbesch
- **HomeAssistant** (opt2): 172.20.70.1
- **WireguardOpenvpn** (opt3): 10.5.0.2
- Gateway: WireguardOpenvpnGW
### Static Routes
- 10.0.0.0/24 via wirenuenbesch
*wireguardn&uuml;nbesch*
- 12.1.0.0/24 via wirenuenbesch
*openvpn nutzer*
### WireGuard VPN
- **Tunnel tun_wg0** (Port 51820)
*de1099.nordvpn.com*
- **Tunnel tun_wg1** (Port 51821)
*wireguardn&uuml;nbesch*
- Peer: de1099.nordvpn.com - Networks: 0.0.0.0/0
- Peer: wireguardn&uuml;nbesch - Networks: 10.0.0.0/24, 10.69.69.1/32, 12.1.0.0/24
### DHCP Configuration

View File

@@ -0,0 +1,53 @@
# Network Topology Summary
Generated from pfSense XML configurations
## pfSense Firewall: gw-nue01
**Version:** unknown
**Domain:** egonetix.lan
### Network Interfaces
- **WAN** (wan): dhcp
- **LAN** (lan): 10.0.0.1
- **wireguardnachhause** (opt1): 10.69.69.1
- Gateway: WirusguardusGW
### Static Routes
- 172.20.0.0/16 via WirusguardusGW
*heyme*
### WireGuard VPN
- **Tunnel tun_wg0** (Port 51820)
*heyme*
- Peer: wireguardheyme - Networks: 172.20.0.0/16, 10.69.69.2/32
### DHCP Configuration
## pfSense Firewall: gw-st01
**Version:** unknown
**Domain:** egonetix.lan
### Network Interfaces
- **WAN** (wan): 192.168.178.3
- Gateway: WANGW
- **LAN** (lan): 172.20.20.1
- **wireguardnnbesch** (opt1): 10.69.69.2
- Gateway: wirenuenbesch
- **HomeAssistant** (opt2): 172.20.70.1
- **WireguardOpenvpn** (opt3): 10.5.0.2
- Gateway: WireguardOpenvpnGW
### Static Routes
- 10.0.0.0/24 via wirenuenbesch
*wireguardn&uuml;nbesch*
- 12.1.0.0/24 via wirenuenbesch
*openvpn nutzer*
### WireGuard VPN
- **Tunnel tun_wg0** (Port 51820)
*de1099.nordvpn.com*
- **Tunnel tun_wg1** (Port 51821)
*wireguardn&uuml;nbesch*
- Peer: de1099.nordvpn.com - Networks: 0.0.0.0/0
- Peer: wireguardn&uuml;nbesch - Networks: 10.0.0.0/24, 10.69.69.1/32, 12.1.0.0/24
### DHCP Configuration

View File

@@ -0,0 +1,53 @@
# Network Topology Summary
Generated from pfSense XML configurations
## pfSense Firewall: gw-nue01
**Version:** unknown
**Domain:** egonetix.lan
### Network Interfaces
- **WAN** (wan): dhcp
- **LAN** (lan): 10.0.0.1
- **wireguardnachhause** (opt1): 10.69.69.1
- Gateway: WirusguardusGW
### Static Routes
- 172.20.0.0/16 via WirusguardusGW
*heyme*
### WireGuard VPN
- **Tunnel tun_wg0** (Port 51820)
*heyme*
- Peer: wireguardheyme - Networks: 172.20.0.0/16, 10.69.69.2/32
### DHCP Configuration
## pfSense Firewall: gw-st01
**Version:** unknown
**Domain:** egonetix.lan
### Network Interfaces
- **WAN** (wan): 192.168.178.3
- Gateway: WANGW
- **LAN** (lan): 172.20.20.1
- **wireguardnnbesch** (opt1): 10.69.69.2
- Gateway: wirenuenbesch
- **HomeAssistant** (opt2): 172.20.70.1
- **WireguardOpenvpn** (opt3): 10.5.0.2
- Gateway: WireguardOpenvpnGW
### Static Routes
- 10.0.0.0/24 via wirenuenbesch
*wireguardn&uuml;nbesch*
- 12.1.0.0/24 via wirenuenbesch
*openvpn nutzer*
### WireGuard VPN
- **Tunnel tun_wg0** (Port 51820)
*de1099.nordvpn.com*
- **Tunnel tun_wg1** (Port 51821)
*wireguardn&uuml;nbesch*
- Peer: de1099.nordvpn.com - Networks: 0.0.0.0/0
- Peer: wireguardn&uuml;nbesch - Networks: 10.0.0.0/24, 10.69.69.1/32, 12.1.0.0/24
### DHCP Configuration

View File

@@ -0,0 +1,53 @@
# Network Topology Summary
Generated from pfSense XML configurations
## pfSense Firewall: gw-nue01
**Version:** unknown
**Domain:** egonetix.lan
### Network Interfaces
- **WAN** (wan): dhcp
- **LAN** (lan): 10.0.0.1
- **wireguardnachhause** (opt1): 10.69.69.1
- Gateway: WirusguardusGW
### Static Routes
- 172.20.0.0/16 via WirusguardusGW
*heyme*
### WireGuard VPN
- **Tunnel tun_wg0** (Port 51820)
*heyme*
- Peer: wireguardheyme - Networks: 172.20.0.0/16, 10.69.69.2/32
### DHCP Configuration
## pfSense Firewall: gw-st01
**Version:** unknown
**Domain:** egonetix.lan
### Network Interfaces
- **WAN** (wan): 192.168.178.3
- Gateway: WANGW
- **LAN** (lan): 172.20.20.1
- **wireguardnnbesch** (opt1): 10.69.69.2
- Gateway: wirenuenbesch
- **HomeAssistant** (opt2): 172.20.70.1
- **WireguardOpenvpn** (opt3): 10.5.0.2
- Gateway: WireguardOpenvpnGW
### Static Routes
- 10.0.0.0/24 via wirenuenbesch
*wireguardn&uuml;nbesch*
- 12.1.0.0/24 via wirenuenbesch
*openvpn nutzer*
### WireGuard VPN
- **Tunnel tun_wg0** (Port 51820)
*de1099.nordvpn.com*
- **Tunnel tun_wg1** (Port 51821)
*wireguardn&uuml;nbesch*
- Peer: de1099.nordvpn.com - Networks: 0.0.0.0/0
- Peer: wireguardn&uuml;nbesch - Networks: 10.0.0.0/24, 10.69.69.1/32, 12.1.0.0/24
### DHCP Configuration

View File

@@ -0,0 +1,53 @@
# Network Topology Summary
Generated from pfSense XML configurations
## pfSense Firewall: gw-nue01
**Version:** unknown
**Domain:** egonetix.lan
### Network Interfaces
- **WAN** (wan): dhcp
- **LAN** (lan): 10.0.0.1
- **wireguardnachhause** (opt1): 10.69.69.1
- Gateway: WirusguardusGW
### Static Routes
- 172.20.0.0/16 via WirusguardusGW
*heyme*
### WireGuard VPN
- **Tunnel tun_wg0** (Port 51820)
*heyme*
- Peer: wireguardheyme - Networks: 172.20.0.0/16, 10.69.69.2/32
### DHCP Configuration
## pfSense Firewall: gw-st01
**Version:** unknown
**Domain:** egonetix.lan
### Network Interfaces
- **WAN** (wan): 192.168.178.3
- Gateway: WANGW
- **LAN** (lan): 172.20.20.1
- **wireguardnnbesch** (opt1): 10.69.69.2
- Gateway: wirenuenbesch
- **HomeAssistant** (opt2): 172.20.70.1
- **WireguardOpenvpn** (opt3): 10.5.0.2
- Gateway: WireguardOpenvpnGW
### Static Routes
- 10.0.0.0/24 via wirenuenbesch
*wireguardn&uuml;nbesch*
- 12.1.0.0/24 via wirenuenbesch
*openvpn nutzer*
### WireGuard VPN
- **Tunnel tun_wg0** (Port 51820)
*de1099.nordvpn.com*
- **Tunnel tun_wg1** (Port 51821)
*wireguardn&uuml;nbesch*
- Peer: de1099.nordvpn.com - Networks: 0.0.0.0/0
- Peer: wireguardn&uuml;nbesch - Networks: 10.0.0.0/24, 10.69.69.1/32, 12.1.0.0/24
### DHCP Configuration

265
scripts/EXAMPLES.sh Executable file
View File

@@ -0,0 +1,265 @@
#!/bin/bash
# Example usage scenarios for the network scanner
echo "=========================================="
echo "Network Scanner - Usage Examples"
echo "=========================================="
echo ""
cat << 'EOF'
# SCENARIO 1: Quick Network Overview
# -----------------------------------
# Scan your local network and get a basic overview
./network_scanner.py -v -o quick_scan.json
# SCENARIO 2: Complete Network Documentation
# -------------------------------------------
# Full scan with pfSense integration and SVG generation
./integrated_scanner.py -c config.json -o full_network.json --generate-svg -v
# View the diagram:
firefox full_network.svg
# SCENARIO 3: pfSense Deep Dive
# ------------------------------
# Detailed scan of a specific pfSense firewall
./pfsense_scanner.py 192.168.1.1 -u root -k ~/.ssh/id_rsa -o pfsense_main.json
# View the results:
cat pfsense_main.json | jq '.vpn' # Show VPN info
cat pfsense_main.json | jq '.routes' # Show routing table
# SCENARIO 4: Multi-Network Scan with VPN
# ----------------------------------------
# Create a config for multiple networks
cat > my_network_config.json << 'CONFIG'
{
"ssh_user": "root",
"ssh_key_path": "/home/user/.ssh/id_rsa",
"timeout": 3,
"additional_networks": [
"192.168.1.0/24", # Main network
"192.168.2.0/24", # Guest network
"10.8.0.0/24", # OpenVPN network
"10.0.0.0/24" # WireGuard VPN
],
"special_devices": {
"192.168.1.1": {
"name": "Main pfSense Firewall",
"type": "firewall",
"os": "pfSense"
},
"192.168.2.1": {
"name": "Guest Network Router",
"type": "router"
}
}
}
CONFIG
./integrated_scanner.py -c my_network_config.json -o multi_network.json --generate-svg
# SCENARIO 5: Scheduled Network Monitoring
# -----------------------------------------
# Add to crontab for daily network documentation
# Create wrapper script
cat > /usr/local/bin/network-scan-daily.sh << 'SCRIPT'
#!/bin/bash
DATE=$(date +%Y%m%d)
OUTPUT_DIR="/var/log/network-scans"
mkdir -p "$OUTPUT_DIR"
cd /path/to/network_scanner
./integrated_scanner.py \
-o "$OUTPUT_DIR/scan_$DATE.json" \
--generate-svg
# Keep only last 30 days
find "$OUTPUT_DIR" -name "scan_*.json" -mtime +30 -delete
find "$OUTPUT_DIR" -name "scan_*.svg" -mtime +30 -delete
SCRIPT
chmod +x /usr/local/bin/network-scan-daily.sh
# Add to crontab (run at 2 AM daily):
# 0 2 * * * /usr/local/bin/network-scan-daily.sh
# SCENARIO 6: Compare Network Changes
# ------------------------------------
# Scan and compare with previous results
# Initial scan
./integrated_scanner.py -o baseline.json
# After changes
./integrated_scanner.py -o current.json
# Compare device counts
echo "Baseline devices:"
cat baseline.json | jq '[.segments[].devices[].ip] | length'
echo "Current devices:"
cat current.json | jq '[.segments[].devices[].ip] | length'
# Find new devices
comm -13 \
<(cat baseline.json | jq -r '.segments[].devices[].ip' | sort) \
<(cat current.json | jq -r '.segments[].devices[].ip' | sort) \
| sed 's/^/NEW: /'
# Find removed devices
comm -23 \
<(cat baseline.json | jq -r '.segments[].devices[].ip' | sort) \
<(cat current.json | jq -r '.segments[].devices[].ip' | sort) \
| sed 's/^/REMOVED: /'
# SCENARIO 7: Extract Specific Information
# -----------------------------------------
# Use jq to extract specific data from scan results
# List all SSH-accessible devices
cat network_scan.json | jq -r '.segments[].devices[] | select(.ssh_accessible==true) | .ip'
# List all routers/firewalls
cat network_scan.json | jq -r '.segments[].devices[] | select(.device_type=="router" or .device_type=="firewall") | "\(.ip) - \(.hostname // "unknown")"'
# List all devices with their OS
cat network_scan.json | jq -r '.segments[].devices[] | "\(.ip)\t\(.os_type // "unknown")\t\(.hostname // "unknown")"'
# Export to CSV
echo "IP,Hostname,Type,OS" > devices.csv
cat network_scan.json | jq -r '.segments[].devices[] | "\(.ip),\(.hostname // ""),\(.device_type // ""),\(.os_type // "")"' >> devices.csv
# SCENARIO 8: Integration with Documentation
# -------------------------------------------
# Generate markdown documentation from scan
cat > generate_docs.py << 'PYTHON'
#!/usr/bin/env python3
import json
import sys
with open(sys.argv[1]) as f:
data = json.load(f)
print("# Network Documentation")
print(f"\nGenerated: {data.get('scan_timestamp', 'N/A')}")
print("\n## Network Segments\n")
for segment in data['segments']:
print(f"### {segment['name']}")
print(f"- CIDR: `{segment['cidr']}`")
print(f"- Devices: {len(segment['devices'])}")
if segment.get('is_vpn'):
print("- Type: VPN Network")
print("\n#### Devices\n")
print("| IP | Hostname | Type | OS |")
print("|---|---|---|---|")
for device in segment['devices']:
ip = device['ip']
hostname = device.get('hostname', '-')
dtype = device.get('device_type', '-')
os = device.get('os_type', '-')
print(f"| {ip} | {hostname} | {dtype} | {os} |")
print()
PYTHON
chmod +x generate_docs.py
./generate_docs.py network_scan.json > NETWORK_DOCS.md
# SCENARIO 9: Security Audit
# ---------------------------
# Check for common security issues
# Find devices with Telnet open
cat network_scan.json | jq -r '.segments[].devices[] | select(.open_ports[]? == 23) | "⚠️ Telnet open on \(.ip) (\(.hostname // "unknown"))"'
# Find devices without SSH access
cat network_scan.json | jq -r '.segments[].devices[] | select(.device_type=="router" or .device_type=="firewall") | select(.ssh_accessible==false) | "⚠️ No SSH access to \(.ip) (\(.hostname // "unknown"))"'
# List devices with many open ports
cat network_scan.json | jq -r '.segments[].devices[] | select((.open_ports | length) > 5) | " \(.ip) has \(.open_ports | length) open ports"'
# SCENARIO 10: WireGuard Topology Mapping
# ----------------------------------------
# Extract WireGuard tunnel information from pfSense
./pfsense_scanner.py 192.168.1.1 -o pfsense.json
# List all WireGuard peers
cat pfsense.json | jq -r '.vpn.wireguard[] | "Peer: \(.peer // "N/A") -> \(.allowed_ips // "N/A")"'
# Check tunnel status
cat pfsense.json | jq -r '.vpn.wireguard[] | select(.latest_handshake) | "Active tunnel to \(.endpoint) (handshake: \(.latest_handshake)s ago)"'
# SCENARIO 11: Network Capacity Planning
# ---------------------------------------
# Analyze network usage and plan capacity
# Count devices per segment
cat network_scan.json | jq -r '.segments[] | "\(.cidr): \(.devices | length) devices"'
# Calculate subnet utilization
cat network_scan.json | jq -r '.segments[] |
if .cidr | contains("/24") then
"\(.cidr): \(.devices | length)/254 = \((.devices | length) * 100 / 254 | floor)% utilized"
else
"\(.cidr): \(.devices | length) devices"
end'
# SCENARIO 12: Quick Health Check
# --------------------------------
# Create a health check script
cat > health_check.sh << 'HEALTH'
#!/bin/bash
SCAN_FILE="latest_scan.json"
echo "Network Health Check"
echo "===================="
echo ""
# Total devices
TOTAL=$(cat $SCAN_FILE | jq '[.segments[].devices[]] | length')
echo "Total devices: $TOTAL"
# SSH accessible
SSH_OK=$(cat $SCAN_FILE | jq '[.segments[].devices[] | select(.ssh_accessible==true)] | length')
echo "SSH accessible: $SSH_OK"
# By type
echo ""
echo "Device Types:"
cat $SCAN_FILE | jq -r '.segments[].devices[].device_type' | sort | uniq -c | sort -rn
# Segments
echo ""
echo "Network Segments:"
cat $SCAN_FILE | jq -r '.segments[] | " \(.name): \(.devices | length) devices"'
HEALTH
chmod +x health_check.sh
./integrated_scanner.py -o latest_scan.json
./health_check.sh
EOF
echo ""
echo "For more examples, see README.md"

181
scripts/quickstart.sh Executable file
View File

@@ -0,0 +1,181 @@
#!/bin/bash
# Quick Start Script for Network Scanner
# This script helps you get started quickly
set -e
echo "================================"
echo "Network Scanner - Quick Start"
echo "================================"
echo ""
# Check for Python
if ! command -v python3 &> /dev/null; then
echo "❌ Error: Python 3 is not installed"
exit 1
fi
echo "✓ Python 3 found"
# Create config if it doesn't exist
if [ ! -f config.json ]; then
echo ""
echo "📝 Creating configuration file..."
# Try to detect default SSH key
SSH_KEY=""
if [ -f ~/.ssh/id_rsa ]; then
SSH_KEY="$HOME/.ssh/id_rsa"
elif [ -f ~/.ssh/id_ed25519 ]; then
SSH_KEY="$HOME/.ssh/id_ed25519"
fi
# Get current user
CURRENT_USER=$(whoami)
# Try to detect local network
LOCAL_NET=$(ip route | grep -oP 'src \K[\d.]+' | head -1)
if [ -n "$LOCAL_NET" ]; then
# Convert to /24 network
NET_PREFIX=$(echo $LOCAL_NET | cut -d. -f1-3)
LOCAL_NET="${NET_PREFIX}.0/24"
else
LOCAL_NET="192.168.1.0/24"
fi
cat > config.json << EOF
{
"ssh_user": "$CURRENT_USER",
"ssh_key_path": "$SSH_KEY",
"timeout": 2,
"additional_networks": [
"$LOCAL_NET"
],
"special_devices": {
},
"scan_options": {
"max_workers": 10,
"ping_timeout": 2,
"port_scan_timeout": 1
}
}
EOF
echo "✓ Created config.json"
echo " Local network detected: $LOCAL_NET"
[ -n "$SSH_KEY" ] && echo " SSH key detected: $SSH_KEY"
echo ""
echo " Please edit config.json to customize for your network!"
echo ""
else
echo "✓ config.json already exists"
fi
# Ask what to do
echo ""
echo "What would you like to do?"
echo ""
echo "1) Run a quick scan (current network only)"
echo "2) Run a full scan with pfSense integration"
echo "3) Scan and generate SVG diagram"
echo "4) Scan specific pfSense device"
echo "5) Show help"
echo "6) Exit"
echo ""
read -p "Choose an option (1-6): " choice
case $choice in
1)
echo ""
echo "🔍 Running quick network scan..."
./network_scanner.py -o quick_scan.json -v
echo ""
echo "✓ Done! Results saved to: quick_scan.json"
echo " Generate diagram with: ./svg_generator.py quick_scan.json"
;;
2)
echo ""
echo "🔍 Running full integrated scan..."
./integrated_scanner.py -o full_scan.json -v
echo ""
echo "✓ Done! Results saved to: full_scan.json"
;;
3)
echo ""
echo "🔍 Running scan and generating diagram..."
./integrated_scanner.py -o scan_with_diagram.json -v --generate-svg
echo ""
echo "✓ Done! Open scan_with_diagram.svg to view the network diagram"
;;
4)
echo ""
read -p "Enter pfSense IP address: " pfsense_ip
echo "🔍 Scanning pfSense at $pfsense_ip..."
./pfsense_scanner.py "$pfsense_ip" -o "pfsense_${pfsense_ip}.json"
echo ""
echo "✓ Done! Results saved to: pfsense_${pfsense_ip}.json"
;;
5)
echo ""
cat << 'HELP'
Network Scanner - Help
======================
Available Scripts:
-----------------
1. network_scanner.py
Basic network scanner that discovers devices and gathers info
Usage: ./network_scanner.py [-c config.json] [-o output.json] [-v]
2. pfsense_scanner.py
Specialized scanner for pfSense firewalls
Usage: ./pfsense_scanner.py <ip> [-u user] [-k keyfile] [-o output.json]
3. integrated_scanner.py
Complete scanner with pfSense integration
Usage: ./integrated_scanner.py [-c config.json] [-o output.json] [-v] [--generate-svg]
4. svg_generator.py
Generate SVG diagram from scan results
Usage: ./svg_generator.py <input.json> [-o output.svg]
Configuration:
-------------
Edit config.json to customize:
- SSH credentials
- Network ranges to scan
- Special device definitions
- Scan timeouts
Examples:
--------
# Quick scan of current network
./network_scanner.py -v
# Full scan with diagram
./integrated_scanner.py --generate-svg
# Scan pfSense
./pfsense_scanner.py 192.168.1.1 -u root -k ~/.ssh/id_rsa
# Generate diagram from existing scan
./svg_generator.py network_scan.json -o my_network.svg
For more information, see README.md
HELP
;;
6)
echo "Goodbye!"
exit 0
;;
*)
echo "Invalid option"
exit 1
;;
esac
echo ""
echo "================================"
echo "Thanks for using Network Scanner!"
echo "================================"

View File

@@ -7,7 +7,10 @@ Combines network scanning with pfSense-specific features
import json
import logging
import argparse
import subprocess
import re
from datetime import datetime
from typing import Dict
from network_scanner import NetworkScanner, NetworkSegment
from pfsense_scanner import PfSenseScanner
from dataclasses import asdict
@@ -38,11 +41,71 @@ class IntegratedNetworkScanner:
# Check for pfSense XML files and integrate them
self._integrate_pfsense_xml()
# Scan pfSense LAN networks
self._scan_pfsense_lan_networks()
# Identify and enhance pfSense devices
self._scan_pfsense_devices()
logger.info("Integrated scan complete")
def _scan_pfsense_lan_networks(self):
"""Scan LAN networks served by pfSense devices"""
logger.info("Scanning pfSense LAN networks...")
# Get pfSense configurations from the integrator
try:
from pfsense_integrator import PfSenseIntegrator
import glob
xml_files = glob.glob("*.xml")
if xml_files:
integrator = PfSenseIntegrator(xml_files)
integrator.load_pfsense_configs()
for pfsense_name, pfsense_config in integrator.pfsense_configs.items():
logger.info(f"Checking LAN networks for pfSense: {pfsense_name}")
interfaces = pfsense_config.get('interfaces', {})
for iface_name, iface_config in interfaces.items():
# Skip WAN interfaces and VPN interfaces
if iface_name.lower() in ['wan', 'wireguard', 'openvpn', 'ipsec']:
continue
# Get the network for this interface
ipaddr = iface_config.get('ipaddr')
subnet = iface_config.get('subnet')
if ipaddr and subnet:
try:
# Calculate the network from IP and subnet
import ipaddress
ip = ipaddress.IPv4Address(ipaddr)
net = ipaddress.IPv4Network(f"{ip}/{subnet}", strict=False)
network_cidr = str(net)
logger.info(f"Scanning pfSense LAN network: {network_cidr} (interface: {iface_name})")
# Check if this network is already scanned
existing_networks = [seg.cidr for seg in self.base_scanner.segments]
if network_cidr not in existing_networks:
# Scan this network
lan_segment = self.base_scanner.scan_network(
network_cidr,
f"{pfsense_name}_{iface_name.upper()}",
is_vpn=False
)
self.base_scanner.segments.append(lan_segment)
logger.info(f"Added LAN network {network_cidr} with {len(lan_segment.devices)} devices")
else:
logger.info(f"Network {network_cidr} already scanned")
except Exception as e:
logger.error(f"Error scanning pfSense LAN network {ipaddr}/{subnet}: {e}")
except Exception as e:
logger.error(f"Error scanning pfSense LAN networks: {e}")
def _integrate_pfsense_xml(self):
"""Automatically integrate pfSense XML files if present"""
import glob
@@ -91,7 +154,9 @@ class IntegratedNetworkScanner:
enhanced_data = json.load(f)
# Update segments
vpn_interfaces = self._check_vpn_interfaces()
self.base_scanner.segments = []
for seg_data in enhanced_data.get('segments', []):
segment = NetworkSegment(
name=seg_data['name'],
@@ -101,9 +166,48 @@ class IntegratedNetworkScanner:
devices=[]
)
for dev_data in seg_data['devices']:
device = Device(**dev_data)
segment.devices.append(device)
# Only scan VPN networks if VPN interfaces are active
if seg_data['is_vpn']:
if vpn_interfaces:
logger.info(f"Scanning VPN network {seg_data['cidr']} (VPN interfaces active)")
# Scan the VPN network for devices
vpn_segment = self.base_scanner.scan_network(
seg_data['cidr'],
seg_data['name'],
is_vpn=True
)
segment.devices = vpn_segment.devices
else:
logger.info(f"VPN network {seg_data['cidr']} (no active VPN interfaces)")
# Try to scan anyway if network might be reachable
try:
logger.info(f"Attempting to scan VPN network {seg_data['cidr']} anyway...")
vpn_segment = self.base_scanner.scan_network(
seg_data['cidr'],
seg_data['name'],
is_vpn=True
)
segment.devices = vpn_segment.devices
logger.info(f"Successfully scanned VPN network {seg_data['cidr']} with {len(vpn_segment.devices)} devices")
except Exception as e:
logger.warning(f"Could not scan VPN network {seg_data['cidr']}: {e}")
# Keep any devices that were already in the segment data
for dev_data in seg_data['devices']:
device_kwargs = {k: v for k, v in dev_data.items()
if k in ['ip', 'hostname', 'mac', 'manufacturer', 'os_type',
'os_version', 'device_type', 'open_ports', 'ssh_accessible',
'services', 'routes', 'interfaces']}
device = Device(**device_kwargs)
segment.devices.append(device)
else:
# For non-VPN networks, use the devices from the segment data
for dev_data in seg_data['devices']:
device_kwargs = {k: v for k, v in dev_data.items()
if k in ['ip', 'hostname', 'mac', 'manufacturer', 'os_type',
'os_version', 'device_type', 'open_ports', 'ssh_accessible',
'services', 'routes', 'interfaces']}
device = Device(**device_kwargs)
segment.devices.append(device)
self.base_scanner.segments.append(segment)
@@ -304,6 +408,33 @@ class IntegratedNetworkScanner:
'client': '💻'
}
return icons.get(device_type, '')
def _check_vpn_interfaces(self) -> dict:
"""Check which VPN interfaces are active"""
vpn_interfaces = {}
try:
# Check for WireGuard interfaces
result = subprocess.run(
['ip', 'link', 'show'],
capture_output=True,
text=True,
timeout=5
)
for line in result.stdout.splitlines():
if 'wg' in line.lower() or 'tun' in line.lower() or 'tap' in line.lower():
# Extract interface name (format: 123: wg0: <POINTOPOINT,NOARP,UP,LOWER_UP> ...)
match = re.search(r'\d+:\s+(\w+):', line)
if match:
iface = match.group(1)
vpn_interfaces[iface] = True
logger.info(f"Found active VPN interface: {iface}")
except Exception as e:
logger.warning(f"Error checking VPN interfaces: {e}")
return vpn_interfaces
def main():
@@ -316,8 +447,6 @@ def main():
help='Output JSON file (default: network_scan.json)')
parser.add_argument('-v', '--verbose', action='store_true',
help='Verbose output')
parser.add_argument('--generate-svg', action='store_true',
help='Automatically generate SVG diagram after scan')
args = parser.parse_args()
@@ -343,23 +472,15 @@ def main():
# Export results
scanner.export_json(args.output)
# Save failed SSH hosts if any
if scanner.base_scanner.failed_ssh_hosts:
failed_ssh_file = args.output.replace('.json', '_failed_ssh.json')
scanner.base_scanner.save_failed_ssh_hosts(failed_ssh_file)
print(f"\n⚠️ {len(scanner.base_scanner.failed_ssh_hosts)} hosts have SSH port open but failed authentication.")
print(f" See {failed_ssh_file} for details.")
print(f"\n✓ Scan complete! Results saved to {args.output}")
# Generate SVG if requested
if args.generate_svg:
try:
from svg_generator import NetworkDiagramGenerator
with open(args.output, 'r') as f:
scan_data = json.load(f)
svg_file = args.output.replace('.json', '.svg')
generator = NetworkDiagramGenerator(scan_data)
generator.generate_svg(svg_file)
print(f"✓ SVG diagram generated: {svg_file}")
except Exception as e:
logger.error(f"Error generating SVG: {e}")
if __name__ == '__main__':

View File

@@ -11,10 +11,12 @@ import json
import re
import socket
import argparse
import multiprocessing
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, asdict
from dataclasses import dataclass, asdict, field
from concurrent.futures import ThreadPoolExecutor, as_completed
import logging
from datetime import datetime
# Configure logging
logging.basicConfig(
@@ -34,21 +36,12 @@ class Device:
os_type: Optional[str] = None
os_version: Optional[str] = None
device_type: Optional[str] = None # router, switch, server, client, etc.
open_ports: List[int] = None
open_ports: List[int] = field(default_factory=list)
ssh_accessible: bool = False
services: List[str] = None
routes: List[Dict] = None
interfaces: List[Dict] = None
def __post_init__(self):
if self.open_ports is None:
self.open_ports = []
if self.services is None:
self.services = []
if self.routes is None:
self.routes = []
if self.interfaces is None:
self.interfaces = []
services: List[str] = field(default_factory=list)
routes: List[Dict] = field(default_factory=list)
interfaces: List[Dict] = field(default_factory=list)
ssh_info: Optional[Dict] = None
@dataclass
@@ -59,11 +52,7 @@ class NetworkSegment:
gateway: Optional[str] = None
vlan: Optional[int] = None
is_vpn: bool = False
devices: List[Device] = None
def __post_init__(self):
if self.devices is None:
self.devices = []
devices: List[Device] = field(default_factory=list)
class NetworkScanner:
@@ -75,6 +64,7 @@ class NetworkScanner:
self.ssh_user = config.get('ssh_user', 'root')
self.ssh_key = config.get('ssh_key_path')
self.timeout = config.get('timeout', 2)
self.failed_ssh_hosts: List[Dict] = []
def discover_networks(self) -> List[str]:
"""Discover all network segments from local routing table"""
@@ -114,6 +104,33 @@ class NetworkScanner:
return networks
def _check_vpn_interfaces(self) -> Dict[str, bool]:
"""Check which VPN interfaces are active"""
vpn_interfaces = {}
try:
# Check for WireGuard interfaces
result = subprocess.run(
['ip', 'link', 'show'],
capture_output=True,
text=True,
timeout=5
)
for line in result.stdout.splitlines():
if 'wg' in line.lower() or 'tun' in line.lower() or 'tap' in line.lower():
# Extract interface name (format: 123: wg0: <POINTOPOINT,NOARP,UP,LOWER_UP> ...)
match = re.search(r'\d+:\s+(\w+):', line)
if match:
iface = match.group(1)
vpn_interfaces[iface] = True
logger.info(f"Found active VPN interface: {iface}")
except Exception as e:
logger.warning(f"Error checking VPN interfaces: {e}")
return vpn_interfaces
def ping_sweep(self, network: str) -> List[str]:
"""Perform ping sweep to find live hosts"""
logger.info(f"Performing ping sweep on {network}")
@@ -128,7 +145,10 @@ class NetworkScanner:
logger.warning(f"Large network {network}, limiting scan")
hosts = hosts[:254]
with ThreadPoolExecutor(max_workers=50) as executor:
max_ping_workers = min(len(hosts), multiprocessing.cpu_count() * 8)
logger.info(f"Pinging {len(hosts)} hosts using {max_ping_workers} concurrent workers")
with ThreadPoolExecutor(max_workers=max_ping_workers) as executor:
future_to_ip = {
executor.submit(self._ping_host, str(ip)): str(ip)
for ip in hosts
@@ -181,6 +201,13 @@ class NetworkScanner:
if device.ssh_accessible:
# Gather detailed info via SSH
self._gather_ssh_info(device)
else:
# Track hosts with SSH port open but failed SSH connection
self.failed_ssh_hosts.append({
'ip': ip,
'hostname': device.hostname,
'reason': 'SSH port open but authentication failed or connection refused'
})
# Identify device type based on ports and services
device.device_type = self._identify_device_type(device)
@@ -222,10 +249,22 @@ class NetworkScanner:
common_ports = [22, 80, 443, 8080, 8443, 3389, 445, 139, 21, 23, 25, 53, 3306, 5432]
open_ports = []
for port in common_ports:
if self._check_port(ip, port):
open_ports.append(port)
logger.debug(f"{ip}:{port} - OPEN")
max_port_workers = min(len(common_ports), multiprocessing.cpu_count() * 2)
with ThreadPoolExecutor(max_workers=max_port_workers) as executor:
future_to_port = {
executor.submit(self._check_port, ip, port): port
for port in common_ports
}
for future in as_completed(future_to_port):
port = future_to_port[future]
try:
if future.result():
open_ports.append(port)
logger.debug(f"{ip}:{port} - OPEN")
except Exception as e:
logger.debug(f"Error checking {ip}:{port}: {e}")
return open_ports
@@ -261,17 +300,41 @@ class NetworkScanner:
"""Gather detailed information via SSH"""
logger.info(f"Gathering SSH info from {device.ip}")
ssh_info = {}
# Get OS info
device.os_type, device.os_version = self._get_os_info(device.ip)
os_type, os_version = self._get_os_info(device.ip)
if os_type:
ssh_info['os_type'] = os_type
device.os_type = os_type
if os_version:
ssh_info['os_version'] = os_version
device.os_version = os_version
# Get network interfaces
device.interfaces = self._get_interfaces(device.ip)
interfaces = self._get_interfaces(device.ip)
if interfaces:
ssh_info['interfaces'] = interfaces
device.interfaces = interfaces
# Get routing table
device.routes = self._get_routes(device.ip)
routes = self._get_routes(device.ip)
if routes:
ssh_info['routes'] = routes
device.routes = routes
# Get running services
device.services = self._get_services(device.ip)
services = self._get_services(device.ip)
if services:
ssh_info['services'] = services
device.services = services
# Get system info
system_info = self._get_system_info(device.ip)
if system_info:
ssh_info['system'] = system_info
device.ssh_info = ssh_info
def _ssh_exec(self, ip: str, command: str) -> Optional[str]:
"""Execute command via SSH"""
@@ -380,6 +443,40 @@ class NetworkScanner:
return services[:20] # Limit to top 20
def _get_system_info(self, ip: str) -> Optional[Dict]:
"""Get basic system information"""
system_info = {}
# Get hostname
hostname = self._ssh_exec(ip, 'hostname')
if hostname:
system_info['hostname'] = hostname.strip()
# Get uptime
uptime = self._ssh_exec(ip, 'uptime -p 2>/dev/null || uptime')
if uptime:
system_info['uptime'] = uptime.strip()
# Get CPU info
cpu_info = self._ssh_exec(ip, 'nproc && cat /proc/cpuinfo | grep "model name" | head -1')
if cpu_info:
lines = cpu_info.strip().split('\n')
if len(lines) >= 2:
system_info['cpu_cores'] = int(lines[0])
system_info['cpu_model'] = lines[1].split(':')[1].strip()
# Get memory info
mem_info = self._ssh_exec(ip, 'free -h | grep Mem')
if mem_info:
system_info['memory'] = mem_info.strip()
# Get disk info
disk_info = self._ssh_exec(ip, 'df -h / | tail -1')
if disk_info:
system_info['disk'] = disk_info.strip()
return system_info if system_info else None
def _identify_device_type(self, device: Device) -> str:
"""Identify device type based on available info"""
if device.routes and len(device.routes) > 5:
@@ -395,7 +492,7 @@ class NetworkScanner:
else:
return 'client'
def scan_network(self, network: str, name: str = None, is_vpn: bool = False) -> NetworkSegment:
def scan_network(self, network: str, name: Optional[str] = None, is_vpn: bool = False) -> NetworkSegment:
"""Scan a complete network segment"""
logger.info(f"Scanning network segment: {network}")
@@ -409,7 +506,10 @@ class NetworkScanner:
live_hosts = self.ping_sweep(network)
# Gather device info
with ThreadPoolExecutor(max_workers=10) as executor:
max_device_workers = min(len(live_hosts), multiprocessing.cpu_count() * 2)
logger.info(f"Gathering device info for {len(live_hosts)} hosts using {max_device_workers} concurrent workers")
with ThreadPoolExecutor(max_workers=max_device_workers) as executor:
future_to_ip = {
executor.submit(self.get_device_info, ip): ip
for ip in live_hosts
@@ -431,13 +531,24 @@ class NetworkScanner:
# Discover networks
networks = self.discover_networks()
# Scan each network
for network in networks:
try:
segment = self.scan_network(network)
self.segments.append(segment)
except Exception as e:
logger.error(f"Error scanning {network}: {e}")
# Scan networks concurrently
max_concurrent_networks = min(len(networks), multiprocessing.cpu_count())
logger.info(f"Scanning {len(networks)} networks using {max_concurrent_networks} concurrent processes")
with ThreadPoolExecutor(max_workers=max_concurrent_networks) as executor:
future_to_network = {
executor.submit(self.scan_network, network): network
for network in networks
}
for future in as_completed(future_to_network):
network = future_to_network[future]
try:
segment = future.result()
self.segments.append(segment)
logger.info(f"Completed scanning network: {network}")
except Exception as e:
logger.error(f"Error scanning {network}: {e}")
logger.info(f"Scan complete. Found {len(self.segments)} segments")
@@ -462,6 +573,17 @@ class NetworkScanner:
logger.info(f"Exported results to {filename}")
def save_failed_ssh_hosts(self, filename: str):
"""Save list of hosts with SSH port open but failed authentication"""
if self.failed_ssh_hosts:
with open(filename, 'w') as f:
json.dump({
'failed_ssh_hosts': self.failed_ssh_hosts,
'total_failed': len(self.failed_ssh_hosts),
'scan_timestamp': str(datetime.now())
}, f, indent=2)
logger.info(f"Saved {len(self.failed_ssh_hosts)} failed SSH hosts to {filename}")
def print_summary(self):
"""Print a human-readable summary"""
print("\n" + "="*80)
@@ -524,6 +646,7 @@ def main():
# Export results
from datetime import datetime
scanner.export_json(args.output)
scanner.save_failed_ssh_hosts('failed_ssh_hosts.json')
print(f"\n✓ Scan complete! Results saved to {args.output}")

View File

@@ -82,7 +82,12 @@ class PfSenseIntegrator:
# Look for LAN interface
lan_interface = interfaces.get('lan')
if lan_interface and lan_interface.get('ipaddr') and lan_interface.get('subnet'):
lan_network = f"{lan_interface['ipaddr']}/{lan_interface['subnet']}"
# Calculate the proper network address
import ipaddress
ip = ipaddress.IPv4Address(lan_interface['ipaddr'])
subnet_bits = int(lan_interface['subnet'])
network = ipaddress.IPv4Network(f"{ip}/{subnet_bits}", strict=False)
lan_network = str(network)
# Find existing segment
for segment in segments:
@@ -213,6 +218,11 @@ class PfSenseIntegrator:
network_cidr = iface_config.get('network_cidr')
if network_cidr and network_cidr != 'unknown':
# Filter out invalid networks
if self._is_invalid_network(network_cidr):
logger.warning(f"Skipping invalid interface network: {network_cidr}")
continue
# Check if segment already exists
segment_exists = any(seg.get('cidr') == network_cidr for seg in segments)
@@ -233,6 +243,12 @@ class PfSenseIntegrator:
for peer in wireguard.get('peers', []):
for allowed_ip in peer.get('allowed_ips', []):
network = f"{allowed_ip['address']}/{allowed_ip['mask']}"
# Filter out invalid networks
if self._is_invalid_network(network):
logger.warning(f"Skipping invalid WireGuard network: {network}")
continue
segment_exists = any(seg.get('cidr') == network for seg in segments)
if not segment_exists:
@@ -246,6 +262,26 @@ class PfSenseIntegrator:
segments.append(new_segment)
logger.info(f"Added WireGuard network: {network}")
def _is_invalid_network(self, network: str) -> bool:
"""Check if a network should not be scanned"""
try:
import ipaddress
net = ipaddress.ip_network(network, strict=False)
# Skip networks that are too large or invalid
if net.prefixlen == 0: # 0.0.0.0/0 - route all traffic
return True
if net.prefixlen < 8: # Very large networks
return True
if net.network_address.is_private and net.prefixlen < 16: # Large private networks
return True
if str(net.network_address) == '0.0.0.0': # Invalid network
return True
return False
except ValueError:
return True # Invalid network format
def generate_network_summary(self, output_file: str):
"""Generate a human-readable network summary"""
summary = []

View File

@@ -0,0 +1,385 @@
#!/usr/bin/env python3
"""
Server Information Collector
This module collects detailed information about servers and VMs from hypervisors
via SSH. It connects to specified hypervisors and gathers system information,
VM details, resource usage, and network configurations.
"""
import json
import logging
import subprocess
import sys
from dataclasses import dataclass, asdict
from typing import Dict, List, Optional, Any
from concurrent.futures import ThreadPoolExecutor, as_completed
@dataclass
class VMInfo:
"""Information about a virtual machine."""
vmid: str
name: str
status: str
cpu: float
memory_used: int
memory_total: int
disk_used: int
disk_total: int
uptime: str
ip_addresses: List[str]
vm_type: str = 'vm' # 'vm' or 'container'
@dataclass
class ServerInfo:
"""Information about a physical server/hypervisor."""
hostname: str
os: str
kernel: str
uptime: str
cpu_model: str
cpu_cores: int
memory_total: int
memory_free: int
disk_total: int
disk_free: int
load_average: str
network_interfaces: Dict[str, str]
vms: List[VMInfo]
containers: Optional[List[VMInfo]] = None
@dataclass
class HypervisorConfig:
"""Configuration for a hypervisor connection."""
host: str
port: int
user: str
class ServerInfoCollector:
"""Collects server and VM information from hypervisors via SSH."""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.logger = logging.getLogger(__name__)
self.hypervisors = [
HypervisorConfig(**hv) for hv in config.get('hypervisors', [])
]
self.ssh_user = config.get('ssh_user', 'root')
self.ssh_key_path = config.get('ssh_key_path')
self.timeout = config.get('timeout', 10)
def run_ssh_command(self, host: str, port: int, user: str, command: str) -> Optional[str]:
"""Run a command via SSH on a remote host."""
ssh_cmd = [
'ssh',
'-o', 'ConnectTimeout=5',
'-o', 'StrictHostKeyChecking=no',
'-o', 'UserKnownHostsFile=/dev/null',
'-p', str(port),
f'{user}@{host}',
command
]
try:
result = subprocess.run(
ssh_cmd,
capture_output=True,
text=True,
timeout=self.timeout
)
if result.returncode == 0:
return result.stdout.strip()
else:
self.logger.warning(f"SSH command failed on {host}:{port}: {result.stderr}")
return None
except subprocess.TimeoutExpired:
self.logger.warning(f"SSH command timed out on {host}:{port}")
return None
except Exception as e:
self.logger.error(f"SSH error on {host}:{port}: {e}")
return None
def detect_hypervisor_type(self, host: str, port: int, user: str) -> str:
"""Detect the type of hypervisor (Proxmox, VMware, etc.)."""
# Check for Proxmox
if self.run_ssh_command(host, port, user, 'which pvesh'):
return 'proxmox'
# Check for VMware
if self.run_ssh_command(host, port, user, 'which vmware'):
return 'vmware'
# Check for KVM/libvirt
if self.run_ssh_command(host, port, user, 'which virsh'):
return 'kvm'
return 'unknown'
def collect_proxmox_info(self, host: str, port: int, user: str) -> ServerInfo:
"""Collect information from a Proxmox hypervisor."""
hostname = self.run_ssh_command(host, port, user, 'hostname') or host
# System info
os_info = self.run_ssh_command(host, port, user, 'cat /etc/os-release | grep PRETTY_NAME | cut -d\'"\' -f2') or 'Unknown'
kernel = self.run_ssh_command(host, port, user, 'uname -r') or 'Unknown'
uptime = self.run_ssh_command(host, port, user, 'uptime -p') or 'Unknown'
# CPU info
cpu_model = self.run_ssh_command(host, port, user, 'cat /proc/cpuinfo | grep "model name" | head -1 | cut -d: -f2 | xargs') or 'Unknown'
cpu_cores = int(self.run_ssh_command(host, port, user, 'nproc') or '0')
# Memory info
mem_info = self.run_ssh_command(host, port, user, 'free -b | grep Mem')
if mem_info:
parts = mem_info.split()
memory_total = int(parts[1])
memory_free = int(parts[3])
else:
memory_total = memory_free = 0
# Disk info
disk_info = self.run_ssh_command(host, port, user, 'df -B1 / | tail -1')
if disk_info:
parts = disk_info.split()
disk_total = int(parts[1])
disk_free = int(parts[3])
else:
disk_total = disk_free = 0
# Load average
load_average = self.run_ssh_command(host, port, user, 'uptime | cut -d: -f5') or 'Unknown'
# Network interfaces
network_interfaces = {}
net_info = self.run_ssh_command(host, port, user, 'ip -o addr show | grep -v lo')
if net_info:
for line in net_info.split('\n'):
if line.strip():
parts = line.split()
if len(parts) >= 4:
iface = parts[1]
ip = parts[3].split('/')[0]
network_interfaces[iface] = ip
# VMs
vms = []
vm_list = self.run_ssh_command(host, port, user, 'qm list')
if vm_list:
for line in vm_list.split('\n')[1:]: # Skip header
if line.strip():
parts = line.split()
if len(parts) >= 3:
vmid = parts[0]
name = parts[1]
status = parts[2]
# Get VM resource usage
cpu = memory_used = memory_total = disk_used = disk_total = 0
uptime = 'Unknown'
# Try to get current status
status_output = self.run_ssh_command(host, port, user, f'qm status {vmid} --verbose')
if status_output:
# Parse status output for resource usage using grep-like extraction
if 'mem:' in status_output:
try:
mem_line = [line for line in status_output.split('\n') if line.strip().startswith('mem:')][0]
memory_used = int(mem_line.split(':')[1].strip())
except:
pass
if 'maxmem:' in status_output:
try:
maxmem_line = [line for line in status_output.split('\n') if line.strip().startswith('maxmem:')][0]
memory_total = int(maxmem_line.split(':')[1].strip())
except:
pass
if 'maxdisk:' in status_output:
try:
maxdisk_line = [line for line in status_output.split('\n') if line.strip().startswith('maxdisk:')][0]
disk_total = int(maxdisk_line.split(':')[1].strip())
except:
pass
if 'uptime:' in status_output:
try:
uptime_line = [line for line in status_output.split('\n') if line.strip().startswith('uptime:')][0]
uptime = uptime_line.split(':')[1].strip()
except:
pass
# Get VM config for IP addresses
vm_config = self.run_ssh_command(host, port, user, f'qm config {vmid}')
ip_addresses = []
if vm_config:
for config_line in vm_config.split('\n'):
if 'ip=' in config_line:
ip = config_line.split('=')[1].strip()
ip_addresses.append(ip)
vm_info = VMInfo(
vmid=vmid,
name=name,
status=status,
cpu=cpu,
memory_used=memory_used,
memory_total=memory_total,
disk_used=disk_used,
disk_total=disk_total,
uptime=uptime,
ip_addresses=ip_addresses,
vm_type='vm'
)
vms.append(vm_info)
# Containers
containers = []
ct_list = self.run_ssh_command(host, port, user, 'pct list')
if ct_list:
for line in ct_list.split('\n')[1:]: # Skip header
if line.strip():
parts = line.split()
if len(parts) >= 3:
vmid = parts[0]
status = parts[1]
# Handle variable number of columns (Lock column may be empty)
if len(parts) == 4:
name = parts[3]
else:
name = parts[2]
# Get container resource usage
cpu = memory_used = memory_total = disk_used = disk_total = 0
uptime = 'Unknown'
# Try to get current status
status_output = self.run_ssh_command(host, port, user, f'pct status {vmid}')
if status_output:
# Parse status output for resource usage
lines = status_output.split('\n')
for line in lines:
line = line.strip()
if ':' in line:
key, value = line.split(':', 1)
key = key.strip()
value = value.strip()
if key == 'status':
# Status is already from pct list
pass
# Container resource info might be limited
# Could add more parsing here if available
# Get container config for IP addresses
ct_config = self.run_ssh_command(host, port, user, f'pct config {vmid}')
ip_addresses = []
if ct_config:
for config_line in ct_config.split('\n'):
if 'ip=' in config_line:
ip = config_line.split('=')[1].strip()
ip_addresses.append(ip)
container_info = VMInfo(
vmid=vmid,
name=name,
status=status,
cpu=cpu,
memory_used=memory_used,
memory_total=memory_total,
disk_used=disk_used,
disk_total=disk_total,
uptime=uptime,
ip_addresses=ip_addresses,
vm_type='container'
)
containers.append(container_info)
return ServerInfo(
hostname=hostname,
os=os_info,
kernel=kernel,
uptime=uptime,
cpu_model=cpu_model,
cpu_cores=cpu_cores,
memory_total=memory_total,
memory_free=memory_free,
disk_total=disk_total,
disk_free=disk_free,
load_average=load_average,
network_interfaces=network_interfaces,
vms=vms,
containers=containers
)
def collect_server_info(self, hypervisor: HypervisorConfig) -> Optional[ServerInfo]:
"""Collect information from a single hypervisor."""
self.logger.info(f"Collecting info from {hypervisor.host}:{hypervisor.port}")
hv_type = self.detect_hypervisor_type(hypervisor.host, hypervisor.port, hypervisor.user)
if hv_type == 'proxmox':
return self.collect_proxmox_info(hypervisor.host, hypervisor.port, hypervisor.user)
else:
self.logger.warning(f"Unsupported hypervisor type: {hv_type} on {hypervisor.host}")
return None
def collect_all_server_info(self) -> Dict[str, ServerInfo]:
"""Collect information from all configured hypervisors."""
server_info = {}
with ThreadPoolExecutor(max_workers=len(self.hypervisors)) as executor:
future_to_hv = {
executor.submit(self.collect_server_info, hv): hv
for hv in self.hypervisors
}
for future in as_completed(future_to_hv):
hv = future_to_hv[future]
try:
info = future.result()
if info:
server_info[f"{hv.host}:{hv.port}"] = info
except Exception as e:
self.logger.error(f"Failed to collect info from {hv.host}:{hv.port}: {e}")
return server_info
def save_to_file(self, server_info: Dict[str, ServerInfo], filename: str):
"""Save collected server information to a JSON file."""
data = {host: asdict(info) for host, info in server_info.items()}
with open(filename, 'w') as f:
json.dump(data, f, indent=2)
self.logger.info(f"Server information saved to {filename}")
def main():
"""Main function for standalone execution."""
import argparse
parser = argparse.ArgumentParser(description='Collect server information from hypervisors')
parser.add_argument('-o', '--output', default='server_details.json', help='Output JSON file')
args = parser.parse_args()
logging.basicConfig(level=logging.INFO)
# Load config
try:
with open('config.json', 'r') as f:
config = json.load(f)
except FileNotFoundError:
print("config.json not found. Please create it from config.json.example")
sys.exit(1)
collector = ServerInfoCollector(config)
server_info = collector.collect_all_server_info()
if server_info:
collector.save_to_file(server_info, args.output)
print(f"Collected information from {len(server_info)} hypervisors")
else:
print("No server information collected")
if __name__ == '__main__':
main()

View File

@@ -56,11 +56,10 @@ def test_commands():
def test_scripts_exist():
"""Check if all scripts exist"""
scripts = [
'network_scanner.py',
'pfsense_scanner.py',
'svg_generator.py',
'integrated_scanner.py',
'quickstart.sh'
'src/network_scanner.py',
'src/pfsense_scanner.py',
'src/integrated_scanner.py',
'scripts/quickstart.sh'
]
all_ok = True
@@ -77,10 +76,9 @@ def test_scripts_exist():
def test_script_syntax():
"""Test Python script syntax"""
scripts = [
'network_scanner.py',
'pfsense_scanner.py',
'svg_generator.py',
'integrated_scanner.py'
'src/network_scanner.py',
'src/pfsense_scanner.py',
'src/integrated_scanner.py'
]
all_ok = True
@@ -230,7 +228,6 @@ def main():
print("Next steps:")
print("1. Edit config.json with your network details")
print("2. Run: ./quickstart.sh")
print(" or: ./integrated_scanner.py --generate-svg")
else:
print("⚠️ Some tests failed. Please check the errors above.")
print()

View File

@@ -1,353 +0,0 @@
#!/usr/bin/env python3
"""
SVG Network Diagram Generator
Converts network scan results into an SVG network diagram
"""
import json
import math
from typing import Dict, List, Tuple
from xml.etree.ElementTree import Element, SubElement, tostring
from xml.dom import minidom
class NetworkDiagramGenerator:
"""Generate SVG diagrams from network scan data"""
# Device type styling
DEVICE_STYLES = {
'router': {'color': '#FF6B6B', 'icon': '🔀', 'shape': 'rect'},
'firewall': {'color': '#FF0000', 'icon': '🛡️', 'shape': 'rect'},
'switch': {'color': '#4ECDC4', 'icon': '🔌', 'shape': 'rect'},
'server': {'color': '#45B7D1', 'icon': '🖥️', 'shape': 'rect'},
'linux_server': {'color': '#45B7D1', 'icon': '🐧', 'shape': 'rect'},
'windows_client': {'color': '#95E1D3', 'icon': '💻', 'shape': 'rect'},
'client': {'color': '#A8E6CF', 'icon': '💻', 'shape': 'rect'},
'default': {'color': '#CCCCCC', 'icon': '', 'shape': 'rect'}
}
# Network segment colors
SEGMENT_COLORS = [
'#E3F2FD', '#F3E5F5', '#E8F5E9', '#FFF3E0', '#FCE4EC',
'#E0F2F1', '#F1F8E9', '#FFF9C4', '#FFE0B2', '#F8BBD0'
]
def __init__(self, scan_data: Dict):
self.scan_data = scan_data
self.width = 1600
self.height = 1200
self.margin = 50
self.device_width = 120
self.device_height = 80
self.segment_spacing = 200
def generate_svg(self, output_file: str):
"""Generate SVG diagram"""
svg = Element('svg', {
'width': str(self.width),
'height': str(self.height),
'xmlns': 'http://www.w3.org/2000/svg',
'version': '1.1'
})
# Add styles
self._add_styles(svg)
# Add title
self._add_title(svg)
# Calculate layout
segments = self.scan_data.get('segments', [])
layout = self._calculate_layout(segments)
# Draw network segments
for i, (segment, positions) in enumerate(zip(segments, layout)):
self._draw_segment(svg, segment, positions, i)
# Draw connections
self._draw_connections(svg, segments, layout)
# Add legend
self._add_legend(svg)
# Save to file
self._save_svg(svg, output_file)
def _add_styles(self, svg: Element):
"""Add CSS styles"""
style = SubElement(svg, 'style')
style.text = """
.device-box { stroke: #333; stroke-width: 2; }
.device-label { font-family: Arial, sans-serif; font-size: 12px; fill: #000; }
.device-icon { font-size: 24px; }
.segment-box { stroke: #666; stroke-width: 2; fill-opacity: 0.1; }
.segment-label { font-family: Arial, sans-serif; font-size: 14px; font-weight: bold; fill: #333; }
.connection { stroke: #999; stroke-width: 2; stroke-dasharray: 5,5; }
.title { font-family: Arial, sans-serif; font-size: 24px; font-weight: bold; fill: #333; }
.info-text { font-family: Arial, sans-serif; font-size: 10px; fill: #666; }
.legend-text { font-family: Arial, sans-serif; font-size: 11px; fill: #333; }
"""
def _add_title(self, svg: Element):
"""Add diagram title"""
title = SubElement(svg, 'text', {
'x': str(self.width // 2),
'y': '30',
'text-anchor': 'middle',
'class': 'title'
})
title.text = 'Network Topology Diagram'
def _calculate_layout(self, segments: List[Dict]) -> List[List[Tuple]]:
"""Calculate positions for all devices"""
layout = []
# Arrange segments horizontally
segment_width = (self.width - 2 * self.margin) / len(segments) if segments else self.width
for seg_idx, segment in enumerate(segments):
devices = segment.get('devices', [])
positions = []
# Calculate segment area
seg_x = self.margin + seg_idx * segment_width
seg_y = self.margin + 60
seg_w = segment_width - 20
seg_h = self.height - seg_y - self.margin
# Arrange devices in a grid within segment
cols = math.ceil(math.sqrt(len(devices)))
rows = math.ceil(len(devices) / cols) if cols > 0 else 1
device_spacing_x = seg_w / (cols + 1) if cols > 0 else seg_w / 2
device_spacing_y = seg_h / (rows + 1) if rows > 0 else seg_h / 2
for dev_idx, device in enumerate(devices):
col = dev_idx % cols
row = dev_idx // cols
x = seg_x + (col + 1) * device_spacing_x
y = seg_y + (row + 1) * device_spacing_y
positions.append((x, y, device))
layout.append(positions)
return layout
def _draw_segment(self, svg: Element, segment: Dict, positions: List[Tuple], seg_idx: int):
"""Draw a network segment and its devices"""
if not positions:
return
# Calculate segment bounds
xs = [pos[0] for pos in positions]
ys = [pos[1] for pos in positions]
min_x = min(xs) - self.device_width
max_x = max(xs) + self.device_width
min_y = min(ys) - self.device_height
max_y = max(ys) + self.device_height
# Draw segment background
color = self.SEGMENT_COLORS[seg_idx % len(self.SEGMENT_COLORS)]
SubElement(svg, 'rect', {
'x': str(min_x - 20),
'y': str(min_y - 40),
'width': str(max_x - min_x + 40),
'height': str(max_y - min_y + 60),
'fill': color,
'class': 'segment-box',
'rx': '10'
})
# Draw segment label
label = SubElement(svg, 'text', {
'x': str(min_x),
'y': str(min_y - 20),
'class': 'segment-label'
})
vpn_indicator = ' (VPN)' if segment.get('is_vpn') else ''
label.text = f"{segment.get('name', 'Unknown')} - {segment.get('cidr', '')}{vpn_indicator}"
# Draw devices
for x, y, device in positions:
self._draw_device(svg, device, x, y)
def _draw_device(self, svg: Element, device: Dict, x: float, y: float):
"""Draw a single device"""
device_type = device.get('device_type', 'default')
style = self.DEVICE_STYLES.get(device_type, self.DEVICE_STYLES['default'])
# Draw device box
box_x = x - self.device_width / 2
box_y = y - self.device_height / 2
SubElement(svg, 'rect', {
'x': str(box_x),
'y': str(box_y),
'width': str(self.device_width),
'height': str(self.device_height),
'fill': style['color'],
'class': 'device-box',
'rx': '5'
})
# Add icon
icon_text = SubElement(svg, 'text', {
'x': str(x),
'y': str(y - 15),
'text-anchor': 'middle',
'class': 'device-icon'
})
icon_text.text = style['icon']
# Add IP address
ip_text = SubElement(svg, 'text', {
'x': str(x),
'y': str(y + 5),
'text-anchor': 'middle',
'class': 'device-label',
'font-weight': 'bold'
})
ip_text.text = device.get('ip', 'Unknown')
# Add hostname
hostname = device.get('hostname', '')
if hostname:
hostname_text = SubElement(svg, 'text', {
'x': str(x),
'y': str(y + 20),
'text-anchor': 'middle',
'class': 'info-text'
})
# Truncate long hostnames
if len(hostname) > 20:
hostname = hostname[:17] + '...'
hostname_text.text = hostname
# Add additional info
info_y = y + 32
if device.get('ssh_accessible'):
ssh_text = SubElement(svg, 'text', {
'x': str(x),
'y': str(info_y),
'text-anchor': 'middle',
'class': 'info-text',
'fill': 'green'
})
ssh_text.text = '🔓 SSH'
def _draw_connections(self, svg: Element, segments: List[Dict], layout: List[List[Tuple]]):
"""Draw connections between devices based on routing info"""
# This is simplified - you'd analyze routing tables to determine actual connections
# For now, connect routers between segments
routers = []
for positions in layout:
for x, y, device in positions:
if device.get('device_type') in ['router', 'firewall']:
routers.append((x, y, device))
# Connect consecutive routers
for i in range(len(routers) - 1):
x1, y1, _ = routers[i]
x2, y2, _ = routers[i + 1]
SubElement(svg, 'line', {
'x1': str(x1),
'y1': str(y1),
'x2': str(x2),
'y2': str(y2),
'class': 'connection'
})
def _add_legend(self, svg: Element):
"""Add legend explaining device types"""
legend_x = self.width - 200
legend_y = self.height - 200
# Legend box
SubElement(svg, 'rect', {
'x': str(legend_x - 10),
'y': str(legend_y - 10),
'width': '190',
'height': str(len(self.DEVICE_STYLES) * 25 + 30),
'fill': 'white',
'stroke': '#333',
'stroke-width': '1',
'rx': '5'
})
# Legend title
title = SubElement(svg, 'text', {
'x': str(legend_x),
'y': str(legend_y + 5),
'class': 'legend-text',
'font-weight': 'bold'
})
title.text = 'Device Types'
# Legend items
y_offset = 25
for device_type, style in self.DEVICE_STYLES.items():
if device_type == 'default':
continue
# Color box
SubElement(svg, 'rect', {
'x': str(legend_x),
'y': str(legend_y + y_offset - 8),
'width': '15',
'height': '15',
'fill': style['color'],
'stroke': '#333'
})
# Label
label = SubElement(svg, 'text', {
'x': str(legend_x + 20),
'y': str(legend_y + y_offset + 4),
'class': 'legend-text'
})
label.text = f"{style['icon']} {device_type.replace('_', ' ').title()}"
y_offset += 25
def _save_svg(self, svg: Element, output_file: str):
"""Save SVG to file with pretty formatting"""
rough_string = tostring(svg, encoding='unicode')
reparsed = minidom.parseString(rough_string)
pretty_svg = reparsed.toprettyxml(indent=' ')
# Remove extra blank lines
pretty_svg = '\n'.join([line for line in pretty_svg.split('\n') if line.strip()])
with open(output_file, 'w', encoding='utf-8') as f:
f.write(pretty_svg)
def main():
"""Command line interface"""
import argparse
parser = argparse.ArgumentParser(description='Generate SVG network diagram from scan data')
parser.add_argument('input', help='Input JSON file from network scan')
parser.add_argument('-o', '--output', default='network_diagram.svg',
help='Output SVG file (default: network_diagram.svg)')
args = parser.parse_args()
# Load scan data
with open(args.input, 'r') as f:
scan_data = json.load(f)
# Generate diagram
generator = NetworkDiagramGenerator(scan_data)
generator.generate_svg(args.output)
print(f"✓ Network diagram generated: {args.output}")
if __name__ == '__main__':
main()

12
test_config.json Normal file
View File

@@ -0,0 +1,12 @@
{
"ssh_user": "root",
"ssh_key_path": null,
"timeout": 2,
"additional_networks": [],
"special_devices": {},
"scan_options": {
"max_workers": 10,
"ping_timeout": 2,
"port_scan_timeout": 1
}
}

12
tests/test_config.json Normal file
View File

@@ -0,0 +1,12 @@
{
"ssh_user": "root",
"ssh_key_path": null,
"timeout": 2,
"additional_networks": [],
"special_devices": {},
"scan_options": {
"max_workers": 10,
"ping_timeout": 2,
"port_scan_timeout": 1
}
}

1429
tests/test_output.json Normal file

File diff suppressed because it is too large Load Diff