Files
werkzeuge/teamleader_test/app/scanner/network_scanner.py
root cb073786b3 Initial commit: Werkzeuge-Sammlung
Enthält:
- rdp_client.py: RDP Client mit GUI und Monitor-Auswahl
- rdp.sh: Bash-basierter RDP Client
- teamleader_test/: Network Scanner Fullstack-App
- teamleader_test2/: Network Mapper CLI

Subdirectories mit eigenem Repo wurden ausgeschlossen.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-28 09:39:24 +01:00

243 lines
7.9 KiB
Python

"""Network scanner implementation for host discovery."""
import socket
import ipaddress
import asyncio
from typing import List, Set, Optional, Callable
from concurrent.futures import ThreadPoolExecutor
import logging
from app.config import settings
logger = logging.getLogger(__name__)
class NetworkScanner:
"""Scanner for discovering active hosts on a network."""
# Common ports for host discovery
DISCOVERY_PORTS = [21, 22, 23, 25, 80, 443, 445, 3389, 8080, 8443]
def __init__(
self,
timeout: int = None,
max_workers: int = None,
progress_callback: Optional[Callable[[str, float], None]] = None
):
"""
Initialize network scanner.
Args:
timeout: Socket connection timeout in seconds
max_workers: Maximum number of concurrent workers
progress_callback: Optional callback for progress updates
"""
self.timeout = timeout or settings.default_scan_timeout
self.max_workers = max_workers or settings.max_concurrent_scans
self.progress_callback = progress_callback
async def scan_network(self, network_range: str) -> List[str]:
"""
Scan a network range for active hosts.
Args:
network_range: Network in CIDR notation (e.g., '192.168.1.0/24')
Returns:
List of active IP addresses
"""
logger.info(f"Starting network scan of {network_range}")
try:
network = ipaddress.ip_network(network_range, strict=False)
# Validate private network if restriction enabled
if settings.scan_private_networks_only and not network.is_private:
raise ValueError(f"Network {network_range} is not a private network")
# Generate list of hosts to scan
hosts = [str(ip) for ip in network.hosts()]
total_hosts = len(hosts)
if total_hosts == 0:
# Single host network
hosts = [str(network.network_address)]
total_hosts = 1
logger.info(f"Scanning {total_hosts} hosts in {network_range}")
# Scan hosts concurrently
active_hosts = await self._scan_hosts_async(hosts)
logger.info(f"Scan completed. Found {len(active_hosts)} active hosts")
return active_hosts
except ValueError as e:
logger.error(f"Invalid network range: {e}")
raise
except Exception as e:
logger.error(f"Error during network scan: {e}")
raise
async def _scan_hosts_async(self, hosts: List[str]) -> List[str]:
"""
Scan multiple hosts asynchronously.
Args:
hosts: List of IP addresses to scan
Returns:
List of active hosts
"""
active_hosts: Set[str] = set()
total = len(hosts)
completed = 0
# Use ThreadPoolExecutor for socket operations
loop = asyncio.get_event_loop()
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = []
for host in hosts:
future = loop.run_in_executor(executor, self._check_host, host)
futures.append((host, future))
# Process results as they complete
for host, future in futures:
try:
is_active = await future
if is_active:
active_hosts.add(host)
logger.debug(f"Host {host} is active")
except Exception as e:
logger.debug(f"Error checking host {host}: {e}")
finally:
completed += 1
if self.progress_callback:
progress = completed / total
self.progress_callback(host, progress)
return sorted(list(active_hosts), key=lambda ip: ipaddress.ip_address(ip))
def _check_host(self, ip: str) -> bool:
"""
Check if a host is active by attempting TCP connections.
Args:
ip: IP address to check
Returns:
True if host responds on any discovery port
"""
for port in self.DISCOVERY_PORTS:
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(self.timeout)
result = sock.connect_ex((ip, port))
sock.close()
if result == 0:
return True
except socket.error:
continue
except Exception as e:
logger.debug(f"Error checking {ip}:{port}: {e}")
continue
return False
def get_local_network_range(self) -> Optional[str]:
"""
Detect local network range.
Returns:
Network range in CIDR notation or None
"""
try:
import netifaces
# Get default gateway interface
gateways = netifaces.gateways()
if 'default' not in gateways or netifaces.AF_INET not in gateways['default']:
return None
default_interface = gateways['default'][netifaces.AF_INET][1]
# Get interface addresses
addrs = netifaces.ifaddresses(default_interface)
if netifaces.AF_INET not in addrs:
return None
# Get IP and netmask
inet_info = addrs[netifaces.AF_INET][0]
ip = inet_info.get('addr')
netmask = inet_info.get('netmask')
if not ip or not netmask:
return None
# Calculate network address
network = ipaddress.ip_network(f"{ip}/{netmask}", strict=False)
return str(network)
except ImportError:
logger.warning("netifaces not available, cannot detect local network")
return None
except Exception as e:
logger.error(f"Error detecting local network: {e}")
return None
def resolve_hostname(self, ip: str) -> Optional[str]:
"""
Resolve IP address to hostname.
Args:
ip: IP address
Returns:
Hostname or None
"""
try:
hostname = socket.gethostbyaddr(ip)[0]
return hostname
except socket.herror:
return None
except Exception as e:
logger.debug(f"Error resolving {ip}: {e}")
return None
def get_mac_address(self, ip: str) -> Optional[str]:
"""
Get MAC address for an IP (requires ARP access).
Args:
ip: IP address
Returns:
MAC address or None
"""
try:
# Try to get MAC from ARP cache
import subprocess
import re
# Platform-specific ARP command
import platform
if platform.system() == 'Windows':
arp_output = subprocess.check_output(['arp', '-a', ip]).decode()
mac_pattern = r'([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})'
else:
arp_output = subprocess.check_output(['arp', '-n', ip]).decode()
mac_pattern = r'([0-9A-Fa-f]{2}:){5}[0-9A-Fa-f]{2}'
match = re.search(mac_pattern, arp_output)
if match:
return match.group(0).upper()
return None
except Exception as e:
logger.debug(f"Error getting MAC for {ip}: {e}")
return None