- GUI and CLI backup/restore functionality - Auto-detection of internal system drive - Smart drive classification (internal vs external) - Reboot integration for clean backups/restores - Portable tools that survive cloning operations - Tool preservation system for external M.2 SSD - Complete disaster recovery workflow - Safety features and multiple confirmations - Desktop integration and launcher scripts - Comprehensive documentation
536 lines
22 KiB
Python
Executable File
536 lines
22 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Linux System Backup Tool with GUI
|
|
A tool for creating full system backups to external M.2 SSD with reboot functionality.
|
|
"""
|
|
|
|
import tkinter as tk
|
|
from tkinter import ttk, messagebox, scrolledtext
|
|
import subprocess
|
|
import threading
|
|
import os
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
|
|
class BackupManager:
|
|
def __init__(self):
|
|
self.root = tk.Tk()
|
|
self.root.title("System Backup Manager")
|
|
self.root.geometry("600x500")
|
|
self.root.resizable(True, True)
|
|
|
|
# Variables
|
|
self.source_drive = tk.StringVar() # Will be auto-detected
|
|
self.target_drive = tk.StringVar()
|
|
self.operation_running = False
|
|
self.operation_type = "backup" # "backup" or "restore"
|
|
|
|
self.setup_ui()
|
|
self.detect_drives()
|
|
|
|
def setup_ui(self):
|
|
"""Setup the user interface"""
|
|
# Main frame
|
|
main_frame = ttk.Frame(self.root, padding="10")
|
|
main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
|
|
|
|
# Configure grid weights
|
|
self.root.columnconfigure(0, weight=1)
|
|
self.root.rowconfigure(0, weight=1)
|
|
main_frame.columnconfigure(1, weight=1)
|
|
|
|
# Title
|
|
title_label = ttk.Label(main_frame, text="System Backup Manager",
|
|
font=("Arial", 16, "bold"))
|
|
title_label.grid(row=0, column=0, columnspan=2, pady=(0, 20))
|
|
|
|
# Source drive selection
|
|
ttk.Label(main_frame, text="Source Drive (Internal):").grid(row=1, column=0, sticky=tk.W, pady=5)
|
|
source_combo = ttk.Combobox(main_frame, textvariable=self.source_drive, width=40)
|
|
source_combo.grid(row=1, column=1, sticky=(tk.W, tk.E), pady=5, padx=(10, 0))
|
|
|
|
# Target drive selection
|
|
ttk.Label(main_frame, text="Target Drive (External M.2):").grid(row=2, column=0, sticky=tk.W, pady=5)
|
|
target_combo = ttk.Combobox(main_frame, textvariable=self.target_drive, width=40)
|
|
target_combo.grid(row=2, column=1, sticky=(tk.W, tk.E), pady=5, padx=(10, 0))
|
|
|
|
# Refresh drives button
|
|
refresh_btn = ttk.Button(main_frame, text="Refresh Drives", command=self.detect_drives)
|
|
refresh_btn.grid(row=3, column=1, sticky=tk.E, pady=10, padx=(10, 0))
|
|
|
|
# Status frame
|
|
status_frame = ttk.LabelFrame(main_frame, text="Status", padding="10")
|
|
status_frame.grid(row=4, column=0, columnspan=2, sticky=(tk.W, tk.E, tk.N, tk.S), pady=10)
|
|
status_frame.columnconfigure(0, weight=1)
|
|
status_frame.rowconfigure(0, weight=1)
|
|
|
|
# Log area
|
|
self.log_text = scrolledtext.ScrolledText(status_frame, height=15, width=60)
|
|
self.log_text.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
|
|
|
|
# Button frame
|
|
button_frame = ttk.Frame(main_frame)
|
|
button_frame.grid(row=5, column=0, columnspan=2, pady=20)
|
|
|
|
# Backup buttons
|
|
backup_frame = ttk.LabelFrame(button_frame, text="Backup Operations", padding="10")
|
|
backup_frame.pack(side=tk.LEFT, padx=5)
|
|
|
|
self.backup_btn = ttk.Button(backup_frame, text="Start Backup",
|
|
command=self.start_backup, style="Accent.TButton")
|
|
self.backup_btn.pack(side=tk.TOP, pady=2)
|
|
|
|
self.reboot_backup_btn = ttk.Button(backup_frame, text="Reboot & Backup",
|
|
command=self.reboot_and_backup)
|
|
self.reboot_backup_btn.pack(side=tk.TOP, pady=2)
|
|
|
|
# Restore buttons
|
|
restore_frame = ttk.LabelFrame(button_frame, text="Restore Operations", padding="10")
|
|
restore_frame.pack(side=tk.LEFT, padx=5)
|
|
|
|
self.restore_btn = ttk.Button(restore_frame, text="Restore from External",
|
|
command=self.start_restore)
|
|
self.restore_btn.pack(side=tk.TOP, pady=2)
|
|
|
|
self.reboot_restore_btn = ttk.Button(restore_frame, text="Reboot & Restore",
|
|
command=self.reboot_and_restore)
|
|
self.reboot_restore_btn.pack(side=tk.TOP, pady=2)
|
|
|
|
# Control buttons
|
|
control_frame = ttk.Frame(button_frame)
|
|
control_frame.pack(side=tk.LEFT, padx=5)
|
|
|
|
self.stop_btn = ttk.Button(control_frame, text="Stop", command=self.stop_operation, state="disabled")
|
|
self.stop_btn.pack(side=tk.TOP, pady=2)
|
|
|
|
self.swap_btn = ttk.Button(control_frame, text="Swap Source↔Target", command=self.swap_drives)
|
|
self.swap_btn.pack(side=tk.TOP, pady=2)
|
|
|
|
# Progress bar
|
|
self.progress = ttk.Progressbar(main_frame, mode='indeterminate')
|
|
self.progress.grid(row=6, column=0, columnspan=2, sticky=(tk.W, tk.E), pady=10)
|
|
|
|
# Store combo references for updating
|
|
self.source_combo = source_combo
|
|
self.target_combo = target_combo
|
|
|
|
# Add initial log message
|
|
self.log("System Backup Manager initialized")
|
|
self.log("Select source and target drives, then click 'Start Backup' or 'Reboot & Backup'")
|
|
|
|
def log(self, message):
|
|
"""Add message to log with timestamp"""
|
|
timestamp = time.strftime("%H:%M:%S")
|
|
self.log_text.insert(tk.END, f"[{timestamp}] {message}\n")
|
|
self.log_text.see(tk.END)
|
|
self.root.update_idletasks()
|
|
|
|
def get_root_drive(self):
|
|
"""Get the drive containing the root filesystem"""
|
|
try:
|
|
# Find the device containing the root filesystem
|
|
result = subprocess.run(['df', '/'], capture_output=True, text=True)
|
|
lines = result.stdout.strip().split('\n')
|
|
if len(lines) > 1:
|
|
device = lines[1].split()[0]
|
|
# Remove partition number to get base device
|
|
import re
|
|
base_device = re.sub(r'[0-9]+$', '', device)
|
|
# Handle nvme drives (e.g., /dev/nvme0n1p1 -> /dev/nvme0n1)
|
|
base_device = re.sub(r'p[0-9]+$', '', base_device)
|
|
return base_device
|
|
except Exception as e:
|
|
self.log(f"Error detecting root drive: {e}")
|
|
return None
|
|
|
|
def detect_drives(self):
|
|
"""Detect available drives"""
|
|
try:
|
|
self.log("Detecting available drives...")
|
|
|
|
# First, detect the root filesystem drive
|
|
root_drive = self.get_root_drive()
|
|
if root_drive:
|
|
self.log(f"Detected root filesystem on: {root_drive}")
|
|
|
|
# Get block devices with more information
|
|
result = subprocess.run(['lsblk', '-d', '-n', '-o', 'NAME,SIZE,TYPE,TRAN,HOTPLUG'],
|
|
capture_output=True, text=True)
|
|
|
|
internal_drives = []
|
|
external_drives = []
|
|
all_drives = []
|
|
root_drive_info = None
|
|
|
|
for line in result.stdout.strip().split('\n'):
|
|
if line and 'disk' in line:
|
|
parts = line.split()
|
|
if len(parts) >= 3:
|
|
name = f"/dev/{parts[0]}"
|
|
size = parts[1]
|
|
transport = parts[3] if len(parts) > 3 else ""
|
|
hotplug = parts[4] if len(parts) > 4 else "0"
|
|
|
|
drive_info = f"{name} ({size})"
|
|
all_drives.append(drive_info)
|
|
|
|
# Check if this is the root drive and mark it
|
|
if root_drive and name == root_drive:
|
|
drive_info = f"{name} ({size}) [SYSTEM]"
|
|
root_drive_info = drive_info
|
|
self.log(f"Root drive found: {drive_info}")
|
|
|
|
# Classify drives
|
|
if transport in ['usb', 'uas'] or hotplug == "1":
|
|
external_drives.append(drive_info)
|
|
self.log(f"External drive detected: {drive_info}")
|
|
else:
|
|
internal_drives.append(drive_info)
|
|
self.log(f"Internal drive detected: {drive_info}")
|
|
|
|
# Auto-select root drive as source if found, otherwise first internal
|
|
if root_drive_info:
|
|
self.source_drive.set(root_drive_info)
|
|
self.log(f"Auto-selected root drive as source: {root_drive_info}")
|
|
elif internal_drives:
|
|
self.source_drive.set(internal_drives[0])
|
|
self.log(f"Auto-selected internal drive as source: {internal_drives[0]}")
|
|
|
|
# Update combo boxes - put internal drives first for source
|
|
self.source_combo['values'] = internal_drives + external_drives
|
|
self.target_combo['values'] = external_drives + internal_drives # Prefer external for target
|
|
|
|
# If there's an external drive, auto-select it as target
|
|
if external_drives:
|
|
self.target_drive.set(external_drives[0])
|
|
self.log(f"Auto-selected external drive as target: {external_drives[0]}")
|
|
|
|
self.log(f"Found {len(internal_drives)} internal and {len(external_drives)} external drives")
|
|
|
|
except Exception as e:
|
|
self.log(f"Error detecting drives: {e}")
|
|
|
|
def validate_selection(self):
|
|
"""Validate drive selection"""
|
|
source = self.source_drive.get().split()[0] if self.source_drive.get() else ""
|
|
target = self.target_drive.get().split()[0] if self.target_drive.get() else ""
|
|
|
|
if not source:
|
|
messagebox.showerror("Error", "Please select a source drive")
|
|
return False
|
|
|
|
if not target:
|
|
messagebox.showerror("Error", "Please select a target drive")
|
|
return False
|
|
|
|
if source == target:
|
|
messagebox.showerror("Error", "Source and target drives cannot be the same")
|
|
return False
|
|
|
|
# Check if drives exist
|
|
if not os.path.exists(source):
|
|
messagebox.showerror("Error", f"Source drive {source} does not exist")
|
|
return False
|
|
|
|
if not os.path.exists(target):
|
|
messagebox.showerror("Error", f"Target drive {target} does not exist")
|
|
return False
|
|
|
|
return True
|
|
|
|
def swap_drives(self):
|
|
"""Swap source and target drives"""
|
|
source = self.source_drive.get()
|
|
target = self.target_drive.get()
|
|
|
|
self.source_drive.set(target)
|
|
self.target_drive.set(source)
|
|
|
|
self.log("Swapped source and target drives")
|
|
|
|
def start_restore(self):
|
|
"""Start the restore process"""
|
|
if not self.validate_selection():
|
|
return
|
|
|
|
if self.operation_running:
|
|
self.log("Operation already running!")
|
|
return
|
|
|
|
# Confirm restore
|
|
source = self.source_drive.get().split()[0]
|
|
target = self.target_drive.get().split()[0]
|
|
|
|
result = messagebox.askyesno("⚠️ CONFIRM RESTORE ⚠️",
|
|
f"This will RESTORE from {source} to {target}.\n\n"
|
|
f"🚨 CRITICAL WARNING 🚨\n"
|
|
f"This will COMPLETELY OVERWRITE {target}!\n"
|
|
f"ALL DATA on {target} will be DESTROYED!\n\n"
|
|
f"This should typically restore FROM external TO internal.\n"
|
|
f"Make sure you have the drives selected correctly!\n\n"
|
|
f"Are you absolutely sure you want to continue?")
|
|
|
|
if not result:
|
|
return
|
|
|
|
# Second confirmation for restore
|
|
result2 = messagebox.askyesno("FINAL CONFIRMATION",
|
|
f"LAST CHANCE TO CANCEL!\n\n"
|
|
f"Restoring from: {source}\n"
|
|
f"Overwriting: {target}\n\n"
|
|
f"Type YES to continue or NO to cancel.")
|
|
|
|
if not result2:
|
|
return
|
|
|
|
self.operation_type = "restore"
|
|
self.start_operation(source, target)
|
|
|
|
def reboot_and_restore(self):
|
|
"""Schedule reboot and restore"""
|
|
if not self.validate_selection():
|
|
return
|
|
|
|
result = messagebox.askyesno("⚠️ CONFIRM REBOOT & RESTORE ⚠️",
|
|
"This will:\n"
|
|
"1. Save current session\n"
|
|
"2. Reboot the system\n"
|
|
"3. Start RESTORE after reboot\n\n"
|
|
"🚨 WARNING: This will OVERWRITE your internal drive! 🚨\n\n"
|
|
"Continue?")
|
|
|
|
if not result:
|
|
return
|
|
|
|
try:
|
|
# Create restore script for after reboot
|
|
script_content = self.create_reboot_operation_script("restore")
|
|
|
|
# Save script
|
|
script_path = "/tmp/restore_after_reboot.sh"
|
|
with open(script_path, 'w') as f:
|
|
f.write(script_content)
|
|
|
|
os.chmod(script_path, 0o755)
|
|
|
|
self.log("Reboot restore script created")
|
|
self.log("System will reboot in 5 seconds...")
|
|
|
|
# Schedule reboot
|
|
subprocess.run(['sudo', 'shutdown', '-r', '+1'], check=True)
|
|
|
|
self.log("Reboot scheduled. Restore will start automatically after reboot.")
|
|
|
|
except Exception as e:
|
|
self.log(f"Error scheduling reboot: {e}")
|
|
messagebox.showerror("Error", f"Failed to schedule reboot: {e}")
|
|
|
|
def start_backup(self):
|
|
"""Start the backup process"""
|
|
if not self.validate_selection():
|
|
return
|
|
|
|
if self.operation_running:
|
|
self.log("Operation already running!")
|
|
return
|
|
|
|
# Confirm backup
|
|
source = self.source_drive.get().split()[0]
|
|
target = self.target_drive.get().split()[0]
|
|
|
|
result = messagebox.askyesno("Confirm Backup",
|
|
f"This will clone {source} to {target}.\n\n"
|
|
f"WARNING: All data on {target} will be destroyed!\n\n"
|
|
f"Are you sure you want to continue?")
|
|
|
|
if not result:
|
|
return
|
|
|
|
self.operation_type = "backup"
|
|
self.start_operation(source, target)
|
|
|
|
def start_operation(self, source, target):
|
|
"""Start backup or restore operation"""
|
|
# Start operation in thread
|
|
self.operation_running = True
|
|
self.backup_btn.config(state="disabled")
|
|
self.reboot_backup_btn.config(state="disabled")
|
|
self.restore_btn.config(state="disabled")
|
|
self.reboot_restore_btn.config(state="disabled")
|
|
self.stop_btn.config(state="normal")
|
|
self.progress.start()
|
|
|
|
operation_thread = threading.Thread(target=self.run_operation, args=(source, target, self.operation_type))
|
|
operation_thread.daemon = True
|
|
operation_thread.start()
|
|
|
|
def run_operation(self, source, target, operation_type):
|
|
"""Run the actual backup or restore process"""
|
|
try:
|
|
if operation_type == "backup":
|
|
self.log(f"Starting backup from {source} to {target}")
|
|
else:
|
|
self.log(f"Starting restore from {source} to {target}")
|
|
|
|
self.log("This may take a while depending on drive size...")
|
|
|
|
# Use dd for cloning
|
|
cmd = [
|
|
'sudo', 'dd',
|
|
f'if={source}',
|
|
f'of={target}',
|
|
'bs=4M',
|
|
'status=progress',
|
|
'conv=fdatasync'
|
|
]
|
|
|
|
self.log(f"Running command: {' '.join(cmd)}")
|
|
|
|
process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT, text=True)
|
|
|
|
# Read output
|
|
for line in process.stdout:
|
|
if self.operation_running: # Check if we should continue
|
|
self.log(line.strip())
|
|
else:
|
|
process.terminate()
|
|
break
|
|
|
|
process.wait()
|
|
|
|
if process.returncode == 0 and self.operation_running:
|
|
if operation_type == "backup":
|
|
self.log("Backup completed successfully!")
|
|
|
|
# Restore backup tools to external drive
|
|
try:
|
|
self.log("Preserving backup tools on external drive...")
|
|
restore_script = os.path.join(os.path.dirname(__file__), "restore_tools_after_backup.sh")
|
|
if os.path.exists(restore_script):
|
|
subprocess.run([restore_script, target], check=False, timeout=60)
|
|
self.log("Backup tools preserved on external drive")
|
|
else:
|
|
self.log("Warning: Tool restoration script not found")
|
|
except Exception as e:
|
|
self.log(f"Warning: Could not preserve tools on external drive: {e}")
|
|
|
|
messagebox.showinfo("Success", "Backup completed successfully!\n\nBackup tools have been preserved on the external drive.")
|
|
else:
|
|
self.log("Restore completed successfully!")
|
|
messagebox.showinfo("Success", "Restore completed successfully!")
|
|
elif not self.operation_running:
|
|
self.log(f"{operation_type.capitalize()} was cancelled by user")
|
|
else:
|
|
self.log(f"{operation_type.capitalize()} failed with return code: {process.returncode}")
|
|
messagebox.showerror("Error", f"{operation_type.capitalize()} failed! Check the log for details.")
|
|
|
|
except Exception as e:
|
|
self.log(f"Error during {operation_type}: {e}")
|
|
messagebox.showerror("Error", f"{operation_type.capitalize()} failed: {e}")
|
|
|
|
finally:
|
|
self.operation_running = False
|
|
self.backup_btn.config(state="normal")
|
|
self.reboot_backup_btn.config(state="normal")
|
|
self.restore_btn.config(state="normal")
|
|
self.reboot_restore_btn.config(state="normal")
|
|
self.stop_btn.config(state="disabled")
|
|
self.progress.stop()
|
|
|
|
def stop_operation(self):
|
|
"""Stop the current operation"""
|
|
self.operation_running = False
|
|
self.log("Stopping operation...")
|
|
|
|
def reboot_and_backup(self):
|
|
"""Schedule reboot and backup"""
|
|
if not self.validate_selection():
|
|
return
|
|
|
|
result = messagebox.askyesno("Confirm Reboot & Backup",
|
|
"This will:\n"
|
|
"1. Save current session\n"
|
|
"2. Reboot the system\n"
|
|
"3. Start backup after reboot\n\n"
|
|
"Continue?")
|
|
|
|
if not result:
|
|
return
|
|
|
|
try:
|
|
# Create backup script for after reboot
|
|
script_content = self.create_reboot_operation_script("backup")
|
|
|
|
# Save script
|
|
script_path = "/tmp/backup_after_reboot.sh"
|
|
with open(script_path, 'w') as f:
|
|
f.write(script_content)
|
|
|
|
os.chmod(script_path, 0o755)
|
|
|
|
self.log("Reboot backup script created")
|
|
self.log("System will reboot in 5 seconds...")
|
|
|
|
# Schedule reboot
|
|
subprocess.run(['sudo', 'shutdown', '-r', '+1'], check=True)
|
|
|
|
self.log("Reboot scheduled. Backup will start automatically after reboot.")
|
|
|
|
except Exception as e:
|
|
self.log(f"Error scheduling reboot: {e}")
|
|
messagebox.showerror("Error", f"Failed to schedule reboot: {e}")
|
|
|
|
def create_reboot_operation_script(self, operation_type):
|
|
"""Create script to run operation after reboot"""
|
|
source = self.source_drive.get().split()[0]
|
|
target = self.target_drive.get().split()[0]
|
|
|
|
if operation_type == "backup":
|
|
action_desc = "backup"
|
|
success_msg = "Backup Complete"
|
|
fail_msg = "Backup Failed"
|
|
else:
|
|
action_desc = "restore"
|
|
success_msg = "Restore Complete"
|
|
fail_msg = "Restore Failed"
|
|
|
|
script = f"""#!/bin/bash
|
|
# Auto-generated {action_desc} script
|
|
|
|
echo "Starting {action_desc} after reboot..."
|
|
echo "Source: {source}"
|
|
echo "Target: {target}"
|
|
|
|
# Wait for system to fully boot
|
|
sleep 30
|
|
|
|
# Run {action_desc}
|
|
sudo dd if={source} of={target} bs=4M status=progress conv=fdatasync
|
|
|
|
if [ $? -eq 0 ]; then
|
|
echo "{action_desc.capitalize()} completed successfully!"
|
|
notify-send "{success_msg}" "System {action_desc} finished successfully"
|
|
else
|
|
echo "{action_desc.capitalize()} failed!"
|
|
notify-send "{fail_msg}" "System {action_desc} encountered an error"
|
|
fi
|
|
|
|
# Clean up
|
|
rm -f /tmp/{action_desc}_after_reboot.sh
|
|
"""
|
|
return script
|
|
|
|
def run(self):
|
|
"""Start the GUI application"""
|
|
self.root.mainloop()
|
|
|
|
if __name__ == "__main__":
|
|
# Check if running as root for certain operations
|
|
if os.geteuid() != 0:
|
|
print("Note: Some operations may require sudo privileges")
|
|
|
|
app = BackupManager()
|
|
app.run()
|