Files
werkzeuge/teamleader_test/app/scanner/service_detector.py
root cb073786b3 Initial commit: Werkzeuge-Sammlung
Enthält:
- rdp_client.py: RDP Client mit GUI und Monitor-Auswahl
- rdp.sh: Bash-basierter RDP Client
- teamleader_test/: Network Scanner Fullstack-App
- teamleader_test2/: Network Mapper CLI

Subdirectories mit eigenem Repo wurden ausgeschlossen.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-28 09:39:24 +01:00

251 lines
7.6 KiB
Python

"""Service detection and banner grabbing implementation."""
import socket
import logging
from typing import Optional, Dict, Any
logger = logging.getLogger(__name__)
class ServiceDetector:
"""Detector for identifying services running on open ports."""
def __init__(self, timeout: int = 3):
"""
Initialize service detector.
Args:
timeout: Socket timeout in seconds
"""
self.timeout = timeout
def detect_service(self, host: str, port: int) -> Dict[str, Any]:
"""
Detect service on a specific port.
Args:
host: Host IP or hostname
port: Port number
Returns:
Dictionary with service information
"""
service_info = {
'port': port,
'protocol': 'tcp',
'service_name': None,
'service_version': None,
'banner': None
}
# Try banner grabbing
banner = self.grab_banner(host, port)
if banner:
service_info['banner'] = banner
# Try to identify service from banner
service_name, version = self._identify_from_banner(banner, port)
if service_name:
service_info['service_name'] = service_name
if version:
service_info['service_version'] = version
# If no banner, use port-based guess
if not service_info['service_name']:
service_info['service_name'] = self._guess_service_from_port(port)
return service_info
def grab_banner(self, host: str, port: int) -> Optional[str]:
"""
Attempt to grab service banner.
Args:
host: Host IP or hostname
port: Port number
Returns:
Banner string or None
"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(self.timeout)
sock.connect((host, port))
# Try to receive banner
try:
banner = sock.recv(1024)
banner_str = banner.decode('utf-8', errors='ignore').strip()
sock.close()
if banner_str:
logger.debug(f"Got banner from {host}:{port}: {banner_str[:100]}")
return banner_str
except socket.timeout:
# Try sending a probe for services that need it
banner_str = self._probe_service(sock, port)
sock.close()
return banner_str
except Exception as e:
logger.debug(f"Error grabbing banner from {host}:{port}: {e}")
return None
def _probe_service(self, sock: socket.socket, port: int) -> Optional[str]:
"""
Send service-specific probe to elicit response.
Args:
sock: Connected socket
port: Port number
Returns:
Response string or None
"""
probes = {
80: b"GET / HTTP/1.0\r\n\r\n",
443: b"GET / HTTP/1.0\r\n\r\n",
8080: b"GET / HTTP/1.0\r\n\r\n",
8443: b"GET / HTTP/1.0\r\n\r\n",
25: b"EHLO test\r\n",
110: b"USER test\r\n",
143: b"A001 CAPABILITY\r\n",
}
probe = probes.get(port, b"\r\n")
try:
sock.send(probe)
response = sock.recv(1024)
return response.decode('utf-8', errors='ignore').strip()
except:
return None
def _identify_from_banner(self, banner: str, port: int) -> tuple[Optional[str], Optional[str]]:
"""
Identify service and version from banner.
Args:
banner: Banner string
port: Port number
Returns:
Tuple of (service_name, version)
"""
banner_lower = banner.lower()
# HTTP servers
if 'http' in banner_lower or port in [80, 443, 8080, 8443]:
if 'apache' in banner_lower:
return self._extract_apache_version(banner)
elif 'nginx' in banner_lower:
return self._extract_nginx_version(banner)
elif 'iis' in banner_lower or 'microsoft' in banner_lower:
return 'IIS', None
else:
return 'HTTP', None
# SSH
if 'ssh' in banner_lower or port == 22:
if 'openssh' in banner_lower:
return self._extract_openssh_version(banner)
return 'SSH', None
# FTP
if 'ftp' in banner_lower or port in [20, 21]:
if 'filezilla' in banner_lower:
return 'FileZilla FTP', None
elif 'proftpd' in banner_lower:
return 'ProFTPD', None
return 'FTP', None
# SMTP
if 'smtp' in banner_lower or 'mail' in banner_lower or port == 25:
if 'postfix' in banner_lower:
return 'Postfix', None
elif 'exim' in banner_lower:
return 'Exim', None
return 'SMTP', None
# MySQL
if 'mysql' in banner_lower or port == 3306:
return 'MySQL', None
# PostgreSQL
if 'postgresql' in banner_lower or port == 5432:
return 'PostgreSQL', None
# Generic identification
if port == 22:
return 'SSH', None
elif port in [80, 8080]:
return 'HTTP', None
elif port in [443, 8443]:
return 'HTTPS', None
return None, None
def _extract_apache_version(self, banner: str) -> tuple[str, Optional[str]]:
"""Extract Apache version from banner."""
import re
match = re.search(r'Apache/?([\d.]+)?', banner, re.IGNORECASE)
if match:
version = match.group(1)
return 'Apache', version
return 'Apache', None
def _extract_nginx_version(self, banner: str) -> tuple[str, Optional[str]]:
"""Extract nginx version from banner."""
import re
match = re.search(r'nginx/?([\d.]+)?', banner, re.IGNORECASE)
if match:
version = match.group(1)
return 'nginx', version
return 'nginx', None
def _extract_openssh_version(self, banner: str) -> tuple[str, Optional[str]]:
"""Extract OpenSSH version from banner."""
import re
match = re.search(r'OpenSSH[_/]?([\d.]+\w*)?', banner, re.IGNORECASE)
if match:
version = match.group(1)
return 'OpenSSH', version
return 'OpenSSH', None
def _guess_service_from_port(self, port: int) -> Optional[str]:
"""
Guess service name from well-known port number.
Args:
port: Port number
Returns:
Service name or None
"""
common_services = {
20: 'ftp-data',
21: 'ftp',
22: 'ssh',
23: 'telnet',
25: 'smtp',
53: 'dns',
80: 'http',
110: 'pop3',
143: 'imap',
443: 'https',
445: 'smb',
993: 'imaps',
995: 'pop3s',
3306: 'mysql',
3389: 'rdp',
5432: 'postgresql',
5900: 'vnc',
6379: 'redis',
8080: 'http-alt',
8443: 'https-alt',
27017: 'mongodb',
}
return common_services.get(port)