Files
zertifizierung/cert-manager.py
root 296948f07e Add interactive credential prompting and fix Home Assistant SSH password authentication
- Added interactive username/password prompts to cert-manager.py
- Removed requirement for SSH_USER environment variable prefix
- Fixed password authentication in deploy-homeassistant.sh using SSHPASS environment variable
- Added SSH rate limiting delays throughout deployment script
- Improved error handling with SSH connection testing
- Prioritized SSH_USER in detect-system.sh to avoid unnecessary root attempts
- Added StrictHostKeyChecking=no for automated deployments

Tool now works fully interactively - just run ./cert-manager.py and answer prompts
2025-12-12 15:38:41 +01:00

438 lines
15 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Interactive Certificate Manager
Generates CSR on remote host and signs it with UCS CA
"""
import os
import sys
import json
import subprocess
import getpass
from pathlib import Path
# Configuration
CONFIG_FILE = Path.home() / '.cert-manager-config.json'
DEFAULT_CONFIG = {
'country': 'DE',
'state': 'berlin',
'locality': 'berlin',
'organization': 'egonetix',
'organizational_unit': 'it',
'ca_server': '10.0.0.21',
'validity_days': '3650',
'key_bits': '4096',
'last_target_host': '',
'last_common_name': ''
}
SYSTEM_TYPES = {
'proxmox': {
'name': 'Proxmox VE',
'deploy_script': 'deploy-proxmox.sh',
'key_location': '/tmp/{short_name}.key',
'default_port': '8006',
'ssh_user': 'root'
},
'homeassistant': {
'name': 'Home Assistant',
'deploy_script': 'deploy-homeassistant.sh',
'key_location': '/tmp/{short_name}.key',
'default_port': '8123',
'ssh_user': 'icke',
'uses_local_csr': True
},
'pfsense': {
'name': 'pfSense',
'deploy_script': None, # Manual deployment via web interface
'key_location': '/tmp/{short_name}.key',
'default_port': '443',
'ssh_user': 'root'
},
'truenas': {
'name': 'TrueNAS',
'deploy_script': None, # Manual deployment via web interface
'key_location': '/tmp/{short_name}.key',
'default_port': '443',
'ssh_user': 'root'
},
'ucs': {
'name': 'Univention Corporate Server',
'deploy_script': None,
'key_location': '/tmp/{short_name}.key',
'default_port': '443',
'ssh_user': 'root'
},
'unknown': {
'name': 'Unknown System',
'deploy_script': None,
'key_location': '/tmp/{short_name}.key',
'default_port': '443',
'ssh_user': 'root'
}
}
def load_config():
"""Load configuration from file or return defaults"""
if CONFIG_FILE.exists():
try:
with open(CONFIG_FILE, 'r') as f:
config = json.load(f)
return {**DEFAULT_CONFIG, **config}
except Exception as e:
print(f"Warning: Could not load config: {e}")
return DEFAULT_CONFIG.copy()
def save_config(config):
"""Save configuration to file"""
try:
with open(CONFIG_FILE, 'w') as f:
json.dump(config, f, indent=2)
except Exception as e:
print(f"Warning: Could not save config: {e}")
def prompt_with_default(prompt, default):
"""Prompt user with a default value"""
if default:
user_input = input(f"{prompt} [{default}]: ").strip()
return user_input if user_input else default
else:
return input(f"{prompt}: ").strip()
def yes_no_prompt(prompt, default=True):
"""Ask a yes/no question"""
default_str = "Y/n" if default else "y/N"
while True:
response = input(f"{prompt} [{default_str}]: ").strip().lower()
if not response:
return default
if response in ['y', 'yes']:
return True
if response in ['n', 'no']:
return False
print("Please answer 'y' or 'n'")
def detect_system_type(target_host, script_dir, ssh_user=None, ssh_password=None):
"""Detect the type of system on the target host"""
try:
detect_script = script_dir / 'detect-system.sh'
if not detect_script.exists():
print(f"Warning: detect-system.sh not found at {detect_script}")
return 'unknown', None
env = os.environ.copy()
if ssh_user:
env['SSH_USER'] = ssh_user
if ssh_password:
env['SSH_PASSWORD'] = ssh_password
result = subprocess.run(
[str(detect_script), target_host],
capture_output=True,
text=True,
timeout=15,
env=env
)
if result.returncode != 0:
print(f"Warning: Detection script returned error code {result.returncode}")
if result.stderr:
print(f"Error output: {result.stderr}")
return 'unknown', None
system_type = result.stdout.strip()
if not system_type:
print("Warning: Detection script returned empty output")
return 'unknown', None
# Extract SSH user if detected
detected_user = None
if ':' in system_type:
system_type, detected_user = system_type.split(':', 1)
return (system_type if system_type in SYSTEM_TYPES else 'unknown'), detected_user
except subprocess.TimeoutExpired:
print(f"Warning: System detection timed out for {target_host}")
return 'unknown', None
except Exception as e:
print(f"Warning: Could not detect system type: {e}")
return 'unknown', None
def main():
print("=" * 60)
print("Interactive Certificate Manager")
print("=" * 60)
print()
# Load config
config = load_config()
# Ask if user wants to modify defaults
if CONFIG_FILE.exists():
if yes_no_prompt("Do you want to modify default values?", False):
print("\n--- Default Values Configuration ---")
config['country'] = prompt_with_default("Country (C)", config['country'])
config['state'] = prompt_with_default("State/Province (ST)", config['state'])
config['locality'] = prompt_with_default("Locality (L)", config['locality'])
config['organization'] = prompt_with_default("Organization (O)", config['organization'])
config['organizational_unit'] = prompt_with_default("Organizational Unit (OU)", config['organizational_unit'])
config['ca_server'] = prompt_with_default("CA Server", config['ca_server'])
config['validity_days'] = prompt_with_default("Validity (days)", config['validity_days'])
config['key_bits'] = prompt_with_default("Key Length (bits)", config['key_bits'])
print()
# Get certificate details
print("--- Certificate Details ---")
target_host = prompt_with_default("Target Host (IP or hostname)", config['last_target_host'])
if not target_host:
print("Error: Target host is required!")
sys.exit(1)
# Get script directory
script_dir = Path(__file__).parent.absolute()
# Detect system type (try key-based auth first)
print(f"\nDetecting system type on {target_host}...")
print("Trying SSH key-based authentication...")
system_type, detected_user = detect_system_type(target_host, script_dir)
# If detection failed, ask for credentials
ssh_user = None
ssh_password = None
if system_type == 'unknown':
print("\nKey-based authentication failed or system not detected.")
if yes_no_prompt("Do you want to try with username/password?", True):
ssh_user = input("SSH Username: ").strip()
import getpass
ssh_password = getpass.getpass("SSH Password: ")
print(f"\nRetrying detection with credentials...")
system_type, detected_user = detect_system_type(target_host, script_dir, ssh_user, ssh_password)
# Use detected user or ask for one
if detected_user:
ssh_user = detected_user
elif not ssh_user and system_type != 'unknown':
# Ask for SSH user if not detected
default_user = SYSTEM_TYPES[system_type].get('ssh_user', 'root')
ssh_user = prompt_with_default("SSH Username", default_user)
system_info = SYSTEM_TYPES[system_type]
print(f"✓ Detected: {system_info['name']}")
if ssh_user:
print(f" SSH User: {ssh_user}")
common_name = prompt_with_default("Common Name (FQDN)", config['last_common_name'])
# Ask for additional DNS names
print("\nAdditional DNS names (optional, comma-separated):")
print(" Example: firewall.domain.com,vpn.domain.com")
additional_dns = input("Additional DNS names [none]: ").strip()
if not common_name:
print("Error: Common name is required!")
sys.exit(1)
# Extract short name for filenames
short_name = common_name.split('.')[0]
# Ask for custom values for this certificate
print("\n--- Certificate Subject (press Enter to use defaults) ---")
country = prompt_with_default("Country (C)", config['country'])
state = prompt_with_default("State/Province (ST)", config['state'])
locality = prompt_with_default("Locality (L)", config['locality'])
organization = prompt_with_default("Organization (O)", config['organization'])
org_unit = prompt_with_default("Organizational Unit (OU)", config['organizational_unit'])
validity_days = prompt_with_default("Validity (days)", config['validity_days'])
key_bits = prompt_with_default("Key Length (bits)", config['key_bits'])
print("\n" + "=" * 60)
print("Summary:")
print("=" * 60)
print(f"System Type: {system_info['name']}")
print(f"Target Host: {target_host}")
print(f"Common Name: {common_name}")
if additional_dns:
print(f"Additional DNS: {additional_dns}")
print(f"Country: {country}")
print(f"State: {state}")
print(f"Locality: {locality}")
print(f"Organization: {organization}")
print(f"Org Unit: {org_unit}")
print(f"Key Length: {key_bits} bits")
print(f"Validity: {validity_days} days")
print(f"CA Server: {config['ca_server']}")
print(f"Output files: {short_name}.req, {short_name}-cert.pem")
print("=" * 60)
print()
if not yes_no_prompt("Proceed with certificate generation?", True):
print("Cancelled.")
sys.exit(0)
# Save config for next run
config['last_target_host'] = target_host
config['last_common_name'] = common_name
save_config(config)
print("\n" + "=" * 60)
print("Step 1: Generating CSR")
print("=" * 60)
# Check if system requires local CSR generation
if system_info.get('uses_local_csr', False):
# Generate CSR locally for Home Assistant and similar systems
# Get target IP addresses
print()
target_ip = prompt_with_default("Target IP address(es) (comma-separated)", "172.20.70.10,172.20.20.10")
generate_cmd = [
str(script_dir / 'generate-csr-local.sh'),
common_name,
country,
state,
locality,
organization,
org_unit,
key_bits,
additional_dns,
target_ip
]
else:
# Set environment for remote SSH commands
env = os.environ.copy()
if ssh_user:
env['SSH_USER'] = ssh_user
if ssh_password:
env['SSH_PASSWORD'] = ssh_password
# Run generate-csr.sh for remote CSR generation
generate_cmd = [
str(script_dir / 'generate-csr.sh'),
target_host,
common_name,
country,
state,
locality,
organization,
org_unit,
key_bits,
additional_dns
]
try:
# Set environment variables for all subprocess calls
env = os.environ.copy()
if ssh_user:
env['SSH_USER'] = ssh_user
if ssh_password:
env['SSH_PASSWORD'] = ssh_password
result = subprocess.run(generate_cmd, check=True, env=env)
except subprocess.CalledProcessError as e:
print(f"\nError: CSR generation failed with exit code {e.returncode}")
sys.exit(1)
except FileNotFoundError:
print(f"\nError: generate-csr.sh not found in {script_dir}")
sys.exit(1)
print("\n" + "=" * 60)
print("Step 2: Signing certificate with CA")
print("=" * 60)
# Run sign-cert.sh
req_file = f"{short_name}.req"
sign_cmd = [
str(script_dir / 'sign-cert.sh'),
req_file,
short_name,
validity_days
]
try:
result = subprocess.run(sign_cmd, check=True)
except subprocess.CalledProcessError as e:
print(f"\nError: Certificate signing failed with exit code {e.returncode}")
sys.exit(1)
except FileNotFoundError:
print(f"\nError: sign-cert.sh not found in {script_dir}")
sys.exit(1)
print("\n" + "=" * 60)
print("Step 3: Deploying certificate to target host")
print("=" * 60)
cert_file = f"{short_name}-cert.pem"
# Use system-specific deployment if available
if system_info['deploy_script']:
if yes_no_prompt(f"Deploy certificate to {system_info['name']} automatically?", True):
deploy_script = script_dir / system_info['deploy_script']
# For local CSR generation, key file is local
if system_info.get('uses_local_csr', False):
key_file = f"{short_name}.key" # Key is local
else:
key_file = f"/tmp/{short_name}.key" # Key is on remote host
# Set SSH_USER and SSH_PASSWORD environment variables if needed
env = os.environ.copy()
if ssh_user:
env['SSH_USER'] = ssh_user
if ssh_password:
env['SSH_PASSWORD'] = ssh_password
deploy_cmd = [
str(deploy_script),
target_host,
cert_file,
key_file,
short_name
]
try:
subprocess.run(deploy_cmd, check=True, env=env)
except subprocess.CalledProcessError as e:
print(f"\nError: Deployment failed with exit code {e.returncode}")
sys.exit(1)
except FileNotFoundError:
print(f"\nError: {system_info['deploy_script']} not found")
sys.exit(1)
else:
# Generic deployment - just copy files
if yes_no_prompt("Copy certificate back to the target host?", True):
try:
subprocess.run(['scp', cert_file, f'root@{target_host}:/tmp/{short_name}.crt'], check=True)
print(f"\n✓ Certificate copied to target host at /tmp/{short_name}.crt")
print(f" Private key is at /tmp/{short_name}.key")
print(f"\n⚠ Manual installation required for {system_info['name']}")
print(f" Please install the certificate through the web interface.")
except subprocess.CalledProcessError:
print("\nWarning: Failed to copy certificate to target host")
print("\n" + "=" * 60)
print("✓ Certificate Management Complete!")
print("=" * 60)
print(f"\nFiles created:")
print(f" - {req_file} (Certificate Request)")
print(f" - {cert_file} (Signed Certificate)")
print(f"\nOn target host ({target_host}):")
print(f" - /tmp/{short_name}.key (Private Key - {key_bits} bits)")
print(f" - /tmp/{short_name}.crt (Certificate)")
if system_type == 'proxmox':
print(f"\n✓ Access Proxmox at: https://{target_host}:{system_info['default_port']}")
print("\n")
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print("\n\nCancelled by user.")
sys.exit(1)
except Exception as e:
print(f"\nError: {e}")
sys.exit(1)