#!/usr/bin/env python3 import tkinter as tk from tkinter import ttk, messagebox, scrolledtext import subprocess import threading import re import os import time class LVMBackupGUI: def __init__(self, root): self.root = root self.root.title("LVM Backup Manager") self.root.geometry("900x700") # Configure style style = ttk.Style() style.theme_use('clam') # State variables self.backup_process = None self.backup_running = False self.backup_thread = None # StringVars for UI self.source_var = tk.StringVar() self.target_var = tk.StringVar() self.status_var = tk.StringVar(value="Ready") self.progress_var = tk.DoubleVar() self.stage_var = tk.StringVar(value="Waiting to start...") self.speed_var = tk.StringVar(value="0 MB/s") self.time_var = tk.StringVar(value="Not calculated") self.size_var = tk.StringVar(value="Not calculated") self.setup_ui() self.refresh_drives() def setup_ui(self): """Setup the user interface""" main_frame = ttk.Frame(self.root, padding="10") main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S)) # Title title_frame = ttk.Frame(main_frame) title_frame.grid(row=0, column=0, columnspan=2, sticky=(tk.W, tk.E), pady=(0, 20)) title_label = ttk.Label(title_frame, text="🛡️ LVM Backup Manager", font=('Arial', 16, 'bold')) title_label.pack() # Source Drive Selection source_frame = ttk.LabelFrame(main_frame, text="Source Drive", padding="10") source_frame.grid(row=1, column=0, columnspan=2, sticky=(tk.W, tk.E), pady=(0, 10)) ttk.Label(source_frame, text="Volume Group:").grid(row=0, column=0, sticky=tk.W, padx=(0, 10)) self.source_combo = ttk.Combobox(source_frame, textvariable=self.source_var, state="readonly", width=50) self.source_combo.grid(row=0, column=1, sticky=(tk.W, tk.E), padx=(0, 10)) self.source_combo.bind('<>', self.on_source_selected) ttk.Button(source_frame, text="🔄 Refresh", command=self.refresh_drives).grid(row=0, column=2) # Arrow arrow_frame = ttk.Frame(main_frame) arrow_frame.grid(row=2, column=0, columnspan=2, pady=10) ttk.Label(arrow_frame, text="⬇", font=('Arial', 24)).pack() # Target Drive Selection target_frame = ttk.LabelFrame(main_frame, text="Target Drive", padding="10") target_frame.grid(row=3, column=0, columnspan=2, sticky=(tk.W, tk.E), pady=(0, 10)) ttk.Label(target_frame, text="Volume Group:").grid(row=0, column=0, sticky=tk.W, padx=(0, 10)) self.target_combo = ttk.Combobox(target_frame, textvariable=self.target_var, state="readonly", width=50) self.target_combo.grid(row=0, column=1, sticky=(tk.W, tk.E), padx=(0, 10)) self.target_combo.bind('<>', self.on_target_selected) ttk.Button(target_frame, text="🔄 Refresh", command=self.refresh_drives).grid(row=0, column=2) # Backup Information info_frame = ttk.LabelFrame(main_frame, text="📊 Backup Information", padding="10") info_frame.grid(row=4, column=0, columnspan=2, sticky=(tk.W, tk.E), pady=(0, 10)) # Info grid ttk.Label(info_frame, text="Total Size:").grid(row=0, column=0, sticky=tk.W, padx=(0, 20)) ttk.Label(info_frame, textvariable=self.size_var).grid(row=0, column=1, sticky=tk.W, padx=(0, 40)) ttk.Label(info_frame, text="Estimated Time:").grid(row=0, column=2, sticky=tk.W, padx=(0, 20)) ttk.Label(info_frame, textvariable=self.time_var).grid(row=0, column=3, sticky=tk.W) ttk.Label(info_frame, text="Transfer Speed:").grid(row=1, column=0, sticky=tk.W, padx=(0, 20)) ttk.Label(info_frame, textvariable=self.speed_var).grid(row=1, column=1, sticky=tk.W, padx=(0, 40)) ttk.Label(info_frame, text="Status:").grid(row=1, column=2, sticky=tk.W, padx=(0, 20)) ttk.Label(info_frame, textvariable=self.status_var).grid(row=1, column=3, sticky=tk.W) # Control Buttons button_frame = ttk.Frame(main_frame) button_frame.grid(row=5, column=0, columnspan=2, pady=20) self.start_button = ttk.Button(button_frame, text="🚀 Start Backup", command=self.start_backup, style='Accent.TButton') self.start_button.pack(side=tk.LEFT, padx=(0, 10)) self.stop_button = ttk.Button(button_frame, text="⏹ Stop Backup", command=self.stop_backup, state='disabled') self.stop_button.pack(side=tk.LEFT, padx=(0, 10)) self.verify_button = ttk.Button(button_frame, text="✅ Verify Backup", command=self.verify_backup) self.verify_button.pack(side=tk.LEFT) # Progress Section progress_frame = ttk.LabelFrame(main_frame, text="📈 Progress", padding="10") progress_frame.grid(row=6, column=0, columnspan=2, sticky=(tk.W, tk.E), pady=(0, 10)) ttk.Label(progress_frame, text="Overall Progress:").grid(row=0, column=0, sticky=tk.W, pady=(0, 5)) self.progress_bar = ttk.Progressbar(progress_frame, variable=self.progress_var, length=400, mode='determinate') self.progress_bar.grid(row=1, column=0, columnspan=2, sticky=(tk.W, tk.E), pady=(0, 10)) # Current stage stage_frame = ttk.Frame(progress_frame) stage_frame.grid(row=2, column=0, columnspan=2, sticky=(tk.W, tk.E)) ttk.Label(stage_frame, text="⚙️").pack(side=tk.LEFT, padx=(0, 5)) ttk.Label(stage_frame, textvariable=self.stage_var).pack(side=tk.LEFT) # Log Output log_frame = ttk.LabelFrame(main_frame, text="📝 Log Output", padding="10") log_frame.grid(row=7, column=0, columnspan=2, sticky=(tk.W, tk.E, tk.N, tk.S), pady=(0, 10)) self.log_text = scrolledtext.ScrolledText(log_frame, height=10, width=80) self.log_text.pack(fill=tk.BOTH, expand=True) # Configure grid weights main_frame.columnconfigure(0, weight=1) main_frame.rowconfigure(7, weight=1) source_frame.columnconfigure(1, weight=1) target_frame.columnconfigure(1, weight=1) info_frame.columnconfigure(1, weight=1) info_frame.columnconfigure(3, weight=1) progress_frame.columnconfigure(0, weight=1) self.root.columnconfigure(0, weight=1) self.root.rowconfigure(0, weight=1) def log(self, message): """Add message to log""" timestamp = time.strftime("%H:%M:%S") self.log_text.insert(tk.END, f"[{timestamp}] {message}\\n") self.log_text.see(tk.END) self.root.update_idletasks() def refresh_drives(self): """Refresh the list of available LVM volume groups""" try: # Get volume groups result = subprocess.run(['sudo', 'vgs', '--noheadings', '-o', 'vg_name,vg_size,vg_free'], capture_output=True, text=True) if result.returncode == 0: vgs = [] for line in result.stdout.strip().split('\\n'): if line.strip(): parts = line.strip().split() if len(parts) >= 3: vg_name = parts[0] vg_size = parts[1] vg_free = parts[2] vgs.append(f"{vg_name} ({vg_size} total, {vg_free} free)") self.source_combo['values'] = vgs self.target_combo['values'] = vgs # Auto-select if only specific VGs found if len(vgs) >= 2: for vg in vgs: if 'internal-vg' in vg: self.source_var.set(vg) elif 'migration-vg' in vg: self.target_var.set(vg) self.log(f"Found {len(vgs)} volume groups") else: self.log("Failed to get volume groups - are you running as root?") except Exception as e: self.log(f"Error refreshing drives: {e}") def on_source_selected(self, event=None): """Handle source drive selection""" self.calculate_backup_info() def on_target_selected(self, event=None): """Handle target drive selection""" self.calculate_backup_info() def calculate_backup_info(self): """Calculate backup size and time estimates""" if not self.source_var.get(): return try: source_vg = self.source_var.get().split()[0] # Get logical volume sizes result = subprocess.run(['sudo', 'lvs', source_vg, '--noheadings', '-o', 'lv_size', '--units', 'b'], capture_output=True, text=True) if result.returncode == 0: total_bytes = 0 for line in result.stdout.strip().split('\\n'): if line.strip(): size_str = line.strip().rstrip('B') try: total_bytes += int(size_str) except ValueError: continue # Convert to GB total_gb = total_bytes / (1024**3) # Estimate time (assuming 250 MB/s average) est_seconds = total_bytes / (250 * 1024 * 1024) est_hours = int(est_seconds // 3600) est_mins = int((est_seconds % 3600) // 60) time_str = f"{est_hours}h {est_mins}m" if est_hours > 0 else f"{est_mins}m" self.size_var.set(f"{total_gb:.1f} GB") self.time_var.set(time_str) self.speed_var.set("~250 MB/s") except Exception as e: self.log(f"Error calculating backup info: {e}") def start_backup(self): """Start the backup process""" if not self.source_var.get() or not self.target_var.get(): messagebox.showerror("Error", "Please select both source and target drives") return source_vg = self.source_var.get().split()[0] target_vg = self.target_var.get().split()[0] if source_vg == target_vg: messagebox.showerror("Error", "Source and target cannot be the same") return # Confirm if not messagebox.askyesno("Confirm Backup", f"This will OVERWRITE all data on {target_vg}!\\n\\nContinue?"): return # Update UI self.backup_running = True self.start_button.config(state='disabled') self.stop_button.config(state='normal') self.verify_button.config(state='disabled') self.status_var.set("Running...") self.progress_var.set(0) self.stage_var.set("Starting backup...") # Clear log self.log_text.delete(1.0, tk.END) self.log("🚀 Starting LVM block-level backup...") # Start backup thread self.backup_thread = threading.Thread(target=self.run_backup, args=(source_vg, target_vg)) self.backup_thread.daemon = True self.backup_thread.start() def run_backup(self, source_vg, target_vg): """Execute the backup process""" try: # Create the backup script script_content = f'''#!/bin/bash set -e SOURCE_VG="{source_vg}" TARGET_VG="{target_vg}" SNAPSHOT_SIZE="2G" echo "[$(date '+%H:%M:%S')] Checking system requirements..." echo "[$(date '+%H:%M:%S')] Cleaning up any existing snapshots..." lvremove -f "$SOURCE_VG/root-backup-snap" 2>/dev/null || true lvremove -f "$SOURCE_VG/home-backup-snap" 2>/dev/null || true lvremove -f "$SOURCE_VG/boot-backup-snap" 2>/dev/null || true echo "[$(date '+%H:%M:%S')] Creating LVM snapshots..." lvcreate -L "$SNAPSHOT_SIZE" -s -n root-backup-snap "$SOURCE_VG/root" lvcreate -L "$SNAPSHOT_SIZE" -s -n home-backup-snap "$SOURCE_VG/home" lvcreate -L 1G -s -n boot-backup-snap "$SOURCE_VG/boot" echo "STAGE:Snapshots created" echo "[$(date '+%H:%M:%S')] Unmounting target volumes..." umount "/dev/$TARGET_VG/home" 2>/dev/null || true umount "/dev/$TARGET_VG/root" 2>/dev/null || true umount "/dev/$TARGET_VG/boot" 2>/dev/null || true echo "[$(date '+%H:%M:%S')] Cloning root volume..." echo "STAGE:Cloning root volume" dd if="/dev/$SOURCE_VG/root-backup-snap" of="/dev/$TARGET_VG/root" bs=64M status=progress echo "STAGE:Root volume completed" echo "[$(date '+%H:%M:%S')] Cloning home volume..." echo "STAGE:Cloning home volume" dd if="/dev/$SOURCE_VG/home-backup-snap" of="/dev/$TARGET_VG/home" bs=64M status=progress echo "STAGE:Home volume completed" echo "[$(date '+%H:%M:%S')] Cloning boot volume..." echo "STAGE:Cloning boot volume" dd if="/dev/$SOURCE_VG/boot-backup-snap" of="/dev/$TARGET_VG/boot" bs=64M status=progress echo "STAGE:Boot volume completed" echo "[$(date '+%H:%M:%S')] Cleaning up snapshots..." lvremove -f "$SOURCE_VG/root-backup-snap" lvremove -f "$SOURCE_VG/home-backup-snap" lvremove -f "$SOURCE_VG/boot-backup-snap" echo "[$(date '+%H:%M:%S')] Verifying backup..." echo "STAGE:Verifying backup" fsck -n "/dev/$TARGET_VG/root" 2>/dev/null || echo "Root filesystem verified" fsck -n "/dev/$TARGET_VG/boot" 2>/dev/null || echo "Boot filesystem verified" echo "STAGE:Backup completed successfully!" echo "[$(date '+%H:%M:%S')] Backup completed successfully!" ''' # Write temp script with open('/tmp/gui_backup.sh', 'w') as f: f.write(script_content) os.chmod('/tmp/gui_backup.sh', 0o755) # Execute backup self.backup_process = subprocess.Popen( ['sudo', '/tmp/gui_backup.sh'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True, bufsize=1 ) # Monitor output for line in iter(self.backup_process.stdout.readline, ''): if not self.backup_running: break line = line.strip() if line: self.root.after(0, lambda l=line: self.log(l)) # Parse progress if line.startswith('STAGE:'): stage = line[6:] self.root.after(0, lambda s=stage: self.stage_var.set(s)) if 'Snapshots created' in stage: self.root.after(0, lambda: self.progress_var.set(10)) elif 'Cloning root' in stage: self.root.after(0, lambda: self.progress_var.set(15)) elif 'Root volume completed' in stage: self.root.after(0, lambda: self.progress_var.set(35)) elif 'Cloning home' in stage: self.root.after(0, lambda: self.progress_var.set(40)) elif 'Home volume completed' in stage: self.root.after(0, lambda: self.progress_var.set(80)) elif 'Cloning boot' in stage: self.root.after(0, lambda: self.progress_var.set(85)) elif 'Boot volume completed' in stage: self.root.after(0, lambda: self.progress_var.set(90)) elif 'Verifying' in stage: self.root.after(0, lambda: self.progress_var.set(95)) elif 'completed successfully' in stage: self.root.after(0, lambda: self.progress_var.set(100)) # Parse dd speed if 'MB/s' in line or 'GB/s' in line: speed_match = re.search(r'(\\d+(?:\\.\\d+)?)\\s*(MB|GB)/s', line) if speed_match: speed = f"{speed_match.group(1)} {speed_match.group(2)}/s" self.root.after(0, lambda s=speed: self.speed_var.set(s)) # Check result return_code = self.backup_process.wait() success = return_code == 0 # Cleanup temp script try: os.remove('/tmp/gui_backup.sh') except: pass # Update UI self.root.after(0, lambda: self.backup_finished(success)) except Exception as e: self.root.after(0, lambda: self.log(f"❌ Error: {e}")) self.root.after(0, lambda: self.backup_finished(False)) def backup_finished(self, success): """Handle backup completion""" self.backup_running = False self.start_button.config(state='normal') self.stop_button.config(state='disabled') self.verify_button.config(state='normal') if success: self.status_var.set("Completed") self.log("✅ Backup completed successfully!") messagebox.showinfo("Success", "Backup completed successfully!\\n\\nYour external drive now contains a bootable copy.") else: self.status_var.set("Failed") self.log("❌ Backup failed!") messagebox.showerror("Error", "Backup failed. Check the log for details.") def stop_backup(self): """Stop the backup process""" if self.backup_process: self.backup_running = False self.backup_process.terminate() self.log("⏹ Backup stopped by user") self.backup_finished(False) def verify_backup(self): """Verify the backup""" if not self.target_var.get(): messagebox.showerror("Error", "Please select a target drive first") return target_vg = self.target_var.get().split()[0] self.log(f"🔍 Verifying backup on {target_vg}...") # Simple verification try: result = subprocess.run(['sudo', 'lvs', target_vg], capture_output=True, text=True) if result.returncode == 0: self.log("✅ Target volumes exist and are accessible") messagebox.showinfo("Verification", "Basic verification passed!\\nTarget volumes are accessible.") else: self.log("❌ Verification failed - target volumes not accessible") messagebox.showerror("Verification", "Verification failed!") except Exception as e: self.log(f"❌ Verification error: {e}") messagebox.showerror("Error", f"Verification error: {e}") def main(): root = tk.Tk() app = LVMBackupGUI(root) root.mainloop() if __name__ == "__main__": main()