Files
zertifizierung/scripts/cert-manager.py

605 lines
21 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Interactive Certificate Manager
Generates CSR on remote host and signs it with UCS CA
"""
import os
import sys
import json
import subprocess
import getpass
import socket
import re
from pathlib import Path
# Get script directory and workspace root
SCRIPT_DIR = Path(__file__).parent.resolve()
WORKSPACE_ROOT = SCRIPT_DIR.parent
# Configuration
CONFIG_FILE = Path.home() / '.cert-manager-config.json'
DEFAULT_CONFIG = {
'country': 'DE',
'state': 'berlin',
'locality': 'berlin',
'organization': 'egonetix',
'organizational_unit': 'it',
'ca_server': '10.0.0.21',
'validity_days': '3650',
'key_bits': '4096',
'last_target_host': '',
'last_common_name': ''
}
SYSTEM_TYPES = {
'proxmox': {
'name': 'Proxmox VE',
'deploy_script': 'deploy-proxmox.sh',
'key_location': '/tmp/{short_name}.key',
'default_port': '8006',
'ssh_user': 'root',
'cert_dir': 'proxmox'
},
'homeassistant': {
'name': 'Home Assistant',
'deploy_script': 'deploy-homeassistant.sh',
'key_location': '/tmp/{short_name}.key',
'default_port': '8123',
'ssh_user': 'icke',
'uses_local_csr': True,
'cert_dir': 'homeassistant'
},
'pfsense': {
'name': 'pfSense',
'deploy_script': None, # Manual deployment via web interface
'key_location': '/tmp/{short_name}.key',
'default_port': '443',
'ssh_user': 'root',
'cert_dir': 'pfsense'
},
'truenas': {
'name': 'TrueNAS',
'deploy_script': None, # Manual deployment via web interface
'key_location': '/tmp/{short_name}.key',
'default_port': '443',
'ssh_user': 'root',
'cert_dir': 'truenas'
},
'ucs': {
'name': 'Univention Corporate Server',
'deploy_script': None,
'key_location': '/tmp/{short_name}.key',
'default_port': '443',
'ssh_user': 'root',
'cert_dir': 'ucs'
},
'unknown': {
'name': 'Unknown System',
'deploy_script': None,
'key_location': '/tmp/{short_name}.key',
'default_port': '443',
'ssh_user': 'root',
'cert_dir': 'other'
}
}
def load_config():
"""Load configuration from file or return defaults"""
if CONFIG_FILE.exists():
try:
with open(CONFIG_FILE, 'r') as f:
config = json.load(f)
return {**DEFAULT_CONFIG, **config}
except Exception as e:
print(f"Warning: Could not load config: {e}")
return DEFAULT_CONFIG.copy()
def save_config(config):
"""Save configuration to file"""
try:
with open(CONFIG_FILE, 'w') as f:
json.dump(config, f, indent=2)
except Exception as e:
print(f"Warning: Could not save config: {e}")
def prompt_with_default(prompt, default):
"""Prompt user with a default value"""
if default:
user_input = input(f"{prompt} [{default}]: ").strip()
return user_input if user_input else default
else:
return input(f"{prompt}: ").strip()
def yes_no_prompt(prompt, default=True):
"""Ask a yes/no question"""
default_str = "Y/n" if default else "y/N"
while True:
response = input(f"{prompt} [{default_str}]: ").strip().lower()
if not response:
return default
if response in ['y', 'yes']:
return True
if response in ['n', 'no']:
return False
print("Please answer 'y' or 'n'")
def detect_system_type(target_host, script_dir, ssh_user=None, ssh_password=None):
"""Detect the type of system on the target host"""
try:
detect_script = script_dir / 'detect-system.sh'
if not detect_script.exists():
print(f"Warning: detect-system.sh not found at {detect_script}")
return 'unknown', None
env = os.environ.copy()
if ssh_user:
env['SSH_USER'] = ssh_user
if ssh_password:
env['SSH_PASSWORD'] = ssh_password
result = subprocess.run(
[str(detect_script), target_host],
capture_output=True,
text=True,
timeout=15,
env=env
)
if result.returncode != 0:
print(f"Warning: Detection script returned error code {result.returncode}")
if result.stderr:
print(f"Error output: {result.stderr}")
return 'unknown', None
system_type = result.stdout.strip()
if not system_type:
print("Warning: Detection script returned empty output")
return 'unknown', None
# Extract SSH user if detected
detected_user = None
if ':' in system_type:
system_type, detected_user = system_type.split(':', 1)
return (system_type if system_type in SYSTEM_TYPES else 'unknown'), detected_user
except subprocess.TimeoutExpired:
print(f"Warning: System detection timed out for {target_host}")
return 'unknown', None
except Exception as e:
print(f"Warning: Could not detect system type: {e}")
return 'unknown', None
def check_dns_resolution(hostname):
"""Check if a hostname resolves in DNS"""
try:
socket.gethostbyname(hostname)
return True
except socket.gaierror:
return False
def extract_hostnames_from_cert(cert_file):
"""Extract all DNS names from a certificate"""
try:
result = subprocess.run(
['openssl', 'x509', '-in', cert_file, '-text', '-noout'],
capture_output=True,
text=True,
check=True
)
hostnames = []
# Extract CN from Subject (not from Issuer!)
subject_match = re.search(r'Subject:.*?CN\s*=\s*([^,\n]+)', result.stdout, re.DOTALL)
if subject_match:
cn = subject_match.group(1).strip()
# Only add if it looks like a hostname (contains letters and possibly dots/dashes)
if re.match(r'^[a-zA-Z0-9]([a-zA-Z0-9\-\.]*[a-zA-Z0-9])?$', cn):
hostnames.append(cn)
# Extract SANs
san_match = re.search(r'X509v3 Subject Alternative Name:.*?DNS:([^\n]+)', result.stdout, re.DOTALL)
if san_match:
san_text = san_match.group(1)
# Extract all DNS entries
dns_entries = re.findall(r'DNS:([^,\s]+)', san_text)
hostnames.extend(dns_entries)
# Remove duplicates and filter out IP addresses
unique_hostnames = []
for h in hostnames:
h = h.strip()
# Skip if it looks like an IP address or contains special chars from CA names
if not re.match(r'^\d+\.\d+\.\d+\.\d+$', h) and '(' not in h and ')' not in h:
if h not in unique_hostnames and len(h) > 0:
unique_hostnames.append(h)
return unique_hostnames
except Exception as e:
print(f"Warning: Could not extract hostnames from certificate: {e}")
return []
def create_dns_record(hostname, target_ip, dns_server='10.0.0.21'):
"""Create a DNS record on UCS server"""
try:
# Split hostname into name and domain
parts = hostname.split('.', 1)
if len(parts) == 1:
# Short hostname without domain - skip it
return False
name = parts[0]
domain = parts[1]
# Construct the zone DN
zone_parts = domain.split('.')
zone_dn = f"zoneName={domain},cn=dns,dc={',dc='.join(zone_parts)}"
# Create DNS record via SSH to UCS server
cmd = [
'ssh', f'root@{dns_server}',
'univention-directory-manager', 'dns/host_record', 'create',
'--superordinate', zone_dn,
'--set', f'name={name}',
'--set', f'a={target_ip}'
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
print(f" ✓ Created DNS record: {hostname}{target_ip}")
return True
else:
if 'already exists' in result.stderr.lower():
print(f" DNS record already exists: {hostname}")
return True
else:
print(f" ✗ Failed to create DNS record for {hostname}: {result.stderr.strip()}")
return False
except Exception as e:
print(f" ✗ Error creating DNS record for {hostname}: {e}")
return False
def check_and_create_dns_records(cert_file, target_ip, dns_server='10.0.0.21'):
"""Check DNS records for certificate hostnames and offer to create missing ones"""
print("\n" + "=" * 60)
print("Step 4: Checking DNS Records")
print("=" * 60)
hostnames = extract_hostnames_from_cert(cert_file)
if not hostnames:
print("No hostnames found in certificate to check.")
return
print(f"\nChecking {len(hostnames)} hostname(s) from certificate...")
missing_hostnames = []
for hostname in hostnames:
if check_dns_resolution(hostname):
print(f"{hostname} - resolves")
else:
print(f"{hostname} - NOT found in DNS")
missing_hostnames.append(hostname)
if not missing_hostnames:
print("\n✓ All hostnames are resolvable in DNS!")
return
print(f"\n⚠ Found {len(missing_hostnames)} hostname(s) not in DNS:")
for hostname in missing_hostnames:
print(f" - {hostname}")
if yes_no_prompt("\nDo you want to create missing DNS records on UCS?", True):
print(f"\nCreating DNS records on {dns_server}...")
success_count = 0
for hostname in missing_hostnames:
if create_dns_record(hostname, target_ip, dns_server):
success_count += 1
if success_count > 0:
print(f"\n✓ Successfully created {success_count} DNS record(s)")
print("\nNote: DNS changes may take a few seconds to propagate.")
else:
print("\n⚠ No DNS records were created")
else:
print("\nSkipping DNS record creation.")
print("You may need to create these records manually for the certificate to work properly.")
def main():
print("=" * 60)
print("Interactive Certificate Manager")
print("=" * 60)
print()
# Load config
config = load_config()
# Ask if user wants to modify defaults
if CONFIG_FILE.exists():
if yes_no_prompt("Do you want to modify default values?", False):
print("\n--- Default Values Configuration ---")
config['country'] = prompt_with_default("Country (C)", config['country'])
config['state'] = prompt_with_default("State/Province (ST)", config['state'])
config['locality'] = prompt_with_default("Locality (L)", config['locality'])
config['organization'] = prompt_with_default("Organization (O)", config['organization'])
config['organizational_unit'] = prompt_with_default("Organizational Unit (OU)", config['organizational_unit'])
config['ca_server'] = prompt_with_default("CA Server", config['ca_server'])
config['validity_days'] = prompt_with_default("Validity (days)", config['validity_days'])
config['key_bits'] = prompt_with_default("Key Length (bits)", config['key_bits'])
print()
# Get certificate details
print("--- Certificate Details ---")
target_host = prompt_with_default("Target Host (IP or hostname)", config['last_target_host'])
if not target_host:
print("Error: Target host is required!")
sys.exit(1)
# Get script directory
script_dir = Path(__file__).parent.absolute()
# Detect system type (try key-based auth first)
print(f"\nDetecting system type on {target_host}...")
print("Trying SSH key-based authentication...")
system_type, detected_user = detect_system_type(target_host, script_dir)
# If detection failed, ask for credentials
ssh_user = None
ssh_password = None
if system_type == 'unknown':
print("\nKey-based authentication failed or system not detected.")
if yes_no_prompt("Do you want to try with username/password?", True):
ssh_user = input("SSH Username: ").strip()
import getpass
ssh_password = getpass.getpass("SSH Password: ")
print(f"\nRetrying detection with credentials...")
system_type, detected_user = detect_system_type(target_host, script_dir, ssh_user, ssh_password)
# Use detected user or ask for one
if detected_user:
ssh_user = detected_user
elif not ssh_user and system_type != 'unknown':
# Ask for SSH user if not detected
default_user = SYSTEM_TYPES[system_type].get('ssh_user', 'root')
ssh_user = prompt_with_default("SSH Username", default_user)
system_info = SYSTEM_TYPES[system_type]
print(f"✓ Detected: {system_info['name']}")
if ssh_user:
print(f" SSH User: {ssh_user}")
common_name = prompt_with_default("Common Name (FQDN)", config['last_common_name'])
# Ask for additional DNS names
print("\nAdditional DNS names (optional, comma-separated):")
print(" Example: firewall.domain.com,vpn.domain.com")
additional_dns = input("Additional DNS names [none]: ").strip()
if not common_name:
print("Error: Common name is required!")
sys.exit(1)
# Extract short name for filenames
short_name = common_name.split('.')[0]
# Ask for custom values for this certificate
print("\n--- Certificate Subject (press Enter to use defaults) ---")
country = prompt_with_default("Country (C)", config['country'])
state = prompt_with_default("State/Province (ST)", config['state'])
locality = prompt_with_default("Locality (L)", config['locality'])
organization = prompt_with_default("Organization (O)", config['organization'])
org_unit = prompt_with_default("Organizational Unit (OU)", config['organizational_unit'])
validity_days = prompt_with_default("Validity (days)", config['validity_days'])
key_bits = prompt_with_default("Key Length (bits)", config['key_bits'])
print("\n" + "=" * 60)
print("Summary:")
print("=" * 60)
print(f"System Type: {system_info['name']}")
print(f"Target Host: {target_host}")
print(f"Common Name: {common_name}")
if additional_dns:
print(f"Additional DNS: {additional_dns}")
print(f"Country: {country}")
print(f"State: {state}")
print(f"Locality: {locality}")
print(f"Organization: {organization}")
print(f"Org Unit: {org_unit}")
print(f"Key Length: {key_bits} bits")
print(f"Validity: {validity_days} days")
print(f"CA Server: {config['ca_server']}")
print(f"Output files: {short_name}.req, {short_name}-cert.pem")
print("=" * 60)
print()
if not yes_no_prompt("Proceed with certificate generation?", True):
print("Cancelled.")
sys.exit(0)
# Create output directory for this system type
cert_dir = WORKSPACE_ROOT / 'certs' / system_info.get('cert_dir', 'other')
cert_dir.mkdir(parents=True, exist_ok=True)
# Change to cert output directory
original_dir = Path.cwd()
os.chdir(cert_dir)
print(f"\nOutput directory: {cert_dir}")
# Save config for next run
config['last_target_host'] = target_host
config['last_common_name'] = common_name
save_config(config)
print("\n" + "=" * 60)
print("Step 1: Generating CSR")
print("=" * 60)
# Get script directory
script_dir = SCRIPT_DIR
# Check if system requires local CSR generation
if system_info.get('uses_local_csr', False):
# Generate CSR locally for Home Assistant and similar systems
# Get target IP addresses
print()
target_ip = prompt_with_default("Target IP address(es) (comma-separated)", "172.20.70.10,172.20.20.10")
generate_cmd = [
str(script_dir / 'generate-csr-local.sh'),
common_name,
country,
state,
locality,
organization,
org_unit,
key_bits,
additional_dns,
target_ip
]
else:
# Set environment for remote SSH commands
env = os.environ.copy()
if ssh_user:
env['SSH_USER'] = ssh_user
if ssh_password:
env['SSH_PASSWORD'] = ssh_password
# Run generate-csr.sh for remote CSR generation
generate_cmd = [
str(script_dir / 'generate-csr.sh'),
target_host,
common_name,
country,
state,
locality,
organization,
org_unit,
key_bits,
additional_dns
]
try:
# Set environment variables for all subprocess calls
env = os.environ.copy()
if ssh_user:
env['SSH_USER'] = ssh_user
if ssh_password:
env['SSH_PASSWORD'] = ssh_password
result = subprocess.run(generate_cmd, check=True, env=env)
except subprocess.CalledProcessError as e:
print(f"\nError: CSR generation failed with exit code {e.returncode}")
sys.exit(1)
except FileNotFoundError:
print(f"\nError: generate-csr.sh not found in {script_dir}")
sys.exit(1)
print("\n" + "=" * 60)
print("Step 2: Signing certificate with CA")
print("=" * 60)
# Run sign-cert.sh
req_file = f"{short_name}.req"
sign_cmd = [
str(script_dir / 'sign-cert.sh'),
req_file,
short_name,
validity_days
]
try:
result = subprocess.run(sign_cmd, check=True)
except subprocess.CalledProcessError as e:
print(f"\nError: Certificate signing failed with exit code {e.returncode}")
sys.exit(1)
except FileNotFoundError:
print(f"\nError: sign-cert.sh not found in {script_dir}")
sys.exit(1)
# Certificate file path
cert_file = f"{short_name}-cert.pem"
# Check and create DNS records if needed
check_and_create_dns_records(cert_file, target_host, config['ca_server'])
print("\n" + "=" * 60)
print("Step 5: Deploying certificate to target host")
print("=" * 60)
# Use system-specific deployment if available
if system_info['deploy_script']:
if yes_no_prompt(f"Deploy certificate to {system_info['name']} automatically?", True):
deploy_script = script_dir / system_info['deploy_script']
# For local CSR generation, key file is local
if system_info.get('uses_local_csr', False):
key_file = f"{short_name}.key" # Key is local
else:
key_file = f"/tmp/{short_name}.key" # Key is on remote host
# Set SSH_USER and SSH_PASSWORD environment variables if needed
env = os.environ.copy()
if ssh_user:
env['SSH_USER'] = ssh_user
if ssh_password:
env['SSH_PASSWORD'] = ssh_password
deploy_cmd = [
str(deploy_script),
target_host,
cert_file,
key_file,
short_name
]
try:
subprocess.run(deploy_cmd, check=True, env=env)
except subprocess.CalledProcessError as e:
print(f"\nError: Deployment failed with exit code {e.returncode}")
sys.exit(1)
except FileNotFoundError:
print(f"\nError: {system_info['deploy_script']} not found")
sys.exit(1)
else:
# Generic deployment - just copy files
if yes_no_prompt("Copy certificate back to the target host?", True):
try:
subprocess.run(['scp', cert_file, f'root@{target_host}:/tmp/{short_name}.crt'], check=True)
print(f"\n✓ Certificate copied to target host at /tmp/{short_name}.crt")
print(f" Private key is at /tmp/{short_name}.key")
print(f"\n⚠ Manual installation required for {system_info['name']}")
print(f" Please install the certificate through the web interface.")
except subprocess.CalledProcessError:
print("\nWarning: Failed to copy certificate to target host")
print("\n" + "=" * 60)
print("✓ Certificate Management Complete!")
print("=" * 60)
print(f"\nFiles created:")
print(f" - {req_file} (Certificate Request)")
print(f" - {cert_file} (Signed Certificate)")
print(f"\nOn target host ({target_host}):")
print(f" - /tmp/{short_name}.key (Private Key - {key_bits} bits)")
print(f" - /tmp/{short_name}.crt (Certificate)")
if system_type == 'proxmox':
print(f"\n✓ Access Proxmox at: https://{target_host}:{system_info['default_port']}")
print("\n")
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print("\n\nCancelled by user.")
sys.exit(1)
except Exception as e:
print(f"\nError: {e}")
sys.exit(1)