"""Network scanner implementation for host discovery.""" import socket import ipaddress import asyncio from typing import List, Set, Optional, Callable from concurrent.futures import ThreadPoolExecutor import logging from app.config import settings logger = logging.getLogger(__name__) class NetworkScanner: """Scanner for discovering active hosts on a network.""" # Common ports for host discovery DISCOVERY_PORTS = [21, 22, 23, 25, 80, 443, 445, 3389, 8080, 8443] def __init__( self, timeout: int = None, max_workers: int = None, progress_callback: Optional[Callable[[str, float], None]] = None ): """ Initialize network scanner. Args: timeout: Socket connection timeout in seconds max_workers: Maximum number of concurrent workers progress_callback: Optional callback for progress updates """ self.timeout = timeout or settings.default_scan_timeout self.max_workers = max_workers or settings.max_concurrent_scans self.progress_callback = progress_callback async def scan_network(self, network_range: str) -> List[str]: """ Scan a network range for active hosts. Args: network_range: Network in CIDR notation (e.g., '192.168.1.0/24') Returns: List of active IP addresses """ logger.info(f"Starting network scan of {network_range}") try: network = ipaddress.ip_network(network_range, strict=False) # Validate private network if restriction enabled if settings.scan_private_networks_only and not network.is_private: raise ValueError(f"Network {network_range} is not a private network") # Generate list of hosts to scan hosts = [str(ip) for ip in network.hosts()] total_hosts = len(hosts) if total_hosts == 0: # Single host network hosts = [str(network.network_address)] total_hosts = 1 logger.info(f"Scanning {total_hosts} hosts in {network_range}") # Scan hosts concurrently active_hosts = await self._scan_hosts_async(hosts) logger.info(f"Scan completed. Found {len(active_hosts)} active hosts") return active_hosts except ValueError as e: logger.error(f"Invalid network range: {e}") raise except Exception as e: logger.error(f"Error during network scan: {e}") raise async def _scan_hosts_async(self, hosts: List[str]) -> List[str]: """ Scan multiple hosts asynchronously. Args: hosts: List of IP addresses to scan Returns: List of active hosts """ active_hosts: Set[str] = set() total = len(hosts) completed = 0 # Use ThreadPoolExecutor for socket operations loop = asyncio.get_event_loop() with ThreadPoolExecutor(max_workers=self.max_workers) as executor: futures = [] for host in hosts: future = loop.run_in_executor(executor, self._check_host, host) futures.append((host, future)) # Process results as they complete for host, future in futures: try: is_active = await future if is_active: active_hosts.add(host) logger.debug(f"Host {host} is active") except Exception as e: logger.debug(f"Error checking host {host}: {e}") finally: completed += 1 if self.progress_callback: progress = completed / total self.progress_callback(host, progress) return sorted(list(active_hosts), key=lambda ip: ipaddress.ip_address(ip)) def _check_host(self, ip: str) -> bool: """ Check if a host is active by attempting TCP connections. Args: ip: IP address to check Returns: True if host responds on any discovery port """ for port in self.DISCOVERY_PORTS: try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(self.timeout) result = sock.connect_ex((ip, port)) sock.close() if result == 0: return True except socket.error: continue except Exception as e: logger.debug(f"Error checking {ip}:{port}: {e}") continue return False def get_local_network_range(self) -> Optional[str]: """ Detect local network range. Returns: Network range in CIDR notation or None """ try: import netifaces # Get default gateway interface gateways = netifaces.gateways() if 'default' not in gateways or netifaces.AF_INET not in gateways['default']: return None default_interface = gateways['default'][netifaces.AF_INET][1] # Get interface addresses addrs = netifaces.ifaddresses(default_interface) if netifaces.AF_INET not in addrs: return None # Get IP and netmask inet_info = addrs[netifaces.AF_INET][0] ip = inet_info.get('addr') netmask = inet_info.get('netmask') if not ip or not netmask: return None # Calculate network address network = ipaddress.ip_network(f"{ip}/{netmask}", strict=False) return str(network) except ImportError: logger.warning("netifaces not available, cannot detect local network") return None except Exception as e: logger.error(f"Error detecting local network: {e}") return None def resolve_hostname(self, ip: str) -> Optional[str]: """ Resolve IP address to hostname. Args: ip: IP address Returns: Hostname or None """ try: hostname = socket.gethostbyaddr(ip)[0] return hostname except socket.herror: return None except Exception as e: logger.debug(f"Error resolving {ip}: {e}") return None def get_mac_address(self, ip: str) -> Optional[str]: """ Get MAC address for an IP (requires ARP access). Args: ip: IP address Returns: MAC address or None """ try: # Try to get MAC from ARP cache import subprocess import re # Platform-specific ARP command import platform if platform.system() == 'Windows': arp_output = subprocess.check_output(['arp', '-a', ip]).decode() mac_pattern = r'([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})' else: arp_output = subprocess.check_output(['arp', '-n', ip]).decode() mac_pattern = r'([0-9A-Fa-f]{2}:){5}[0-9A-Fa-f]{2}' match = re.search(mac_pattern, arp_output) if match: return match.group(0).upper() return None except Exception as e: logger.debug(f"Error getting MAC for {ip}: {e}") return None