Files
werkzeuge/teamleader_test/app/services/topology_service.py
root cb073786b3 Initial commit: Werkzeuge-Sammlung
Enthält:
- rdp_client.py: RDP Client mit GUI und Monitor-Auswahl
- rdp.sh: Bash-basierter RDP Client
- teamleader_test/: Network Scanner Fullstack-App
- teamleader_test2/: Network Mapper CLI

Subdirectories mit eigenem Repo wurden ausgeschlossen.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-28 09:39:24 +01:00

257 lines
7.5 KiB
Python

"""Topology service for network graph generation."""
import logging
from typing import List, Dict, Any
from sqlalchemy.orm import Session
from sqlalchemy import func
from app.models import Host, Service, Connection
from app.schemas import TopologyNode, TopologyEdge, TopologyResponse
logger = logging.getLogger(__name__)
class TopologyService:
"""Service for generating network topology graphs."""
# Node type colors
NODE_COLORS = {
'gateway': '#FF6B6B',
'server': '#4ECDC4',
'workstation': '#45B7D1',
'device': '#96CEB4',
'unknown': '#95A5A6'
}
def __init__(self, db: Session):
"""
Initialize topology service.
Args:
db: Database session
"""
self.db = db
def generate_topology(self, include_offline: bool = False) -> TopologyResponse:
"""
Generate network topology graph.
Args:
include_offline: Include offline hosts
Returns:
Topology response with nodes and edges
"""
logger.info("Generating network topology")
# Get hosts
query = self.db.query(Host)
if not include_offline:
query = query.filter(Host.status == 'online')
hosts = query.all()
# Generate nodes
nodes = []
for host in hosts:
node = self._create_node(host)
nodes.append(node)
# Generate edges from connections
edges = []
connections = self.db.query(Connection).all()
for conn in connections:
# Only include edges if both hosts are in the topology
source_in_topology = any(n.id == str(conn.source_host_id) for n in nodes)
target_in_topology = any(n.id == str(conn.target_host_id) for n in nodes)
if source_in_topology and target_in_topology:
edge = self._create_edge(conn)
edges.append(edge)
# Generate statistics
statistics = self._generate_statistics(hosts, connections)
logger.info(f"Generated topology with {len(nodes)} nodes and {len(edges)} edges")
return TopologyResponse(
nodes=nodes,
edges=edges,
statistics=statistics
)
def _create_node(self, host: Host) -> TopologyNode:
"""
Create a topology node from a host.
Args:
host: Host model
Returns:
TopologyNode
"""
# Determine device type
device_type = self._determine_device_type(host)
# Count connections
connections = self.db.query(Connection).filter(
(Connection.source_host_id == host.id) |
(Connection.target_host_id == host.id)
).count()
return TopologyNode(
id=str(host.id),
ip=host.ip_address,
hostname=host.hostname,
type=device_type,
status=host.status,
service_count=len(host.services),
connections=connections
)
def _determine_device_type(self, host: Host) -> str:
"""
Determine device type based on host information.
Args:
host: Host model
Returns:
Device type string
"""
# Check if explicitly set
if host.device_type:
return host.device_type
# Infer from services
service_names = [s.service_name for s in host.services if s.service_name]
# Check for gateway indicators
if any(s.port == 53 for s in host.services): # DNS server
return 'gateway'
# Check for server indicators
server_services = ['http', 'https', 'ssh', 'smtp', 'mysql', 'postgresql', 'ftp']
if any(svc in service_names for svc in server_services):
if len(host.services) > 5:
return 'server'
# Check for workstation indicators
if any(s.port == 3389 for s in host.services): # RDP
return 'workstation'
# Default to device
if len(host.services) > 0:
return 'device'
return 'unknown'
def _create_edge(self, connection: Connection) -> TopologyEdge:
"""
Create a topology edge from a connection.
Args:
connection: Connection model
Returns:
TopologyEdge
"""
return TopologyEdge(
source=str(connection.source_host_id),
target=str(connection.target_host_id),
type=connection.connection_type or 'default',
confidence=connection.confidence
)
def _generate_statistics(
self,
hosts: List[Host],
connections: List[Connection]
) -> Dict[str, Any]:
"""
Generate statistics about the topology.
Args:
hosts: List of hosts
connections: List of connections
Returns:
Statistics dictionary
"""
# Count isolated nodes (no connections)
isolated = 0
for host in hosts:
conn_count = self.db.query(Connection).filter(
(Connection.source_host_id == host.id) |
(Connection.target_host_id == host.id)
).count()
if conn_count == 0:
isolated += 1
# Calculate average connections
avg_connections = len(connections) / max(len(hosts), 1) if hosts else 0
return {
'total_nodes': len(hosts),
'total_edges': len(connections),
'isolated_nodes': isolated,
'avg_connections': round(avg_connections, 2)
}
def get_host_neighbors(self, host_id: int) -> List[Host]:
"""
Get all hosts connected to a specific host.
Args:
host_id: Host ID
Returns:
List of connected hosts
"""
# Get outgoing connections
outgoing = self.db.query(Connection).filter(
Connection.source_host_id == host_id
).all()
# Get incoming connections
incoming = self.db.query(Connection).filter(
Connection.target_host_id == host_id
).all()
# Collect unique neighbor IDs
neighbor_ids = set()
for conn in outgoing:
neighbor_ids.add(conn.target_host_id)
for conn in incoming:
neighbor_ids.add(conn.source_host_id)
# Get host objects
neighbors = self.db.query(Host).filter(
Host.id.in_(neighbor_ids)
).all()
return neighbors
def get_network_statistics(self) -> Dict[str, Any]:
"""
Get network statistics.
Returns:
Statistics dictionary
"""
total_hosts = self.db.query(func.count(Host.id)).scalar()
online_hosts = self.db.query(func.count(Host.id)).filter(
Host.status == 'online'
).scalar()
total_services = self.db.query(func.count(Service.id)).scalar()
return {
'total_hosts': total_hosts,
'online_hosts': online_hosts,
'offline_hosts': total_hosts - online_hosts,
'total_services': total_services,
'total_connections': self.db.query(func.count(Connection.id)).scalar()
}