605 lines
21 KiB
Python
Executable File
605 lines
21 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
"""
|
||
Interactive Certificate Manager
|
||
Generates CSR on remote host and signs it with UCS CA
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import json
|
||
import subprocess
|
||
import getpass
|
||
import socket
|
||
import re
|
||
from pathlib import Path
|
||
|
||
# Get script directory and workspace root
|
||
SCRIPT_DIR = Path(__file__).parent.resolve()
|
||
WORKSPACE_ROOT = SCRIPT_DIR.parent
|
||
|
||
# Configuration
|
||
CONFIG_FILE = Path.home() / '.cert-manager-config.json'
|
||
DEFAULT_CONFIG = {
|
||
'country': 'DE',
|
||
'state': 'berlin',
|
||
'locality': 'berlin',
|
||
'organization': 'egonetix',
|
||
'organizational_unit': 'it',
|
||
'ca_server': '10.0.0.21',
|
||
'validity_days': '3650',
|
||
'key_bits': '4096',
|
||
'last_target_host': '',
|
||
'last_common_name': ''
|
||
}
|
||
|
||
SYSTEM_TYPES = {
|
||
'proxmox': {
|
||
'name': 'Proxmox VE',
|
||
'deploy_script': 'deploy-proxmox.sh',
|
||
'key_location': '/tmp/{short_name}.key',
|
||
'default_port': '8006',
|
||
'ssh_user': 'root',
|
||
'cert_dir': 'proxmox'
|
||
},
|
||
'homeassistant': {
|
||
'name': 'Home Assistant',
|
||
'deploy_script': 'deploy-homeassistant.sh',
|
||
'key_location': '/tmp/{short_name}.key',
|
||
'default_port': '8123',
|
||
'ssh_user': 'icke',
|
||
'uses_local_csr': True,
|
||
'cert_dir': 'homeassistant'
|
||
},
|
||
'pfsense': {
|
||
'name': 'pfSense',
|
||
'deploy_script': None, # Manual deployment via web interface
|
||
'key_location': '/tmp/{short_name}.key',
|
||
'default_port': '443',
|
||
'ssh_user': 'root',
|
||
'cert_dir': 'pfsense'
|
||
},
|
||
'truenas': {
|
||
'name': 'TrueNAS',
|
||
'deploy_script': None, # Manual deployment via web interface
|
||
'key_location': '/tmp/{short_name}.key',
|
||
'default_port': '443',
|
||
'ssh_user': 'root',
|
||
'cert_dir': 'truenas'
|
||
},
|
||
'ucs': {
|
||
'name': 'Univention Corporate Server',
|
||
'deploy_script': None,
|
||
'key_location': '/tmp/{short_name}.key',
|
||
'default_port': '443',
|
||
'ssh_user': 'root',
|
||
'cert_dir': 'ucs'
|
||
},
|
||
'unknown': {
|
||
'name': 'Unknown System',
|
||
'deploy_script': None,
|
||
'key_location': '/tmp/{short_name}.key',
|
||
'default_port': '443',
|
||
'ssh_user': 'root',
|
||
'cert_dir': 'other'
|
||
}
|
||
}
|
||
|
||
def load_config():
|
||
"""Load configuration from file or return defaults"""
|
||
if CONFIG_FILE.exists():
|
||
try:
|
||
with open(CONFIG_FILE, 'r') as f:
|
||
config = json.load(f)
|
||
return {**DEFAULT_CONFIG, **config}
|
||
except Exception as e:
|
||
print(f"Warning: Could not load config: {e}")
|
||
return DEFAULT_CONFIG.copy()
|
||
|
||
def save_config(config):
|
||
"""Save configuration to file"""
|
||
try:
|
||
with open(CONFIG_FILE, 'w') as f:
|
||
json.dump(config, f, indent=2)
|
||
except Exception as e:
|
||
print(f"Warning: Could not save config: {e}")
|
||
|
||
def prompt_with_default(prompt, default):
|
||
"""Prompt user with a default value"""
|
||
if default:
|
||
user_input = input(f"{prompt} [{default}]: ").strip()
|
||
return user_input if user_input else default
|
||
else:
|
||
return input(f"{prompt}: ").strip()
|
||
|
||
def yes_no_prompt(prompt, default=True):
|
||
"""Ask a yes/no question"""
|
||
default_str = "Y/n" if default else "y/N"
|
||
while True:
|
||
response = input(f"{prompt} [{default_str}]: ").strip().lower()
|
||
if not response:
|
||
return default
|
||
if response in ['y', 'yes']:
|
||
return True
|
||
if response in ['n', 'no']:
|
||
return False
|
||
print("Please answer 'y' or 'n'")
|
||
|
||
def detect_system_type(target_host, script_dir, ssh_user=None, ssh_password=None):
|
||
"""Detect the type of system on the target host"""
|
||
try:
|
||
detect_script = script_dir / 'detect-system.sh'
|
||
if not detect_script.exists():
|
||
print(f"Warning: detect-system.sh not found at {detect_script}")
|
||
return 'unknown', None
|
||
|
||
env = os.environ.copy()
|
||
if ssh_user:
|
||
env['SSH_USER'] = ssh_user
|
||
if ssh_password:
|
||
env['SSH_PASSWORD'] = ssh_password
|
||
|
||
result = subprocess.run(
|
||
[str(detect_script), target_host],
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=15,
|
||
env=env
|
||
)
|
||
|
||
if result.returncode != 0:
|
||
print(f"Warning: Detection script returned error code {result.returncode}")
|
||
if result.stderr:
|
||
print(f"Error output: {result.stderr}")
|
||
return 'unknown', None
|
||
|
||
system_type = result.stdout.strip()
|
||
if not system_type:
|
||
print("Warning: Detection script returned empty output")
|
||
return 'unknown', None
|
||
|
||
# Extract SSH user if detected
|
||
detected_user = None
|
||
if ':' in system_type:
|
||
system_type, detected_user = system_type.split(':', 1)
|
||
|
||
return (system_type if system_type in SYSTEM_TYPES else 'unknown'), detected_user
|
||
except subprocess.TimeoutExpired:
|
||
print(f"Warning: System detection timed out for {target_host}")
|
||
return 'unknown', None
|
||
except Exception as e:
|
||
print(f"Warning: Could not detect system type: {e}")
|
||
return 'unknown', None
|
||
|
||
def check_dns_resolution(hostname):
|
||
"""Check if a hostname resolves in DNS"""
|
||
try:
|
||
socket.gethostbyname(hostname)
|
||
return True
|
||
except socket.gaierror:
|
||
return False
|
||
|
||
def extract_hostnames_from_cert(cert_file):
|
||
"""Extract all DNS names from a certificate"""
|
||
try:
|
||
result = subprocess.run(
|
||
['openssl', 'x509', '-in', cert_file, '-text', '-noout'],
|
||
capture_output=True,
|
||
text=True,
|
||
check=True
|
||
)
|
||
|
||
hostnames = []
|
||
|
||
# Extract CN from Subject (not from Issuer!)
|
||
subject_match = re.search(r'Subject:.*?CN\s*=\s*([^,\n]+)', result.stdout, re.DOTALL)
|
||
if subject_match:
|
||
cn = subject_match.group(1).strip()
|
||
# Only add if it looks like a hostname (contains letters and possibly dots/dashes)
|
||
if re.match(r'^[a-zA-Z0-9]([a-zA-Z0-9\-\.]*[a-zA-Z0-9])?$', cn):
|
||
hostnames.append(cn)
|
||
|
||
# Extract SANs
|
||
san_match = re.search(r'X509v3 Subject Alternative Name:.*?DNS:([^\n]+)', result.stdout, re.DOTALL)
|
||
if san_match:
|
||
san_text = san_match.group(1)
|
||
# Extract all DNS entries
|
||
dns_entries = re.findall(r'DNS:([^,\s]+)', san_text)
|
||
hostnames.extend(dns_entries)
|
||
|
||
# Remove duplicates and filter out IP addresses
|
||
unique_hostnames = []
|
||
for h in hostnames:
|
||
h = h.strip()
|
||
# Skip if it looks like an IP address or contains special chars from CA names
|
||
if not re.match(r'^\d+\.\d+\.\d+\.\d+$', h) and '(' not in h and ')' not in h:
|
||
if h not in unique_hostnames and len(h) > 0:
|
||
unique_hostnames.append(h)
|
||
|
||
return unique_hostnames
|
||
except Exception as e:
|
||
print(f"Warning: Could not extract hostnames from certificate: {e}")
|
||
return []
|
||
|
||
def create_dns_record(hostname, target_ip, dns_server='10.0.0.21'):
|
||
"""Create a DNS record on UCS server"""
|
||
try:
|
||
# Split hostname into name and domain
|
||
parts = hostname.split('.', 1)
|
||
if len(parts) == 1:
|
||
# Short hostname without domain - skip it
|
||
return False
|
||
|
||
name = parts[0]
|
||
domain = parts[1]
|
||
|
||
# Construct the zone DN
|
||
zone_parts = domain.split('.')
|
||
zone_dn = f"zoneName={domain},cn=dns,dc={',dc='.join(zone_parts)}"
|
||
|
||
# Create DNS record via SSH to UCS server
|
||
cmd = [
|
||
'ssh', f'root@{dns_server}',
|
||
'univention-directory-manager', 'dns/host_record', 'create',
|
||
'--superordinate', zone_dn,
|
||
'--set', f'name={name}',
|
||
'--set', f'a={target_ip}'
|
||
]
|
||
|
||
result = subprocess.run(cmd, capture_output=True, text=True)
|
||
|
||
if result.returncode == 0:
|
||
print(f" ✓ Created DNS record: {hostname} → {target_ip}")
|
||
return True
|
||
else:
|
||
if 'already exists' in result.stderr.lower():
|
||
print(f" ℹ DNS record already exists: {hostname}")
|
||
return True
|
||
else:
|
||
print(f" ✗ Failed to create DNS record for {hostname}: {result.stderr.strip()}")
|
||
return False
|
||
except Exception as e:
|
||
print(f" ✗ Error creating DNS record for {hostname}: {e}")
|
||
return False
|
||
|
||
def check_and_create_dns_records(cert_file, target_ip, dns_server='10.0.0.21'):
|
||
"""Check DNS records for certificate hostnames and offer to create missing ones"""
|
||
print("\n" + "=" * 60)
|
||
print("Step 4: Checking DNS Records")
|
||
print("=" * 60)
|
||
|
||
hostnames = extract_hostnames_from_cert(cert_file)
|
||
|
||
if not hostnames:
|
||
print("No hostnames found in certificate to check.")
|
||
return
|
||
|
||
print(f"\nChecking {len(hostnames)} hostname(s) from certificate...")
|
||
|
||
missing_hostnames = []
|
||
for hostname in hostnames:
|
||
if check_dns_resolution(hostname):
|
||
print(f" ✓ {hostname} - resolves")
|
||
else:
|
||
print(f" ✗ {hostname} - NOT found in DNS")
|
||
missing_hostnames.append(hostname)
|
||
|
||
if not missing_hostnames:
|
||
print("\n✓ All hostnames are resolvable in DNS!")
|
||
return
|
||
|
||
print(f"\n⚠ Found {len(missing_hostnames)} hostname(s) not in DNS:")
|
||
for hostname in missing_hostnames:
|
||
print(f" - {hostname}")
|
||
|
||
if yes_no_prompt("\nDo you want to create missing DNS records on UCS?", True):
|
||
print(f"\nCreating DNS records on {dns_server}...")
|
||
|
||
success_count = 0
|
||
for hostname in missing_hostnames:
|
||
if create_dns_record(hostname, target_ip, dns_server):
|
||
success_count += 1
|
||
|
||
if success_count > 0:
|
||
print(f"\n✓ Successfully created {success_count} DNS record(s)")
|
||
print("\nNote: DNS changes may take a few seconds to propagate.")
|
||
else:
|
||
print("\n⚠ No DNS records were created")
|
||
else:
|
||
print("\nSkipping DNS record creation.")
|
||
print("You may need to create these records manually for the certificate to work properly.")
|
||
|
||
|
||
def main():
|
||
print("=" * 60)
|
||
print("Interactive Certificate Manager")
|
||
print("=" * 60)
|
||
print()
|
||
|
||
# Load config
|
||
config = load_config()
|
||
|
||
# Ask if user wants to modify defaults
|
||
if CONFIG_FILE.exists():
|
||
if yes_no_prompt("Do you want to modify default values?", False):
|
||
print("\n--- Default Values Configuration ---")
|
||
config['country'] = prompt_with_default("Country (C)", config['country'])
|
||
config['state'] = prompt_with_default("State/Province (ST)", config['state'])
|
||
config['locality'] = prompt_with_default("Locality (L)", config['locality'])
|
||
config['organization'] = prompt_with_default("Organization (O)", config['organization'])
|
||
config['organizational_unit'] = prompt_with_default("Organizational Unit (OU)", config['organizational_unit'])
|
||
config['ca_server'] = prompt_with_default("CA Server", config['ca_server'])
|
||
config['validity_days'] = prompt_with_default("Validity (days)", config['validity_days'])
|
||
config['key_bits'] = prompt_with_default("Key Length (bits)", config['key_bits'])
|
||
print()
|
||
|
||
# Get certificate details
|
||
print("--- Certificate Details ---")
|
||
target_host = prompt_with_default("Target Host (IP or hostname)", config['last_target_host'])
|
||
|
||
if not target_host:
|
||
print("Error: Target host is required!")
|
||
sys.exit(1)
|
||
|
||
# Get script directory
|
||
script_dir = Path(__file__).parent.absolute()
|
||
|
||
# Detect system type (try key-based auth first)
|
||
print(f"\nDetecting system type on {target_host}...")
|
||
print("Trying SSH key-based authentication...")
|
||
system_type, detected_user = detect_system_type(target_host, script_dir)
|
||
|
||
# If detection failed, ask for credentials
|
||
ssh_user = None
|
||
ssh_password = None
|
||
|
||
if system_type == 'unknown':
|
||
print("\nKey-based authentication failed or system not detected.")
|
||
if yes_no_prompt("Do you want to try with username/password?", True):
|
||
ssh_user = input("SSH Username: ").strip()
|
||
import getpass
|
||
ssh_password = getpass.getpass("SSH Password: ")
|
||
print(f"\nRetrying detection with credentials...")
|
||
system_type, detected_user = detect_system_type(target_host, script_dir, ssh_user, ssh_password)
|
||
|
||
# Use detected user or ask for one
|
||
if detected_user:
|
||
ssh_user = detected_user
|
||
elif not ssh_user and system_type != 'unknown':
|
||
# Ask for SSH user if not detected
|
||
default_user = SYSTEM_TYPES[system_type].get('ssh_user', 'root')
|
||
ssh_user = prompt_with_default("SSH Username", default_user)
|
||
|
||
system_info = SYSTEM_TYPES[system_type]
|
||
print(f"✓ Detected: {system_info['name']}")
|
||
if ssh_user:
|
||
print(f" SSH User: {ssh_user}")
|
||
|
||
common_name = prompt_with_default("Common Name (FQDN)", config['last_common_name'])
|
||
|
||
# Ask for additional DNS names
|
||
print("\nAdditional DNS names (optional, comma-separated):")
|
||
print(" Example: firewall.domain.com,vpn.domain.com")
|
||
additional_dns = input("Additional DNS names [none]: ").strip()
|
||
|
||
if not common_name:
|
||
print("Error: Common name is required!")
|
||
sys.exit(1)
|
||
|
||
# Extract short name for filenames
|
||
short_name = common_name.split('.')[0]
|
||
|
||
# Ask for custom values for this certificate
|
||
print("\n--- Certificate Subject (press Enter to use defaults) ---")
|
||
country = prompt_with_default("Country (C)", config['country'])
|
||
state = prompt_with_default("State/Province (ST)", config['state'])
|
||
locality = prompt_with_default("Locality (L)", config['locality'])
|
||
organization = prompt_with_default("Organization (O)", config['organization'])
|
||
org_unit = prompt_with_default("Organizational Unit (OU)", config['organizational_unit'])
|
||
validity_days = prompt_with_default("Validity (days)", config['validity_days'])
|
||
key_bits = prompt_with_default("Key Length (bits)", config['key_bits'])
|
||
|
||
print("\n" + "=" * 60)
|
||
print("Summary:")
|
||
print("=" * 60)
|
||
print(f"System Type: {system_info['name']}")
|
||
print(f"Target Host: {target_host}")
|
||
print(f"Common Name: {common_name}")
|
||
if additional_dns:
|
||
print(f"Additional DNS: {additional_dns}")
|
||
print(f"Country: {country}")
|
||
print(f"State: {state}")
|
||
print(f"Locality: {locality}")
|
||
print(f"Organization: {organization}")
|
||
print(f"Org Unit: {org_unit}")
|
||
print(f"Key Length: {key_bits} bits")
|
||
print(f"Validity: {validity_days} days")
|
||
print(f"CA Server: {config['ca_server']}")
|
||
print(f"Output files: {short_name}.req, {short_name}-cert.pem")
|
||
print("=" * 60)
|
||
print()
|
||
|
||
if not yes_no_prompt("Proceed with certificate generation?", True):
|
||
print("Cancelled.")
|
||
sys.exit(0)
|
||
|
||
# Create output directory for this system type
|
||
cert_dir = WORKSPACE_ROOT / 'certs' / system_info.get('cert_dir', 'other')
|
||
cert_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
# Change to cert output directory
|
||
original_dir = Path.cwd()
|
||
os.chdir(cert_dir)
|
||
print(f"\nOutput directory: {cert_dir}")
|
||
|
||
# Save config for next run
|
||
config['last_target_host'] = target_host
|
||
config['last_common_name'] = common_name
|
||
save_config(config)
|
||
|
||
print("\n" + "=" * 60)
|
||
print("Step 1: Generating CSR")
|
||
print("=" * 60)
|
||
|
||
# Get script directory
|
||
script_dir = SCRIPT_DIR
|
||
|
||
# Check if system requires local CSR generation
|
||
if system_info.get('uses_local_csr', False):
|
||
# Generate CSR locally for Home Assistant and similar systems
|
||
# Get target IP addresses
|
||
print()
|
||
target_ip = prompt_with_default("Target IP address(es) (comma-separated)", "172.20.70.10,172.20.20.10")
|
||
|
||
generate_cmd = [
|
||
str(script_dir / 'generate-csr-local.sh'),
|
||
common_name,
|
||
country,
|
||
state,
|
||
locality,
|
||
organization,
|
||
org_unit,
|
||
key_bits,
|
||
additional_dns,
|
||
target_ip
|
||
]
|
||
else:
|
||
# Set environment for remote SSH commands
|
||
env = os.environ.copy()
|
||
if ssh_user:
|
||
env['SSH_USER'] = ssh_user
|
||
if ssh_password:
|
||
env['SSH_PASSWORD'] = ssh_password
|
||
|
||
# Run generate-csr.sh for remote CSR generation
|
||
generate_cmd = [
|
||
str(script_dir / 'generate-csr.sh'),
|
||
target_host,
|
||
common_name,
|
||
country,
|
||
state,
|
||
locality,
|
||
organization,
|
||
org_unit,
|
||
key_bits,
|
||
additional_dns
|
||
]
|
||
|
||
try:
|
||
# Set environment variables for all subprocess calls
|
||
env = os.environ.copy()
|
||
if ssh_user:
|
||
env['SSH_USER'] = ssh_user
|
||
if ssh_password:
|
||
env['SSH_PASSWORD'] = ssh_password
|
||
|
||
result = subprocess.run(generate_cmd, check=True, env=env)
|
||
except subprocess.CalledProcessError as e:
|
||
print(f"\nError: CSR generation failed with exit code {e.returncode}")
|
||
sys.exit(1)
|
||
except FileNotFoundError:
|
||
print(f"\nError: generate-csr.sh not found in {script_dir}")
|
||
sys.exit(1)
|
||
|
||
print("\n" + "=" * 60)
|
||
print("Step 2: Signing certificate with CA")
|
||
print("=" * 60)
|
||
|
||
# Run sign-cert.sh
|
||
req_file = f"{short_name}.req"
|
||
sign_cmd = [
|
||
str(script_dir / 'sign-cert.sh'),
|
||
req_file,
|
||
short_name,
|
||
validity_days
|
||
]
|
||
|
||
try:
|
||
result = subprocess.run(sign_cmd, check=True)
|
||
except subprocess.CalledProcessError as e:
|
||
print(f"\nError: Certificate signing failed with exit code {e.returncode}")
|
||
sys.exit(1)
|
||
except FileNotFoundError:
|
||
print(f"\nError: sign-cert.sh not found in {script_dir}")
|
||
sys.exit(1)
|
||
|
||
# Certificate file path
|
||
cert_file = f"{short_name}-cert.pem"
|
||
|
||
# Check and create DNS records if needed
|
||
check_and_create_dns_records(cert_file, target_host, config['ca_server'])
|
||
|
||
print("\n" + "=" * 60)
|
||
print("Step 5: Deploying certificate to target host")
|
||
print("=" * 60)
|
||
|
||
# Use system-specific deployment if available
|
||
if system_info['deploy_script']:
|
||
if yes_no_prompt(f"Deploy certificate to {system_info['name']} automatically?", True):
|
||
deploy_script = script_dir / system_info['deploy_script']
|
||
|
||
# For local CSR generation, key file is local
|
||
if system_info.get('uses_local_csr', False):
|
||
key_file = f"{short_name}.key" # Key is local
|
||
else:
|
||
key_file = f"/tmp/{short_name}.key" # Key is on remote host
|
||
|
||
# Set SSH_USER and SSH_PASSWORD environment variables if needed
|
||
env = os.environ.copy()
|
||
if ssh_user:
|
||
env['SSH_USER'] = ssh_user
|
||
if ssh_password:
|
||
env['SSH_PASSWORD'] = ssh_password
|
||
|
||
deploy_cmd = [
|
||
str(deploy_script),
|
||
target_host,
|
||
cert_file,
|
||
key_file,
|
||
short_name
|
||
]
|
||
|
||
try:
|
||
subprocess.run(deploy_cmd, check=True, env=env)
|
||
except subprocess.CalledProcessError as e:
|
||
print(f"\nError: Deployment failed with exit code {e.returncode}")
|
||
sys.exit(1)
|
||
except FileNotFoundError:
|
||
print(f"\nError: {system_info['deploy_script']} not found")
|
||
sys.exit(1)
|
||
else:
|
||
# Generic deployment - just copy files
|
||
if yes_no_prompt("Copy certificate back to the target host?", True):
|
||
try:
|
||
subprocess.run(['scp', cert_file, f'root@{target_host}:/tmp/{short_name}.crt'], check=True)
|
||
print(f"\n✓ Certificate copied to target host at /tmp/{short_name}.crt")
|
||
print(f" Private key is at /tmp/{short_name}.key")
|
||
print(f"\n⚠ Manual installation required for {system_info['name']}")
|
||
print(f" Please install the certificate through the web interface.")
|
||
except subprocess.CalledProcessError:
|
||
print("\nWarning: Failed to copy certificate to target host")
|
||
|
||
print("\n" + "=" * 60)
|
||
print("✓ Certificate Management Complete!")
|
||
print("=" * 60)
|
||
print(f"\nFiles created:")
|
||
print(f" - {req_file} (Certificate Request)")
|
||
print(f" - {cert_file} (Signed Certificate)")
|
||
print(f"\nOn target host ({target_host}):")
|
||
print(f" - /tmp/{short_name}.key (Private Key - {key_bits} bits)")
|
||
print(f" - /tmp/{short_name}.crt (Certificate)")
|
||
|
||
if system_type == 'proxmox':
|
||
print(f"\n✓ Access Proxmox at: https://{target_host}:{system_info['default_port']}")
|
||
|
||
print("\n")
|
||
|
||
if __name__ == '__main__':
|
||
try:
|
||
main()
|
||
except KeyboardInterrupt:
|
||
print("\n\nCancelled by user.")
|
||
sys.exit(1)
|
||
except Exception as e:
|
||
print(f"\nError: {e}")
|
||
sys.exit(1)
|