Enthält: - rdp_client.py: RDP Client mit GUI und Monitor-Auswahl - rdp.sh: Bash-basierter RDP Client - teamleader_test/: Network Scanner Fullstack-App - teamleader_test2/: Network Mapper CLI Subdirectories mit eigenem Repo wurden ausgeschlossen. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
251 lines
7.6 KiB
Python
251 lines
7.6 KiB
Python
"""Service detection and banner grabbing implementation."""
|
|
|
|
import socket
|
|
import logging
|
|
from typing import Optional, Dict, Any
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ServiceDetector:
|
|
"""Detector for identifying services running on open ports."""
|
|
|
|
def __init__(self, timeout: int = 3):
|
|
"""
|
|
Initialize service detector.
|
|
|
|
Args:
|
|
timeout: Socket timeout in seconds
|
|
"""
|
|
self.timeout = timeout
|
|
|
|
def detect_service(self, host: str, port: int) -> Dict[str, Any]:
|
|
"""
|
|
Detect service on a specific port.
|
|
|
|
Args:
|
|
host: Host IP or hostname
|
|
port: Port number
|
|
|
|
Returns:
|
|
Dictionary with service information
|
|
"""
|
|
service_info = {
|
|
'port': port,
|
|
'protocol': 'tcp',
|
|
'service_name': None,
|
|
'service_version': None,
|
|
'banner': None
|
|
}
|
|
|
|
# Try banner grabbing
|
|
banner = self.grab_banner(host, port)
|
|
if banner:
|
|
service_info['banner'] = banner
|
|
|
|
# Try to identify service from banner
|
|
service_name, version = self._identify_from_banner(banner, port)
|
|
if service_name:
|
|
service_info['service_name'] = service_name
|
|
if version:
|
|
service_info['service_version'] = version
|
|
|
|
# If no banner, use port-based guess
|
|
if not service_info['service_name']:
|
|
service_info['service_name'] = self._guess_service_from_port(port)
|
|
|
|
return service_info
|
|
|
|
def grab_banner(self, host: str, port: int) -> Optional[str]:
|
|
"""
|
|
Attempt to grab service banner.
|
|
|
|
Args:
|
|
host: Host IP or hostname
|
|
port: Port number
|
|
|
|
Returns:
|
|
Banner string or None
|
|
"""
|
|
try:
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
sock.settimeout(self.timeout)
|
|
sock.connect((host, port))
|
|
|
|
# Try to receive banner
|
|
try:
|
|
banner = sock.recv(1024)
|
|
banner_str = banner.decode('utf-8', errors='ignore').strip()
|
|
sock.close()
|
|
|
|
if banner_str:
|
|
logger.debug(f"Got banner from {host}:{port}: {banner_str[:100]}")
|
|
return banner_str
|
|
except socket.timeout:
|
|
# Try sending a probe for services that need it
|
|
banner_str = self._probe_service(sock, port)
|
|
sock.close()
|
|
return banner_str
|
|
|
|
except Exception as e:
|
|
logger.debug(f"Error grabbing banner from {host}:{port}: {e}")
|
|
|
|
return None
|
|
|
|
def _probe_service(self, sock: socket.socket, port: int) -> Optional[str]:
|
|
"""
|
|
Send service-specific probe to elicit response.
|
|
|
|
Args:
|
|
sock: Connected socket
|
|
port: Port number
|
|
|
|
Returns:
|
|
Response string or None
|
|
"""
|
|
probes = {
|
|
80: b"GET / HTTP/1.0\r\n\r\n",
|
|
443: b"GET / HTTP/1.0\r\n\r\n",
|
|
8080: b"GET / HTTP/1.0\r\n\r\n",
|
|
8443: b"GET / HTTP/1.0\r\n\r\n",
|
|
25: b"EHLO test\r\n",
|
|
110: b"USER test\r\n",
|
|
143: b"A001 CAPABILITY\r\n",
|
|
}
|
|
|
|
probe = probes.get(port, b"\r\n")
|
|
|
|
try:
|
|
sock.send(probe)
|
|
response = sock.recv(1024)
|
|
return response.decode('utf-8', errors='ignore').strip()
|
|
except:
|
|
return None
|
|
|
|
def _identify_from_banner(self, banner: str, port: int) -> tuple[Optional[str], Optional[str]]:
|
|
"""
|
|
Identify service and version from banner.
|
|
|
|
Args:
|
|
banner: Banner string
|
|
port: Port number
|
|
|
|
Returns:
|
|
Tuple of (service_name, version)
|
|
"""
|
|
banner_lower = banner.lower()
|
|
|
|
# HTTP servers
|
|
if 'http' in banner_lower or port in [80, 443, 8080, 8443]:
|
|
if 'apache' in banner_lower:
|
|
return self._extract_apache_version(banner)
|
|
elif 'nginx' in banner_lower:
|
|
return self._extract_nginx_version(banner)
|
|
elif 'iis' in banner_lower or 'microsoft' in banner_lower:
|
|
return 'IIS', None
|
|
else:
|
|
return 'HTTP', None
|
|
|
|
# SSH
|
|
if 'ssh' in banner_lower or port == 22:
|
|
if 'openssh' in banner_lower:
|
|
return self._extract_openssh_version(banner)
|
|
return 'SSH', None
|
|
|
|
# FTP
|
|
if 'ftp' in banner_lower or port in [20, 21]:
|
|
if 'filezilla' in banner_lower:
|
|
return 'FileZilla FTP', None
|
|
elif 'proftpd' in banner_lower:
|
|
return 'ProFTPD', None
|
|
return 'FTP', None
|
|
|
|
# SMTP
|
|
if 'smtp' in banner_lower or 'mail' in banner_lower or port == 25:
|
|
if 'postfix' in banner_lower:
|
|
return 'Postfix', None
|
|
elif 'exim' in banner_lower:
|
|
return 'Exim', None
|
|
return 'SMTP', None
|
|
|
|
# MySQL
|
|
if 'mysql' in banner_lower or port == 3306:
|
|
return 'MySQL', None
|
|
|
|
# PostgreSQL
|
|
if 'postgresql' in banner_lower or port == 5432:
|
|
return 'PostgreSQL', None
|
|
|
|
# Generic identification
|
|
if port == 22:
|
|
return 'SSH', None
|
|
elif port in [80, 8080]:
|
|
return 'HTTP', None
|
|
elif port in [443, 8443]:
|
|
return 'HTTPS', None
|
|
|
|
return None, None
|
|
|
|
def _extract_apache_version(self, banner: str) -> tuple[str, Optional[str]]:
|
|
"""Extract Apache version from banner."""
|
|
import re
|
|
match = re.search(r'Apache/?([\d.]+)?', banner, re.IGNORECASE)
|
|
if match:
|
|
version = match.group(1)
|
|
return 'Apache', version
|
|
return 'Apache', None
|
|
|
|
def _extract_nginx_version(self, banner: str) -> tuple[str, Optional[str]]:
|
|
"""Extract nginx version from banner."""
|
|
import re
|
|
match = re.search(r'nginx/?([\d.]+)?', banner, re.IGNORECASE)
|
|
if match:
|
|
version = match.group(1)
|
|
return 'nginx', version
|
|
return 'nginx', None
|
|
|
|
def _extract_openssh_version(self, banner: str) -> tuple[str, Optional[str]]:
|
|
"""Extract OpenSSH version from banner."""
|
|
import re
|
|
match = re.search(r'OpenSSH[_/]?([\d.]+\w*)?', banner, re.IGNORECASE)
|
|
if match:
|
|
version = match.group(1)
|
|
return 'OpenSSH', version
|
|
return 'OpenSSH', None
|
|
|
|
def _guess_service_from_port(self, port: int) -> Optional[str]:
|
|
"""
|
|
Guess service name from well-known port number.
|
|
|
|
Args:
|
|
port: Port number
|
|
|
|
Returns:
|
|
Service name or None
|
|
"""
|
|
common_services = {
|
|
20: 'ftp-data',
|
|
21: 'ftp',
|
|
22: 'ssh',
|
|
23: 'telnet',
|
|
25: 'smtp',
|
|
53: 'dns',
|
|
80: 'http',
|
|
110: 'pop3',
|
|
143: 'imap',
|
|
443: 'https',
|
|
445: 'smb',
|
|
993: 'imaps',
|
|
995: 'pop3s',
|
|
3306: 'mysql',
|
|
3389: 'rdp',
|
|
5432: 'postgresql',
|
|
5900: 'vnc',
|
|
6379: 'redis',
|
|
8080: 'http-alt',
|
|
8443: 'https-alt',
|
|
27017: 'mongodb',
|
|
}
|
|
|
|
return common_services.get(port)
|