#!/usr/bin/env python3 """ Simple LVM Backup GUI Just the basics: snapshot -> copy -> cleanup No complex logic, just simple reliable backups """ import tkinter as tk from tkinter import ttk, messagebox import subprocess import threading import os import time import json class SimpleBackupGUI: def __init__(self, root): self.root = root self.root.title("Simple LVM Backup") self.root.geometry("1200x800") # Made much bigger: was 900x600 # State tracking self.backup_running = False self.current_snapshot = None # Settings file for remembering preferences if 'SUDO_USER' in os.environ: user_home = f"/home/{os.environ['SUDO_USER']}" else: user_home = os.path.expanduser("~") self.settings_file = os.path.join(user_home, ".simple_backup_settings.json") self.settings = self.load_settings() self.setup_ui() def load_settings(self): """Load settings from file""" try: if os.path.exists(self.settings_file): with open(self.settings_file, 'r') as f: return json.load(f) except Exception as e: # Use print since log method isn't available yet print(f"Could not load settings: {e}") return {} def save_settings(self): """Save settings to file""" try: with open(self.settings_file, 'w') as f: json.dump(self.settings, f, indent=2) except Exception as e: if hasattr(self, 'log'): self.log(f"Could not save settings: {e}") else: print(f"Could not save settings: {e}") def update_borg_settings(self): """Update Borg settings when backup starts""" if hasattr(self, 'repo_path_var'): self.settings['last_borg_repo'] = self.repo_path_var.get() self.settings['last_encryption'] = getattr(self, 'encryption_var', tk.StringVar()).get() self.settings['last_snapshot_size'] = getattr(self, 'snapshot_size_var', tk.StringVar()).get() self.log(f"Saving settings: repo={self.settings.get('last_borg_repo', 'none')}") self.save_settings() else: self.log("repo_path_var not available for settings update") self.refresh_drives() # Initialize with correct widgets for default mode def setup_ui(self): # Main frame main_frame = ttk.Frame(self.root, padding="10") main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S)) # Backup mode selection ttk.Label(main_frame, text="Backup Mode:").grid(row=0, column=0, sticky=tk.W, pady=5) self.mode_var = tk.StringVar(value="lv_to_lv") mode_frame = ttk.Frame(main_frame) mode_frame.grid(row=0, column=1, columnspan=2, sticky=(tk.W, tk.E), pady=5) ttk.Radiobutton(mode_frame, text="LV → LV (update existing)", variable=self.mode_var, value="lv_to_lv", command=self.on_mode_change).pack(side=tk.LEFT, padx=5) ttk.Radiobutton(mode_frame, text="LV → Raw Device (fresh)", variable=self.mode_var, value="lv_to_raw", command=self.on_mode_change).pack(side=tk.LEFT, padx=5) ttk.Radiobutton(mode_frame, text="Entire VG → Device", variable=self.mode_var, value="vg_to_raw", command=self.on_mode_change).pack(side=tk.LEFT, padx=5) ttk.Radiobutton(mode_frame, text="LV → Borg (block)", variable=self.mode_var, value="lv_to_borg", command=self.on_mode_change).pack(side=tk.LEFT, padx=5) ttk.Radiobutton(mode_frame, text="VG → Borg (block)", variable=self.mode_var, value="vg_to_borg", command=self.on_mode_change).pack(side=tk.LEFT, padx=5) ttk.Radiobutton(mode_frame, text="Files → Borg", variable=self.mode_var, value="files_to_borg", command=self.on_mode_change).pack(side=tk.LEFT, padx=5) # Source selection ttk.Label(main_frame, text="Source:").grid(row=1, column=0, sticky=tk.W, pady=5) # Create a frame for source selection that can switch between combobox and listbox self.source_frame = ttk.Frame(main_frame) self.source_frame.grid(row=1, column=1, columnspan=2, sticky=(tk.W, tk.E), pady=5) # Single selection (combobox) - for most modes self.source_var = tk.StringVar() self.source_combo = ttk.Combobox(self.source_frame, textvariable=self.source_var, width=50) self.source_combo.grid(row=0, column=0, sticky=(tk.W, tk.E)) # Multi-selection (listbox) - for LV modes self.source_listbox_frame = ttk.Frame(self.source_frame) # Help text for multi-selection help_text = ttk.Label(self.source_listbox_frame, text="Hold Ctrl/Cmd to select multiple LVs", font=("TkDefaultFont", 8), foreground="gray") help_text.grid(row=0, column=0, columnspan=2, sticky=tk.W, pady=(0, 2)) self.source_listbox = tk.Listbox(self.source_listbox_frame, height=6, selectmode=tk.EXTENDED) source_scrollbar = ttk.Scrollbar(self.source_listbox_frame, orient="vertical", command=self.source_listbox.yview) self.source_listbox.configure(yscrollcommand=source_scrollbar.set) self.source_listbox.grid(row=1, column=0, sticky=(tk.W, tk.E, tk.N, tk.S)) source_scrollbar.grid(row=1, column=1, sticky=(tk.N, tk.S)) self.source_listbox_frame.columnconfigure(0, weight=1) self.source_listbox_frame.rowconfigure(1, weight=1) # Configure source frame self.source_frame.columnconfigure(0, weight=1) # Target selection ttk.Label(main_frame, text="Target:").grid(row=2, column=0, sticky=tk.W, pady=5) self.target_var = tk.StringVar() self.target_combo = ttk.Combobox(main_frame, textvariable=self.target_var, width=50) self.target_combo.grid(row=2, column=1, columnspan=2, sticky=(tk.W, tk.E), pady=5) # Borg-specific settings (hidden initially) self.borg_frame = ttk.LabelFrame(main_frame, text="Borg Backup Settings", padding="5") self.borg_frame.grid(row=3, column=0, columnspan=3, sticky=(tk.W, tk.E), pady=5) self.borg_frame.grid_remove() # Hide initially # Repo path ttk.Label(self.borg_frame, text="Repository Path:").grid(row=0, column=0, sticky=tk.W, pady=2) self.repo_path_var = tk.StringVar() # Load saved repository path if 'last_borg_repo' in self.settings: saved_repo = self.settings['last_borg_repo'] self.repo_path_var.set(saved_repo) self.log(f"Loaded saved repository: {saved_repo}") else: self.log("No saved repository found") repo_entry = ttk.Entry(self.borg_frame, textvariable=self.repo_path_var, width=40) repo_entry.grid(row=0, column=1, sticky=(tk.W, tk.E), pady=2) ttk.Button(self.borg_frame, text="Browse", command=self.browse_repo).grid(row=0, column=2, padx=5, pady=2) # Create new repo checkbox self.create_new_repo = tk.BooleanVar() ttk.Checkbutton(self.borg_frame, text="Create new repository", variable=self.create_new_repo).grid(row=1, column=0, columnspan=2, sticky=tk.W, pady=2) # Encryption settings ttk.Label(self.borg_frame, text="Encryption Mode:").grid(row=2, column=0, sticky=tk.W, pady=2) self.encryption_var = tk.StringVar(value="repokey") encryption_combo = ttk.Combobox(self.borg_frame, textvariable=self.encryption_var, values=["none", "repokey", "keyfile"], state="readonly", width=15) encryption_combo.grid(row=2, column=1, sticky=tk.W, pady=2) # Snapshot size option ttk.Label(self.borg_frame, text="Snapshot Size:").grid(row=3, column=0, sticky=tk.W, pady=2) self.snapshot_size_var = tk.StringVar(value="auto") snapshot_combo = ttk.Combobox(self.borg_frame, textvariable=self.snapshot_size_var, values=["auto", "conservative", "generous"], state="readonly", width=15) snapshot_combo.grid(row=3, column=1, sticky=tk.W, pady=2) # Help text for snapshot sizes help_label = ttk.Label(self.borg_frame, text="auto: smart sizing, conservative: 5%, generous: 25%", font=("TkDefaultFont", 8), foreground="gray") help_label.grid(row=3, column=2, sticky=tk.W, padx=(5, 0), pady=2) # Read-only mode for full filesystems self.readonly_mode = tk.BooleanVar() ttk.Checkbutton(self.borg_frame, text="Read-only mode (for very full filesystems)", variable=self.readonly_mode).grid(row=5, column=0, columnspan=3, sticky=tk.W, pady=2) # Passphrase ttk.Label(self.borg_frame, text="Passphrase:").grid(row=4, column=0, sticky=tk.W, pady=2) self.passphrase_var = tk.StringVar() passphrase_entry = ttk.Entry(self.borg_frame, textvariable=self.passphrase_var, show="*", width=40) passphrase_entry.grid(row=4, column=1, sticky=(tk.W, tk.E), pady=2) # Configure borg frame grid self.borg_frame.columnconfigure(1, weight=1) self.borg_frame.columnconfigure(2, weight=0) # Refresh button ttk.Button(main_frame, text="Refresh", command=self.refresh_drives).grid(row=4, column=0, pady=10) # Backup button self.backup_btn = ttk.Button(main_frame, text="Start Backup", command=self.start_backup, style="Accent.TButton") self.backup_btn.grid(row=4, column=1, pady=10) # Emergency stop self.stop_btn = ttk.Button(main_frame, text="Emergency Stop", command=self.emergency_stop, state="disabled") self.stop_btn.grid(row=4, column=2, pady=10) # Management buttons row mgmt_frame = ttk.Frame(main_frame) mgmt_frame.grid(row=5, column=0, columnspan=3, pady=10) ttk.Button(mgmt_frame, text="Manage Borg Repo", command=self.manage_borg_repo).pack(side=tk.LEFT, padx=5) ttk.Button(mgmt_frame, text="Manage LVM Snapshots", command=self.manage_snapshots).pack(side=tk.LEFT, padx=5) # Progress area ttk.Label(main_frame, text="Progress:").grid(row=6, column=0, sticky=tk.W, pady=(20, 5)) self.progress = ttk.Progressbar(main_frame, mode='indeterminate') self.progress.grid(row=7, column=0, columnspan=3, sticky=(tk.W, tk.E), pady=5) # Log area ttk.Label(main_frame, text="Log:").grid(row=8, column=0, sticky=tk.W, pady=(10, 5)) log_frame = ttk.Frame(main_frame) log_frame.grid(row=9, column=0, columnspan=3, sticky=(tk.W, tk.E, tk.N, tk.S), pady=5) self.log_text = tk.Text(log_frame, height=15, width=70) scrollbar = ttk.Scrollbar(log_frame, orient="vertical", command=self.log_text.yview) self.log_text.configure(yscrollcommand=scrollbar.set) self.log_text.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S)) scrollbar.grid(row=0, column=1, sticky=(tk.N, tk.S)) # Configure grid weights self.root.columnconfigure(0, weight=1) self.root.rowconfigure(0, weight=1) main_frame.columnconfigure(1, weight=1) main_frame.rowconfigure(9, weight=1) # Updated for new log row log_frame.columnconfigure(0, weight=1) log_frame.rowconfigure(0, weight=1) def browse_repo(self): """Browse for Borg repository path""" from tkinter import filedialog path = filedialog.askdirectory(title="Select Borg Repository Directory") if path: self.repo_path_var.set(path) def on_mode_change(self): """Handle backup mode change""" mode = self.mode_var.get() # Show/hide Borg settings if mode in ["lv_to_borg", "vg_to_borg", "files_to_borg"]: self.borg_frame.grid() else: self.borg_frame.grid_remove() # Always refresh drives when mode changes self.refresh_drives() def log(self, message): """Add message to log""" timestamp = time.strftime("%H:%M:%S") # Check if log_text widget exists yet if hasattr(self, 'log_text'): self.log_text.insert(tk.END, f"[{timestamp}] {message}\n") self.log_text.see(tk.END) self.root.update_idletasks() else: # Print to console if GUI log not available yet print(f"[{timestamp}] {message}") def calculate_snapshot_size(self, lv_size_bytes, backup_mode="block"): """Calculate appropriate snapshot size based on LV size, user preference, and backup mode""" mode = self.snapshot_size_var.get() # For file-level backups, use much smaller snapshots if backup_mode == "file": if mode == "conservative": # 2% of LV size, minimum 512MB, maximum 10GB snapshot_size_bytes = max(min(lv_size_bytes // 50, 10 * 1024**3), 512 * 1024**2) elif mode == "generous": # 5% of LV size, minimum 1GB, maximum 20GB snapshot_size_bytes = max(min(lv_size_bytes // 20, 20 * 1024**3), 1024**3) else: # auto # 3% of LV size, minimum 1GB, maximum 15GB snapshot_size_bytes = max(min(lv_size_bytes * 3 // 100, 15 * 1024**3), 1024**3) else: # Original logic for block-level backups if mode == "conservative": # 5% of LV size, minimum 1GB snapshot_size_bytes = max(lv_size_bytes // 20, 1024**3) elif mode == "generous": # 25% of LV size, minimum 2GB snapshot_size_bytes = max(lv_size_bytes // 4, 2 * 1024**3) else: # auto # 10% of LV size, minimum 1GB, but increase for very large LVs base_percentage = 10 if lv_size_bytes > 50 * 1024**3: # >50GB base_percentage = 15 # Use 15% for large, potentially active LVs snapshot_size_bytes = max(lv_size_bytes * base_percentage // 100, 1024**3) snapshot_size_gb = snapshot_size_bytes // (1024**3) return f"{snapshot_size_gb}G" def _parse_size_to_bytes(self, size_str): """Parse simple size strings like '10G' to bytes (supports G and M).""" try: s = size_str.strip().upper() if s.endswith('G'): return int(s[:-1]) * (1024 ** 3) if s.endswith('M'): return int(s[:-1]) * (1024 ** 2) # Fallback: raw int bytes return int(s) except Exception: return None def _vg_free_bytes(self, vg_name): """Return VG free space in bytes or None on failure.""" ok, out = self.run_command(f"vgs --noheadings -o vg_free --units b {vg_name}", show_output=False) if not ok or not out.strip(): return None try: return int(out.strip().replace('B', '').strip()) except Exception: return None def _ensure_vg_has_space(self, vg_name, snapshot_size_str): """Raise Exception if VG does not have enough free space for requested snapshot size.""" needed = self._parse_size_to_bytes(snapshot_size_str) if needed is None: return # cannot parse; skip strict check free_b = self._vg_free_bytes(vg_name) if free_b is None: return # unknown; skip strict check if free_b < needed: raise Exception(f"Insufficient VG free space: need {snapshot_size_str}, free {free_b // (1024**3)}G in VG {vg_name}") def run_command(self, cmd, show_output=True): """Run a command and return result""" try: if show_output: self.log(f"Running: {cmd}") result = subprocess.run(cmd, shell=True, capture_output=True, text=True) if result.returncode != 0: if show_output: self.log(f"ERROR: {result.stderr.strip()}") return False, result.stderr.strip() else: if show_output and result.stdout.strip(): self.log(f"Output: {result.stdout.strip()}") return True, result.stdout.strip() except Exception as e: if show_output: self.log(f"ERROR: {str(e)}") return False, str(e) def run_interactive_command(self, cmd): """Run an interactive command in a terminal window""" try: self.log(f"Running interactive: {cmd}") # Try different terminal emulators terminals = [ f"x-terminal-emulator -e bash -c '{cmd}; read -p \"Press Enter to continue...\"'", f"xterm -e bash -c '{cmd}; read -p \"Press Enter to continue...\"'", f"konsole -e bash -c '{cmd}; read -p \"Press Enter to continue...\"'", f"gnome-terminal --wait -- bash -c '{cmd}; read -p \"Press Enter to continue...\"'", f"xfce4-terminal -e 'bash -c \"{cmd}; read -p \\\"Press Enter to continue...\\\"\"; bash'" ] for terminal_cmd in terminals: try: result = subprocess.run(terminal_cmd, shell=True, timeout=300) # 5 minute timeout if result.returncode == 0: return True except (subprocess.TimeoutExpired, FileNotFoundError): continue # If no terminal worked, try a simpler approach self.log("No terminal emulator found, trying direct execution...") result = subprocess.run(cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) return result.returncode == 0 except Exception as e: self.log(f"ERROR running interactive command: {str(e)}") return False def get_luks_passphrase(self): """Get LUKS passphrase via GUI dialog""" from tkinter import simpledialog try: passphrase = simpledialog.askstring( "LUKS Passphrase", "Enter LUKS passphrase for encrypted volume:", show='*' # Hide password input ) return passphrase except Exception as e: self.log(f"Error getting passphrase: {e}") return None def open_luks_device(self, device_path, luks_name, passphrase): """Open LUKS device with provided passphrase""" try: # Avoid exposing secrets via shell; pass via stdin result = subprocess.run( ["cryptsetup", "luksOpen", device_path, luks_name], input=(passphrase or "") + "\n", text=True, capture_output=True, ) if result.returncode == 0: self.log("LUKS device opened successfully") return True self.log(f"Failed to open LUKS device: {result.stderr.strip()}") return False except Exception as e: self.log(f"Error opening LUKS device: {e}") return False def _fs_mount_opts(self, device_path): """Return safe read-only mount options based on filesystem type.""" fs_ok, fs_type = self.run_command(f"blkid -o value -s TYPE {device_path}", show_output=False) fs_type = fs_type.strip().lower() if fs_ok and fs_type else "" if fs_type == "ext4" or fs_type == "ext3": return "ro,noload" if fs_type == "xfs": return "ro,norecovery" # default safe read-only return "ro" def refresh_drives(self): """Refresh available drives based on selected mode""" mode = self.mode_var.get() self.log(f"Refreshing drives for mode: {mode}") # Clear target combo for Borg modes if mode in ["lv_to_borg", "vg_to_borg", "files_to_borg"]: self.target_combo['values'] = [] self.target_var.set("") # Show appropriate source selection widget if mode in ["lv_to_borg", "files_to_borg"]: # Multi-selection for individual LV modes self.source_combo.grid_remove() self.source_listbox_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S)) self.log("Switched to multi-selection listbox for LV selection") else: # Single selection for VG modes and other modes self.source_listbox_frame.grid_remove() self.source_combo.grid(row=0, column=0, sticky=(tk.W, tk.E)) self.log("Switched to single-selection combobox") # Update the window to force widget refresh self.root.update_idletasks() # Source options if mode == "vg_to_raw" or mode == "vg_to_borg": # Show volume groups success, output = self.run_command("vgs --noheadings -o vg_name,vg_size,pv_count", show_output=False) if success: source_list = [] for line in output.strip().split('\n'): if line.strip(): parts = line.strip().split() if len(parts) >= 3: vg_name = parts[0] vg_size = parts[1] pv_count = parts[2] source_list.append(f"{vg_name} ({vg_size}) [{pv_count} PVs]") self.source_combo['values'] = source_list self.source_var.set("") # Clear selection self.log(f"Found {len(source_list)} volume groups") else: self.log("No volume groups found") self.source_combo['values'] = [] else: # Show logical volumes (all LV modes: lv_to_lv, lv_to_raw, lv_to_borg, files_to_borg) success, output = self.run_command("lvs --noheadings -o lv_path,lv_size,vg_name", show_output=False) if success: lv_list = [] for line in output.strip().split('\n'): if line.strip(): parts = line.strip().split() if len(parts) >= 3: lv_path = parts[0] lv_size = parts[1] vg_name = parts[2] lv_list.append(f"{lv_path} ({lv_size}) [VG: {vg_name}]") # Update the appropriate widget if mode in ["lv_to_borg", "files_to_borg"]: # Update listbox for multi-selection self.source_listbox.delete(0, tk.END) for lv in lv_list: self.source_listbox.insert(tk.END, lv) self.log(f"Loaded {len(lv_list)} logical volumes into listbox") else: # Update combobox for single selection self.source_combo['values'] = lv_list self.source_var.set("") # Clear selection self.log(f"Loaded {len(lv_list)} logical volumes into combobox") self.log(f"Found {len(lv_list)} logical volumes") else: self.log("No LVM volumes found") if mode in ["lv_to_borg", "files_to_borg"]: self.source_listbox.delete(0, tk.END) else: self.source_combo['values'] = [] # Target options if mode == "lv_to_lv": # Show existing logical volumes on external drives success, output = self.run_command("lvs --noheadings -o lv_path,lv_size,vg_name", show_output=False) if success: target_list = [] for line in output.strip().split('\n'): if line.strip(): parts = line.strip().split() if len(parts) >= 3: lv_path = parts[0] lv_size = parts[1] vg_name = parts[2] # Filter out internal VGs (you might want to customize this) if "migration" in vg_name or "external" in vg_name or "backup" in vg_name: target_list.append(f"{lv_path} ({lv_size}) [VG: {vg_name}]") self.target_combo['values'] = target_list self.log(f"Found {len(target_list)} target logical volumes") else: self.log("No target LVs found") self.target_combo['values'] = [] elif mode in ["lv_to_borg", "vg_to_borg", "files_to_borg"]: # No target combo for Borg modes - repository path is used self.target_combo['values'] = [] self.log("Using Borg repository path instead of target device") else: # Show raw block devices (lv_to_raw and vg_to_raw) success, output = self.run_command("lsblk -dno NAME,SIZE,MODEL | grep -E '^sd|^nvme'", show_output=False) if success: drive_list = [] for line in output.strip().split('\n'): if line.strip(): parts = line.strip().split(None, 2) if len(parts) >= 2: name = parts[0] size = parts[1] model = parts[2] if len(parts) > 2 else "Unknown" drive_list.append(f"/dev/{name} ({size}) {model}") self.target_combo['values'] = drive_list self.log(f"Found {len(drive_list)} target drives") else: self.log("No block devices found") self.target_combo['values'] = [] def start_backup(self): """Start the backup process""" mode = self.mode_var.get() # Get selected sources based on mode if mode in ["lv_to_borg", "files_to_borg"]: # Multi-selection mode selected_indices = self.source_listbox.curselection() if not selected_indices: messagebox.showerror("Error", "Please select at least one logical volume") return selected_sources = [] for index in selected_indices: selected_sources.append(self.source_listbox.get(index).split()[0]) if not self.repo_path_var.get(): messagebox.showerror("Error", "Please select repository path") return # Check if borg is installed success, _ = self.run_command("which borg", show_output=False) if not success: messagebox.showerror("Error", "Borg Backup is not installed. Please install it first:\nsudo apt install borgbackup") return repo_path = self.repo_path_var.get() # Build confirmation message for multiple LVs if mode == "lv_to_borg": msg = f"Borg backup of {len(selected_sources)} LVs (block-level):\n\n" else: # files_to_borg msg = f"Borg backup of {len(selected_sources)} LVs (file-level):\n\n" for source in selected_sources: msg += f"• {source}\n" msg += f"\nRepository: {repo_path}\n" if self.create_new_repo.get(): msg += "This will create a new Borg repository.\n" else: msg += "This will add to existing Borg repository.\n" msg += f"Encryption: {self.encryption_var.get()}\n\nContinue?" if not messagebox.askyesno("Confirm Backup", msg): return # Start multi-LV backup self.backup_running = True self.backup_btn.config(state="disabled") self.stop_btn.config(state="normal") self.progress.start() # Save settings before starting backup self.update_borg_settings() thread = threading.Thread(target=self.multi_lv_backup_worker, args=(mode, selected_sources, repo_path)) thread.daemon = True thread.start() return # Original single-selection logic for other modes # Validate inputs based on mode if mode in ["vg_to_borg"]: if not self.source_var.get() or not self.repo_path_var.get(): messagebox.showerror("Error", "Please select source and repository path") return # Check if borg is installed success, _ = self.run_command("which borg", show_output=False) if not success: messagebox.showerror("Error", "Borg Backup is not installed. Please install it first:\nsudo apt install borgbackup") return source = self.source_var.get().split()[0] repo_path = self.repo_path_var.get() # Build confirmation message for VG Borg msg = f"Borg backup of entire VG (block-level):\n\nSource VG: {source}\nRepository: {repo_path}\n\n" if self.create_new_repo.get(): msg += "This will create a new Borg repository.\n" else: msg += "This will add to existing Borg repository.\n" msg += f"Encryption: {self.encryption_var.get()}\n" msg += "This will backup ALL logical volumes in the VG.\n\nContinue?" target = repo_path else: # Original validation for non-Borg modes if not self.source_var.get() or not self.target_var.get(): messagebox.showerror("Error", "Please select both source and target") return source = self.source_var.get().split()[0] target = self.target_var.get().split()[0] # Build confirmation message based on mode if mode == "lv_to_lv": msg = f"Update existing LV backup:\n\nSource LV: {source}\nTarget LV: {target}\n\n" msg += "This will overwrite the target LV with current source data.\n\nContinue?" elif mode == "lv_to_raw": msg = f"Create fresh backup:\n\nSource LV: {source}\nTarget Device: {target}\n\n" msg += "WARNING: Target device will be completely overwritten!\n\nContinue?" elif mode == "vg_to_raw": msg = f"Clone entire Volume Group:\n\nSource VG: {source}\nTarget Device: {target}\n\n" msg += "WARNING: Target device will be completely overwritten!\n" msg += "This will clone ALL logical volumes in the VG.\n\nContinue?" if not messagebox.askyesno("Confirm Backup", msg): return # Start backup in thread self.backup_running = True self.backup_btn.config(state="disabled") self.stop_btn.config(state="normal") self.progress.start() # Save settings before starting backup self.update_borg_settings() thread = threading.Thread(target=self.backup_worker, args=(mode, source, target)) thread.daemon = True thread.start() def backup_worker(self, mode, source, target): """The actual backup work""" try: # Check if stop was requested if not self.backup_running: self.log("Backup stopped before starting") return self.log(f"=== Starting {mode} backup ===") if mode == "lv_to_lv": self.backup_lv_to_lv(source, target) elif mode == "lv_to_raw": self.backup_lv_to_raw(source, target) elif mode == "vg_to_raw": self.backup_vg_to_raw(source, target) elif mode == "lv_to_borg": self.backup_lv_to_borg(source, target) elif mode == "files_to_borg": self.backup_files_to_borg(source, target) elif mode == "vg_to_borg": self.backup_vg_to_borg(source, target) self.log("=== Backup completed successfully! ===") self.root.after(0, lambda: messagebox.showinfo("Success", "Backup completed successfully!")) except Exception as e: self.log(f"ERROR: {str(e)}") self.cleanup_on_error() self.root.after(0, lambda: messagebox.showerror("Backup Failed", str(e))) finally: # Reset UI state self.root.after(0, self.reset_ui_state) def multi_lv_backup_worker(self, mode, selected_sources, repo_path): """Backup multiple LVs sequentially""" try: # Check if stop was requested if not self.backup_running: self.log("Multi-LV backup stopped before starting") return self.log(f"=== Starting multi-LV {mode} backup ===") self.log(f"Selected {len(selected_sources)} logical volumes") # Initialize repository once if needed if self.create_new_repo.get(): # Check for stop again before repo creation if not self.backup_running: self.log("Backup stopped during repository initialization") return borg_env = os.environ.copy() if self.passphrase_var.get(): borg_env['BORG_PASSPHRASE'] = self.passphrase_var.get() self.log(f"Creating new Borg repository: {repo_path}") encryption = self.encryption_var.get() if encryption == "none": init_cmd = f"borg init --encryption=none {repo_path}" else: init_cmd = f"borg init --encryption={encryption} {repo_path}" result = subprocess.run(init_cmd, shell=True, capture_output=True, text=True, env=borg_env) if result.returncode != 0: raise Exception(f"Failed to initialize Borg repository: {result.stderr}") self.log("Repository initialized successfully") # Backup each LV successful_backups = 0 failed_backups = [] for i, source_lv in enumerate(selected_sources, 1): self.log(f"--- Processing LV {i}/{len(selected_sources)}: {source_lv} ---") try: if mode == "lv_to_borg": self.backup_lv_to_borg(source_lv, repo_path) elif mode == "files_to_borg": self.backup_files_to_borg(source_lv, repo_path) successful_backups += 1 self.log(f"Successfully backed up {source_lv}") except Exception as e: self.log(f"Failed to backup {source_lv}: {str(e)}") failed_backups.append(source_lv) # Continue with next LV instead of stopping # Summary self.log(f"=== Multi-LV backup completed ===") self.log(f"Successful: {successful_backups}/{len(selected_sources)}") if failed_backups: self.log(f"Failed: {', '.join(failed_backups)}") # Show completion dialog if failed_backups: msg = f"Backup completed with some failures:\n\nSuccessful: {successful_backups}\nFailed: {len(failed_backups)}\n\nFailed LVs:\n" + '\n'.join(failed_backups) self.root.after(0, lambda: messagebox.showwarning("Backup Completed with Warnings", msg)) else: self.root.after(0, lambda: messagebox.showinfo("Success", f"All {successful_backups} LVs backed up successfully!")) except Exception as e: self.log(f"ERROR: {str(e)}") self.cleanup_on_error() self.root.after(0, lambda: messagebox.showerror("Backup Failed", str(e))) finally: # Reset UI state self.root.after(0, self.reset_ui_state) def backup_lv_to_lv(self, source_lv, target_lv): """Backup LV to existing LV""" self.log("Mode: LV to LV (updating existing backup)") # Create snapshot of source vg_name = source_lv.split('/')[2] lv_name = source_lv.split('/')[3] snapshot_name = f"{lv_name}_backup_snap_{int(time.time())}" self.current_snapshot = f"/dev/{vg_name}/{snapshot_name}" self.log(f"Creating snapshot: {snapshot_name}") # Pre-flight VG free check for 1G default try: self._ensure_vg_has_space(vg_name, "1G") except Exception as e: raise Exception(str(e)) success, output = self.run_command(f"lvcreate -L1G -s -n {snapshot_name} {source_lv}") if not success: raise Exception(f"Failed to create snapshot: {output}") # Copy snapshot to target LV self.log(f"Copying {self.current_snapshot} to {target_lv}") success, _ = self.run_command("which pv", show_output=False) if success: copy_cmd = f"pv {self.current_snapshot} | dd of={target_lv} bs=4M" else: copy_cmd = f"dd if={self.current_snapshot} of={target_lv} bs=4M status=progress" success, output = self.run_command(copy_cmd) if not success: raise Exception(f"Failed to copy data: {output}") # Cleanup self.log("Cleaning up snapshot") success, output = self.run_command(f"lvremove -f {self.current_snapshot}") if not success: self.log(f"Warning: Failed to remove snapshot: {output}") else: self.log("Snapshot cleaned up") self.current_snapshot = None def backup_lv_to_raw(self, source_lv, target_device): """Backup LV to raw device (fresh backup)""" self.log("Mode: LV to Raw Device (fresh backup)") # Create snapshot of source vg_name = source_lv.split('/')[2] lv_name = source_lv.split('/')[3] snapshot_name = f"{lv_name}_backup_snap_{int(time.time())}" self.current_snapshot = f"/dev/{vg_name}/{snapshot_name}" self.log(f"Creating snapshot: {snapshot_name}") # Pre-flight VG free check for 1G default try: self._ensure_vg_has_space(vg_name, "1G") except Exception as e: raise Exception(str(e)) success, output = self.run_command(f"lvcreate -L1G -s -n {snapshot_name} {source_lv}") if not success: raise Exception(f"Failed to create snapshot: {output}") # Copy snapshot to target device self.log(f"Copying {self.current_snapshot} to {target_device}") self.log("This will create a raw block-level copy") success, _ = self.run_command("which pv", show_output=False) if success: copy_cmd = f"pv {self.current_snapshot} | dd of={target_device} bs=4M" else: copy_cmd = f"dd if={self.current_snapshot} of={target_device} bs=4M status=progress" success, output = self.run_command(copy_cmd) if not success: raise Exception(f"Failed to copy data: {output}") # Cleanup self.log("Cleaning up snapshot") success, output = self.run_command(f"lvremove -f {self.current_snapshot}") if not success: self.log(f"Warning: Failed to remove snapshot: {output}") else: self.log("Snapshot cleaned up") self.current_snapshot = None def backup_vg_to_raw(self, source_vg, target_device): """Backup entire VG to raw device""" self.log("Mode: Entire VG to Raw Device") self.log("This will create a complete clone including LVM metadata") # Get the physical volume(s) that make up this VG success, output = self.run_command(f"vgs --noheadings -o pv_name {source_vg}", show_output=False) if not success: raise Exception(f"Failed to get PV info for VG {source_vg}") # For simplicity, we'll use the first PV as the source # In a real implementation, you might want to handle multiple PVs pv_list = output.strip().split() if not pv_list: raise Exception(f"No physical volumes found for VG {source_vg}") source_pv = pv_list[0] self.log(f"Source PV: {source_pv}") # Copy the entire physical volume self.log(f"Copying entire PV {source_pv} to {target_device}") self.log("This preserves LVM metadata and all logical volumes") success, _ = self.run_command("which pv", show_output=False) if success: copy_cmd = f"pv {source_pv} | dd of={target_device} bs=4M" else: copy_cmd = f"dd if={source_pv} of={target_device} bs=4M status=progress" success, output = self.run_command(copy_cmd) if not success: raise Exception(f"Failed to copy PV: {output}") self.log("VG copy completed - target device now contains complete LVM structure") def backup_lv_to_borg(self, source_lv, repo_path): """Backup LV to Borg repository (block-level)""" self.log("Mode: LV to Borg Repository (block-level backup)") # Set up environment for Borg borg_env = os.environ.copy() if self.passphrase_var.get(): borg_env['BORG_PASSPHRASE'] = self.passphrase_var.get() # Initialize repository if needed if self.create_new_repo.get(): self.log(f"Creating new Borg repository: {repo_path}") encryption = self.encryption_var.get() if encryption == "none": init_cmd = f"borg init --encryption=none {repo_path}" else: init_cmd = f"borg init --encryption={encryption} {repo_path}" result = subprocess.run(init_cmd, shell=True, capture_output=True, text=True, env=borg_env) if result.returncode != 0: raise Exception(f"Failed to initialize Borg repository: {result.stderr}") self.log("Repository initialized successfully") # Create snapshot vg_name = source_lv.split('/')[2] lv_name = source_lv.split('/')[3] snapshot_name = f"{lv_name}_borg_snap_{int(time.time())}" self.current_snapshot = f"/dev/{vg_name}/{snapshot_name}" # Get LV size to determine appropriate snapshot size success, lv_size_output = self.run_command(f"lvs --noheadings -o lv_size --units b {source_lv}", show_output=False) if success: lv_size_bytes = int(lv_size_output.strip().replace('B', '')) snapshot_size = self.calculate_snapshot_size(lv_size_bytes) else: snapshot_size = "2G" # Default fallback self.log(f"Creating snapshot: {snapshot_name} (size: {snapshot_size})") # Pre-flight VG free space try: self._ensure_vg_has_space(vg_name, snapshot_size) except Exception as e: raise Exception(str(e)) success, output = self.run_command(f"lvcreate -L{snapshot_size} -s -n {snapshot_name} {source_lv}") if not success: raise Exception(f"Failed to create snapshot: {output}") # Create Borg backup of the raw block device archive_name = f"lv_{lv_name}_{time.strftime('%Y%m%d_%H%M%S')}" self.log(f"Creating Borg archive (block-level): {archive_name}") self.log("Backing up raw snapshot block device to Borg...") self.log("This preserves exact block-level state including filesystem metadata") # Use stdin mode to pipe the block device into borg borg_cmd = f"dd if={self.current_snapshot} bs=4M | borg create --stdin-name '{lv_name}.img' --progress --stats {repo_path}::{archive_name} -" # Run command with Borg environment process = subprocess.Popen(borg_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, env=borg_env) # Stream output for line in process.stdout: if line.strip(): self.log(line.strip()) process.wait() if process.returncode != 0: raise Exception("Borg block-level backup failed") self.log("Block-level Borg backup completed successfully") # Cleanup snapshot self.log("Cleaning up snapshot") success, output = self.run_command(f"lvremove -f {self.current_snapshot}") if not success: self.log(f"Warning: Failed to remove snapshot: {output}") else: self.log("Snapshot cleaned up") self.current_snapshot = None def backup_vg_to_borg(self, source_vg, repo_path): """Backup entire VG to Borg repository (block-level)""" self.log("Mode: Entire VG to Borg Repository (block-level backup)") self.log("This will store all LV snapshots as raw block devices in Borg") # Set up environment for Borg borg_env = os.environ.copy() if self.passphrase_var.get(): borg_env['BORG_PASSPHRASE'] = self.passphrase_var.get() # Initialize repository if needed if self.create_new_repo.get(): self.log(f"Creating new Borg repository: {repo_path}") encryption = self.encryption_var.get() if encryption == "none": init_cmd = f"borg init --encryption=none {repo_path}" else: init_cmd = f"borg init --encryption={encryption} {repo_path}" result = subprocess.run(init_cmd, shell=True, capture_output=True, text=True, env=borg_env) if result.returncode != 0: raise Exception(f"Failed to initialize Borg repository: {result.stderr}") self.log("Repository initialized successfully") # Get all LVs in the VG success, output = self.run_command(f"lvs --noheadings -o lv_name {source_vg}", show_output=False) if not success: raise Exception(f"Failed to get LVs for VG {source_vg}") lv_names = [lv.strip() for lv in output.strip().split('\n') if lv.strip()] if not lv_names: raise Exception(f"No logical volumes found in VG {source_vg}") self.log(f"Found {len(lv_names)} logical volumes: {', '.join(lv_names)}") # Create one archive with all LVs archive_name = f"vg_{source_vg}_{time.strftime('%Y%m%d_%H%M%S')}" self.log(f"Creating Borg archive (block-level): {archive_name}") # Create temporary directory for organizing the LV images import tempfile temp_dir = tempfile.mkdtemp(prefix="borg_vg_backup_") snapshots_created = [] try: # Process each LV one by one to avoid space issues for lv_name in lv_names: snapshot_name = f"{lv_name}_borg_snap_{int(time.time())}" snapshot_path = f"/dev/{source_vg}/{snapshot_name}" lv_path = f"/dev/{source_vg}/{lv_name}" # Get LV size to determine appropriate snapshot size success, lv_size_output = self.run_command(f"lvs --noheadings -o lv_size --units b {lv_path}", show_output=False) if success: lv_size_bytes = int(lv_size_output.strip().replace('B', '')) snapshot_size = self.calculate_snapshot_size(lv_size_bytes) else: snapshot_size = "2G" # Default fallback self.log(f"Processing LV: {lv_name}") self.log(f"Creating snapshot: {snapshot_name} (size: {snapshot_size})") # Pre-flight VG free space try: self._ensure_vg_has_space(source_vg, snapshot_size) except Exception as e: self.log(f"Skipping {lv_name}: {e}") continue success, output = self.run_command(f"lvcreate -L{snapshot_size} -s -n {snapshot_name} {lv_path}") if not success: self.log(f"Warning: Failed to create snapshot for {lv_name}: {output}") continue snapshots_created.append(snapshot_path) # Stream this LV directly to Borg using append mode (if supported) # or create individual archives archive_name_lv = f"vg_{source_vg}_lv_{lv_name}_{time.strftime('%Y%m%d_%H%M%S')}" self.log(f"Backing up {lv_name} to archive: {archive_name_lv}") # Use stdin mode to stream the block device borg_cmd = f"dd if={snapshot_path} bs=4M | borg create --stdin-name '{lv_name}.img' --progress --stats {repo_path}::{archive_name_lv} -" # Run command with Borg environment process = subprocess.Popen(borg_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, env=borg_env) # Stream output for line in process.stdout: if line.strip(): self.log(line.strip()) process.wait() if process.returncode != 0: self.log(f"Warning: Backup of {lv_name} failed") else: self.log(f"Successfully backed up {lv_name}") # Clean up this snapshot immediately to save space self.log(f"Removing snapshot {snapshot_path}") success, output = self.run_command(f"lvremove -f {snapshot_path}") if success: snapshots_created.remove(snapshot_path) else: self.log(f"Warning: Failed to remove snapshot {snapshot_path}: {output}") self.log("Block-level VG Borg backup completed successfully") self.log(f"Created individual archives for each LV in VG {source_vg}") finally: # Remove temporary directory import shutil try: shutil.rmtree(temp_dir) self.log("Temporary directory cleaned up") except: self.log(f"Warning: Could not remove temp directory {temp_dir}") # Remove any remaining snapshots for snapshot_path in snapshots_created: self.log(f"Removing remaining snapshot {snapshot_path}") success, output = self.run_command(f"lvremove -f {snapshot_path}") if not success: self.log(f"Warning: Failed to remove snapshot {snapshot_path}: {output}") self.current_snapshot = None def backup_files_to_borg(self, source_lv, repo_path): """Backup LV files to Borg repository (file-level via snapshot mount)""" self.log("Mode: LV to Borg Repository (file-level backup)") self.log("This will mount an LV snapshot and backup files (space-efficient)") # Set up environment for Borg borg_env = os.environ.copy() if self.passphrase_var.get(): borg_env['BORG_PASSPHRASE'] = self.passphrase_var.get() # Initialize repository if needed if self.create_new_repo.get(): self.log(f"Creating new Borg repository: {repo_path}") encryption = self.encryption_var.get() if encryption == "none": init_cmd = f"borg init --encryption=none {repo_path}" else: init_cmd = f"borg init --encryption={encryption} {repo_path}" result = subprocess.run(init_cmd, shell=True, capture_output=True, text=True, env=borg_env) if result.returncode != 0: raise Exception(f"Failed to initialize Borg repository: {result.stderr}") self.log("Repository initialized successfully") # Create snapshot vg_name = source_lv.split('/')[2] lv_name = source_lv.split('/')[3] # Use timestamp to ensure unique snapshot names timestamp = int(time.time()) snapshot_name = f"{lv_name}_files_snap_{timestamp}" self.current_snapshot = f"/dev/{vg_name}/{snapshot_name}" # Get LV size to determine appropriate snapshot size success, lv_size_output = self.run_command(f"lvs --noheadings -o lv_size --units b {source_lv}", show_output=False) if success: lv_size_bytes = int(lv_size_output.strip().replace('B', '')) snapshot_size = self.calculate_snapshot_size(lv_size_bytes, backup_mode="file") else: snapshot_size = "2G" # Default fallback self.log(f"Creating snapshot: {snapshot_name} (size: {snapshot_size})") # Pre-flight VG free space try: self._ensure_vg_has_space(vg_name, snapshot_size) except Exception as e: raise Exception(str(e)) success, output = self.run_command(f"lvcreate -L{snapshot_size} -s -n {snapshot_name} {source_lv}") if not success: raise Exception(f"Failed to create snapshot: {output}") # Create temporary mount point import tempfile temp_mount = tempfile.mkdtemp(prefix="borg_files_backup_") luks_device = None try: # Check if the snapshot is LUKS encrypted self.log("Checking if volume is LUKS encrypted...") # Resolve the real device path in case of symbolic links real_device_success, real_device_path = self.run_command(f"readlink -f {self.current_snapshot}", show_output=False) device_to_check = real_device_path.strip() if real_device_success else self.current_snapshot self.log(f"Checking device: {device_to_check}") check_success, check_output = self.run_command(f"file -s {device_to_check}", show_output=False) self.log(f"File command result: success={check_success}, output='{check_output}'") # Also try cryptsetup isLuks command which is more reliable luks_check_success, luks_check_output = self.run_command(f"cryptsetup isLuks {device_to_check}", show_output=False) self.log(f"cryptsetup isLuks result: success={luks_check_success}") is_luks = (check_success and "LUKS" in check_output) or luks_check_success self.log(f"Final LUKS detection result: {is_luks}") if is_luks: self.log("LUKS encryption detected - opening encrypted device") luks_name = f"borg_backup_{lv_name}_{int(time.time())}" luks_device = f"/dev/mapper/{luks_name}" # Open LUKS device (this will prompt for password) self.log("LUKS encryption detected - need passphrase") # Get passphrase via GUI dialog passphrase = self.get_luks_passphrase() if not passphrase: raise Exception("LUKS passphrase required but not provided") self.log("Opening LUKS device with provided passphrase...") success = self.open_luks_device(device_to_check, luks_name, passphrase) if not success: raise Exception(f"Failed to open LUKS device - check passphrase") # Mount the decrypted device self.log(f"Mounting decrypted device to {temp_mount}") mount_opts = self._fs_mount_opts(luks_device) success, output = self.run_command(f"mount -o {mount_opts} {luks_device} {temp_mount}") if not success: # Clean up LUKS device self.run_command(f"cryptsetup luksClose {luks_name}", show_output=False) raise Exception(f"Failed to mount decrypted device: {output}") else: # Mount the snapshot directly (non-encrypted) self.log(f"Mounting snapshot to {temp_mount}") mount_opts = self._fs_mount_opts(self.current_snapshot) success, output = self.run_command(f"mount -o {mount_opts} {self.current_snapshot} {temp_mount}") if not success: raise Exception(f"Failed to mount snapshot: {output}") # Create Borg backup of files archive_name = f"files_{lv_name}_{time.strftime('%Y%m%d_%H%M%S')}" self.log(f"Creating Borg archive (file-level): {archive_name}") self.log("Backing up files from mounted snapshot...") self.log("This is space-efficient and skips empty blocks") borg_cmd = f"borg create --progress --stats --compression auto,zstd {repo_path}::{archive_name} {temp_mount}" # Run borg with environment process = subprocess.Popen(borg_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, env=borg_env) # Stream output for line in process.stdout: self.log(line.strip()) process.wait() if process.returncode != 0: raise Exception("Borg file-level backup failed") self.log("File-level Borg backup completed successfully") finally: # Cleanup: unmount and remove temp directory self.log("Cleaning up mount point") self.run_command(f"umount {temp_mount}", show_output=False) # Close LUKS device if it was opened if luks_device: luks_name = luks_device.split('/')[-1] self.log("Closing LUKS device") self.run_command(f"cryptsetup luksClose {luks_name}", show_output=False) os.rmdir(temp_mount) # Remove snapshot self.log("Cleaning up snapshot") success, output = self.run_command(f"lvremove -f {self.current_snapshot}") if not success: self.log(f"Warning: Failed to remove snapshot: {output}") else: self.log("Snapshot cleaned up") self.current_snapshot = None def cleanup_on_error(self): """Clean up on error""" if self.current_snapshot: self.log("Attempting to clean up snapshot after error") success, output = self.run_command(f"lvremove -f {self.current_snapshot}") if success: self.log("Snapshot cleaned up") else: self.log(f"Failed to clean up snapshot: {output}") self.current_snapshot = None def emergency_stop(self): """Emergency stop - kill any running processes""" self.log("EMERGENCY STOP requested") # Set stop flag for backup workers to check self.backup_running = False # Kill various backup-related processes processes_to_kill = [ "pkill -f 'dd.*if=.*dev'", # dd processes "pkill -f 'pv.*dev'", # pv processes "pkill -f 'borg create'", # borg backup processes "pkill -f 'borg init'", # borg init processes "pkill -f 'lvcreate.*snap'", # snapshot creation "pkill -f 'cryptsetup luksOpen'", # LUKS operations ] for cmd in processes_to_kill: self.run_command(cmd, show_output=False) # Force cleanup self.log("Performing emergency cleanup...") self.cleanup_on_error() # Reset UI immediately self.reset_ui_state() messagebox.showwarning("Emergency Stop", "Backup process stopped!\n\n" + "- All backup processes killed\n" + "- Snapshots cleaned up\n" + "- LUKS devices closed\n\n" + "Check log for details.") def cleanup_on_error(self): """Clean up resources on error or stop""" try: # Clean up any current snapshot if hasattr(self, 'current_snapshot') and self.current_snapshot: self.log(f"Emergency cleanup: removing snapshot {self.current_snapshot}") self.run_command(f"lvremove -f {self.current_snapshot}", show_output=False) self.current_snapshot = None # Clean up any mount points self.run_command("umount /tmp/borg_files_backup_* 2>/dev/null", show_output=False) # Close any LUKS devices we might have opened self.run_command("cryptsetup luksClose borg_backup_* 2>/dev/null", show_output=False) # Remove any temp directories self.run_command("rmdir /tmp/borg_files_backup_* 2>/dev/null", show_output=False) except Exception as e: self.log(f"Error during cleanup: {e}") def manage_snapshots(self): """Open LVM snapshot management window""" snapshot_window = tk.Toplevel(self.root) snapshot_window.title("LVM Snapshot Manager") snapshot_window.geometry("800x500") # Main frame frame = ttk.Frame(snapshot_window, padding="10") frame.pack(fill=tk.BOTH, expand=True) ttk.Label(frame, text="LVM Snapshots:").pack(anchor=tk.W, pady=(0, 5)) # Listbox for snapshots list_frame = ttk.Frame(frame) list_frame.pack(fill=tk.BOTH, expand=True, pady=(0, 10)) self.snapshot_listbox = tk.Listbox(list_frame, selectmode=tk.EXTENDED) snap_scrollbar = ttk.Scrollbar(list_frame, orient="vertical", command=self.snapshot_listbox.yview) self.snapshot_listbox.configure(yscrollcommand=snap_scrollbar.set) self.snapshot_listbox.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) snap_scrollbar.pack(side=tk.RIGHT, fill=tk.Y) # Buttons btn_frame = ttk.Frame(frame) btn_frame.pack(fill=tk.X, pady=(10, 0)) ttk.Button(btn_frame, text="Refresh", command=self.refresh_snapshots).pack(side=tk.LEFT, padx=(0, 5)) ttk.Button(btn_frame, text="Remove Selected", command=self.remove_snapshots).pack(side=tk.LEFT, padx=5) ttk.Button(btn_frame, text="Remove All", command=self.remove_all_snapshots).pack(side=tk.LEFT, padx=5) ttk.Button(btn_frame, text="Close", command=snapshot_window.destroy).pack(side=tk.RIGHT) # Load snapshots self.refresh_snapshots() def refresh_snapshots(self): """Refresh the snapshot list""" try: self.snapshot_listbox.delete(0, tk.END) success, output = self.run_command("lvs --noheadings -o lv_name,vg_name,lv_size,origin | grep -E '_snap|snap_'", show_output=False) if success and output.strip(): for line in output.strip().split('\n'): if line.strip(): parts = line.strip().split() if len(parts) >= 4: lv_name = parts[0] vg_name = parts[1] lv_size = parts[2] origin = parts[3] display_text = f"/dev/{vg_name}/{lv_name} ({lv_size}) -> {origin}" self.snapshot_listbox.insert(tk.END, display_text) else: self.snapshot_listbox.insert(tk.END, "No snapshots found") except Exception as e: messagebox.showerror("Error", f"Failed to list snapshots: {e}") def remove_snapshots(self): """Remove selected snapshots""" selected = self.snapshot_listbox.curselection() if not selected: messagebox.showwarning("Warning", "Please select snapshots to remove") return snapshot_paths = [] for idx in selected: line = self.snapshot_listbox.get(idx) if "No snapshots found" in line: continue # Extract /dev/vg/lv path from display text path = line.split(' ')[0] snapshot_paths.append(path) if not snapshot_paths: return if messagebox.askyesno("Confirm", f"Remove {len(snapshot_paths)} snapshot(s)?\n\n" + "\n".join(snapshot_paths)): for path in snapshot_paths: success, output = self.run_command(f"lvremove -f {path}") if success: self.log(f"Removed snapshot: {path}") else: self.log(f"Failed to remove {path}: {output}") self.refresh_snapshots() def remove_all_snapshots(self): """Remove all snapshots after confirmation""" if messagebox.askyesno("Confirm", "Remove ALL snapshots? This cannot be undone!"): success, output = self.run_command("lvs --noheadings -o lv_path | grep -E '_snap|snap_' | xargs -r lvremove -f") if success: self.log("All snapshots removed") else: self.log(f"Error removing snapshots: {output}") self.refresh_snapshots() def manage_borg_repo(self): """Open Borg repository management window""" # Get repo path if not hasattr(self, 'repo_path_var') or not self.repo_path_var.get().strip(): messagebox.showerror("Error", "Please set a Borg repository path first") return repo_path = self.repo_path_var.get().strip() # Check if repo exists if not os.path.exists(repo_path): messagebox.showerror("Error", f"Repository path does not exist: {repo_path}") return borg_window = tk.Toplevel(self.root) borg_window.title(f"Borg Repository Manager - {repo_path}") borg_window.geometry("900x600") # Main frame frame = ttk.Frame(borg_window, padding="10") frame.pack(fill=tk.BOTH, expand=True) # Repository info info_frame = ttk.LabelFrame(frame, text="Repository Information", padding="5") info_frame.pack(fill=tk.X, pady=(0, 10)) ttk.Label(info_frame, text=f"Path: {repo_path}").pack(anchor=tk.W) # Archives list ttk.Label(frame, text="Archives:").pack(anchor=tk.W, pady=(0, 5)) list_frame = ttk.Frame(frame) list_frame.pack(fill=tk.BOTH, expand=True, pady=(0, 10)) self.archive_listbox = tk.Listbox(list_frame, selectmode=tk.EXTENDED) arch_scrollbar = ttk.Scrollbar(list_frame, orient="vertical", command=self.archive_listbox.yview) self.archive_listbox.configure(yscrollcommand=arch_scrollbar.set) self.archive_listbox.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) arch_scrollbar.pack(side=tk.RIGHT, fill=tk.Y) # Buttons btn_frame = ttk.Frame(frame) btn_frame.pack(fill=tk.X, pady=(10, 0)) ttk.Button(btn_frame, text="Refresh", command=lambda: self.refresh_archives(repo_path)).pack(side=tk.LEFT, padx=(0, 5)) ttk.Button(btn_frame, text="Mount Archive", command=lambda: self.mount_archive(repo_path)).pack(side=tk.LEFT, padx=5) ttk.Button(btn_frame, text="Delete Selected", command=lambda: self.delete_archives(repo_path)).pack(side=tk.LEFT, padx=5) ttk.Button(btn_frame, text="Repository Info", command=lambda: self.show_repo_info(repo_path)).pack(side=tk.LEFT, padx=5) ttk.Button(btn_frame, text="Close", command=borg_window.destroy).pack(side=tk.RIGHT) # Store reference for other methods self.current_repo_path = repo_path # Load archives self.refresh_archives(repo_path) def reset_ui_state(self): """Reset UI to normal state""" self.backup_running = False self.backup_btn.config(state="normal") self.stop_btn.config(state="disabled") self.progress.stop() def refresh_archives(self, repo_path): """Refresh the archive list""" try: self.archive_listbox.delete(0, tk.END) # Set up environment for Borg borg_env = os.environ.copy() if hasattr(self, 'passphrase_var') and self.passphrase_var.get(): borg_env['BORG_PASSPHRASE'] = self.passphrase_var.get() cmd = f"borg list {repo_path}" result = subprocess.run(cmd, shell=True, capture_output=True, text=True, env=borg_env) if result.returncode == 0: if result.stdout.strip(): for line in result.stdout.strip().split('\n'): if line.strip(): self.archive_listbox.insert(tk.END, line.strip()) else: self.archive_listbox.insert(tk.END, "No archives found") else: self.archive_listbox.insert(tk.END, f"Error: {result.stderr}") except Exception as e: messagebox.showerror("Error", f"Failed to list archives: {e}") def mount_archive(self, repo_path): """Mount selected archive""" selected = self.archive_listbox.curselection() if not selected: messagebox.showwarning("Warning", "Please select an archive to mount") return archive_line = self.archive_listbox.get(selected[0]) if "No archives found" in archive_line or "Error:" in archive_line: return # Extract archive name (first word) archive_name = archive_line.split()[0] # Ask for mount point from tkinter import filedialog mount_point = filedialog.askdirectory(title="Select mount point directory") if not mount_point: return try: # Set up environment borg_env = os.environ.copy() if hasattr(self, 'passphrase_var') and self.passphrase_var.get(): borg_env['BORG_PASSPHRASE'] = self.passphrase_var.get() cmd = f"borg mount {repo_path}::{archive_name} {mount_point}" result = subprocess.run(cmd, shell=True, capture_output=True, text=True, env=borg_env) if result.returncode == 0: messagebox.showinfo("Success", f"Archive mounted at: {mount_point}\n\nTo unmount: fusermount -u {mount_point}") self.log(f"Mounted archive {archive_name} at {mount_point}") else: messagebox.showerror("Error", f"Failed to mount archive: {result.stderr}") except Exception as e: messagebox.showerror("Error", f"Failed to mount archive: {e}") def delete_archives(self, repo_path): """Delete selected archives""" selected = self.archive_listbox.curselection() if not selected: messagebox.showwarning("Warning", "Please select archives to delete") return archive_names = [] for idx in selected: line = self.archive_listbox.get(idx) if "No archives found" in line or "Error:" in line: continue archive_names.append(line.split()[0]) if not archive_names: return if messagebox.askyesno("Confirm", f"Delete {len(archive_names)} archive(s)?\n\n" + "\n".join(archive_names) + "\n\nThis cannot be undone!"): try: borg_env = os.environ.copy() if hasattr(self, 'passphrase_var') and self.passphrase_var.get(): borg_env['BORG_PASSPHRASE'] = self.passphrase_var.get() for archive_name in archive_names: cmd = f"borg delete {repo_path}::{archive_name}" result = subprocess.run(cmd, shell=True, capture_output=True, text=True, env=borg_env) if result.returncode == 0: self.log(f"Deleted archive: {archive_name}") else: self.log(f"Failed to delete {archive_name}: {result.stderr}") self.refresh_archives(repo_path) except Exception as e: messagebox.showerror("Error", f"Failed to delete archives: {e}") def show_repo_info(self, repo_path): """Show repository information""" try: borg_env = os.environ.copy() if hasattr(self, 'passphrase_var') and self.passphrase_var.get(): borg_env['BORG_PASSPHRASE'] = self.passphrase_var.get() cmd = f"borg info {repo_path}" result = subprocess.run(cmd, shell=True, capture_output=True, text=True, env=borg_env) if result.returncode == 0: info_window = tk.Toplevel(self.root) info_window.title("Repository Information") info_window.geometry("600x400") text_widget = tk.Text(info_window, wrap=tk.WORD) scrollbar = ttk.Scrollbar(info_window, orient="vertical", command=text_widget.yview) text_widget.configure(yscrollcommand=scrollbar.set) text_widget.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) scrollbar.pack(side=tk.RIGHT, fill=tk.Y) text_widget.insert(tk.END, result.stdout) text_widget.config(state=tk.DISABLED) else: messagebox.showerror("Error", f"Failed to get repository info: {result.stderr}") except Exception as e: messagebox.showerror("Error", f"Failed to get repository info: {e}") def main(): # Check if running as root if os.geteuid() != 0: print("This application needs to run as root for LVM operations.") print("Please run: sudo python3 simple_backup_gui.py") return root = tk.Tk() app = SimpleBackupGUI(root) root.mainloop() if __name__ == "__main__": main()