#!/usr/bin/env python3 """ Linux System Backup Tool with GUI A tool for creating full system backups to external M.2 SSD with reboot functionality. """ import tkinter as tk from tkinter import ttk, messagebox, scrolledtext import subprocess import threading import os import sys import time from pathlib import Path class BackupManager: def __init__(self): self.root = tk.Tk() self.root.title("System Backup Manager") self.root.geometry("600x500") self.root.resizable(True, True) # Variables self.source_drive = tk.StringVar() # Will be auto-detected self.target_drive = tk.StringVar() self.operation_running = False self.operation_type = "backup" # "backup" or "restore" self.sync_mode = "full" # "full", "sync", or "auto" self.setup_ui() self.detect_drives() def setup_ui(self): """Setup the user interface""" # Main frame main_frame = ttk.Frame(self.root, padding="10") main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S)) # Configure grid weights self.root.columnconfigure(0, weight=1) self.root.rowconfigure(0, weight=1) main_frame.columnconfigure(1, weight=1) # Title title_label = ttk.Label(main_frame, text="System Backup Manager", font=("Arial", 16, "bold")) title_label.grid(row=0, column=0, columnspan=2, pady=(0, 20)) # Source drive selection ttk.Label(main_frame, text="Source Drive (Internal):").grid(row=1, column=0, sticky=tk.W, pady=5) source_combo = ttk.Combobox(main_frame, textvariable=self.source_drive, width=40) source_combo.grid(row=1, column=1, sticky=(tk.W, tk.E), pady=5, padx=(10, 0)) # Target drive selection ttk.Label(main_frame, text="Target Drive (External M.2):").grid(row=2, column=0, sticky=tk.W, pady=5) target_combo = ttk.Combobox(main_frame, textvariable=self.target_drive, width=40) target_combo.grid(row=2, column=1, sticky=(tk.W, tk.E), pady=5, padx=(10, 0)) # Refresh drives button refresh_btn = ttk.Button(main_frame, text="Refresh Drives", command=self.detect_drives) refresh_btn.grid(row=3, column=1, sticky=tk.E, pady=10, padx=(10, 0)) # Status frame status_frame = ttk.LabelFrame(main_frame, text="Status", padding="10") status_frame.grid(row=4, column=0, columnspan=2, sticky=(tk.W, tk.E, tk.N, tk.S), pady=10) status_frame.columnconfigure(0, weight=1) status_frame.rowconfigure(0, weight=1) # Log area self.log_text = scrolledtext.ScrolledText(status_frame, height=15, width=60) self.log_text.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S)) # Button frame button_frame = ttk.Frame(main_frame) button_frame.grid(row=5, column=0, columnspan=2, pady=20) # Backup buttons backup_frame = ttk.LabelFrame(button_frame, text="Backup Operations", padding="10") backup_frame.pack(side=tk.LEFT, padx=5) self.sync_backup_btn = ttk.Button(backup_frame, text="Smart Sync Backup", command=self.smart_sync_backup, style="Accent.TButton") self.sync_backup_btn.pack(side=tk.TOP, pady=2) self.backup_btn = ttk.Button(backup_frame, text="Full Clone Backup", command=self.start_backup) self.backup_btn.pack(side=tk.TOP, pady=2) self.reboot_backup_btn = ttk.Button(backup_frame, text="Reboot & Full Clone", command=self.reboot_and_backup) self.reboot_backup_btn.pack(side=tk.TOP, pady=2) # Restore buttons restore_frame = ttk.LabelFrame(button_frame, text="Restore Operations", padding="10") restore_frame.pack(side=tk.LEFT, padx=5) self.restore_btn = ttk.Button(restore_frame, text="Restore from External", command=self.start_restore) self.restore_btn.pack(side=tk.TOP, pady=2) self.reboot_restore_btn = ttk.Button(restore_frame, text="Reboot & Restore", command=self.reboot_and_restore) self.reboot_restore_btn.pack(side=tk.TOP, pady=2) # Control buttons control_frame = ttk.Frame(button_frame) control_frame.pack(side=tk.LEFT, padx=5) self.stop_btn = ttk.Button(control_frame, text="Stop", command=self.stop_operation, state="disabled") self.stop_btn.pack(side=tk.TOP, pady=2) self.swap_btn = ttk.Button(control_frame, text="Swap Sourceโ†”Target", command=self.swap_drives) self.swap_btn.pack(side=tk.TOP, pady=2) self.analyze_btn = ttk.Button(control_frame, text="Analyze Changes", command=self.analyze_changes) self.analyze_btn.pack(side=tk.TOP, pady=2) # Progress bar self.progress = ttk.Progressbar(main_frame, mode='indeterminate') self.progress.grid(row=6, column=0, columnspan=2, sticky=(tk.W, tk.E), pady=10) # Store combo references for updating self.source_combo = source_combo self.target_combo = target_combo # Add initial log message self.log("System Backup Manager initialized") self.log("Select source and target drives, then click 'Start Backup' or 'Reboot & Backup'") def log(self, message): """Add message to log with timestamp""" timestamp = time.strftime("%H:%M:%S") self.log_text.insert(tk.END, f"[{timestamp}] {message}\n") self.log_text.see(tk.END) self.root.update_idletasks() def get_root_drive(self): """Get the drive containing the root filesystem""" try: # Find the device containing the root filesystem result = subprocess.run(['df', '/'], capture_output=True, text=True) lines = result.stdout.strip().split('\n') if len(lines) > 1: device = lines[1].split()[0] # Remove partition number to get base device import re base_device = re.sub(r'[0-9]+$', '', device) # Handle nvme drives (e.g., /dev/nvme0n1p1 -> /dev/nvme0n1) base_device = re.sub(r'p[0-9]+$', '', base_device) return base_device except Exception as e: self.log(f"Error detecting root drive: {e}") return None def detect_drives(self): """Detect available drives""" try: self.log("Detecting available drives...") # First, detect the root filesystem drive root_drive = self.get_root_drive() if root_drive: self.log(f"Detected root filesystem on: {root_drive}") # Get block devices with more information result = subprocess.run(['lsblk', '-d', '-n', '-o', 'NAME,SIZE,TYPE,TRAN,HOTPLUG'], capture_output=True, text=True) internal_drives = [] external_drives = [] all_drives = [] root_drive_info = None for line in result.stdout.strip().split('\n'): if line and 'disk' in line: parts = line.split() if len(parts) >= 3: name = f"/dev/{parts[0]}" size = parts[1] transport = parts[3] if len(parts) > 3 else "" hotplug = parts[4] if len(parts) > 4 else "0" drive_info = f"{name} ({size})" all_drives.append(drive_info) # Check if this is the root drive and mark it if root_drive and name == root_drive: drive_info = f"{name} ({size}) [SYSTEM]" root_drive_info = drive_info self.log(f"Root drive found: {drive_info}") # Classify drives if transport in ['usb', 'uas'] or hotplug == "1": external_drives.append(drive_info) self.log(f"External drive detected: {drive_info}") else: internal_drives.append(drive_info) self.log(f"Internal drive detected: {drive_info}") # Auto-select root drive as source if found, otherwise first internal if root_drive_info: self.source_drive.set(root_drive_info) self.log(f"Auto-selected root drive as source: {root_drive_info}") elif internal_drives: self.source_drive.set(internal_drives[0]) self.log(f"Auto-selected internal drive as source: {internal_drives[0]}") # Update combo boxes - put internal drives first for source self.source_combo['values'] = internal_drives + external_drives self.target_combo['values'] = external_drives + internal_drives # Prefer external for target # If there's an external drive, auto-select it as target if external_drives: self.target_drive.set(external_drives[0]) self.log(f"Auto-selected external drive as target: {external_drives[0]}") self.log(f"Found {len(internal_drives)} internal and {len(external_drives)} external drives") except Exception as e: self.log(f"Error detecting drives: {e}") def validate_selection(self): """Validate drive selection""" source = self.source_drive.get().split()[0] if self.source_drive.get() else "" target = self.target_drive.get().split()[0] if self.target_drive.get() else "" if not source: messagebox.showerror("Error", "Please select a source drive") return False if not target: messagebox.showerror("Error", "Please select a target drive") return False if source == target: messagebox.showerror("Error", "Source and target drives cannot be the same") return False # Check if drives exist if not os.path.exists(source): messagebox.showerror("Error", f"Source drive {source} does not exist") return False if not os.path.exists(target): messagebox.showerror("Error", f"Target drive {target} does not exist") return False return True def analyze_changes(self): """Analyze changes between source and target drives""" if not self.validate_selection(): return # Get drive paths source = self.source_var.get().split()[0] target = self.target_var.get().split()[0] self.run_backup_script("analyze", source, target) def run_change_analysis(self, source, target): """Run change analysis in background""" try: # Check if target has existing backup backup_info = self.check_existing_backup(target) if not backup_info['has_backup']: self.log("No existing backup found. Full clone required.") return self.log(f"Found existing backup from: {backup_info['backup_date']}") # Mount both filesystems to compare changes = self.compare_filesystems(source, target) self.log(f"Analysis complete:") self.log(f" Files changed: {changes['files_changed']}") self.log(f" Files added: {changes['files_added']}") self.log(f" Files deleted: {changes['files_deleted']}") self.log(f" Total size changed: {changes['size_changed_mb']:.1f} MB") self.log(f" Recommended action: {changes['recommendation']}") # Show recommendation if changes['recommendation'] == 'sync': messagebox.showinfo("Analysis Complete", f"Smart Sync Recommended\n\n" f"Changes detected: {changes['files_changed']} files\n" f"Size to sync: {changes['size_changed_mb']:.1f} MB\n" f"Estimated time: {changes['estimated_time_min']:.1f} minutes\n\n" f"This is much faster than full clone!") else: messagebox.showinfo("Analysis Complete", f"Full Clone Recommended\n\n" f"Reason: {changes['reason']}\n" f"Use 'Full Clone Backup' for best results.") except Exception as e: self.log(f"Error during analysis: {e}") messagebox.showerror("Analysis Error", f"Could not analyze changes: {e}") def smart_sync_backup(self): """Start smart sync backup operation""" if not self.validate_selection(): return # Get drive paths source = self.source_var.get().split()[0] target = self.target_var.get().split()[0] # Confirm operation result = messagebox.askyesno( "Confirm Smart Sync Backup", f"Perform smart sync backup?\n\n" f"Source: {source}\n" f"Target: {target}\n\n" f"This will quickly update the target drive with changes from the source.\n" f"The operation is much faster than a full backup but requires an existing backup on the target." ) if result: self.run_backup_script("sync", source, target) def run_backup_script(self, mode, source, target): """Run the backup script with specified mode""" try: # Clear previous output self.output_text.delete(1.0, tk.END) # Determine command arguments if mode == "analyze": cmd = ['sudo', './backup_script.sh', '--analyze', '--source', source, '--target', target] self.log_message("๐Ÿ” Analyzing changes between drives...") elif mode == "sync": cmd = ['sudo', './backup_script.sh', '--sync', '--source', source, '--target', target] self.log_message("โšก Starting smart sync backup...") elif mode == "backup": cmd = ['sudo', './backup_script.sh', '--source', source, '--target', target] self.log_message("๐Ÿ”„ Starting full backup...") elif mode == "restore": cmd = ['sudo', './backup_script.sh', '--restore', '--source', source, '--target', target] self.log_message("๐Ÿ”ง Starting restore operation...") else: raise ValueError(f"Unknown mode: {mode}") # Change to script directory script_dir = os.path.dirname(os.path.abspath(__file__)) # Run the command process = subprocess.Popen( cmd, cwd=script_dir, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True, bufsize=1 ) # Monitor progress in real-time while True: output = process.stdout.readline() if output == '' and process.poll() is not None: break if output: # Update GUI in real-time self.output_text.insert(tk.END, output) self.output_text.see(tk.END) self.root.update() # Get final result return_code = process.poll() if return_code == 0: if mode == "analyze": self.log_message("โœ… Analysis completed successfully!") messagebox.showinfo("Analysis Complete", "Drive analysis completed. Check the output for recommendations.") elif mode == "sync": self.log_message("โœ… Smart sync completed successfully!") messagebox.showinfo("Success", "Smart sync backup completed successfully!") elif mode == "backup": self.log_message("โœ… Backup completed successfully!") messagebox.showinfo("Success", "Full backup completed successfully!") elif mode == "restore": self.log_message("โœ… Restore completed successfully!") messagebox.showinfo("Success", "System restore completed successfully!") else: self.log_message(f"โŒ {mode.title()} operation failed!") messagebox.showerror("Error", f"{mode.title()} operation failed. Check the output for details.") except Exception as e: error_msg = f"Error running {mode} operation: {str(e)}" self.log_message(f"โŒ {error_msg}") messagebox.showerror("Error", error_msg) def check_existing_backup(self, target_drive): """Check if target drive has existing backup and get info""" try: # Try to mount the target drive temporarily temp_mount = f"/tmp/backup_check_{os.getpid()}" os.makedirs(temp_mount, exist_ok=True) # Find the main partition (usually partition 1) partitions = subprocess.run(['lsblk', '-n', '-o', 'NAME', target_drive], capture_output=True, text=True).stdout.strip().split('\n') main_partition = None for partition in partitions: if partition.strip() and partition.strip() != os.path.basename(target_drive): main_partition = f"/dev/{partition.strip()}" break if not main_partition: return {'has_backup': False, 'reason': 'No partitions found'} # Try to mount and check try: subprocess.run(['sudo', 'mount', '-o', 'ro', main_partition, temp_mount], check=True, capture_output=True) # Check if it looks like a Linux system has_backup = (os.path.exists(os.path.join(temp_mount, 'etc')) and os.path.exists(os.path.join(temp_mount, 'home')) and os.path.exists(os.path.join(temp_mount, 'usr'))) backup_date = "Unknown" if has_backup: # Try to get last modification time of /etc try: etc_stat = os.stat(os.path.join(temp_mount, 'etc')) backup_date = time.strftime('%Y-%m-%d %H:%M', time.localtime(etc_stat.st_mtime)) except: pass return { 'has_backup': has_backup, 'backup_date': backup_date, 'main_partition': main_partition } finally: subprocess.run(['sudo', 'umount', temp_mount], capture_output=True) os.rmdir(temp_mount) except Exception as e: return {'has_backup': False, 'reason': f'Mount error: {e}'} def compare_filesystems(self, source_drive, target_drive): """Compare filesystems to determine sync requirements""" try: # Get basic change information using filesystem comparison # This is a simplified analysis - in practice you'd want more sophisticated comparison # Check filesystem sizes source_size = self.get_filesystem_usage(source_drive) target_info = self.check_existing_backup(target_drive) if not target_info['has_backup']: return { 'recommendation': 'full', 'reason': 'No existing backup', 'files_changed': 0, 'files_added': 0, 'files_deleted': 0, 'size_changed_mb': 0, 'estimated_time_min': 0, 'full_clone_time_min': source_size['total_gb'] * 2 # Rough estimate } target_size = self.get_filesystem_usage(target_drive) # Simple heuristic based on size difference size_diff_gb = abs(source_size['used_gb'] - target_size['used_gb']) size_change_percent = (size_diff_gb / max(source_size['used_gb'], 0.1)) * 100 # Estimate file changes (rough approximation) estimated_files_changed = int(size_diff_gb * 1000) # Assume 1MB per file average estimated_sync_time = size_diff_gb * 1.5 # 1.5 minutes per GB for sync estimated_full_time = source_size['total_gb'] * 2 # 2 minutes per GB for full clone # Decision logic if size_change_percent < 5 and size_diff_gb < 2: recommendation = 'sync' reason = 'Minor changes detected' elif size_change_percent < 15 and size_diff_gb < 10: recommendation = 'sync' reason = 'Moderate changes, sync beneficial' else: recommendation = 'full' reason = 'Major changes detected, full clone safer' return { 'recommendation': recommendation, 'reason': reason, 'files_changed': estimated_files_changed, 'files_added': max(0, estimated_files_changed // 2), 'files_deleted': max(0, estimated_files_changed // 4), 'size_changed_mb': size_diff_gb * 1024, 'estimated_time_min': estimated_sync_time, 'full_clone_time_min': estimated_full_time } except Exception as e: return { 'recommendation': 'full', 'reason': f'Analysis failed: {e}', 'files_changed': 0, 'files_added': 0, 'files_deleted': 0, 'size_changed_mb': 0, 'estimated_time_min': 0, 'full_clone_time_min': 60 } def get_filesystem_usage(self, drive): """Get filesystem usage information""" try: # Mount temporarily and get usage temp_mount = f"/tmp/fs_check_{os.getpid()}" os.makedirs(temp_mount, exist_ok=True) # Find main partition partitions = subprocess.run(['lsblk', '-n', '-o', 'NAME', drive], capture_output=True, text=True).stdout.strip().split('\n') main_partition = None for partition in partitions: if partition.strip() and partition.strip() != os.path.basename(drive): main_partition = f"/dev/{partition.strip()}" break if not main_partition: return {'total_gb': 0, 'used_gb': 0, 'free_gb': 0} try: subprocess.run(['sudo', 'mount', '-o', 'ro', main_partition, temp_mount], check=True, capture_output=True) # Get filesystem usage statvfs = os.statvfs(temp_mount) total_bytes = statvfs.f_frsize * statvfs.f_blocks free_bytes = statvfs.f_frsize * statvfs.f_available used_bytes = total_bytes - free_bytes return { 'total_gb': total_bytes / (1024**3), 'used_gb': used_bytes / (1024**3), 'free_gb': free_bytes / (1024**3) } finally: subprocess.run(['sudo', 'umount', temp_mount], capture_output=True) os.rmdir(temp_mount) except Exception: # Fallback to drive size try: size_bytes = int(subprocess.run(['blockdev', '--getsize64', drive], capture_output=True, text=True).stdout.strip()) total_gb = size_bytes / (1024**3) return {'total_gb': total_gb, 'used_gb': total_gb * 0.7, 'free_gb': total_gb * 0.3} except: return {'total_gb': 500, 'used_gb': 350, 'free_gb': 150} # Default estimates def start_sync_operation(self, source, target, changes): """Start smart sync operation""" self.operation_running = True self.sync_backup_btn.config(state="disabled") self.backup_btn.config(state="disabled") self.reboot_backup_btn.config(state="disabled") self.restore_btn.config(state="disabled") self.reboot_restore_btn.config(state="disabled") self.stop_btn.config(state="normal") self.progress.start() sync_thread = threading.Thread(target=self.run_sync_operation, args=(source, target, changes)) sync_thread.daemon = True sync_thread.start() def run_sync_operation(self, source, target, changes): """Swap source and target drives""" source = self.source_drive.get() target = self.target_drive.get() self.source_drive.set(target) self.target_drive.set(source) self.log("Swapped source and target drives") def run_sync_operation(self, source, target, changes): """Run smart filesystem sync operation""" try: self.log("Starting smart sync operation...") self.log(f"Syncing {changes['size_changed_mb']:.1f} MB of changes...") # Mount both filesystems source_mount = f"/tmp/sync_source_{os.getpid()}" target_mount = f"/tmp/sync_target_{os.getpid()}" os.makedirs(source_mount, exist_ok=True) os.makedirs(target_mount, exist_ok=True) # Find main partitions source_partitions = subprocess.run(['lsblk', '-n', '-o', 'NAME', source], capture_output=True, text=True).stdout.strip().split('\n') target_partitions = subprocess.run(['lsblk', '-n', '-o', 'NAME', target], capture_output=True, text=True).stdout.strip().split('\n') source_partition = f"/dev/{[p.strip() for p in source_partitions if p.strip() and p.strip() != os.path.basename(source)][0]}" target_partition = f"/dev/{[p.strip() for p in target_partitions if p.strip() and p.strip() != os.path.basename(target)][0]}" try: # Mount filesystems subprocess.run(['sudo', 'mount', '-o', 'ro', source_partition, source_mount], check=True) subprocess.run(['sudo', 'mount', target_partition, target_mount], check=True) self.log("Filesystems mounted, starting rsync...") # Use rsync for efficient synchronization rsync_cmd = [ 'sudo', 'rsync', '-avHAXS', '--numeric-ids', '--delete', '--progress', '--exclude=/proc/*', '--exclude=/sys/*', '--exclude=/dev/*', '--exclude=/tmp/*', '--exclude=/run/*', '--exclude=/mnt/*', '--exclude=/media/*', '--exclude=/lost+found', f'{source_mount}/', f'{target_mount}/' ] self.log(f"Running rsync command") process = subprocess.Popen(rsync_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1) # Read output line by line for line in process.stdout: if self.operation_running: line = line.strip() if line and not line.startswith('sent ') and not line.startswith('total size'): self.log(f"Sync: {line}") else: process.terminate() break process.wait() if process.returncode == 0 and self.operation_running: self.log("Smart sync completed successfully!") # Preserve backup tools try: self.log("Preserving backup tools on external drive...") restore_script = os.path.join(os.path.dirname(__file__), "restore_tools_after_backup.sh") if os.path.exists(restore_script): subprocess.run([restore_script, target], check=False, timeout=60) self.log("Backup tools preserved on external drive") except Exception as e: self.log(f"Warning: Could not preserve tools: {e}") messagebox.showinfo("Success", f"Smart Sync completed successfully!\n\n" f"Synced: {changes['size_changed_mb']:.1f} MB\n" f"Much faster than full clone!") elif not self.operation_running: self.log("Sync operation was cancelled") else: self.log(f"Sync failed with return code: {process.returncode}") messagebox.showerror("Error", "Smart sync failed! Consider using full clone backup.") finally: # Unmount filesystems subprocess.run(['sudo', 'umount', source_mount], capture_output=True) subprocess.run(['sudo', 'umount', target_mount], capture_output=True) os.rmdir(source_mount) os.rmdir(target_mount) except Exception as e: self.log(f"Error during smart sync: {e}") messagebox.showerror("Error", f"Smart sync failed: {e}") finally: self.operation_running = False self.sync_backup_btn.config(state="normal") self.backup_btn.config(state="normal") self.reboot_backup_btn.config(state="normal") self.restore_btn.config(state="normal") self.reboot_restore_btn.config(state="normal") self.stop_btn.config(state="disabled") self.progress.stop() def swap_drives(self): """Swap source and target drives""" source = self.source_drive.get() target = self.target_drive.get() self.source_drive.set(target) self.target_drive.set(source) self.log("Swapped source and target drives") def start_restore(self): """Start the restore process""" if not self.validate_selection(): return if self.operation_running: self.log("Operation already running!") return # Confirm restore source = self.source_drive.get().split()[0] target = self.target_drive.get().split()[0] result = messagebox.askyesno("โš ๏ธ CONFIRM RESTORE โš ๏ธ", f"This will RESTORE from {source} to {target}.\n\n" f"๐Ÿšจ CRITICAL WARNING ๐Ÿšจ\n" f"This will COMPLETELY OVERWRITE {target}!\n" f"ALL DATA on {target} will be DESTROYED!\n\n" f"This should typically restore FROM external TO internal.\n" f"Make sure you have the drives selected correctly!\n\n" f"Are you absolutely sure you want to continue?") if not result: return # Second confirmation for restore result2 = messagebox.askyesno("FINAL CONFIRMATION", f"LAST CHANCE TO CANCEL!\n\n" f"Restoring from: {source}\n" f"Overwriting: {target}\n\n" f"Type YES to continue or NO to cancel.") if not result2: return source = self.source_var.get().split()[0] target = self.target_var.get().split()[0] self.run_backup_script("restore", source, target) def reboot_and_restore(self): """Schedule reboot and restore""" if not self.validate_selection(): return result = messagebox.askyesno("โš ๏ธ CONFIRM REBOOT & RESTORE โš ๏ธ", "This will:\n" "1. Save current session\n" "2. Reboot the system\n" "3. Start RESTORE after reboot\n\n" "๐Ÿšจ WARNING: This will OVERWRITE your internal drive! ๐Ÿšจ\n\n" "Continue?") if not result: return try: # Create restore script for after reboot script_content = self.create_reboot_operation_script("restore") # Save script script_path = "/tmp/restore_after_reboot.sh" with open(script_path, 'w') as f: f.write(script_content) os.chmod(script_path, 0o755) self.log("Reboot restore script created") self.log("System will reboot in 5 seconds...") # Schedule reboot subprocess.run(['sudo', 'shutdown', '-r', '+1'], check=True) self.log("Reboot scheduled. Restore will start automatically after reboot.") except Exception as e: self.log(f"Error scheduling reboot: {e}") messagebox.showerror("Error", f"Failed to schedule reboot: {e}") def start_backup(self): """Start the backup process""" if not self.validate_selection(): return # Confirm backup source = self.source_var.get().split()[0] target = self.target_var.get().split()[0] result = messagebox.askyesno("Confirm Backup", f"This will clone {source} to {target}.\n\n" f"WARNING: All data on {target} will be destroyed!\n\n" f"Are you sure you want to continue?") if result: self.run_backup_script("backup", source, target) def start_operation(self, source, target): """Start backup or restore operation""" # Start operation in thread self.operation_running = True self.sync_backup_btn.config(state="disabled") self.backup_btn.config(state="disabled") self.reboot_backup_btn.config(state="disabled") self.restore_btn.config(state="disabled") self.reboot_restore_btn.config(state="disabled") self.stop_btn.config(state="normal") self.progress.start() operation_thread = threading.Thread(target=self.run_operation, args=(source, target, self.operation_type)) operation_thread.daemon = True operation_thread.start() def run_operation(self, source, target, operation_type): """Run the actual backup or restore process""" try: if operation_type == "backup": self.log(f"Starting backup from {source} to {target}") else: self.log(f"Starting restore from {source} to {target}") self.log("This may take a while depending on drive size...") # Use dd for cloning cmd = [ 'sudo', 'dd', f'if={source}', f'of={target}', 'bs=4M', 'status=progress', 'conv=fdatasync' ] self.log(f"Running command: {' '.join(cmd)}") process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) # Read output for line in process.stdout: if self.operation_running: # Check if we should continue self.log(line.strip()) else: process.terminate() break process.wait() if process.returncode == 0 and self.operation_running: if operation_type == "backup": self.log("Backup completed successfully!") # Restore backup tools to external drive try: self.log("Preserving backup tools on external drive...") restore_script = os.path.join(os.path.dirname(__file__), "restore_tools_after_backup.sh") if os.path.exists(restore_script): subprocess.run([restore_script, target], check=False, timeout=60) self.log("Backup tools preserved on external drive") else: self.log("Warning: Tool restoration script not found") except Exception as e: self.log(f"Warning: Could not preserve tools on external drive: {e}") messagebox.showinfo("Success", "Backup completed successfully!\n\nBackup tools have been preserved on the external drive.") else: self.log("Restore completed successfully!") messagebox.showinfo("Success", "Restore completed successfully!") elif not self.operation_running: self.log(f"{operation_type.capitalize()} was cancelled by user") else: self.log(f"{operation_type.capitalize()} failed with return code: {process.returncode}") messagebox.showerror("Error", f"{operation_type.capitalize()} failed! Check the log for details.") except Exception as e: self.log(f"Error during {operation_type}: {e}") messagebox.showerror("Error", f"{operation_type.capitalize()} failed: {e}") finally: self.operation_running = False self.sync_backup_btn.config(state="normal") self.backup_btn.config(state="normal") self.reboot_backup_btn.config(state="normal") self.restore_btn.config(state="normal") self.reboot_restore_btn.config(state="normal") self.stop_btn.config(state="disabled") self.progress.stop() def stop_operation(self): """Stop the current operation""" self.operation_running = False self.log("Stopping operation...") def reboot_and_backup(self): """Schedule reboot and backup""" if not self.validate_selection(): return result = messagebox.askyesno("Confirm Reboot & Backup", "This will:\n" "1. Save current session\n" "2. Reboot the system\n" "3. Start backup after reboot\n\n" "Continue?") if not result: return try: # Create backup script for after reboot script_content = self.create_reboot_operation_script("backup") # Save script script_path = "/tmp/backup_after_reboot.sh" with open(script_path, 'w') as f: f.write(script_content) os.chmod(script_path, 0o755) self.log("Reboot backup script created") self.log("System will reboot in 5 seconds...") # Schedule reboot subprocess.run(['sudo', 'shutdown', '-r', '+1'], check=True) self.log("Reboot scheduled. Backup will start automatically after reboot.") except Exception as e: self.log(f"Error scheduling reboot: {e}") messagebox.showerror("Error", f"Failed to schedule reboot: {e}") def create_reboot_operation_script(self, operation_type): """Create script to run operation after reboot""" source = self.source_drive.get().split()[0] target = self.target_drive.get().split()[0] if operation_type == "backup": action_desc = "backup" success_msg = "Backup Complete" fail_msg = "Backup Failed" else: action_desc = "restore" success_msg = "Restore Complete" fail_msg = "Restore Failed" script = f"""#!/bin/bash # Auto-generated {action_desc} script echo "Starting {action_desc} after reboot..." echo "Source: {source}" echo "Target: {target}" # Wait for system to fully boot sleep 30 # Run {action_desc} sudo dd if={source} of={target} bs=4M status=progress conv=fdatasync if [ $? -eq 0 ]; then echo "{action_desc.capitalize()} completed successfully!" notify-send "{success_msg}" "System {action_desc} finished successfully" else echo "{action_desc.capitalize()} failed!" notify-send "{fail_msg}" "System {action_desc} encountered an error" fi # Clean up rm -f /tmp/{action_desc}_after_reboot.sh """ return script def run(self): """Start the GUI application""" self.root.mainloop() if __name__ == "__main__": # Check if running as root for certain operations if os.geteuid() != 0: print("Note: Some operations may require sudo privileges") app = BackupManager() app.run()