- Enhanced detect-system.sh with better SSH options - Added BatchMode and StrictHostKeyChecking=no for automation - Increased timeout from 5 to 10 seconds - Explicit exit codes for clarity - Improved cert-manager.py detection function: - Checks if detect script exists before running - Validates return code - Checks for empty output - Better timeout handling (15 seconds) - More detailed error messages - Handles TimeoutExpired exception separately
333 lines
11 KiB
Python
Executable File
333 lines
11 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Interactive Certificate Manager
|
|
Generates CSR on remote host and signs it with UCS CA
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import subprocess
|
|
from pathlib import Path
|
|
|
|
# Configuration
|
|
CONFIG_FILE = Path.home() / '.cert-manager-config.json'
|
|
DEFAULT_CONFIG = {
|
|
'country': 'DE',
|
|
'state': 'berlin',
|
|
'locality': 'berlin',
|
|
'organization': 'egonetix',
|
|
'organizational_unit': 'it',
|
|
'ca_server': '10.0.0.21',
|
|
'validity_days': '3650',
|
|
'key_bits': '4096',
|
|
'last_target_host': '',
|
|
'last_common_name': ''
|
|
}
|
|
|
|
SYSTEM_TYPES = {
|
|
'proxmox': {
|
|
'name': 'Proxmox VE',
|
|
'deploy_script': 'deploy-proxmox.sh',
|
|
'key_location': '/tmp/{short_name}.key',
|
|
'default_port': '8006'
|
|
},
|
|
'pfsense': {
|
|
'name': 'pfSense',
|
|
'deploy_script': None, # Manual deployment via web interface
|
|
'key_location': '/tmp/{short_name}.key',
|
|
'default_port': '443'
|
|
},
|
|
'truenas': {
|
|
'name': 'TrueNAS',
|
|
'deploy_script': None, # Manual deployment via web interface
|
|
'key_location': '/tmp/{short_name}.key',
|
|
'default_port': '443'
|
|
},
|
|
'ucs': {
|
|
'name': 'Univention Corporate Server',
|
|
'deploy_script': None,
|
|
'key_location': '/tmp/{short_name}.key',
|
|
'default_port': '443'
|
|
},
|
|
'unknown': {
|
|
'name': 'Unknown System',
|
|
'deploy_script': None,
|
|
'key_location': '/tmp/{short_name}.key',
|
|
'default_port': '443'
|
|
}
|
|
}
|
|
|
|
def load_config():
|
|
"""Load configuration from file or return defaults"""
|
|
if CONFIG_FILE.exists():
|
|
try:
|
|
with open(CONFIG_FILE, 'r') as f:
|
|
config = json.load(f)
|
|
return {**DEFAULT_CONFIG, **config}
|
|
except Exception as e:
|
|
print(f"Warning: Could not load config: {e}")
|
|
return DEFAULT_CONFIG.copy()
|
|
|
|
def save_config(config):
|
|
"""Save configuration to file"""
|
|
try:
|
|
with open(CONFIG_FILE, 'w') as f:
|
|
json.dump(config, f, indent=2)
|
|
except Exception as e:
|
|
print(f"Warning: Could not save config: {e}")
|
|
|
|
def prompt_with_default(prompt, default):
|
|
"""Prompt user with a default value"""
|
|
if default:
|
|
user_input = input(f"{prompt} [{default}]: ").strip()
|
|
return user_input if user_input else default
|
|
else:
|
|
return input(f"{prompt}: ").strip()
|
|
|
|
def yes_no_prompt(prompt, default=True):
|
|
"""Ask a yes/no question"""
|
|
default_str = "Y/n" if default else "y/N"
|
|
while True:
|
|
response = input(f"{prompt} [{default_str}]: ").strip().lower()
|
|
if not response:
|
|
return default
|
|
if response in ['y', 'yes']:
|
|
return True
|
|
if response in ['n', 'no']:
|
|
return False
|
|
print("Please answer 'y' or 'n'")
|
|
|
|
def detect_system_type(target_host, script_dir):
|
|
"""Detect the type of system on the target host"""
|
|
try:
|
|
detect_script = script_dir / 'detect-system.sh'
|
|
if not detect_script.exists():
|
|
print(f"Warning: detect-system.sh not found at {detect_script}")
|
|
return 'unknown'
|
|
|
|
result = subprocess.run(
|
|
[str(detect_script), target_host],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=15
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
print(f"Warning: Detection script returned error code {result.returncode}")
|
|
if result.stderr:
|
|
print(f"Error output: {result.stderr}")
|
|
return 'unknown'
|
|
|
|
system_type = result.stdout.strip()
|
|
if not system_type:
|
|
print("Warning: Detection script returned empty output")
|
|
return 'unknown'
|
|
|
|
return system_type if system_type in SYSTEM_TYPES else 'unknown'
|
|
except subprocess.TimeoutExpired:
|
|
print(f"Warning: System detection timed out for {target_host}")
|
|
return 'unknown'
|
|
except Exception as e:
|
|
print(f"Warning: Could not detect system type: {e}")
|
|
return 'unknown'
|
|
|
|
def main():
|
|
print("=" * 60)
|
|
print("Interactive Certificate Manager")
|
|
print("=" * 60)
|
|
print()
|
|
|
|
# Load config
|
|
config = load_config()
|
|
|
|
# Ask if user wants to modify defaults
|
|
if CONFIG_FILE.exists():
|
|
if yes_no_prompt("Do you want to modify default values?", False):
|
|
print("\n--- Default Values Configuration ---")
|
|
config['country'] = prompt_with_default("Country (C)", config['country'])
|
|
config['state'] = prompt_with_default("State/Province (ST)", config['state'])
|
|
config['locality'] = prompt_with_default("Locality (L)", config['locality'])
|
|
config['organization'] = prompt_with_default("Organization (O)", config['organization'])
|
|
config['organizational_unit'] = prompt_with_default("Organizational Unit (OU)", config['organizational_unit'])
|
|
config['ca_server'] = prompt_with_default("CA Server", config['ca_server'])
|
|
config['validity_days'] = prompt_with_default("Validity (days)", config['validity_days'])
|
|
config['key_bits'] = prompt_with_default("Key Length (bits)", config['key_bits'])
|
|
print()
|
|
|
|
# Get certificate details
|
|
print("--- Certificate Details ---")
|
|
target_host = prompt_with_default("Target Host (IP or hostname)", config['last_target_host'])
|
|
|
|
if not target_host:
|
|
print("Error: Target host is required!")
|
|
sys.exit(1)
|
|
|
|
# Get script directory
|
|
script_dir = Path(__file__).parent.absolute()
|
|
|
|
# Detect system type
|
|
print(f"\nDetecting system type on {target_host}...")
|
|
system_type = detect_system_type(target_host, script_dir)
|
|
system_info = SYSTEM_TYPES[system_type]
|
|
print(f"✓ Detected: {system_info['name']}")
|
|
|
|
common_name = prompt_with_default("Common Name (FQDN)", config['last_common_name'])
|
|
|
|
if not common_name:
|
|
print("Error: Common name is required!")
|
|
sys.exit(1)
|
|
|
|
# Extract short name for filenames
|
|
short_name = common_name.split('.')[0]
|
|
|
|
# Ask for custom values for this certificate
|
|
print("\n--- Certificate Subject (press Enter to use defaults) ---")
|
|
country = prompt_with_default("Country (C)", config['country'])
|
|
state = prompt_with_default("State/Province (ST)", config['state'])
|
|
locality = prompt_with_default("Locality (L)", config['locality'])
|
|
organization = prompt_with_default("Organization (O)", config['organization'])
|
|
org_unit = prompt_with_default("Organizational Unit (OU)", config['organizational_unit'])
|
|
validity_days = prompt_with_default("Validity (days)", config['validity_days'])
|
|
key_bits = prompt_with_default("Key Length (bits)", config['key_bits'])
|
|
|
|
print("\n" + "=" * 60)
|
|
print("Summary:")
|
|
print("=" * 60)
|
|
print(f"System Type: {system_info['name']}")
|
|
print(f"Target Host: {target_host}")
|
|
print(f"Common Name: {common_name}")
|
|
print(f"Country: {country}")
|
|
print(f"State: {state}")
|
|
print(f"Locality: {locality}")
|
|
print(f"Organization: {organization}")
|
|
print(f"Org Unit: {org_unit}")
|
|
print(f"Key Length: {key_bits} bits")
|
|
print(f"Validity: {validity_days} days")
|
|
print(f"CA Server: {config['ca_server']}")
|
|
print(f"Output files: {short_name}.req, {short_name}-cert.pem")
|
|
print("=" * 60)
|
|
print()
|
|
|
|
if not yes_no_prompt("Proceed with certificate generation?", True):
|
|
print("Cancelled.")
|
|
sys.exit(0)
|
|
|
|
# Save config for next run
|
|
config['last_target_host'] = target_host
|
|
config['last_common_name'] = common_name
|
|
save_config(config)
|
|
|
|
print("\n" + "=" * 60)
|
|
print("Step 1: Generating CSR on target host")
|
|
print("=" * 60)
|
|
|
|
# Run generate-csr.sh
|
|
generate_cmd = [
|
|
str(script_dir / 'generate-csr.sh'),
|
|
target_host,
|
|
common_name,
|
|
country,
|
|
state,
|
|
locality,
|
|
organization,
|
|
org_unit,
|
|
key_bits
|
|
]
|
|
|
|
try:
|
|
result = subprocess.run(generate_cmd, check=True)
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"\nError: CSR generation failed with exit code {e.returncode}")
|
|
sys.exit(1)
|
|
except FileNotFoundError:
|
|
print(f"\nError: generate-csr.sh not found in {script_dir}")
|
|
sys.exit(1)
|
|
|
|
print("\n" + "=" * 60)
|
|
print("Step 2: Signing certificate with CA")
|
|
print("=" * 60)
|
|
|
|
# Run sign-cert.sh
|
|
req_file = f"{short_name}.req"
|
|
sign_cmd = [
|
|
str(script_dir / 'sign-cert.sh'),
|
|
req_file,
|
|
short_name,
|
|
validity_days
|
|
]
|
|
|
|
try:
|
|
result = subprocess.run(sign_cmd, check=True)
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"\nError: Certificate signing failed with exit code {e.returncode}")
|
|
sys.exit(1)
|
|
except FileNotFoundError:
|
|
print(f"\nError: sign-cert.sh not found in {script_dir}")
|
|
sys.exit(1)
|
|
|
|
print("\n" + "=" * 60)
|
|
print("Step 3: Deploying certificate to target host")
|
|
print("=" * 60)
|
|
|
|
cert_file = f"{short_name}-cert.pem"
|
|
|
|
# Use system-specific deployment if available
|
|
if system_info['deploy_script']:
|
|
if yes_no_prompt(f"Deploy certificate to {system_info['name']} automatically?", True):
|
|
deploy_script = script_dir / system_info['deploy_script']
|
|
key_file = f"/tmp/{short_name}.key" # Key is on remote host
|
|
|
|
deploy_cmd = [
|
|
str(deploy_script),
|
|
target_host,
|
|
cert_file,
|
|
key_file,
|
|
short_name
|
|
]
|
|
|
|
try:
|
|
subprocess.run(deploy_cmd, check=True)
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"\nError: Deployment failed with exit code {e.returncode}")
|
|
sys.exit(1)
|
|
except FileNotFoundError:
|
|
print(f"\nError: {system_info['deploy_script']} not found")
|
|
sys.exit(1)
|
|
else:
|
|
# Generic deployment - just copy files
|
|
if yes_no_prompt("Copy certificate back to the target host?", True):
|
|
try:
|
|
subprocess.run(['scp', cert_file, f'root@{target_host}:/tmp/{short_name}.crt'], check=True)
|
|
print(f"\n✓ Certificate copied to target host at /tmp/{short_name}.crt")
|
|
print(f" Private key is at /tmp/{short_name}.key")
|
|
print(f"\n⚠ Manual installation required for {system_info['name']}")
|
|
print(f" Please install the certificate through the web interface.")
|
|
except subprocess.CalledProcessError:
|
|
print("\nWarning: Failed to copy certificate to target host")
|
|
|
|
print("\n" + "=" * 60)
|
|
print("✓ Certificate Management Complete!")
|
|
print("=" * 60)
|
|
print(f"\nFiles created:")
|
|
print(f" - {req_file} (Certificate Request)")
|
|
print(f" - {cert_file} (Signed Certificate)")
|
|
print(f"\nOn target host ({target_host}):")
|
|
print(f" - /tmp/{short_name}.key (Private Key - {key_bits} bits)")
|
|
print(f" - /tmp/{short_name}.crt (Certificate)")
|
|
|
|
if system_type == 'proxmox':
|
|
print(f"\n✓ Access Proxmox at: https://{target_host}:{system_info['default_port']}")
|
|
|
|
print("\n")
|
|
|
|
if __name__ == '__main__':
|
|
try:
|
|
main()
|
|
except KeyboardInterrupt:
|
|
print("\n\nCancelled by user.")
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
print(f"\nError: {e}")
|
|
sys.exit(1)
|