Files
backup_to_external_m.2/simple_backup_gui.py

1657 lines
75 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Simple LVM Backup GUI
Just the basics: snapshot -> copy -> cleanup
No complex logic, just simple reliable backups
"""
import tkinter as tk
from tkinter import ttk, messagebox
import subprocess
import threading
import os
import time
import json
class SimpleBackupGUI:
def __init__(self, root):
self.root = root
self.root.title("Simple LVM Backup")
self.root.geometry("1200x800") # Made much bigger: was 900x600
# State tracking
self.backup_running = False
self.current_snapshot = None
# Settings file for remembering preferences
if 'SUDO_USER' in os.environ:
user_home = f"/home/{os.environ['SUDO_USER']}"
else:
user_home = os.path.expanduser("~")
self.settings_file = os.path.join(user_home, ".simple_backup_settings.json")
self.settings = self.load_settings()
self.setup_ui()
def load_settings(self):
"""Load settings from file"""
try:
if os.path.exists(self.settings_file):
with open(self.settings_file, 'r') as f:
return json.load(f)
except Exception as e:
# Use print since log method isn't available yet
print(f"Could not load settings: {e}")
return {}
def save_settings(self):
"""Save settings to file"""
try:
with open(self.settings_file, 'w') as f:
json.dump(self.settings, f, indent=2)
except Exception as e:
if hasattr(self, 'log'):
self.log(f"Could not save settings: {e}")
else:
print(f"Could not save settings: {e}")
def update_borg_settings(self):
"""Update Borg settings when backup starts"""
if hasattr(self, 'repo_path_var'):
self.settings['last_borg_repo'] = self.repo_path_var.get()
self.settings['last_encryption'] = getattr(self, 'encryption_var', tk.StringVar()).get()
self.settings['last_snapshot_size'] = getattr(self, 'snapshot_size_var', tk.StringVar()).get()
self.log(f"Saving settings: repo={self.settings.get('last_borg_repo', 'none')}")
self.save_settings()
else:
self.log("repo_path_var not available for settings update")
self.refresh_drives() # Initialize with correct widgets for default mode
def setup_ui(self):
# Main frame
main_frame = ttk.Frame(self.root, padding="10")
main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
# Backup mode selection
ttk.Label(main_frame, text="Backup Mode:").grid(row=0, column=0, sticky=tk.W, pady=5)
self.mode_var = tk.StringVar(value="lv_to_lv")
mode_frame = ttk.Frame(main_frame)
mode_frame.grid(row=0, column=1, columnspan=2, sticky=(tk.W, tk.E), pady=5)
ttk.Radiobutton(mode_frame, text="LV → LV (update existing)",
variable=self.mode_var, value="lv_to_lv",
command=self.on_mode_change).pack(side=tk.LEFT, padx=5)
ttk.Radiobutton(mode_frame, text="LV → Raw Device (fresh)",
variable=self.mode_var, value="lv_to_raw",
command=self.on_mode_change).pack(side=tk.LEFT, padx=5)
ttk.Radiobutton(mode_frame, text="Entire VG → Device",
variable=self.mode_var, value="vg_to_raw",
command=self.on_mode_change).pack(side=tk.LEFT, padx=5)
ttk.Radiobutton(mode_frame, text="LV → Borg (block)",
variable=self.mode_var, value="lv_to_borg",
command=self.on_mode_change).pack(side=tk.LEFT, padx=5)
ttk.Radiobutton(mode_frame, text="VG → Borg (block)",
variable=self.mode_var, value="vg_to_borg",
command=self.on_mode_change).pack(side=tk.LEFT, padx=5)
ttk.Radiobutton(mode_frame, text="Files → Borg",
variable=self.mode_var, value="files_to_borg",
command=self.on_mode_change).pack(side=tk.LEFT, padx=5)
# Source selection
ttk.Label(main_frame, text="Source:").grid(row=1, column=0, sticky=tk.W, pady=5)
# Create a frame for source selection that can switch between combobox and listbox
self.source_frame = ttk.Frame(main_frame)
self.source_frame.grid(row=1, column=1, columnspan=2, sticky=(tk.W, tk.E), pady=5)
# Single selection (combobox) - for most modes
self.source_var = tk.StringVar()
self.source_combo = ttk.Combobox(self.source_frame, textvariable=self.source_var, width=50)
self.source_combo.grid(row=0, column=0, sticky=(tk.W, tk.E))
# Multi-selection (listbox) - for LV modes
self.source_listbox_frame = ttk.Frame(self.source_frame)
# Help text for multi-selection
help_text = ttk.Label(self.source_listbox_frame, text="Hold Ctrl/Cmd to select multiple LVs",
font=("TkDefaultFont", 8), foreground="gray")
help_text.grid(row=0, column=0, columnspan=2, sticky=tk.W, pady=(0, 2))
self.source_listbox = tk.Listbox(self.source_listbox_frame, height=6, selectmode=tk.EXTENDED)
source_scrollbar = ttk.Scrollbar(self.source_listbox_frame, orient="vertical", command=self.source_listbox.yview)
self.source_listbox.configure(yscrollcommand=source_scrollbar.set)
self.source_listbox.grid(row=1, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
source_scrollbar.grid(row=1, column=1, sticky=(tk.N, tk.S))
self.source_listbox_frame.columnconfigure(0, weight=1)
self.source_listbox_frame.rowconfigure(1, weight=1)
# Configure source frame
self.source_frame.columnconfigure(0, weight=1)
# Target selection
ttk.Label(main_frame, text="Target:").grid(row=2, column=0, sticky=tk.W, pady=5)
self.target_var = tk.StringVar()
self.target_combo = ttk.Combobox(main_frame, textvariable=self.target_var, width=50)
self.target_combo.grid(row=2, column=1, columnspan=2, sticky=(tk.W, tk.E), pady=5)
# Borg-specific settings (hidden initially)
self.borg_frame = ttk.LabelFrame(main_frame, text="Borg Backup Settings", padding="5")
self.borg_frame.grid(row=3, column=0, columnspan=3, sticky=(tk.W, tk.E), pady=5)
self.borg_frame.grid_remove() # Hide initially
# Repo path
ttk.Label(self.borg_frame, text="Repository Path:").grid(row=0, column=0, sticky=tk.W, pady=2)
self.repo_path_var = tk.StringVar()
# Load saved repository path
if 'last_borg_repo' in self.settings:
saved_repo = self.settings['last_borg_repo']
self.repo_path_var.set(saved_repo)
self.log(f"Loaded saved repository: {saved_repo}")
else:
self.log("No saved repository found")
repo_entry = ttk.Entry(self.borg_frame, textvariable=self.repo_path_var, width=40)
repo_entry.grid(row=0, column=1, sticky=(tk.W, tk.E), pady=2)
ttk.Button(self.borg_frame, text="Browse", command=self.browse_repo).grid(row=0, column=2, padx=5, pady=2)
# Create new repo checkbox
self.create_new_repo = tk.BooleanVar()
ttk.Checkbutton(self.borg_frame, text="Create new repository",
variable=self.create_new_repo).grid(row=1, column=0, columnspan=2, sticky=tk.W, pady=2)
# Encryption settings
ttk.Label(self.borg_frame, text="Encryption Mode:").grid(row=2, column=0, sticky=tk.W, pady=2)
self.encryption_var = tk.StringVar(value="repokey")
encryption_combo = ttk.Combobox(self.borg_frame, textvariable=self.encryption_var,
values=["none", "repokey", "keyfile"], state="readonly", width=15)
encryption_combo.grid(row=2, column=1, sticky=tk.W, pady=2)
# Snapshot size option
ttk.Label(self.borg_frame, text="Snapshot Size:").grid(row=3, column=0, sticky=tk.W, pady=2)
self.snapshot_size_var = tk.StringVar(value="auto")
snapshot_combo = ttk.Combobox(self.borg_frame, textvariable=self.snapshot_size_var,
values=["auto", "conservative", "generous"], state="readonly", width=15)
snapshot_combo.grid(row=3, column=1, sticky=tk.W, pady=2)
# Help text for snapshot sizes
help_label = ttk.Label(self.borg_frame, text="auto: smart sizing, conservative: 5%, generous: 25%",
font=("TkDefaultFont", 8), foreground="gray")
help_label.grid(row=3, column=2, sticky=tk.W, padx=(5, 0), pady=2)
# Read-only mode for full filesystems
self.readonly_mode = tk.BooleanVar()
ttk.Checkbutton(self.borg_frame, text="Read-only mode (for very full filesystems)",
variable=self.readonly_mode).grid(row=5, column=0, columnspan=3, sticky=tk.W, pady=2)
# Passphrase
ttk.Label(self.borg_frame, text="Passphrase:").grid(row=4, column=0, sticky=tk.W, pady=2)
self.passphrase_var = tk.StringVar()
passphrase_entry = ttk.Entry(self.borg_frame, textvariable=self.passphrase_var, show="*", width=40)
passphrase_entry.grid(row=4, column=1, sticky=(tk.W, tk.E), pady=2)
# Configure borg frame grid
self.borg_frame.columnconfigure(1, weight=1)
self.borg_frame.columnconfigure(2, weight=0)
# Refresh button
ttk.Button(main_frame, text="Refresh", command=self.refresh_drives).grid(row=4, column=0, pady=10)
# Backup button
self.backup_btn = ttk.Button(main_frame, text="Start Backup",
command=self.start_backup, style="Accent.TButton")
self.backup_btn.grid(row=4, column=1, pady=10)
# Emergency stop
self.stop_btn = ttk.Button(main_frame, text="Emergency Stop",
command=self.emergency_stop, state="disabled")
self.stop_btn.grid(row=4, column=2, pady=10)
# Management buttons row
mgmt_frame = ttk.Frame(main_frame)
mgmt_frame.grid(row=5, column=0, columnspan=3, pady=10)
ttk.Button(mgmt_frame, text="Manage Borg Repo",
command=self.manage_borg_repo).pack(side=tk.LEFT, padx=5)
ttk.Button(mgmt_frame, text="Manage LVM Snapshots",
command=self.manage_snapshots).pack(side=tk.LEFT, padx=5)
# Progress area
ttk.Label(main_frame, text="Progress:").grid(row=6, column=0, sticky=tk.W, pady=(20, 5))
self.progress = ttk.Progressbar(main_frame, mode='indeterminate')
self.progress.grid(row=7, column=0, columnspan=3, sticky=(tk.W, tk.E), pady=5)
# Log area
ttk.Label(main_frame, text="Log:").grid(row=8, column=0, sticky=tk.W, pady=(10, 5))
log_frame = ttk.Frame(main_frame)
log_frame.grid(row=9, column=0, columnspan=3, sticky=(tk.W, tk.E, tk.N, tk.S), pady=5)
self.log_text = tk.Text(log_frame, height=15, width=70)
scrollbar = ttk.Scrollbar(log_frame, orient="vertical", command=self.log_text.yview)
self.log_text.configure(yscrollcommand=scrollbar.set)
self.log_text.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
scrollbar.grid(row=0, column=1, sticky=(tk.N, tk.S))
# Configure grid weights
self.root.columnconfigure(0, weight=1)
self.root.rowconfigure(0, weight=1)
main_frame.columnconfigure(1, weight=1)
main_frame.rowconfigure(9, weight=1) # Updated for new log row
log_frame.columnconfigure(0, weight=1)
log_frame.rowconfigure(0, weight=1)
def browse_repo(self):
"""Browse for Borg repository path"""
from tkinter import filedialog
path = filedialog.askdirectory(title="Select Borg Repository Directory")
if path:
self.repo_path_var.set(path)
def on_mode_change(self):
"""Handle backup mode change"""
mode = self.mode_var.get()
# Show/hide Borg settings
if mode in ["lv_to_borg", "vg_to_borg", "files_to_borg"]:
self.borg_frame.grid()
else:
self.borg_frame.grid_remove()
# Always refresh drives when mode changes
self.refresh_drives()
def log(self, message):
"""Add message to log"""
timestamp = time.strftime("%H:%M:%S")
# Check if log_text widget exists yet
if hasattr(self, 'log_text'):
self.log_text.insert(tk.END, f"[{timestamp}] {message}\n")
self.log_text.see(tk.END)
self.root.update_idletasks()
else:
# Print to console if GUI log not available yet
print(f"[{timestamp}] {message}")
def calculate_snapshot_size(self, lv_size_bytes, backup_mode="block"):
"""Calculate appropriate snapshot size based on LV size, user preference, and backup mode"""
mode = self.snapshot_size_var.get()
# For file-level backups, use much smaller snapshots
if backup_mode == "file":
if mode == "conservative":
# 2% of LV size, minimum 512MB, maximum 10GB
snapshot_size_bytes = max(min(lv_size_bytes // 50, 10 * 1024**3), 512 * 1024**2)
elif mode == "generous":
# 5% of LV size, minimum 1GB, maximum 20GB
snapshot_size_bytes = max(min(lv_size_bytes // 20, 20 * 1024**3), 1024**3)
else: # auto
# 3% of LV size, minimum 1GB, maximum 15GB
snapshot_size_bytes = max(min(lv_size_bytes * 3 // 100, 15 * 1024**3), 1024**3)
else:
# Original logic for block-level backups
if mode == "conservative":
# 5% of LV size, minimum 1GB
snapshot_size_bytes = max(lv_size_bytes // 20, 1024**3)
elif mode == "generous":
# 25% of LV size, minimum 2GB
snapshot_size_bytes = max(lv_size_bytes // 4, 2 * 1024**3)
else: # auto
# 10% of LV size, minimum 1GB, but increase for very large LVs
base_percentage = 10
if lv_size_bytes > 50 * 1024**3: # >50GB
base_percentage = 15 # Use 15% for large, potentially active LVs
snapshot_size_bytes = max(lv_size_bytes * base_percentage // 100, 1024**3)
snapshot_size_gb = snapshot_size_bytes // (1024**3)
return f"{snapshot_size_gb}G"
def _parse_size_to_bytes(self, size_str):
"""Parse simple size strings like '10G' to bytes (supports G and M)."""
try:
s = size_str.strip().upper()
if s.endswith('G'):
return int(s[:-1]) * (1024 ** 3)
if s.endswith('M'):
return int(s[:-1]) * (1024 ** 2)
# Fallback: raw int bytes
return int(s)
except Exception:
return None
def _vg_free_bytes(self, vg_name):
"""Return VG free space in bytes or None on failure."""
ok, out = self.run_command(f"vgs --noheadings -o vg_free --units b {vg_name}", show_output=False)
if not ok or not out.strip():
return None
try:
return int(out.strip().replace('B', '').strip())
except Exception:
return None
def _ensure_vg_has_space(self, vg_name, snapshot_size_str):
"""Raise Exception if VG does not have enough free space for requested snapshot size."""
needed = self._parse_size_to_bytes(snapshot_size_str)
if needed is None:
return # cannot parse; skip strict check
free_b = self._vg_free_bytes(vg_name)
if free_b is None:
return # unknown; skip strict check
if free_b < needed:
raise Exception(f"Insufficient VG free space: need {snapshot_size_str}, free {free_b // (1024**3)}G in VG {vg_name}")
def run_command(self, cmd, show_output=True):
"""Run a command and return result"""
try:
if show_output:
self.log(f"Running: {cmd}")
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
if result.returncode != 0:
if show_output:
self.log(f"ERROR: {result.stderr.strip()}")
return False, result.stderr.strip()
else:
if show_output and result.stdout.strip():
self.log(f"Output: {result.stdout.strip()}")
return True, result.stdout.strip()
except Exception as e:
if show_output:
self.log(f"ERROR: {str(e)}")
return False, str(e)
def run_interactive_command(self, cmd):
"""Run an interactive command in a terminal window"""
try:
self.log(f"Running interactive: {cmd}")
# Try different terminal emulators
terminals = [
f"x-terminal-emulator -e bash -c '{cmd}; read -p \"Press Enter to continue...\"'",
f"xterm -e bash -c '{cmd}; read -p \"Press Enter to continue...\"'",
f"konsole -e bash -c '{cmd}; read -p \"Press Enter to continue...\"'",
f"gnome-terminal --wait -- bash -c '{cmd}; read -p \"Press Enter to continue...\"'",
f"xfce4-terminal -e 'bash -c \"{cmd}; read -p \\\"Press Enter to continue...\\\"\"; bash'"
]
for terminal_cmd in terminals:
try:
result = subprocess.run(terminal_cmd, shell=True, timeout=300) # 5 minute timeout
if result.returncode == 0:
return True
except (subprocess.TimeoutExpired, FileNotFoundError):
continue
# If no terminal worked, try a simpler approach
self.log("No terminal emulator found, trying direct execution...")
result = subprocess.run(cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
return result.returncode == 0
except Exception as e:
self.log(f"ERROR running interactive command: {str(e)}")
return False
def get_luks_passphrase(self):
"""Get LUKS passphrase via GUI dialog"""
from tkinter import simpledialog
try:
passphrase = simpledialog.askstring(
"LUKS Passphrase",
"Enter LUKS passphrase for encrypted volume:",
show='*' # Hide password input
)
return passphrase
except Exception as e:
self.log(f"Error getting passphrase: {e}")
return None
def open_luks_device(self, device_path, luks_name, passphrase):
"""Open LUKS device with provided passphrase"""
try:
# Avoid exposing secrets via shell; pass via stdin
result = subprocess.run(
["cryptsetup", "luksOpen", device_path, luks_name],
input=(passphrase or "") + "\n",
text=True,
capture_output=True,
)
if result.returncode == 0:
self.log("LUKS device opened successfully")
return True
self.log(f"Failed to open LUKS device: {result.stderr.strip()}")
return False
except Exception as e:
self.log(f"Error opening LUKS device: {e}")
return False
def _fs_mount_opts(self, device_path):
"""Return safe read-only mount options based on filesystem type."""
fs_ok, fs_type = self.run_command(f"blkid -o value -s TYPE {device_path}", show_output=False)
fs_type = fs_type.strip().lower() if fs_ok and fs_type else ""
if fs_type == "ext4" or fs_type == "ext3":
return "ro,noload"
if fs_type == "xfs":
return "ro,norecovery"
# default safe read-only
return "ro"
def refresh_drives(self):
"""Refresh available drives based on selected mode"""
mode = self.mode_var.get()
self.log(f"Refreshing drives for mode: {mode}")
# Clear target combo for Borg modes
if mode in ["lv_to_borg", "vg_to_borg", "files_to_borg"]:
self.target_combo['values'] = []
self.target_var.set("")
# Show appropriate source selection widget
if mode in ["lv_to_borg", "files_to_borg"]:
# Multi-selection for individual LV modes
self.source_combo.grid_remove()
self.source_listbox_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
self.log("Switched to multi-selection listbox for LV selection")
else:
# Single selection for VG modes and other modes
self.source_listbox_frame.grid_remove()
self.source_combo.grid(row=0, column=0, sticky=(tk.W, tk.E))
self.log("Switched to single-selection combobox")
# Update the window to force widget refresh
self.root.update_idletasks()
# Source options
if mode == "vg_to_raw" or mode == "vg_to_borg":
# Show volume groups
success, output = self.run_command("vgs --noheadings -o vg_name,vg_size,pv_count", show_output=False)
if success:
source_list = []
for line in output.strip().split('\n'):
if line.strip():
parts = line.strip().split()
if len(parts) >= 3:
vg_name = parts[0]
vg_size = parts[1]
pv_count = parts[2]
source_list.append(f"{vg_name} ({vg_size}) [{pv_count} PVs]")
self.source_combo['values'] = source_list
self.source_var.set("") # Clear selection
self.log(f"Found {len(source_list)} volume groups")
else:
self.log("No volume groups found")
self.source_combo['values'] = []
else:
# Show logical volumes (all LV modes: lv_to_lv, lv_to_raw, lv_to_borg, files_to_borg)
success, output = self.run_command("lvs --noheadings -o lv_path,lv_size,vg_name", show_output=False)
if success:
lv_list = []
for line in output.strip().split('\n'):
if line.strip():
parts = line.strip().split()
if len(parts) >= 3:
lv_path = parts[0]
lv_size = parts[1]
vg_name = parts[2]
lv_list.append(f"{lv_path} ({lv_size}) [VG: {vg_name}]")
# Update the appropriate widget
if mode in ["lv_to_borg", "files_to_borg"]:
# Update listbox for multi-selection
self.source_listbox.delete(0, tk.END)
for lv in lv_list:
self.source_listbox.insert(tk.END, lv)
self.log(f"Loaded {len(lv_list)} logical volumes into listbox")
else:
# Update combobox for single selection
self.source_combo['values'] = lv_list
self.source_var.set("") # Clear selection
self.log(f"Loaded {len(lv_list)} logical volumes into combobox")
self.log(f"Found {len(lv_list)} logical volumes")
else:
self.log("No LVM volumes found")
if mode in ["lv_to_borg", "files_to_borg"]:
self.source_listbox.delete(0, tk.END)
else:
self.source_combo['values'] = []
# Target options
if mode == "lv_to_lv":
# Show existing logical volumes on external drives
success, output = self.run_command("lvs --noheadings -o lv_path,lv_size,vg_name", show_output=False)
if success:
target_list = []
for line in output.strip().split('\n'):
if line.strip():
parts = line.strip().split()
if len(parts) >= 3:
lv_path = parts[0]
lv_size = parts[1]
vg_name = parts[2]
# Filter out internal VGs (you might want to customize this)
if "migration" in vg_name or "external" in vg_name or "backup" in vg_name:
target_list.append(f"{lv_path} ({lv_size}) [VG: {vg_name}]")
self.target_combo['values'] = target_list
self.log(f"Found {len(target_list)} target logical volumes")
else:
self.log("No target LVs found")
self.target_combo['values'] = []
elif mode in ["lv_to_borg", "vg_to_borg", "files_to_borg"]:
# No target combo for Borg modes - repository path is used
self.target_combo['values'] = []
self.log("Using Borg repository path instead of target device")
else:
# Show raw block devices (lv_to_raw and vg_to_raw)
success, output = self.run_command("lsblk -dno NAME,SIZE,MODEL | grep -E '^sd|^nvme'", show_output=False)
if success:
drive_list = []
for line in output.strip().split('\n'):
if line.strip():
parts = line.strip().split(None, 2)
if len(parts) >= 2:
name = parts[0]
size = parts[1]
model = parts[2] if len(parts) > 2 else "Unknown"
drive_list.append(f"/dev/{name} ({size}) {model}")
self.target_combo['values'] = drive_list
self.log(f"Found {len(drive_list)} target drives")
else:
self.log("No block devices found")
self.target_combo['values'] = []
def start_backup(self):
"""Start the backup process"""
mode = self.mode_var.get()
# Get selected sources based on mode
if mode in ["lv_to_borg", "files_to_borg"]:
# Multi-selection mode
selected_indices = self.source_listbox.curselection()
if not selected_indices:
messagebox.showerror("Error", "Please select at least one logical volume")
return
selected_sources = []
for index in selected_indices:
selected_sources.append(self.source_listbox.get(index).split()[0])
if not self.repo_path_var.get():
messagebox.showerror("Error", "Please select repository path")
return
# Check if borg is installed
success, _ = self.run_command("which borg", show_output=False)
if not success:
messagebox.showerror("Error", "Borg Backup is not installed. Please install it first:\nsudo apt install borgbackup")
return
repo_path = self.repo_path_var.get()
# Build confirmation message for multiple LVs
if mode == "lv_to_borg":
msg = f"Borg backup of {len(selected_sources)} LVs (block-level):\n\n"
else: # files_to_borg
msg = f"Borg backup of {len(selected_sources)} LVs (file-level):\n\n"
for source in selected_sources:
msg += f"{source}\n"
msg += f"\nRepository: {repo_path}\n"
if self.create_new_repo.get():
msg += "This will create a new Borg repository.\n"
else:
msg += "This will add to existing Borg repository.\n"
msg += f"Encryption: {self.encryption_var.get()}\n\nContinue?"
if not messagebox.askyesno("Confirm Backup", msg):
return
# Start multi-LV backup
self.backup_running = True
self.backup_btn.config(state="disabled")
self.stop_btn.config(state="normal")
self.progress.start()
# Save settings before starting backup
self.update_borg_settings()
thread = threading.Thread(target=self.multi_lv_backup_worker, args=(mode, selected_sources, repo_path))
thread.daemon = True
thread.start()
return
# Original single-selection logic for other modes
# Validate inputs based on mode
if mode in ["vg_to_borg"]:
if not self.source_var.get() or not self.repo_path_var.get():
messagebox.showerror("Error", "Please select source and repository path")
return
# Check if borg is installed
success, _ = self.run_command("which borg", show_output=False)
if not success:
messagebox.showerror("Error", "Borg Backup is not installed. Please install it first:\nsudo apt install borgbackup")
return
source = self.source_var.get().split()[0]
repo_path = self.repo_path_var.get()
# Build confirmation message for VG Borg
msg = f"Borg backup of entire VG (block-level):\n\nSource VG: {source}\nRepository: {repo_path}\n\n"
if self.create_new_repo.get():
msg += "This will create a new Borg repository.\n"
else:
msg += "This will add to existing Borg repository.\n"
msg += f"Encryption: {self.encryption_var.get()}\n"
msg += "This will backup ALL logical volumes in the VG.\n\nContinue?"
target = repo_path
else:
# Original validation for non-Borg modes
if not self.source_var.get() or not self.target_var.get():
messagebox.showerror("Error", "Please select both source and target")
return
source = self.source_var.get().split()[0]
target = self.target_var.get().split()[0]
# Build confirmation message based on mode
if mode == "lv_to_lv":
msg = f"Update existing LV backup:\n\nSource LV: {source}\nTarget LV: {target}\n\n"
msg += "This will overwrite the target LV with current source data.\n\nContinue?"
elif mode == "lv_to_raw":
msg = f"Create fresh backup:\n\nSource LV: {source}\nTarget Device: {target}\n\n"
msg += "WARNING: Target device will be completely overwritten!\n\nContinue?"
elif mode == "vg_to_raw":
msg = f"Clone entire Volume Group:\n\nSource VG: {source}\nTarget Device: {target}\n\n"
msg += "WARNING: Target device will be completely overwritten!\n"
msg += "This will clone ALL logical volumes in the VG.\n\nContinue?"
if not messagebox.askyesno("Confirm Backup", msg):
return
# Start backup in thread
self.backup_running = True
self.backup_btn.config(state="disabled")
self.stop_btn.config(state="normal")
self.progress.start()
# Save settings before starting backup
self.update_borg_settings()
thread = threading.Thread(target=self.backup_worker, args=(mode, source, target))
thread.daemon = True
thread.start()
def backup_worker(self, mode, source, target):
"""The actual backup work"""
try:
# Check if stop was requested
if not self.backup_running:
self.log("Backup stopped before starting")
return
self.log(f"=== Starting {mode} backup ===")
if mode == "lv_to_lv":
self.backup_lv_to_lv(source, target)
elif mode == "lv_to_raw":
self.backup_lv_to_raw(source, target)
elif mode == "vg_to_raw":
self.backup_vg_to_raw(source, target)
elif mode == "lv_to_borg":
self.backup_lv_to_borg(source, target)
elif mode == "files_to_borg":
self.backup_files_to_borg(source, target)
elif mode == "vg_to_borg":
self.backup_vg_to_borg(source, target)
self.log("=== Backup completed successfully! ===")
self.root.after(0, lambda: messagebox.showinfo("Success", "Backup completed successfully!"))
except Exception as e:
self.log(f"ERROR: {str(e)}")
self.cleanup_on_error()
self.root.after(0, lambda: messagebox.showerror("Backup Failed", str(e)))
finally:
# Reset UI state
self.root.after(0, self.reset_ui_state)
def multi_lv_backup_worker(self, mode, selected_sources, repo_path):
"""Backup multiple LVs sequentially"""
try:
# Check if stop was requested
if not self.backup_running:
self.log("Multi-LV backup stopped before starting")
return
self.log(f"=== Starting multi-LV {mode} backup ===")
self.log(f"Selected {len(selected_sources)} logical volumes")
# Initialize repository once if needed
if self.create_new_repo.get():
# Check for stop again before repo creation
if not self.backup_running:
self.log("Backup stopped during repository initialization")
return
borg_env = os.environ.copy()
if self.passphrase_var.get():
borg_env['BORG_PASSPHRASE'] = self.passphrase_var.get()
self.log(f"Creating new Borg repository: {repo_path}")
encryption = self.encryption_var.get()
if encryption == "none":
init_cmd = f"borg init --encryption=none {repo_path}"
else:
init_cmd = f"borg init --encryption={encryption} {repo_path}"
result = subprocess.run(init_cmd, shell=True, capture_output=True, text=True, env=borg_env)
if result.returncode != 0:
raise Exception(f"Failed to initialize Borg repository: {result.stderr}")
self.log("Repository initialized successfully")
# Backup each LV
successful_backups = 0
failed_backups = []
for i, source_lv in enumerate(selected_sources, 1):
self.log(f"--- Processing LV {i}/{len(selected_sources)}: {source_lv} ---")
try:
if mode == "lv_to_borg":
self.backup_lv_to_borg(source_lv, repo_path)
elif mode == "files_to_borg":
self.backup_files_to_borg(source_lv, repo_path)
successful_backups += 1
self.log(f"Successfully backed up {source_lv}")
except Exception as e:
self.log(f"Failed to backup {source_lv}: {str(e)}")
failed_backups.append(source_lv)
# Continue with next LV instead of stopping
# Summary
self.log(f"=== Multi-LV backup completed ===")
self.log(f"Successful: {successful_backups}/{len(selected_sources)}")
if failed_backups:
self.log(f"Failed: {', '.join(failed_backups)}")
# Show completion dialog
if failed_backups:
msg = f"Backup completed with some failures:\n\nSuccessful: {successful_backups}\nFailed: {len(failed_backups)}\n\nFailed LVs:\n" + '\n'.join(failed_backups)
self.root.after(0, lambda: messagebox.showwarning("Backup Completed with Warnings", msg))
else:
self.root.after(0, lambda: messagebox.showinfo("Success", f"All {successful_backups} LVs backed up successfully!"))
except Exception as e:
self.log(f"ERROR: {str(e)}")
self.cleanup_on_error()
self.root.after(0, lambda: messagebox.showerror("Backup Failed", str(e)))
finally:
# Reset UI state
self.root.after(0, self.reset_ui_state)
def backup_lv_to_lv(self, source_lv, target_lv):
"""Backup LV to existing LV"""
self.log("Mode: LV to LV (updating existing backup)")
# Create snapshot of source
vg_name = source_lv.split('/')[2]
lv_name = source_lv.split('/')[3]
snapshot_name = f"{lv_name}_backup_snap_{int(time.time())}"
self.current_snapshot = f"/dev/{vg_name}/{snapshot_name}"
self.log(f"Creating snapshot: {snapshot_name}")
# Pre-flight VG free check for 1G default
try:
self._ensure_vg_has_space(vg_name, "1G")
except Exception as e:
raise Exception(str(e))
success, output = self.run_command(f"lvcreate -L1G -s -n {snapshot_name} {source_lv}")
if not success:
raise Exception(f"Failed to create snapshot: {output}")
# Copy snapshot to target LV
self.log(f"Copying {self.current_snapshot} to {target_lv}")
success, _ = self.run_command("which pv", show_output=False)
if success:
copy_cmd = f"pv {self.current_snapshot} | dd of={target_lv} bs=4M"
else:
copy_cmd = f"dd if={self.current_snapshot} of={target_lv} bs=4M status=progress"
success, output = self.run_command(copy_cmd)
if not success:
raise Exception(f"Failed to copy data: {output}")
# Cleanup
self.log("Cleaning up snapshot")
success, output = self.run_command(f"lvremove -f {self.current_snapshot}")
if not success:
self.log(f"Warning: Failed to remove snapshot: {output}")
else:
self.log("Snapshot cleaned up")
self.current_snapshot = None
def backup_lv_to_raw(self, source_lv, target_device):
"""Backup LV to raw device (fresh backup)"""
self.log("Mode: LV to Raw Device (fresh backup)")
# Create snapshot of source
vg_name = source_lv.split('/')[2]
lv_name = source_lv.split('/')[3]
snapshot_name = f"{lv_name}_backup_snap_{int(time.time())}"
self.current_snapshot = f"/dev/{vg_name}/{snapshot_name}"
self.log(f"Creating snapshot: {snapshot_name}")
# Pre-flight VG free check for 1G default
try:
self._ensure_vg_has_space(vg_name, "1G")
except Exception as e:
raise Exception(str(e))
success, output = self.run_command(f"lvcreate -L1G -s -n {snapshot_name} {source_lv}")
if not success:
raise Exception(f"Failed to create snapshot: {output}")
# Copy snapshot to target device
self.log(f"Copying {self.current_snapshot} to {target_device}")
self.log("This will create a raw block-level copy")
success, _ = self.run_command("which pv", show_output=False)
if success:
copy_cmd = f"pv {self.current_snapshot} | dd of={target_device} bs=4M"
else:
copy_cmd = f"dd if={self.current_snapshot} of={target_device} bs=4M status=progress"
success, output = self.run_command(copy_cmd)
if not success:
raise Exception(f"Failed to copy data: {output}")
# Cleanup
self.log("Cleaning up snapshot")
success, output = self.run_command(f"lvremove -f {self.current_snapshot}")
if not success:
self.log(f"Warning: Failed to remove snapshot: {output}")
else:
self.log("Snapshot cleaned up")
self.current_snapshot = None
def backup_vg_to_raw(self, source_vg, target_device):
"""Backup entire VG to raw device"""
self.log("Mode: Entire VG to Raw Device")
self.log("This will create a complete clone including LVM metadata")
# Get the physical volume(s) that make up this VG
success, output = self.run_command(f"vgs --noheadings -o pv_name {source_vg}", show_output=False)
if not success:
raise Exception(f"Failed to get PV info for VG {source_vg}")
# For simplicity, we'll use the first PV as the source
# In a real implementation, you might want to handle multiple PVs
pv_list = output.strip().split()
if not pv_list:
raise Exception(f"No physical volumes found for VG {source_vg}")
source_pv = pv_list[0]
self.log(f"Source PV: {source_pv}")
# Copy the entire physical volume
self.log(f"Copying entire PV {source_pv} to {target_device}")
self.log("This preserves LVM metadata and all logical volumes")
success, _ = self.run_command("which pv", show_output=False)
if success:
copy_cmd = f"pv {source_pv} | dd of={target_device} bs=4M"
else:
copy_cmd = f"dd if={source_pv} of={target_device} bs=4M status=progress"
success, output = self.run_command(copy_cmd)
if not success:
raise Exception(f"Failed to copy PV: {output}")
self.log("VG copy completed - target device now contains complete LVM structure")
def backup_lv_to_borg(self, source_lv, repo_path):
"""Backup LV to Borg repository (block-level)"""
self.log("Mode: LV to Borg Repository (block-level backup)")
# Set up environment for Borg
borg_env = os.environ.copy()
if self.passphrase_var.get():
borg_env['BORG_PASSPHRASE'] = self.passphrase_var.get()
# Initialize repository if needed
if self.create_new_repo.get():
self.log(f"Creating new Borg repository: {repo_path}")
encryption = self.encryption_var.get()
if encryption == "none":
init_cmd = f"borg init --encryption=none {repo_path}"
else:
init_cmd = f"borg init --encryption={encryption} {repo_path}"
result = subprocess.run(init_cmd, shell=True, capture_output=True, text=True, env=borg_env)
if result.returncode != 0:
raise Exception(f"Failed to initialize Borg repository: {result.stderr}")
self.log("Repository initialized successfully")
# Create snapshot
vg_name = source_lv.split('/')[2]
lv_name = source_lv.split('/')[3]
snapshot_name = f"{lv_name}_borg_snap_{int(time.time())}"
self.current_snapshot = f"/dev/{vg_name}/{snapshot_name}"
# Get LV size to determine appropriate snapshot size
success, lv_size_output = self.run_command(f"lvs --noheadings -o lv_size --units b {source_lv}", show_output=False)
if success:
lv_size_bytes = int(lv_size_output.strip().replace('B', ''))
snapshot_size = self.calculate_snapshot_size(lv_size_bytes)
else:
snapshot_size = "2G" # Default fallback
self.log(f"Creating snapshot: {snapshot_name} (size: {snapshot_size})")
# Pre-flight VG free space
try:
self._ensure_vg_has_space(vg_name, snapshot_size)
except Exception as e:
raise Exception(str(e))
success, output = self.run_command(f"lvcreate -L{snapshot_size} -s -n {snapshot_name} {source_lv}")
if not success:
raise Exception(f"Failed to create snapshot: {output}")
# Create Borg backup of the raw block device
archive_name = f"lv_{lv_name}_{time.strftime('%Y%m%d_%H%M%S')}"
self.log(f"Creating Borg archive (block-level): {archive_name}")
self.log("Backing up raw snapshot block device to Borg...")
self.log("This preserves exact block-level state including filesystem metadata")
# Use stdin mode to pipe the block device into borg
borg_cmd = f"dd if={self.current_snapshot} bs=4M | borg create --stdin-name '{lv_name}.img' --progress --stats {repo_path}::{archive_name} -"
# Run command with Borg environment
process = subprocess.Popen(borg_cmd, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, text=True, env=borg_env)
# Stream output
for line in process.stdout:
if line.strip():
self.log(line.strip())
process.wait()
if process.returncode != 0:
raise Exception("Borg block-level backup failed")
self.log("Block-level Borg backup completed successfully")
# Cleanup snapshot
self.log("Cleaning up snapshot")
success, output = self.run_command(f"lvremove -f {self.current_snapshot}")
if not success:
self.log(f"Warning: Failed to remove snapshot: {output}")
else:
self.log("Snapshot cleaned up")
self.current_snapshot = None
def backup_vg_to_borg(self, source_vg, repo_path):
"""Backup entire VG to Borg repository (block-level)"""
self.log("Mode: Entire VG to Borg Repository (block-level backup)")
self.log("This will store all LV snapshots as raw block devices in Borg")
# Set up environment for Borg
borg_env = os.environ.copy()
if self.passphrase_var.get():
borg_env['BORG_PASSPHRASE'] = self.passphrase_var.get()
# Initialize repository if needed
if self.create_new_repo.get():
self.log(f"Creating new Borg repository: {repo_path}")
encryption = self.encryption_var.get()
if encryption == "none":
init_cmd = f"borg init --encryption=none {repo_path}"
else:
init_cmd = f"borg init --encryption={encryption} {repo_path}"
result = subprocess.run(init_cmd, shell=True, capture_output=True, text=True, env=borg_env)
if result.returncode != 0:
raise Exception(f"Failed to initialize Borg repository: {result.stderr}")
self.log("Repository initialized successfully")
# Get all LVs in the VG
success, output = self.run_command(f"lvs --noheadings -o lv_name {source_vg}", show_output=False)
if not success:
raise Exception(f"Failed to get LVs for VG {source_vg}")
lv_names = [lv.strip() for lv in output.strip().split('\n') if lv.strip()]
if not lv_names:
raise Exception(f"No logical volumes found in VG {source_vg}")
self.log(f"Found {len(lv_names)} logical volumes: {', '.join(lv_names)}")
# Create one archive with all LVs
archive_name = f"vg_{source_vg}_{time.strftime('%Y%m%d_%H%M%S')}"
self.log(f"Creating Borg archive (block-level): {archive_name}")
# Create temporary directory for organizing the LV images
import tempfile
temp_dir = tempfile.mkdtemp(prefix="borg_vg_backup_")
snapshots_created = []
try:
# Process each LV one by one to avoid space issues
for lv_name in lv_names:
snapshot_name = f"{lv_name}_borg_snap_{int(time.time())}"
snapshot_path = f"/dev/{source_vg}/{snapshot_name}"
lv_path = f"/dev/{source_vg}/{lv_name}"
# Get LV size to determine appropriate snapshot size
success, lv_size_output = self.run_command(f"lvs --noheadings -o lv_size --units b {lv_path}", show_output=False)
if success:
lv_size_bytes = int(lv_size_output.strip().replace('B', ''))
snapshot_size = self.calculate_snapshot_size(lv_size_bytes)
else:
snapshot_size = "2G" # Default fallback
self.log(f"Processing LV: {lv_name}")
self.log(f"Creating snapshot: {snapshot_name} (size: {snapshot_size})")
# Pre-flight VG free space
try:
self._ensure_vg_has_space(source_vg, snapshot_size)
except Exception as e:
self.log(f"Skipping {lv_name}: {e}")
continue
success, output = self.run_command(f"lvcreate -L{snapshot_size} -s -n {snapshot_name} {lv_path}")
if not success:
self.log(f"Warning: Failed to create snapshot for {lv_name}: {output}")
continue
snapshots_created.append(snapshot_path)
# Stream this LV directly to Borg using append mode (if supported)
# or create individual archives
archive_name_lv = f"vg_{source_vg}_lv_{lv_name}_{time.strftime('%Y%m%d_%H%M%S')}"
self.log(f"Backing up {lv_name} to archive: {archive_name_lv}")
# Use stdin mode to stream the block device
borg_cmd = f"dd if={snapshot_path} bs=4M | borg create --stdin-name '{lv_name}.img' --progress --stats {repo_path}::{archive_name_lv} -"
# Run command with Borg environment
process = subprocess.Popen(borg_cmd, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, text=True, env=borg_env)
# Stream output
for line in process.stdout:
if line.strip():
self.log(line.strip())
process.wait()
if process.returncode != 0:
self.log(f"Warning: Backup of {lv_name} failed")
else:
self.log(f"Successfully backed up {lv_name}")
# Clean up this snapshot immediately to save space
self.log(f"Removing snapshot {snapshot_path}")
success, output = self.run_command(f"lvremove -f {snapshot_path}")
if success:
snapshots_created.remove(snapshot_path)
else:
self.log(f"Warning: Failed to remove snapshot {snapshot_path}: {output}")
self.log("Block-level VG Borg backup completed successfully")
self.log(f"Created individual archives for each LV in VG {source_vg}")
finally:
# Remove temporary directory
import shutil
try:
shutil.rmtree(temp_dir)
self.log("Temporary directory cleaned up")
except:
self.log(f"Warning: Could not remove temp directory {temp_dir}")
# Remove any remaining snapshots
for snapshot_path in snapshots_created:
self.log(f"Removing remaining snapshot {snapshot_path}")
success, output = self.run_command(f"lvremove -f {snapshot_path}")
if not success:
self.log(f"Warning: Failed to remove snapshot {snapshot_path}: {output}")
self.current_snapshot = None
def backup_files_to_borg(self, source_lv, repo_path):
"""Backup LV files to Borg repository (file-level via snapshot mount)"""
self.log("Mode: LV to Borg Repository (file-level backup)")
self.log("This will mount an LV snapshot and backup files (space-efficient)")
# Set up environment for Borg
borg_env = os.environ.copy()
if self.passphrase_var.get():
borg_env['BORG_PASSPHRASE'] = self.passphrase_var.get()
# Initialize repository if needed
if self.create_new_repo.get():
self.log(f"Creating new Borg repository: {repo_path}")
encryption = self.encryption_var.get()
if encryption == "none":
init_cmd = f"borg init --encryption=none {repo_path}"
else:
init_cmd = f"borg init --encryption={encryption} {repo_path}"
result = subprocess.run(init_cmd, shell=True, capture_output=True, text=True, env=borg_env)
if result.returncode != 0:
raise Exception(f"Failed to initialize Borg repository: {result.stderr}")
self.log("Repository initialized successfully")
# Create snapshot
vg_name = source_lv.split('/')[2]
lv_name = source_lv.split('/')[3]
# Use timestamp to ensure unique snapshot names
timestamp = int(time.time())
snapshot_name = f"{lv_name}_files_snap_{timestamp}"
self.current_snapshot = f"/dev/{vg_name}/{snapshot_name}"
# Get LV size to determine appropriate snapshot size
success, lv_size_output = self.run_command(f"lvs --noheadings -o lv_size --units b {source_lv}", show_output=False)
if success:
lv_size_bytes = int(lv_size_output.strip().replace('B', ''))
snapshot_size = self.calculate_snapshot_size(lv_size_bytes, backup_mode="file")
else:
snapshot_size = "2G" # Default fallback
self.log(f"Creating snapshot: {snapshot_name} (size: {snapshot_size})")
# Pre-flight VG free space
try:
self._ensure_vg_has_space(vg_name, snapshot_size)
except Exception as e:
raise Exception(str(e))
success, output = self.run_command(f"lvcreate -L{snapshot_size} -s -n {snapshot_name} {source_lv}")
if not success:
raise Exception(f"Failed to create snapshot: {output}")
# Create temporary mount point
import tempfile
temp_mount = tempfile.mkdtemp(prefix="borg_files_backup_")
luks_device = None
try:
# Check if the snapshot is LUKS encrypted
self.log("Checking if volume is LUKS encrypted...")
# Resolve the real device path in case of symbolic links
real_device_success, real_device_path = self.run_command(f"readlink -f {self.current_snapshot}", show_output=False)
device_to_check = real_device_path.strip() if real_device_success else self.current_snapshot
self.log(f"Checking device: {device_to_check}")
check_success, check_output = self.run_command(f"file -s {device_to_check}", show_output=False)
self.log(f"File command result: success={check_success}, output='{check_output}'")
# Also try cryptsetup isLuks command which is more reliable
luks_check_success, luks_check_output = self.run_command(f"cryptsetup isLuks {device_to_check}", show_output=False)
self.log(f"cryptsetup isLuks result: success={luks_check_success}")
is_luks = (check_success and "LUKS" in check_output) or luks_check_success
self.log(f"Final LUKS detection result: {is_luks}")
if is_luks:
self.log("LUKS encryption detected - opening encrypted device")
luks_name = f"borg_backup_{lv_name}_{int(time.time())}"
luks_device = f"/dev/mapper/{luks_name}"
# Open LUKS device (this will prompt for password)
self.log("LUKS encryption detected - need passphrase")
# Get passphrase via GUI dialog
passphrase = self.get_luks_passphrase()
if not passphrase:
raise Exception("LUKS passphrase required but not provided")
self.log("Opening LUKS device with provided passphrase...")
success = self.open_luks_device(device_to_check, luks_name, passphrase)
if not success:
raise Exception(f"Failed to open LUKS device - check passphrase")
# Mount the decrypted device
self.log(f"Mounting decrypted device to {temp_mount}")
mount_opts = self._fs_mount_opts(luks_device)
success, output = self.run_command(f"mount -o {mount_opts} {luks_device} {temp_mount}")
if not success:
# Clean up LUKS device
self.run_command(f"cryptsetup luksClose {luks_name}", show_output=False)
raise Exception(f"Failed to mount decrypted device: {output}")
else:
# Mount the snapshot directly (non-encrypted)
self.log(f"Mounting snapshot to {temp_mount}")
mount_opts = self._fs_mount_opts(self.current_snapshot)
success, output = self.run_command(f"mount -o {mount_opts} {self.current_snapshot} {temp_mount}")
if not success:
raise Exception(f"Failed to mount snapshot: {output}")
# Create Borg backup of files
archive_name = f"files_{lv_name}_{time.strftime('%Y%m%d_%H%M%S')}"
self.log(f"Creating Borg archive (file-level): {archive_name}")
self.log("Backing up files from mounted snapshot...")
self.log("This is space-efficient and skips empty blocks")
borg_cmd = f"borg create --progress --stats --compression auto,zstd {repo_path}::{archive_name} {temp_mount}"
# Run borg with environment
process = subprocess.Popen(borg_cmd, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, text=True, env=borg_env)
# Stream output
for line in process.stdout:
self.log(line.strip())
process.wait()
if process.returncode != 0:
raise Exception("Borg file-level backup failed")
self.log("File-level Borg backup completed successfully")
finally:
# Cleanup: unmount and remove temp directory
self.log("Cleaning up mount point")
self.run_command(f"umount {temp_mount}", show_output=False)
# Close LUKS device if it was opened
if luks_device:
luks_name = luks_device.split('/')[-1]
self.log("Closing LUKS device")
self.run_command(f"cryptsetup luksClose {luks_name}", show_output=False)
os.rmdir(temp_mount)
# Remove snapshot
self.log("Cleaning up snapshot")
success, output = self.run_command(f"lvremove -f {self.current_snapshot}")
if not success:
self.log(f"Warning: Failed to remove snapshot: {output}")
else:
self.log("Snapshot cleaned up")
self.current_snapshot = None
def cleanup_on_error(self):
"""Clean up on error"""
if self.current_snapshot:
self.log("Attempting to clean up snapshot after error")
success, output = self.run_command(f"lvremove -f {self.current_snapshot}")
if success:
self.log("Snapshot cleaned up")
else:
self.log(f"Failed to clean up snapshot: {output}")
self.current_snapshot = None
def emergency_stop(self):
"""Emergency stop - kill any running processes"""
self.log("EMERGENCY STOP requested")
# Set stop flag for backup workers to check
self.backup_running = False
# Kill various backup-related processes
processes_to_kill = [
"pkill -f 'dd.*if=.*dev'", # dd processes
"pkill -f 'pv.*dev'", # pv processes
"pkill -f 'borg create'", # borg backup processes
"pkill -f 'borg init'", # borg init processes
"pkill -f 'lvcreate.*snap'", # snapshot creation
"pkill -f 'cryptsetup luksOpen'", # LUKS operations
]
for cmd in processes_to_kill:
self.run_command(cmd, show_output=False)
# Force cleanup
self.log("Performing emergency cleanup...")
self.cleanup_on_error()
# Reset UI immediately
self.reset_ui_state()
messagebox.showwarning("Emergency Stop",
"Backup process stopped!\n\n" +
"- All backup processes killed\n" +
"- Snapshots cleaned up\n" +
"- LUKS devices closed\n\n" +
"Check log for details.")
def cleanup_on_error(self):
"""Clean up resources on error or stop"""
try:
# Clean up any current snapshot
if hasattr(self, 'current_snapshot') and self.current_snapshot:
self.log(f"Emergency cleanup: removing snapshot {self.current_snapshot}")
self.run_command(f"lvremove -f {self.current_snapshot}", show_output=False)
self.current_snapshot = None
# Clean up any mount points
self.run_command("umount /tmp/borg_files_backup_* 2>/dev/null", show_output=False)
# Close any LUKS devices we might have opened
self.run_command("cryptsetup luksClose borg_backup_* 2>/dev/null", show_output=False)
# Remove any temp directories
self.run_command("rmdir /tmp/borg_files_backup_* 2>/dev/null", show_output=False)
except Exception as e:
self.log(f"Error during cleanup: {e}")
def manage_snapshots(self):
"""Open LVM snapshot management window"""
snapshot_window = tk.Toplevel(self.root)
snapshot_window.title("LVM Snapshot Manager")
snapshot_window.geometry("800x500")
# Main frame
frame = ttk.Frame(snapshot_window, padding="10")
frame.pack(fill=tk.BOTH, expand=True)
ttk.Label(frame, text="LVM Snapshots:").pack(anchor=tk.W, pady=(0, 5))
# Listbox for snapshots
list_frame = ttk.Frame(frame)
list_frame.pack(fill=tk.BOTH, expand=True, pady=(0, 10))
self.snapshot_listbox = tk.Listbox(list_frame, selectmode=tk.EXTENDED)
snap_scrollbar = ttk.Scrollbar(list_frame, orient="vertical", command=self.snapshot_listbox.yview)
self.snapshot_listbox.configure(yscrollcommand=snap_scrollbar.set)
self.snapshot_listbox.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
snap_scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
# Buttons
btn_frame = ttk.Frame(frame)
btn_frame.pack(fill=tk.X, pady=(10, 0))
ttk.Button(btn_frame, text="Refresh", command=self.refresh_snapshots).pack(side=tk.LEFT, padx=(0, 5))
ttk.Button(btn_frame, text="Remove Selected", command=self.remove_snapshots).pack(side=tk.LEFT, padx=5)
ttk.Button(btn_frame, text="Remove All", command=self.remove_all_snapshots).pack(side=tk.LEFT, padx=5)
ttk.Button(btn_frame, text="Close", command=snapshot_window.destroy).pack(side=tk.RIGHT)
# Load snapshots
self.refresh_snapshots()
def refresh_snapshots(self):
"""Refresh the snapshot list"""
try:
self.snapshot_listbox.delete(0, tk.END)
success, output = self.run_command("lvs --noheadings -o lv_name,vg_name,lv_size,origin | grep -E '_snap|snap_'", show_output=False)
if success and output.strip():
for line in output.strip().split('\n'):
if line.strip():
parts = line.strip().split()
if len(parts) >= 4:
lv_name = parts[0]
vg_name = parts[1]
lv_size = parts[2]
origin = parts[3]
display_text = f"/dev/{vg_name}/{lv_name} ({lv_size}) -> {origin}"
self.snapshot_listbox.insert(tk.END, display_text)
else:
self.snapshot_listbox.insert(tk.END, "No snapshots found")
except Exception as e:
messagebox.showerror("Error", f"Failed to list snapshots: {e}")
def remove_snapshots(self):
"""Remove selected snapshots"""
selected = self.snapshot_listbox.curselection()
if not selected:
messagebox.showwarning("Warning", "Please select snapshots to remove")
return
snapshot_paths = []
for idx in selected:
line = self.snapshot_listbox.get(idx)
if "No snapshots found" in line:
continue
# Extract /dev/vg/lv path from display text
path = line.split(' ')[0]
snapshot_paths.append(path)
if not snapshot_paths:
return
if messagebox.askyesno("Confirm", f"Remove {len(snapshot_paths)} snapshot(s)?\n\n" + "\n".join(snapshot_paths)):
for path in snapshot_paths:
success, output = self.run_command(f"lvremove -f {path}")
if success:
self.log(f"Removed snapshot: {path}")
else:
self.log(f"Failed to remove {path}: {output}")
self.refresh_snapshots()
def remove_all_snapshots(self):
"""Remove all snapshots after confirmation"""
if messagebox.askyesno("Confirm", "Remove ALL snapshots? This cannot be undone!"):
success, output = self.run_command("lvs --noheadings -o lv_path | grep -E '_snap|snap_' | xargs -r lvremove -f")
if success:
self.log("All snapshots removed")
else:
self.log(f"Error removing snapshots: {output}")
self.refresh_snapshots()
def manage_borg_repo(self):
"""Open Borg repository management window"""
# Get repo path
if not hasattr(self, 'repo_path_var') or not self.repo_path_var.get().strip():
messagebox.showerror("Error", "Please set a Borg repository path first")
return
repo_path = self.repo_path_var.get().strip()
# Check if repo exists
if not os.path.exists(repo_path):
messagebox.showerror("Error", f"Repository path does not exist: {repo_path}")
return
borg_window = tk.Toplevel(self.root)
borg_window.title(f"Borg Repository Manager - {repo_path}")
borg_window.geometry("900x600")
# Main frame
frame = ttk.Frame(borg_window, padding="10")
frame.pack(fill=tk.BOTH, expand=True)
# Repository info
info_frame = ttk.LabelFrame(frame, text="Repository Information", padding="5")
info_frame.pack(fill=tk.X, pady=(0, 10))
ttk.Label(info_frame, text=f"Path: {repo_path}").pack(anchor=tk.W)
# Archives list
ttk.Label(frame, text="Archives:").pack(anchor=tk.W, pady=(0, 5))
list_frame = ttk.Frame(frame)
list_frame.pack(fill=tk.BOTH, expand=True, pady=(0, 10))
self.archive_listbox = tk.Listbox(list_frame, selectmode=tk.EXTENDED)
arch_scrollbar = ttk.Scrollbar(list_frame, orient="vertical", command=self.archive_listbox.yview)
self.archive_listbox.configure(yscrollcommand=arch_scrollbar.set)
self.archive_listbox.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
arch_scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
# Buttons
btn_frame = ttk.Frame(frame)
btn_frame.pack(fill=tk.X, pady=(10, 0))
ttk.Button(btn_frame, text="Refresh", command=lambda: self.refresh_archives(repo_path)).pack(side=tk.LEFT, padx=(0, 5))
ttk.Button(btn_frame, text="Mount Archive", command=lambda: self.mount_archive(repo_path)).pack(side=tk.LEFT, padx=5)
ttk.Button(btn_frame, text="Delete Selected", command=lambda: self.delete_archives(repo_path)).pack(side=tk.LEFT, padx=5)
ttk.Button(btn_frame, text="Repository Info", command=lambda: self.show_repo_info(repo_path)).pack(side=tk.LEFT, padx=5)
ttk.Button(btn_frame, text="Close", command=borg_window.destroy).pack(side=tk.RIGHT)
# Store reference for other methods
self.current_repo_path = repo_path
# Load archives
self.refresh_archives(repo_path)
def reset_ui_state(self):
"""Reset UI to normal state"""
self.backup_running = False
self.backup_btn.config(state="normal")
self.stop_btn.config(state="disabled")
self.progress.stop()
def refresh_archives(self, repo_path):
"""Refresh the archive list"""
try:
self.archive_listbox.delete(0, tk.END)
# Set up environment for Borg
borg_env = os.environ.copy()
if hasattr(self, 'passphrase_var') and self.passphrase_var.get():
borg_env['BORG_PASSPHRASE'] = self.passphrase_var.get()
cmd = f"borg list {repo_path}"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True, env=borg_env)
if result.returncode == 0:
if result.stdout.strip():
for line in result.stdout.strip().split('\n'):
if line.strip():
self.archive_listbox.insert(tk.END, line.strip())
else:
self.archive_listbox.insert(tk.END, "No archives found")
else:
self.archive_listbox.insert(tk.END, f"Error: {result.stderr}")
except Exception as e:
messagebox.showerror("Error", f"Failed to list archives: {e}")
def mount_archive(self, repo_path):
"""Mount selected archive"""
selected = self.archive_listbox.curselection()
if not selected:
messagebox.showwarning("Warning", "Please select an archive to mount")
return
archive_line = self.archive_listbox.get(selected[0])
if "No archives found" in archive_line or "Error:" in archive_line:
return
# Extract archive name (first word)
archive_name = archive_line.split()[0]
# Ask for mount point
from tkinter import filedialog
mount_point = filedialog.askdirectory(title="Select mount point directory")
if not mount_point:
return
try:
# Set up environment
borg_env = os.environ.copy()
if hasattr(self, 'passphrase_var') and self.passphrase_var.get():
borg_env['BORG_PASSPHRASE'] = self.passphrase_var.get()
cmd = f"borg mount {repo_path}::{archive_name} {mount_point}"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True, env=borg_env)
if result.returncode == 0:
messagebox.showinfo("Success", f"Archive mounted at: {mount_point}\n\nTo unmount: fusermount -u {mount_point}")
self.log(f"Mounted archive {archive_name} at {mount_point}")
else:
messagebox.showerror("Error", f"Failed to mount archive: {result.stderr}")
except Exception as e:
messagebox.showerror("Error", f"Failed to mount archive: {e}")
def delete_archives(self, repo_path):
"""Delete selected archives"""
selected = self.archive_listbox.curselection()
if not selected:
messagebox.showwarning("Warning", "Please select archives to delete")
return
archive_names = []
for idx in selected:
line = self.archive_listbox.get(idx)
if "No archives found" in line or "Error:" in line:
continue
archive_names.append(line.split()[0])
if not archive_names:
return
if messagebox.askyesno("Confirm", f"Delete {len(archive_names)} archive(s)?\n\n" + "\n".join(archive_names) + "\n\nThis cannot be undone!"):
try:
borg_env = os.environ.copy()
if hasattr(self, 'passphrase_var') and self.passphrase_var.get():
borg_env['BORG_PASSPHRASE'] = self.passphrase_var.get()
for archive_name in archive_names:
cmd = f"borg delete {repo_path}::{archive_name}"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True, env=borg_env)
if result.returncode == 0:
self.log(f"Deleted archive: {archive_name}")
else:
self.log(f"Failed to delete {archive_name}: {result.stderr}")
self.refresh_archives(repo_path)
except Exception as e:
messagebox.showerror("Error", f"Failed to delete archives: {e}")
def show_repo_info(self, repo_path):
"""Show repository information"""
try:
borg_env = os.environ.copy()
if hasattr(self, 'passphrase_var') and self.passphrase_var.get():
borg_env['BORG_PASSPHRASE'] = self.passphrase_var.get()
cmd = f"borg info {repo_path}"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True, env=borg_env)
if result.returncode == 0:
info_window = tk.Toplevel(self.root)
info_window.title("Repository Information")
info_window.geometry("600x400")
text_widget = tk.Text(info_window, wrap=tk.WORD)
scrollbar = ttk.Scrollbar(info_window, orient="vertical", command=text_widget.yview)
text_widget.configure(yscrollcommand=scrollbar.set)
text_widget.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
text_widget.insert(tk.END, result.stdout)
text_widget.config(state=tk.DISABLED)
else:
messagebox.showerror("Error", f"Failed to get repository info: {result.stderr}")
except Exception as e:
messagebox.showerror("Error", f"Failed to get repository info: {e}")
def main():
# Check if running as root
if os.geteuid() != 0:
print("This application needs to run as root for LVM operations.")
print("Please run: sudo python3 simple_backup_gui.py")
return
root = tk.Tk()
app = SimpleBackupGUI(root)
root.mainloop()
if __name__ == "__main__":
main()