#!/usr/bin/env python3 """ Interactive Certificate Manager Generates CSR on remote host and signs it with UCS CA """ import os import sys import json import subprocess import getpass import socket import re from pathlib import Path # Get script directory and workspace root SCRIPT_DIR = Path(__file__).parent.resolve() WORKSPACE_ROOT = SCRIPT_DIR.parent # Configuration CONFIG_FILE = Path.home() / '.cert-manager-config.json' DEFAULT_CONFIG = { 'country': 'DE', 'state': 'berlin', 'locality': 'berlin', 'organization': 'egonetix', 'organizational_unit': 'it', 'ca_server': '10.0.0.21', 'validity_days': '3650', 'key_bits': '4096', 'last_target_host': '', 'last_common_name': '' } SYSTEM_TYPES = { 'proxmox': { 'name': 'Proxmox VE', 'deploy_script': 'deploy-proxmox.sh', 'key_location': '/tmp/{short_name}.key', 'default_port': '8006', 'ssh_user': 'root', 'cert_dir': 'proxmox' }, 'homeassistant': { 'name': 'Home Assistant', 'deploy_script': 'deploy-homeassistant.sh', 'key_location': '/tmp/{short_name}.key', 'default_port': '8123', 'ssh_user': 'icke', 'uses_local_csr': True, 'cert_dir': 'homeassistant' }, 'pfsense': { 'name': 'pfSense', 'deploy_script': None, # Manual deployment via web interface 'key_location': '/tmp/{short_name}.key', 'default_port': '443', 'ssh_user': 'root', 'cert_dir': 'pfsense' }, 'truenas': { 'name': 'TrueNAS', 'deploy_script': None, # Manual deployment via web interface 'key_location': '/tmp/{short_name}.key', 'default_port': '443', 'ssh_user': 'root', 'cert_dir': 'truenas' }, 'ucs': { 'name': 'Univention Corporate Server', 'deploy_script': None, 'key_location': '/tmp/{short_name}.key', 'default_port': '443', 'ssh_user': 'root', 'cert_dir': 'ucs' }, 'unknown': { 'name': 'Unknown System', 'deploy_script': None, 'key_location': '/tmp/{short_name}.key', 'default_port': '443', 'ssh_user': 'root', 'cert_dir': 'other' } } def load_config(): """Load configuration from file or return defaults""" if CONFIG_FILE.exists(): try: with open(CONFIG_FILE, 'r') as f: config = json.load(f) return {**DEFAULT_CONFIG, **config} except Exception as e: print(f"Warning: Could not load config: {e}") return DEFAULT_CONFIG.copy() def save_config(config): """Save configuration to file""" try: with open(CONFIG_FILE, 'w') as f: json.dump(config, f, indent=2) except Exception as e: print(f"Warning: Could not save config: {e}") def prompt_with_default(prompt, default): """Prompt user with a default value""" if default: user_input = input(f"{prompt} [{default}]: ").strip() return user_input if user_input else default else: return input(f"{prompt}: ").strip() def yes_no_prompt(prompt, default=True): """Ask a yes/no question""" default_str = "Y/n" if default else "y/N" while True: response = input(f"{prompt} [{default_str}]: ").strip().lower() if not response: return default if response in ['y', 'yes']: return True if response in ['n', 'no']: return False print("Please answer 'y' or 'n'") def detect_system_type(target_host, script_dir, ssh_user=None, ssh_password=None): """Detect the type of system on the target host""" try: detect_script = script_dir / 'detect-system.sh' if not detect_script.exists(): print(f"Warning: detect-system.sh not found at {detect_script}") return 'unknown', None env = os.environ.copy() if ssh_user: env['SSH_USER'] = ssh_user if ssh_password: env['SSH_PASSWORD'] = ssh_password result = subprocess.run( [str(detect_script), target_host], capture_output=True, text=True, timeout=15, env=env ) if result.returncode != 0: print(f"Warning: Detection script returned error code {result.returncode}") if result.stderr: print(f"Error output: {result.stderr}") return 'unknown', None system_type = result.stdout.strip() if not system_type: print("Warning: Detection script returned empty output") return 'unknown', None # Extract SSH user if detected detected_user = None if ':' in system_type: system_type, detected_user = system_type.split(':', 1) return (system_type if system_type in SYSTEM_TYPES else 'unknown'), detected_user except subprocess.TimeoutExpired: print(f"Warning: System detection timed out for {target_host}") return 'unknown', None except Exception as e: print(f"Warning: Could not detect system type: {e}") return 'unknown', None def check_dns_resolution(hostname): """Check if a hostname resolves in DNS""" try: socket.gethostbyname(hostname) return True except socket.gaierror: return False def extract_hostnames_from_cert(cert_file): """Extract all DNS names from a certificate""" try: result = subprocess.run( ['openssl', 'x509', '-in', cert_file, '-text', '-noout'], capture_output=True, text=True, check=True ) hostnames = [] # Extract CN from Subject (not from Issuer!) subject_match = re.search(r'Subject:.*?CN\s*=\s*([^,\n]+)', result.stdout, re.DOTALL) if subject_match: cn = subject_match.group(1).strip() # Only add if it looks like a hostname (contains letters and possibly dots/dashes) if re.match(r'^[a-zA-Z0-9]([a-zA-Z0-9\-\.]*[a-zA-Z0-9])?$', cn): hostnames.append(cn) # Extract SANs san_match = re.search(r'X509v3 Subject Alternative Name:.*?DNS:([^\n]+)', result.stdout, re.DOTALL) if san_match: san_text = san_match.group(1) # Extract all DNS entries dns_entries = re.findall(r'DNS:([^,\s]+)', san_text) hostnames.extend(dns_entries) # Remove duplicates and filter out IP addresses unique_hostnames = [] for h in hostnames: h = h.strip() # Skip if it looks like an IP address or contains special chars from CA names if not re.match(r'^\d+\.\d+\.\d+\.\d+$', h) and '(' not in h and ')' not in h: if h not in unique_hostnames and len(h) > 0: unique_hostnames.append(h) return unique_hostnames except Exception as e: print(f"Warning: Could not extract hostnames from certificate: {e}") return [] def create_dns_record(hostname, target_ip, dns_server='10.0.0.21'): """Create a DNS record on UCS server""" try: # Split hostname into name and domain parts = hostname.split('.', 1) if len(parts) == 1: # Short hostname without domain - skip it return False name = parts[0] domain = parts[1] # Construct the zone DN zone_parts = domain.split('.') zone_dn = f"zoneName={domain},cn=dns,dc={',dc='.join(zone_parts)}" # Create DNS record via SSH to UCS server cmd = [ 'ssh', f'root@{dns_server}', 'univention-directory-manager', 'dns/host_record', 'create', '--superordinate', zone_dn, '--set', f'name={name}', '--set', f'a={target_ip}' ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode == 0: print(f" ✓ Created DNS record: {hostname} → {target_ip}") return True else: if 'already exists' in result.stderr.lower(): print(f" ℹ DNS record already exists: {hostname}") return True else: print(f" ✗ Failed to create DNS record for {hostname}: {result.stderr.strip()}") return False except Exception as e: print(f" ✗ Error creating DNS record for {hostname}: {e}") return False def check_and_create_dns_records(cert_file, target_ip, dns_server='10.0.0.21'): """Check DNS records for certificate hostnames and offer to create missing ones""" print("\n" + "=" * 60) print("Step 4: Checking DNS Records") print("=" * 60) hostnames = extract_hostnames_from_cert(cert_file) if not hostnames: print("No hostnames found in certificate to check.") return print(f"\nChecking {len(hostnames)} hostname(s) from certificate...") missing_hostnames = [] for hostname in hostnames: if check_dns_resolution(hostname): print(f" ✓ {hostname} - resolves") else: print(f" ✗ {hostname} - NOT found in DNS") missing_hostnames.append(hostname) if not missing_hostnames: print("\n✓ All hostnames are resolvable in DNS!") return print(f"\n⚠ Found {len(missing_hostnames)} hostname(s) not in DNS:") for hostname in missing_hostnames: print(f" - {hostname}") if yes_no_prompt("\nDo you want to create missing DNS records on UCS?", True): print(f"\nCreating DNS records on {dns_server}...") success_count = 0 for hostname in missing_hostnames: if create_dns_record(hostname, target_ip, dns_server): success_count += 1 if success_count > 0: print(f"\n✓ Successfully created {success_count} DNS record(s)") print("\nNote: DNS changes may take a few seconds to propagate.") else: print("\n⚠ No DNS records were created") else: print("\nSkipping DNS record creation.") print("You may need to create these records manually for the certificate to work properly.") def main(): print("=" * 60) print("Interactive Certificate Manager") print("=" * 60) print() # Load config config = load_config() # Ask if user wants to modify defaults if CONFIG_FILE.exists(): if yes_no_prompt("Do you want to modify default values?", False): print("\n--- Default Values Configuration ---") config['country'] = prompt_with_default("Country (C)", config['country']) config['state'] = prompt_with_default("State/Province (ST)", config['state']) config['locality'] = prompt_with_default("Locality (L)", config['locality']) config['organization'] = prompt_with_default("Organization (O)", config['organization']) config['organizational_unit'] = prompt_with_default("Organizational Unit (OU)", config['organizational_unit']) config['ca_server'] = prompt_with_default("CA Server", config['ca_server']) config['validity_days'] = prompt_with_default("Validity (days)", config['validity_days']) config['key_bits'] = prompt_with_default("Key Length (bits)", config['key_bits']) print() # Get certificate details print("--- Certificate Details ---") target_host = prompt_with_default("Target Host (IP or hostname)", config['last_target_host']) if not target_host: print("Error: Target host is required!") sys.exit(1) # Get script directory script_dir = Path(__file__).parent.absolute() # Detect system type (try key-based auth first) print(f"\nDetecting system type on {target_host}...") print("Trying SSH key-based authentication...") system_type, detected_user = detect_system_type(target_host, script_dir) # If detection failed, ask for credentials ssh_user = None ssh_password = None if system_type == 'unknown': print("\nKey-based authentication failed or system not detected.") if yes_no_prompt("Do you want to try with username/password?", True): ssh_user = input("SSH Username: ").strip() import getpass ssh_password = getpass.getpass("SSH Password: ") print(f"\nRetrying detection with credentials...") system_type, detected_user = detect_system_type(target_host, script_dir, ssh_user, ssh_password) # Use detected user or ask for one if detected_user: ssh_user = detected_user elif not ssh_user and system_type != 'unknown': # Ask for SSH user if not detected default_user = SYSTEM_TYPES[system_type].get('ssh_user', 'root') ssh_user = prompt_with_default("SSH Username", default_user) system_info = SYSTEM_TYPES[system_type] print(f"✓ Detected: {system_info['name']}") if ssh_user: print(f" SSH User: {ssh_user}") common_name = prompt_with_default("Common Name (FQDN)", config['last_common_name']) # Ask for additional DNS names print("\nAdditional DNS names (optional, comma-separated):") print(" Example: firewall.domain.com,vpn.domain.com") additional_dns = input("Additional DNS names [none]: ").strip() if not common_name: print("Error: Common name is required!") sys.exit(1) # Extract short name for filenames short_name = common_name.split('.')[0] # Ask for custom values for this certificate print("\n--- Certificate Subject (press Enter to use defaults) ---") country = prompt_with_default("Country (C)", config['country']) state = prompt_with_default("State/Province (ST)", config['state']) locality = prompt_with_default("Locality (L)", config['locality']) organization = prompt_with_default("Organization (O)", config['organization']) org_unit = prompt_with_default("Organizational Unit (OU)", config['organizational_unit']) validity_days = prompt_with_default("Validity (days)", config['validity_days']) key_bits = prompt_with_default("Key Length (bits)", config['key_bits']) print("\n" + "=" * 60) print("Summary:") print("=" * 60) print(f"System Type: {system_info['name']}") print(f"Target Host: {target_host}") print(f"Common Name: {common_name}") if additional_dns: print(f"Additional DNS: {additional_dns}") print(f"Country: {country}") print(f"State: {state}") print(f"Locality: {locality}") print(f"Organization: {organization}") print(f"Org Unit: {org_unit}") print(f"Key Length: {key_bits} bits") print(f"Validity: {validity_days} days") print(f"CA Server: {config['ca_server']}") print(f"Output files: {short_name}.req, {short_name}-cert.pem") print("=" * 60) print() if not yes_no_prompt("Proceed with certificate generation?", True): print("Cancelled.") sys.exit(0) # Create output directory for this system type cert_dir = WORKSPACE_ROOT / 'certs' / system_info.get('cert_dir', 'other') cert_dir.mkdir(parents=True, exist_ok=True) # Change to cert output directory original_dir = Path.cwd() os.chdir(cert_dir) print(f"\nOutput directory: {cert_dir}") # Save config for next run config['last_target_host'] = target_host config['last_common_name'] = common_name save_config(config) print("\n" + "=" * 60) print("Step 1: Generating CSR") print("=" * 60) # Get script directory script_dir = SCRIPT_DIR # Check if system requires local CSR generation if system_info.get('uses_local_csr', False): # Generate CSR locally for Home Assistant and similar systems # Get target IP addresses print() target_ip = prompt_with_default("Target IP address(es) (comma-separated)", "172.20.70.10,172.20.20.10") generate_cmd = [ str(script_dir / 'generate-csr-local.sh'), common_name, country, state, locality, organization, org_unit, key_bits, additional_dns, target_ip ] else: # Set environment for remote SSH commands env = os.environ.copy() if ssh_user: env['SSH_USER'] = ssh_user if ssh_password: env['SSH_PASSWORD'] = ssh_password # Run generate-csr.sh for remote CSR generation generate_cmd = [ str(script_dir / 'generate-csr.sh'), target_host, common_name, country, state, locality, organization, org_unit, key_bits, additional_dns ] try: # Set environment variables for all subprocess calls env = os.environ.copy() if ssh_user: env['SSH_USER'] = ssh_user if ssh_password: env['SSH_PASSWORD'] = ssh_password result = subprocess.run(generate_cmd, check=True, env=env) except subprocess.CalledProcessError as e: print(f"\nError: CSR generation failed with exit code {e.returncode}") sys.exit(1) except FileNotFoundError: print(f"\nError: generate-csr.sh not found in {script_dir}") sys.exit(1) print("\n" + "=" * 60) print("Step 2: Signing certificate with CA") print("=" * 60) # Run sign-cert.sh req_file = f"{short_name}.req" sign_cmd = [ str(script_dir / 'sign-cert.sh'), req_file, short_name, validity_days ] try: result = subprocess.run(sign_cmd, check=True) except subprocess.CalledProcessError as e: print(f"\nError: Certificate signing failed with exit code {e.returncode}") sys.exit(1) except FileNotFoundError: print(f"\nError: sign-cert.sh not found in {script_dir}") sys.exit(1) # Certificate file path cert_file = f"{short_name}-cert.pem" # Check and create DNS records if needed check_and_create_dns_records(cert_file, target_host, config['ca_server']) print("\n" + "=" * 60) print("Step 5: Deploying certificate to target host") print("=" * 60) # Use system-specific deployment if available if system_info['deploy_script']: if yes_no_prompt(f"Deploy certificate to {system_info['name']} automatically?", True): deploy_script = script_dir / system_info['deploy_script'] # For local CSR generation, key file is local if system_info.get('uses_local_csr', False): key_file = f"{short_name}.key" # Key is local else: key_file = f"/tmp/{short_name}.key" # Key is on remote host # Set SSH_USER and SSH_PASSWORD environment variables if needed env = os.environ.copy() if ssh_user: env['SSH_USER'] = ssh_user if ssh_password: env['SSH_PASSWORD'] = ssh_password deploy_cmd = [ str(deploy_script), target_host, cert_file, key_file, short_name ] try: subprocess.run(deploy_cmd, check=True, env=env) except subprocess.CalledProcessError as e: print(f"\nError: Deployment failed with exit code {e.returncode}") sys.exit(1) except FileNotFoundError: print(f"\nError: {system_info['deploy_script']} not found") sys.exit(1) else: # Generic deployment - just copy files if yes_no_prompt("Copy certificate back to the target host?", True): try: subprocess.run(['scp', cert_file, f'root@{target_host}:/tmp/{short_name}.crt'], check=True) print(f"\n✓ Certificate copied to target host at /tmp/{short_name}.crt") print(f" Private key is at /tmp/{short_name}.key") print(f"\n⚠ Manual installation required for {system_info['name']}") print(f" Please install the certificate through the web interface.") except subprocess.CalledProcessError: print("\nWarning: Failed to copy certificate to target host") print("\n" + "=" * 60) print("✓ Certificate Management Complete!") print("=" * 60) print(f"\nFiles created:") print(f" - {req_file} (Certificate Request)") print(f" - {cert_file} (Signed Certificate)") print(f"\nOn target host ({target_host}):") print(f" - /tmp/{short_name}.key (Private Key - {key_bits} bits)") print(f" - /tmp/{short_name}.crt (Certificate)") if system_type == 'proxmox': print(f"\n✓ Access Proxmox at: https://{target_host}:{system_info['default_port']}") print("\n") if __name__ == '__main__': try: main() except KeyboardInterrupt: print("\n\nCancelled by user.") sys.exit(1) except Exception as e: print(f"\nError: {e}") sys.exit(1)