Files
netzwerk_diagramm_scanner/pfsense_scanner.py

337 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
"""
pfSense specific scanner module
Extracts routing tables, firewall rules, VPN configurations, and network topology
"""
import subprocess
import re
import logging
from typing import Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
class PfSenseScanner:
"""Scanner specifically for pfSense firewalls"""
def __init__(self, ip: str, ssh_user: str = 'root', ssh_key: Optional[str] = None):
self.ip = ip
self.ssh_user = ssh_user
self.ssh_key = ssh_key
def _ssh_exec(self, command: str) -> Optional[str]:
"""Execute command on pfSense via SSH"""
try:
cmd = ['ssh', '-o', 'ConnectTimeout=5',
'-o', 'StrictHostKeyChecking=no',
'-o', 'BatchMode=yes']
if self.ssh_key:
cmd.extend(['-i', self.ssh_key])
cmd.extend([f'{self.ssh_user}@{self.ip}', command])
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode == 0:
return result.stdout
else:
logger.error(f"SSH command failed: {result.stderr}")
except Exception as e:
logger.error(f"SSH exec error on {self.ip}: {e}")
return None
def get_interfaces(self) -> List[Dict]:
"""Get all network interfaces"""
logger.info(f"Getting pfSense interfaces from {self.ip}")
interfaces = []
output = self._ssh_exec('ifconfig -a')
if not output:
return interfaces
current_iface = None
for line in output.splitlines():
if not line.startswith((' ', '\t')):
# New interface
match = re.match(r'(\S+):\s+flags=', line)
if match:
if current_iface:
interfaces.append(current_iface)
current_iface = {
'name': match.group(1),
'addresses': [],
'flags': line
}
elif current_iface:
# Interface details
if 'inet ' in line:
match = re.search(r'inet (\S+)', line)
if match:
ip = match.group(1)
netmask_match = re.search(r'netmask (\S+)', line)
if netmask_match:
current_iface['addresses'].append({
'ip': ip,
'netmask': netmask_match.group(1)
})
elif 'ether ' in line:
match = re.search(r'ether (\S+)', line)
if match:
current_iface['mac'] = match.group(1)
if current_iface:
interfaces.append(current_iface)
return interfaces
def get_routing_table(self) -> List[Dict]:
"""Get routing table"""
logger.info(f"Getting routing table from pfSense {self.ip}")
routes = []
output = self._ssh_exec('netstat -rn -f inet')
if not output:
return routes
in_table = False
for line in output.splitlines():
if 'Destination' in line and 'Gateway' in line:
in_table = True
continue
if in_table and line.strip():
parts = line.split()
if len(parts) >= 4:
route = {
'destination': parts[0],
'gateway': parts[1],
'flags': parts[2],
'interface': parts[3] if len(parts) > 3 else None
}
routes.append(route)
return routes
def get_vpn_connections(self) -> Dict:
"""Get VPN (WireGuard, OpenVPN, IPsec) configurations and status"""
logger.info(f"Getting VPN connections from pfSense {self.ip}")
vpn_info = {
'wireguard': self._get_wireguard_info(),
'openvpn': self._get_openvpn_info(),
'ipsec': self._get_ipsec_info()
}
return vpn_info
def _get_wireguard_info(self) -> List[Dict]:
"""Get WireGuard tunnel information"""
tunnels = []
# Check if WireGuard is running
output = self._ssh_exec('wg show all dump 2>/dev/null')
if not output:
return tunnels
for line in output.splitlines():
parts = line.split('\t')
if len(parts) >= 5:
tunnel = {
'interface': parts[0],
'peer': parts[1] if len(parts) > 1 else None,
'endpoint': parts[3] if len(parts) > 3 else None,
'allowed_ips': parts[4] if len(parts) > 4 else None,
'latest_handshake': parts[5] if len(parts) > 5 else None,
}
tunnels.append(tunnel)
return tunnels
def _get_openvpn_info(self) -> List[Dict]:
"""Get OpenVPN connection information"""
connections = []
# Check OpenVPN status
output = self._ssh_exec('ps aux | grep openvpn | grep -v grep')
if output:
for line in output.splitlines():
if 'openvpn' in line:
match = re.search(r'--config\s+(\S+)', line)
if match:
connections.append({
'config': match.group(1),
'status': 'running'
})
return connections
def _get_ipsec_info(self) -> List[Dict]:
"""Get IPsec tunnel information"""
tunnels = []
output = self._ssh_exec('ipsec statusall 2>/dev/null || setkey -D 2>/dev/null')
if output:
# Parse IPsec status - this varies by version
tunnels.append({
'raw_status': output,
'parsed': False # Would need specific parsing for your setup
})
return tunnels
def get_firewall_rules(self) -> List[Dict]:
"""Get active firewall rules"""
logger.info(f"Getting firewall rules from pfSense {self.ip}")
rules = []
output = self._ssh_exec('pfctl -sr -v')
if not output:
return rules
current_rule = None
for line in output.splitlines():
if line.startswith(('@', 'pass', 'block')):
if current_rule:
rules.append(current_rule)
current_rule = {'rule': line.strip()}
elif current_rule and line.strip():
# Additional rule info
if 'packets:' in line or 'bytes:' in line:
current_rule['stats'] = line.strip()
if current_rule:
rules.append(current_rule)
return rules
def get_dhcp_leases(self) -> List[Dict]:
"""Get DHCP leases"""
logger.info(f"Getting DHCP leases from pfSense {self.ip}")
leases = []
output = self._ssh_exec('cat /var/dhcpd/var/db/dhcpd.leases 2>/dev/null')
if not output:
return leases
current_lease = None
for line in output.splitlines():
line = line.strip()
if line.startswith('lease '):
if current_lease:
leases.append(current_lease)
match = re.match(r'lease (\S+)', line)
if match:
current_lease = {'ip': match.group(1)}
elif current_lease:
if line.startswith('hardware ethernet'):
match = re.search(r'hardware ethernet (\S+);', line)
if match:
current_lease['mac'] = match.group(1)
elif line.startswith('client-hostname'):
match = re.search(r'client-hostname "([^"]+)"', line)
if match:
current_lease['hostname'] = match.group(1)
elif line.startswith('ends'):
match = re.search(r'ends \d+ ([^;]+);', line)
if match:
current_lease['expires'] = match.group(1)
if current_lease:
leases.append(current_lease)
return leases
def get_arp_table(self) -> List[Dict]:
"""Get ARP table"""
logger.info(f"Getting ARP table from pfSense {self.ip}")
arp_entries = []
output = self._ssh_exec('arp -an')
if not output:
return arp_entries
for line in output.splitlines():
# Format: ? (192.168.1.10) at 00:11:22:33:44:55 on em0 [ethernet]
match = re.search(r'\((\d+\.\d+\.\d+\.\d+)\)\s+at\s+([0-9a-f:]+)\s+on\s+(\S+)', line, re.IGNORECASE)
if match:
arp_entries.append({
'ip': match.group(1),
'mac': match.group(2),
'interface': match.group(3)
})
return arp_entries
def get_gateway_status(self) -> List[Dict]:
"""Get gateway status"""
logger.info(f"Getting gateway status from pfSense {self.ip}")
gateways = []
output = self._ssh_exec('route -n get default')
if output:
gateway_ip = None
interface = None
for line in output.splitlines():
if 'gateway:' in line:
match = re.search(r'gateway:\s*(\S+)', line)
if match:
gateway_ip = match.group(1)
elif 'interface:' in line:
match = re.search(r'interface:\s*(\S+)', line)
if match:
interface = match.group(1)
if gateway_ip:
gateways.append({
'ip': gateway_ip,
'interface': interface,
'type': 'default'
})
return gateways
def get_full_info(self) -> Dict:
"""Get all information from pfSense"""
logger.info(f"Gathering complete pfSense info from {self.ip}")
info = {
'ip': self.ip,
'type': 'pfSense',
'interfaces': self.get_interfaces(),
'routes': self.get_routing_table(),
'vpn': self.get_vpn_connections(),
'firewall_rules': self.get_firewall_rules(),
'dhcp_leases': self.get_dhcp_leases(),
'arp_table': self.get_arp_table(),
'gateways': self.get_gateway_status()
}
return info
def main():
"""Test function"""
import argparse
import json
parser = argparse.ArgumentParser(description='pfSense Scanner')
parser.add_argument('ip', help='pfSense IP address')
parser.add_argument('-u', '--user', default='root', help='SSH user')
parser.add_argument('-k', '--key', help='SSH key path')
parser.add_argument('-o', '--output', help='Output JSON file')
args = parser.parse_args()
scanner = PfSenseScanner(args.ip, args.user, args.key)
info = scanner.get_full_info()
if args.output:
with open(args.output, 'w') as f:
json.dump(info, f, indent=2)
print(f"Saved to {args.output}")
else:
print(json.dumps(info, indent=2))
if __name__ == '__main__':
main()