Initial commit: Network scanner with pfSense integration and SVG diagram generation
This commit is contained in:
336
pfsense_scanner.py
Executable file
336
pfsense_scanner.py
Executable file
@@ -0,0 +1,336 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
pfSense specific scanner module
|
||||
Extracts routing tables, firewall rules, VPN configurations, and network topology
|
||||
"""
|
||||
|
||||
import subprocess
|
||||
import re
|
||||
import logging
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PfSenseScanner:
|
||||
"""Scanner specifically for pfSense firewalls"""
|
||||
|
||||
def __init__(self, ip: str, ssh_user: str = 'root', ssh_key: Optional[str] = None):
|
||||
self.ip = ip
|
||||
self.ssh_user = ssh_user
|
||||
self.ssh_key = ssh_key
|
||||
|
||||
def _ssh_exec(self, command: str) -> Optional[str]:
|
||||
"""Execute command on pfSense via SSH"""
|
||||
try:
|
||||
cmd = ['ssh', '-o', 'ConnectTimeout=5',
|
||||
'-o', 'StrictHostKeyChecking=no',
|
||||
'-o', 'BatchMode=yes']
|
||||
|
||||
if self.ssh_key:
|
||||
cmd.extend(['-i', self.ssh_key])
|
||||
|
||||
cmd.extend([f'{self.ssh_user}@{self.ip}', command])
|
||||
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
|
||||
if result.returncode == 0:
|
||||
return result.stdout
|
||||
else:
|
||||
logger.error(f"SSH command failed: {result.stderr}")
|
||||
except Exception as e:
|
||||
logger.error(f"SSH exec error on {self.ip}: {e}")
|
||||
return None
|
||||
|
||||
def get_interfaces(self) -> List[Dict]:
|
||||
"""Get all network interfaces"""
|
||||
logger.info(f"Getting pfSense interfaces from {self.ip}")
|
||||
interfaces = []
|
||||
|
||||
output = self._ssh_exec('ifconfig -a')
|
||||
if not output:
|
||||
return interfaces
|
||||
|
||||
current_iface = None
|
||||
for line in output.splitlines():
|
||||
if not line.startswith((' ', '\t')):
|
||||
# New interface
|
||||
match = re.match(r'(\S+):\s+flags=', line)
|
||||
if match:
|
||||
if current_iface:
|
||||
interfaces.append(current_iface)
|
||||
current_iface = {
|
||||
'name': match.group(1),
|
||||
'addresses': [],
|
||||
'flags': line
|
||||
}
|
||||
elif current_iface:
|
||||
# Interface details
|
||||
if 'inet ' in line:
|
||||
match = re.search(r'inet (\S+)', line)
|
||||
if match:
|
||||
ip = match.group(1)
|
||||
netmask_match = re.search(r'netmask (\S+)', line)
|
||||
if netmask_match:
|
||||
current_iface['addresses'].append({
|
||||
'ip': ip,
|
||||
'netmask': netmask_match.group(1)
|
||||
})
|
||||
elif 'ether ' in line:
|
||||
match = re.search(r'ether (\S+)', line)
|
||||
if match:
|
||||
current_iface['mac'] = match.group(1)
|
||||
|
||||
if current_iface:
|
||||
interfaces.append(current_iface)
|
||||
|
||||
return interfaces
|
||||
|
||||
def get_routing_table(self) -> List[Dict]:
|
||||
"""Get routing table"""
|
||||
logger.info(f"Getting routing table from pfSense {self.ip}")
|
||||
routes = []
|
||||
|
||||
output = self._ssh_exec('netstat -rn -f inet')
|
||||
if not output:
|
||||
return routes
|
||||
|
||||
in_table = False
|
||||
for line in output.splitlines():
|
||||
if 'Destination' in line and 'Gateway' in line:
|
||||
in_table = True
|
||||
continue
|
||||
|
||||
if in_table and line.strip():
|
||||
parts = line.split()
|
||||
if len(parts) >= 4:
|
||||
route = {
|
||||
'destination': parts[0],
|
||||
'gateway': parts[1],
|
||||
'flags': parts[2],
|
||||
'interface': parts[3] if len(parts) > 3 else None
|
||||
}
|
||||
routes.append(route)
|
||||
|
||||
return routes
|
||||
|
||||
def get_vpn_connections(self) -> Dict:
|
||||
"""Get VPN (WireGuard, OpenVPN, IPsec) configurations and status"""
|
||||
logger.info(f"Getting VPN connections from pfSense {self.ip}")
|
||||
vpn_info = {
|
||||
'wireguard': self._get_wireguard_info(),
|
||||
'openvpn': self._get_openvpn_info(),
|
||||
'ipsec': self._get_ipsec_info()
|
||||
}
|
||||
return vpn_info
|
||||
|
||||
def _get_wireguard_info(self) -> List[Dict]:
|
||||
"""Get WireGuard tunnel information"""
|
||||
tunnels = []
|
||||
|
||||
# Check if WireGuard is running
|
||||
output = self._ssh_exec('wg show all dump 2>/dev/null')
|
||||
if not output:
|
||||
return tunnels
|
||||
|
||||
for line in output.splitlines():
|
||||
parts = line.split('\t')
|
||||
if len(parts) >= 5:
|
||||
tunnel = {
|
||||
'interface': parts[0],
|
||||
'peer': parts[1] if len(parts) > 1 else None,
|
||||
'endpoint': parts[3] if len(parts) > 3 else None,
|
||||
'allowed_ips': parts[4] if len(parts) > 4 else None,
|
||||
'latest_handshake': parts[5] if len(parts) > 5 else None,
|
||||
}
|
||||
tunnels.append(tunnel)
|
||||
|
||||
return tunnels
|
||||
|
||||
def _get_openvpn_info(self) -> List[Dict]:
|
||||
"""Get OpenVPN connection information"""
|
||||
connections = []
|
||||
|
||||
# Check OpenVPN status
|
||||
output = self._ssh_exec('ps aux | grep openvpn | grep -v grep')
|
||||
if output:
|
||||
for line in output.splitlines():
|
||||
if 'openvpn' in line:
|
||||
match = re.search(r'--config\s+(\S+)', line)
|
||||
if match:
|
||||
connections.append({
|
||||
'config': match.group(1),
|
||||
'status': 'running'
|
||||
})
|
||||
|
||||
return connections
|
||||
|
||||
def _get_ipsec_info(self) -> List[Dict]:
|
||||
"""Get IPsec tunnel information"""
|
||||
tunnels = []
|
||||
|
||||
output = self._ssh_exec('ipsec statusall 2>/dev/null || setkey -D 2>/dev/null')
|
||||
if output:
|
||||
# Parse IPsec status - this varies by version
|
||||
tunnels.append({
|
||||
'raw_status': output,
|
||||
'parsed': False # Would need specific parsing for your setup
|
||||
})
|
||||
|
||||
return tunnels
|
||||
|
||||
def get_firewall_rules(self) -> List[Dict]:
|
||||
"""Get active firewall rules"""
|
||||
logger.info(f"Getting firewall rules from pfSense {self.ip}")
|
||||
rules = []
|
||||
|
||||
output = self._ssh_exec('pfctl -sr -v')
|
||||
if not output:
|
||||
return rules
|
||||
|
||||
current_rule = None
|
||||
for line in output.splitlines():
|
||||
if line.startswith(('@', 'pass', 'block')):
|
||||
if current_rule:
|
||||
rules.append(current_rule)
|
||||
current_rule = {'rule': line.strip()}
|
||||
elif current_rule and line.strip():
|
||||
# Additional rule info
|
||||
if 'packets:' in line or 'bytes:' in line:
|
||||
current_rule['stats'] = line.strip()
|
||||
|
||||
if current_rule:
|
||||
rules.append(current_rule)
|
||||
|
||||
return rules
|
||||
|
||||
def get_dhcp_leases(self) -> List[Dict]:
|
||||
"""Get DHCP leases"""
|
||||
logger.info(f"Getting DHCP leases from pfSense {self.ip}")
|
||||
leases = []
|
||||
|
||||
output = self._ssh_exec('cat /var/dhcpd/var/db/dhcpd.leases 2>/dev/null')
|
||||
if not output:
|
||||
return leases
|
||||
|
||||
current_lease = None
|
||||
for line in output.splitlines():
|
||||
line = line.strip()
|
||||
if line.startswith('lease '):
|
||||
if current_lease:
|
||||
leases.append(current_lease)
|
||||
match = re.match(r'lease (\S+)', line)
|
||||
if match:
|
||||
current_lease = {'ip': match.group(1)}
|
||||
elif current_lease:
|
||||
if line.startswith('hardware ethernet'):
|
||||
match = re.search(r'hardware ethernet (\S+);', line)
|
||||
if match:
|
||||
current_lease['mac'] = match.group(1)
|
||||
elif line.startswith('client-hostname'):
|
||||
match = re.search(r'client-hostname "([^"]+)"', line)
|
||||
if match:
|
||||
current_lease['hostname'] = match.group(1)
|
||||
elif line.startswith('ends'):
|
||||
match = re.search(r'ends \d+ ([^;]+);', line)
|
||||
if match:
|
||||
current_lease['expires'] = match.group(1)
|
||||
|
||||
if current_lease:
|
||||
leases.append(current_lease)
|
||||
|
||||
return leases
|
||||
|
||||
def get_arp_table(self) -> List[Dict]:
|
||||
"""Get ARP table"""
|
||||
logger.info(f"Getting ARP table from pfSense {self.ip}")
|
||||
arp_entries = []
|
||||
|
||||
output = self._ssh_exec('arp -an')
|
||||
if not output:
|
||||
return arp_entries
|
||||
|
||||
for line in output.splitlines():
|
||||
# Format: ? (192.168.1.10) at 00:11:22:33:44:55 on em0 [ethernet]
|
||||
match = re.search(r'\((\d+\.\d+\.\d+\.\d+)\)\s+at\s+([0-9a-f:]+)\s+on\s+(\S+)', line, re.IGNORECASE)
|
||||
if match:
|
||||
arp_entries.append({
|
||||
'ip': match.group(1),
|
||||
'mac': match.group(2),
|
||||
'interface': match.group(3)
|
||||
})
|
||||
|
||||
return arp_entries
|
||||
|
||||
def get_gateway_status(self) -> List[Dict]:
|
||||
"""Get gateway status"""
|
||||
logger.info(f"Getting gateway status from pfSense {self.ip}")
|
||||
gateways = []
|
||||
|
||||
output = self._ssh_exec('route -n get default')
|
||||
if output:
|
||||
gateway_ip = None
|
||||
interface = None
|
||||
for line in output.splitlines():
|
||||
if 'gateway:' in line:
|
||||
match = re.search(r'gateway:\s*(\S+)', line)
|
||||
if match:
|
||||
gateway_ip = match.group(1)
|
||||
elif 'interface:' in line:
|
||||
match = re.search(r'interface:\s*(\S+)', line)
|
||||
if match:
|
||||
interface = match.group(1)
|
||||
|
||||
if gateway_ip:
|
||||
gateways.append({
|
||||
'ip': gateway_ip,
|
||||
'interface': interface,
|
||||
'type': 'default'
|
||||
})
|
||||
|
||||
return gateways
|
||||
|
||||
def get_full_info(self) -> Dict:
|
||||
"""Get all information from pfSense"""
|
||||
logger.info(f"Gathering complete pfSense info from {self.ip}")
|
||||
|
||||
info = {
|
||||
'ip': self.ip,
|
||||
'type': 'pfSense',
|
||||
'interfaces': self.get_interfaces(),
|
||||
'routes': self.get_routing_table(),
|
||||
'vpn': self.get_vpn_connections(),
|
||||
'firewall_rules': self.get_firewall_rules(),
|
||||
'dhcp_leases': self.get_dhcp_leases(),
|
||||
'arp_table': self.get_arp_table(),
|
||||
'gateways': self.get_gateway_status()
|
||||
}
|
||||
|
||||
return info
|
||||
|
||||
|
||||
def main():
|
||||
"""Test function"""
|
||||
import argparse
|
||||
import json
|
||||
|
||||
parser = argparse.ArgumentParser(description='pfSense Scanner')
|
||||
parser.add_argument('ip', help='pfSense IP address')
|
||||
parser.add_argument('-u', '--user', default='root', help='SSH user')
|
||||
parser.add_argument('-k', '--key', help='SSH key path')
|
||||
parser.add_argument('-o', '--output', help='Output JSON file')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
scanner = PfSenseScanner(args.ip, args.user, args.key)
|
||||
info = scanner.get_full_info()
|
||||
|
||||
if args.output:
|
||||
with open(args.output, 'w') as f:
|
||||
json.dump(info, f, indent=2)
|
||||
print(f"Saved to {args.output}")
|
||||
else:
|
||||
print(json.dumps(info, indent=2))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Reference in New Issue
Block a user