"""Nmap integration for advanced scanning capabilities.""" import logging from typing import Optional, Dict, Any, List import asyncio logger = logging.getLogger(__name__) class NmapScanner: """Wrapper for python-nmap with safe execution.""" def __init__(self): """Initialize nmap scanner.""" self.nmap_available = self._check_nmap_available() if not self.nmap_available: logger.warning("nmap is not available on this system") def _check_nmap_available(self) -> bool: """ Check if nmap is available on the system. Returns: True if nmap is available """ try: import nmap nm = nmap.PortScanner() nm.nmap_version() return True except Exception as e: logger.debug(f"nmap not available: {e}") return False async def scan_host( self, host: str, arguments: str = '-sT -T4' ) -> Optional[Dict[str, Any]]: """ Scan a host using nmap. Args: host: IP address or hostname arguments: Nmap arguments (default: TCP connect scan, aggressive timing) Returns: Scan results dictionary or None """ if not self.nmap_available: logger.warning("Attempted to use nmap but it's not available") return None try: import nmap # Run nmap scan in thread pool loop = asyncio.get_event_loop() result = await loop.run_in_executor( None, self._run_nmap_scan, host, arguments ) return result except Exception as e: logger.error(f"Error running nmap scan on {host}: {e}") return None def _run_nmap_scan(self, host: str, arguments: str) -> Optional[Dict[str, Any]]: """ Run nmap scan synchronously. Args: host: Host to scan arguments: Nmap arguments Returns: Scan results """ try: import nmap nm = nmap.PortScanner() # Sanitize host input if not self._validate_host(host): logger.error(f"Invalid host: {host}") return None # Execute scan logger.info(f"Running nmap scan: nmap {arguments} {host}") nm.scan(hosts=host, arguments=arguments) # Parse results if host not in nm.all_hosts(): logger.debug(f"No results for {host}") return None host_info = nm[host] # Extract relevant information result = { 'hostname': host_info.hostname(), 'state': host_info.state(), 'protocols': list(host_info.all_protocols()), 'ports': [] } # Extract port information for proto in host_info.all_protocols(): ports = host_info[proto].keys() for port in ports: port_info = host_info[proto][port] result['ports'].append({ 'port': port, 'protocol': proto, 'state': port_info['state'], 'service_name': port_info.get('name'), 'service_version': port_info.get('version'), 'service_product': port_info.get('product'), 'extrainfo': port_info.get('extrainfo') }) # OS detection if available if 'osmatch' in host_info: result['os_matches'] = [ { 'name': os['name'], 'accuracy': os['accuracy'] } for os in host_info['osmatch'] ] return result except Exception as e: logger.error(f"Error in _run_nmap_scan for {host}: {e}") return None def _validate_host(self, host: str) -> bool: """ Validate host input to prevent command injection. Args: host: Host string to validate Returns: True if valid """ import ipaddress import re # Try as IP address try: ipaddress.ip_address(host) return True except ValueError: pass # Try as network range try: ipaddress.ip_network(host, strict=False) return True except ValueError: pass # Try as hostname (alphanumeric, dots, hyphens only) if re.match(r'^[a-zA-Z0-9.-]+$', host): return True return False def get_scan_arguments( self, scan_type: str, service_detection: bool = True, os_detection: bool = False, port_range: Optional[str] = None ) -> str: """ Generate nmap arguments based on scan configuration. Args: scan_type: Type of scan ('quick', 'standard', 'deep') service_detection: Enable service/version detection os_detection: Enable OS detection (requires root) port_range: Custom port range (e.g., '1-1000' or '80,443,8080') Returns: Nmap argument string """ args = [] # Use TCP connect scan (no root required) args.append('-sT') # Port specification if port_range: args.append(f'-p {port_range}') elif scan_type == 'quick': args.append('--top-ports 100') elif scan_type == 'standard': args.append('--top-ports 1000') elif scan_type == 'deep': args.append('-p-') # All ports # Only show open ports args.append('--open') # Timing if scan_type == 'quick': args.append('-T5') # Insane elif scan_type == 'deep': args.append('-T3') # Normal else: args.append('-T4') # Aggressive # Service detection if service_detection: args.append('-sV') # OS detection (requires root) if os_detection: args.append('-O') logger.warning("OS detection requires root privileges") return ' '.join(args) async def scan_network_with_nmap( self, network: str, scan_type: str = 'quick' ) -> List[Dict[str, Any]]: """ Scan entire network using nmap. Args: network: Network in CIDR notation scan_type: Type of scan Returns: List of host results """ if not self.nmap_available: return [] try: arguments = self.get_scan_arguments(scan_type) result = await self.scan_host(network, arguments) if result: return [result] return [] except Exception as e: logger.error(f"Error scanning network {network}: {e}") return []