#!/usr/bin/env python3 """ Enhanced RDP Client with Professional GUI A modern replacement for the zenity-based RDP script with better UX """ # Auto-detect and use virtual environment if available import sys import os def setup_virtual_env(): """Setup virtual environment if needed""" script_dir = os.path.dirname(os.path.abspath(__file__)) venv_python = os.path.join(script_dir, '.venv', 'bin', 'python') # If we're not already in the virtual environment and it exists, restart with venv python if os.path.exists(venv_python) and sys.executable != venv_python: import subprocess # Re-execute this script with the virtual environment Python os.execv(venv_python, [venv_python] + sys.argv) # Try to setup virtual environment first setup_virtual_env() import tkinter as tk from tkinter import ttk, messagebox, simpledialog, filedialog import json import os import subprocess import base64 from cryptography.fernet import Fernet from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC import threading from datetime import datetime import shutil import logging import socket import sys class RDPClient: def __init__(self): self.root = tk.Tk() self.root.title("RDP Client - Professional (Press F1 for shortcuts)") self.root.geometry("1000x700") self.root.minsize(800, 600) # Configuration self.config_dir = os.path.expanduser("~/.config/rdp-client") self.connections_file = os.path.join(self.config_dir, "connections.json") self.credentials_file = os.path.join(self.config_dir, "credentials.json") self.history_file = os.path.join(self.config_dir, "history.json") self.log_file = os.path.join(self.config_dir, "rdp_client.log") # Create config directory os.makedirs(self.config_dir, exist_ok=True) # Setup logging self._setup_logging() # Initialize encryption self._init_encryption() # Load data self.connections = self._load_connections() self.credentials = self._load_credentials() self.history = self._load_history() # Migrate legacy multimon settings self._migrate_multimon_settings() # Add existing connections to history if history is empty (first run or migration) if not self.history and self.connections: for conn_name in self.connections.keys(): self._add_to_history(conn_name) # Setup GUI self._setup_gui() self._refresh_connections_list() # Force refresh after a short delay to ensure GUI is ready self.root.after(100, self._refresh_connections_list) self.logger.info("RDP Client initialized successfully") def _setup_logging(self): """Setup logging configuration""" self.logger = logging.getLogger('rdp_client') self.logger.setLevel(logging.INFO) # Create file handler file_handler = logging.FileHandler(self.log_file) file_handler.setLevel(logging.INFO) # Create console handler for errors console_handler = logging.StreamHandler(sys.stdout) console_handler.setLevel(logging.ERROR) # Create formatter formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) file_handler.setFormatter(formatter) console_handler.setFormatter(formatter) # Add handlers self.logger.addHandler(file_handler) self.logger.addHandler(console_handler) # Prevent duplicate logs self.logger.propagate = False def _init_encryption(self): """Initialize encryption for password storage""" password = f"{os.getenv('USER')}@{os.uname().nodename}".encode() salt = b'salt_1234567890' # In production, use random salt kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=salt, iterations=100000, ) key = base64.urlsafe_b64encode(kdf.derive(password)) self.cipher = Fernet(key) def _encrypt_password(self, password): """Encrypt password for secure storage""" return self.cipher.encrypt(password.encode()).decode() def _decrypt_password(self, encrypted_password): """Decrypt password from storage""" try: return self.cipher.decrypt(encrypted_password.encode()).decode() except: return "" def _load_connections(self): """Load saved connections from file""" if os.path.exists(self.connections_file): try: with open(self.connections_file, 'r') as f: return json.load(f) except: return {} return {} def _save_connections(self): """Save connections to file""" with open(self.connections_file, 'w') as f: json.dump(self.connections, f, indent=2) def _load_credentials(self): """Load saved credentials from file""" if os.path.exists(self.credentials_file): try: with open(self.credentials_file, 'r') as f: return json.load(f) except: return {} return {} def _save_credentials(self): """Save credentials to file""" with open(self.credentials_file, 'w') as f: json.dump(self.credentials, f, indent=2) def _load_history(self): """Load connection history from file""" if os.path.exists(self.history_file): try: with open(self.history_file, 'r') as f: return json.load(f) except: return [] return [] def _save_history(self): """Save connection history to file""" with open(self.history_file, 'w') as f: json.dump(self.history, f, indent=2) def _add_to_history(self, connection_name): """Add a connection to history""" # Remove existing entry if present self.history = [h for h in self.history if h.get('name') != connection_name] # Add new entry at the beginning history_entry = { 'name': connection_name, 'timestamp': datetime.now().isoformat(), 'count': self._get_connection_count(connection_name) + 1 } self.history.insert(0, history_entry) # Keep only last 20 entries self.history = self.history[:20] self._save_history() def _get_connection_count(self, connection_name): """Get the connection count for a specific connection""" for entry in self.history: if entry.get('name') == connection_name: return entry.get('count', 0) return 0 def _migrate_multimon_settings(self): """Migrate legacy 'Yes' multimon settings to new specific monitor options""" changed = False for conn_name, conn_data in self.connections.items(): if conn_data.get("multimon") == "Yes": # Default to 2 monitors for legacy "Yes" settings conn_data["multimon"] = "2 Monitors" changed = True self.logger.info(f"Migrated multimon setting for {conn_name} from 'Yes' to '2 Monitors'") if changed: self._save_connections() self.logger.info("Completed multimon settings migration") def _get_available_monitors_count(self): """Get the number of available monitors""" try: result = subprocess.run(['xrandr', '--listmonitors'], capture_output=True, text=True, timeout=5) if result.returncode == 0: lines = result.stdout.strip().split('\n') if lines and lines[0].startswith('Monitors:'): return int(lines[0].split(':')[1].strip()) except: pass return 1 # Default to 1 monitor if detection fails def _get_multimon_options(self): """Get available multi-monitor options based on system""" monitor_count = self._get_available_monitors_count() options = ["No"] # Add options for available monitors for i in range(2, min(monitor_count + 1, 5)): # Up to 4 monitors options.append(f"{i} Monitors") if monitor_count > 1: options.extend(["All Monitors", "Span"]) return options def _get_best_monitor_selection(self, count): """Get the best monitor selection based on layout""" try: result = subprocess.run(['xfreerdp', '/monitor-list'], capture_output=True, text=True, timeout=5) if result.returncode == 0: lines = result.stdout.strip().split('\n') monitors = [] for line in lines: # Clean up the line and split by whitespace/tabs cleaned_line = line.strip().replace('*', '').strip() if '[' in cleaned_line and ']' in cleaned_line and 'x' in cleaned_line and '+' in cleaned_line: # Split by whitespace/tabs parts = cleaned_line.split() if len(parts) >= 3: id_part = parts[0] # [0] pos_part = parts[2] # +3840+0 if '[' in id_part and ']' in id_part: monitor_id = int(id_part.strip('[]')) x_pos = int(pos_part.split('+')[1]) monitors.append((monitor_id, x_pos)) # Sort monitors by X position (left to right) monitors.sort(key=lambda x: x[1]) # Return the leftmost monitors selected = [str(m[0]) for m in monitors[:count]] return ','.join(selected) except: pass # Fallback: simple sequential selection return ','.join([str(i) for i in range(count)]) def _setup_gui(self): """Setup the main GUI""" # Configure style style = ttk.Style() style.theme_use('clam') # Main container main_frame = ttk.Frame(self.root, padding="10") main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S)) # Configure grid weights self.root.columnconfigure(0, weight=1) self.root.rowconfigure(0, weight=1) main_frame.columnconfigure(1, weight=1) main_frame.rowconfigure(1, weight=1) # Title title_label = ttk.Label(main_frame, text="RDP Client - Professional", font=('Arial', 16, 'bold')) title_label.grid(row=0, column=0, columnspan=3, pady=(0, 20)) # Left panel - Connections left_frame = ttk.Frame(main_frame) left_frame.grid(row=1, column=0, sticky=(tk.W, tk.E, tk.N, tk.S), padx=(0, 10)) left_frame.rowconfigure(1, weight=1) left_frame.rowconfigure(3, weight=1) left_frame.columnconfigure(0, weight=1) # Recent Connections recent_label = ttk.Label(left_frame, text="Recent Connections", font=('Arial', 10, 'bold')) recent_label.grid(row=0, column=0, sticky=tk.W, pady=(0, 5)) # Recent connections frame with listbox and scrollbar recent_frame = ttk.Frame(left_frame) recent_frame.grid(row=1, column=0, sticky=(tk.W, tk.E, tk.N, tk.S), pady=(0, 10)) recent_frame.rowconfigure(0, weight=1) recent_frame.columnconfigure(0, weight=1) self.recent_listbox = tk.Listbox(recent_frame, font=('Arial', 9), height=6, width=30) recent_scrollbar = ttk.Scrollbar(recent_frame, orient="vertical", command=self.recent_listbox.yview) self.recent_listbox.configure(yscrollcommand=recent_scrollbar.set) self.recent_listbox.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S)) recent_scrollbar.grid(row=0, column=1, sticky=(tk.N, tk.S)) # Saved Connections saved_label = ttk.Label(left_frame, text="Saved Connections", font=('Arial', 10, 'bold')) saved_label.grid(row=2, column=0, sticky=tk.W, pady=(10, 5)) # Saved connections frame with listbox and scrollbar saved_frame = ttk.Frame(left_frame) saved_frame.grid(row=3, column=0, sticky=(tk.W, tk.E, tk.N, tk.S)) saved_frame.rowconfigure(0, weight=1) saved_frame.columnconfigure(0, weight=1) self.connections_listbox = tk.Listbox(saved_frame, font=('Arial', 9), width=30) scrollbar = ttk.Scrollbar(saved_frame, orient="vertical", command=self.connections_listbox.yview) self.connections_listbox.configure(yscrollcommand=scrollbar.set) self.connections_listbox.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S)) scrollbar.grid(row=0, column=1, sticky=(tk.N, tk.S)) # Connection buttons conn_buttons_frame = ttk.Frame(left_frame) conn_buttons_frame.grid(row=4, column=0, pady=(10, 0), sticky=(tk.W, tk.E)) ttk.Button(conn_buttons_frame, text="Connect", command=self._connect_selected).pack(side=tk.LEFT, padx=(0, 5)) ttk.Button(conn_buttons_frame, text="Edit", command=self._edit_selected).pack(side=tk.LEFT, padx=(0, 5)) ttk.Button(conn_buttons_frame, text="Delete", command=self._delete_selected).pack(side=tk.LEFT) # Right panel - Actions and Details right_frame = ttk.Frame(main_frame) right_frame.grid(row=1, column=1, sticky=(tk.W, tk.E, tk.N, tk.S)) right_frame.rowconfigure(1, weight=1) right_frame.columnconfigure(0, weight=1) # Actions frame actions_frame = ttk.LabelFrame(right_frame, text="Actions", padding="10") actions_frame.grid(row=0, column=0, sticky=(tk.W, tk.E), pady=(0, 10)) ttk.Button(actions_frame, text="New Connection", command=self._new_connection, width=20).pack(pady=2) ttk.Button(actions_frame, text="Test Connection", command=self._test_selected_connection, width=20).pack(pady=2) ttk.Button(actions_frame, text="Import Connections", command=self._import_connections, width=20).pack(pady=2) ttk.Button(actions_frame, text="Export Connections", command=self._export_connections, width=20).pack(pady=2) ttk.Button(actions_frame, text="Clear All Credentials", command=self._clear_credentials, width=20).pack(pady=2) # Details frame details_frame = ttk.LabelFrame(right_frame, text="Connection Details", padding="10") details_frame.grid(row=1, column=0, sticky=(tk.W, tk.E, tk.N, tk.S)) details_frame.columnconfigure(1, weight=1) # Details labels self.details_labels = {} details_fields = [ ("Server:", "server"), ("Username:", "username"), ("Domain:", "domain"), ("Resolution:", "resolution"), ("Color Depth:", "color_depth"), ("Multi-Monitor:", "multimon"), ("Sound:", "sound"), ("Clipboard:", "clipboard"), ("Drive Sharing:", "drives"), ("Created:", "created") ] for i, (label, field) in enumerate(details_fields): ttk.Label(details_frame, text=label, font=('Arial', 9, 'bold')).grid( row=i, column=0, sticky=tk.W, pady=2) value_label = ttk.Label(details_frame, text="", font=('Arial', 9)) value_label.grid(row=i, column=1, sticky=tk.W, padx=(10, 0), pady=2) self.details_labels[field] = value_label # Bind listbox selection self.connections_listbox.bind('<>', self._on_connection_select) self.connections_listbox.bind('', self._connect_selected) self.recent_listbox.bind('<>', self._on_recent_select) self.recent_listbox.bind('', self._connect_recent) # Keyboard shortcuts self._setup_keyboard_shortcuts() # Bottom frame - Status status_frame = ttk.Frame(main_frame) status_frame.grid(row=2, column=0, columnspan=3, sticky=(tk.W, tk.E), pady=(10, 0)) status_frame.columnconfigure(0, weight=1) self.status_var = tk.StringVar(value="Ready") status_label = ttk.Label(status_frame, textvariable=self.status_var, font=('Arial', 9), foreground='gray') status_label.grid(row=0, column=0, sticky=tk.W) # Exit button ttk.Button(status_frame, text="Exit", command=self.root.quit).grid(row=0, column=1, sticky=tk.E) def _setup_keyboard_shortcuts(self): """Setup keyboard shortcuts for the application""" # Global shortcuts self.root.bind('', lambda e: self._new_connection()) self.root.bind('', lambda e: self._import_connections()) self.root.bind('', lambda e: self._export_connections()) self.root.bind('', lambda e: self._test_selected_connection()) self.root.bind('', lambda e: self._refresh_connections_list()) self.root.bind('', lambda e: self.root.quit()) self.root.bind('', lambda e: self.root.quit()) self.root.bind('', lambda e: self._show_help()) # Listbox specific shortcuts self.connections_listbox.bind('', lambda e: self._connect_selected()) self.connections_listbox.bind('', lambda e: self._delete_selected()) self.connections_listbox.bind('', lambda e: self._edit_selected()) self.connections_listbox.bind('', lambda e: self._test_selected_connection()) self.recent_listbox.bind('', lambda e: self._connect_selected()) self.recent_listbox.bind('', lambda e: self._delete_selected()) self.recent_listbox.bind('', lambda e: self._edit_selected()) self.recent_listbox.bind('', lambda e: self._test_selected_connection()) # Set focus handling self.root.bind('', self._handle_tab_focus) self.root.bind('', self._handle_shift_tab_focus) # Add tooltip information for shortcuts self._add_keyboard_shortcuts_info() def _handle_tab_focus(self, event): """Handle Tab key for focus navigation""" current_focus = self.root.focus_get() if current_focus == self.recent_listbox: self.connections_listbox.focus_set() if self.connections_listbox.size() > 0: self.connections_listbox.selection_set(0) elif current_focus == self.connections_listbox: self.recent_listbox.focus_set() if self.recent_listbox.size() > 0: self.recent_listbox.selection_set(0) else: self.recent_listbox.focus_set() if self.recent_listbox.size() > 0: self.recent_listbox.selection_set(0) return "break" # Prevent default Tab behavior def _handle_shift_tab_focus(self, event): """Handle Shift+Tab key for reverse focus navigation""" current_focus = self.root.focus_get() if current_focus == self.connections_listbox: self.recent_listbox.focus_set() if self.recent_listbox.size() > 0: self.recent_listbox.selection_set(0) elif current_focus == self.recent_listbox: self.connections_listbox.focus_set() if self.connections_listbox.size() > 0: self.connections_listbox.selection_set(0) else: self.connections_listbox.focus_set() if self.connections_listbox.size() > 0: self.connections_listbox.selection_set(0) return "break" # Prevent default Shift+Tab behavior def _add_keyboard_shortcuts_info(self): """Add keyboard shortcuts information to the status bar or help""" # You could create a help dialog or status tooltip here # For now, we'll add it to the window title when focused def show_shortcuts_hint(event): self.status_var.set("Shortcuts: Ctrl+N=New, Enter=Connect, Del=Delete, F2=Edit, Ctrl+T=Test, F5=Refresh, Ctrl+Q=Quit") def clear_shortcuts_hint(event): self.status_var.set("Ready") # Show shortcuts hint when certain widgets get focus self.connections_listbox.bind('', show_shortcuts_hint) self.recent_listbox.bind('', show_shortcuts_hint) self.connections_listbox.bind('', clear_shortcuts_hint) self.recent_listbox.bind('', clear_shortcuts_hint) def _show_help(self): """Show keyboard shortcuts help dialog""" help_text = """RDP Client - Keyboard Shortcuts Global Shortcuts: • Ctrl+N - Create new connection • Ctrl+O - Import connections from file • Ctrl+S - Export connections to file • Ctrl+T - Test selected connection • F5 - Refresh connections list • Ctrl+Q - Quit application • Escape - Quit application • F1 - Show this help Connection List Shortcuts: • Enter - Connect to selected connection • Double-click - Connect to selected connection • Delete - Delete selected connection • F2 - Edit selected connection • Tab - Switch between Recent and Saved connections Navigation: • Tab - Move between Recent and Saved connections • Shift+Tab - Move between connections (reverse) • Arrow keys - Navigate within lists Tips: • Recent connections show usage count (e.g., "Server (5x)") • Double-click any connection to connect quickly • Use Test Connection to verify server availability • Import/Export to backup or share connection profiles Multi-Monitor Support: • "2 Monitors" - Use first 2 monitors (0,1) • "3 Monitors" - Use first 3 monitors (0,1,2) • "4 Monitors" - Use first 4 monitors (0,1,2,3) • "All Monitors" - Use all available monitors • "Span" - Span desktop across monitors • "No" - Single monitor only""" messagebox.showinfo("Keyboard Shortcuts", help_text) def _refresh_connections_list(self): """Refresh the connections listbox""" self.connections_listbox.delete(0, tk.END) for name in sorted(self.connections.keys()): self.connections_listbox.insert(tk.END, name) # Refresh recent connections self.recent_listbox.delete(0, tk.END) for entry in self.history[:10]: # Show last 10 recent connections name = entry.get('name', '') count = entry.get('count', 0) if name in self.connections: # Only show if connection still exists display_text = f"{name} ({count}x)" self.recent_listbox.insert(tk.END, display_text) # Clear details if no connections if not self.connections: for label in self.details_labels.values(): label.config(text="") def _on_recent_select(self, event=None): """Handle recent connection selection""" selection = self.recent_listbox.curselection() if selection: display_text = self.recent_listbox.get(selection[0]) # Extract connection name (everything before " (") name = display_text.split(' (')[0] if name in self.connections: # Clear saved connections selection self.connections_listbox.selection_clear(0, tk.END) # Update details conn = self.connections[name] self._update_details(conn) def _connect_recent(self, event=None): """Connect to selected recent connection""" selection = self.recent_listbox.curselection() if not selection: return display_text = self.recent_listbox.get(selection[0]) name = display_text.split(' (')[0] if name in self.connections: self._connect_to(name) def _update_details(self, conn): """Update the details panel with connection info""" self.details_labels["server"].config(text=conn.get("server", "")) self.details_labels["username"].config(text=conn.get("username", "")) self.details_labels["domain"].config(text=conn.get("domain", "N/A")) self.details_labels["resolution"].config(text=conn.get("resolution", "1920x1080")) self.details_labels["color_depth"].config(text=f"{conn.get('color_depth', 32)}-bit") self.details_labels["multimon"].config(text=conn.get("multimon", "No")) self.details_labels["sound"].config(text=conn.get("sound", "Yes")) self.details_labels["clipboard"].config(text=conn.get("clipboard", "Yes")) self.details_labels["drives"].config(text=conn.get("drives", "No")) self.details_labels["created"].config(text=conn.get("created", "Unknown")) def _on_connection_select(self, event=None): """Handle connection selection""" selection = self.connections_listbox.curselection() if selection: name = self.connections_listbox.get(selection[0]) conn = self.connections[name] # Clear recent connections selection self.recent_listbox.selection_clear(0, tk.END) # Update details self._update_details(conn) def _new_connection(self): """Create a new connection""" dialog = ConnectionDialog(self.root, "New Connection", rdp_client=self) if dialog.result: name = dialog.result["name"] if name in self.connections: if not messagebox.askyesno("Overwrite", f"Connection '{name}' already exists. Overwrite?"): return # Save connection conn_data = dialog.result.copy() del conn_data["name"] del conn_data["password"] conn_data["created"] = datetime.now().strftime("%Y-%m-%d %H:%M") self.connections[name] = conn_data self._save_connections() # Save credentials if provided if dialog.result["password"]: if name not in self.credentials: self.credentials[name] = {} self.credentials[name]["password"] = self._encrypt_password(dialog.result["password"]) self._save_credentials() # Add new connection to history so it appears in recent connections self._add_to_history(name) self._refresh_connections_list() self.status_var.set(f"Connection '{name}' saved successfully") def _edit_selected(self): """Edit selected connection""" # Check which listbox has a selection saved_selection = self.connections_listbox.curselection() recent_selection = self.recent_listbox.curselection() name = None if saved_selection: name = self.connections_listbox.get(saved_selection[0]) elif recent_selection: display_text = self.recent_listbox.get(recent_selection[0]) name = display_text.split(' (')[0] if not name: messagebox.showwarning("No Selection", "Please select a connection to edit.") return if name not in self.connections: messagebox.showerror("Error", f"Connection '{name}' not found.") return conn = self.connections[name].copy() # Get saved password if exists password = "" if name in self.credentials and "password" in self.credentials[name]: password = self._decrypt_password(self.credentials[name]["password"]) conn["name"] = name conn["password"] = password dialog = ConnectionDialog(self.root, f"Edit Connection: {name}", conn, rdp_client=self) if dialog.result: new_name = dialog.result["name"] # Handle name change if new_name != name: if new_name in self.connections: if not messagebox.askyesno("Overwrite", f"Connection '{new_name}' already exists. Overwrite?"): return # Remove old connection del self.connections[name] if name in self.credentials: del self.credentials[name] # Save updated connection conn_data = dialog.result.copy() del conn_data["name"] del conn_data["password"] conn_data["created"] = self.connections.get(name, {}).get("created", datetime.now().strftime("%Y-%m-%d %H:%M")) conn_data["modified"] = datetime.now().strftime("%Y-%m-%d %H:%M") self.connections[new_name] = conn_data self._save_connections() # Save credentials if dialog.result["password"]: if new_name not in self.credentials: self.credentials[new_name] = {} self.credentials[new_name]["password"] = self._encrypt_password(dialog.result["password"]) self._save_credentials() # Update history for the new/updated connection self._add_to_history(new_name) self._refresh_connections_list() self.status_var.set(f"Connection '{new_name}' updated successfully") def _delete_selected(self): """Delete selected connection""" # Check which listbox has a selection saved_selection = self.connections_listbox.curselection() recent_selection = self.recent_listbox.curselection() name = None if saved_selection: name = self.connections_listbox.get(saved_selection[0]) elif recent_selection: display_text = self.recent_listbox.get(recent_selection[0]) name = display_text.split(' (')[0] if not name: messagebox.showwarning("No Selection", "Please select a connection to delete.") return if name not in self.connections: messagebox.showerror("Error", f"Connection '{name}' not found.") return if messagebox.askyesno("Confirm Delete", f"Delete connection '{name}'?"): del self.connections[name] if name in self.credentials: del self.credentials[name] self._save_connections() self._save_credentials() self._refresh_connections_list() # Clear details for label in self.details_labels.values(): label.config(text="") self.status_var.set(f"Connection '{name}' deleted") def _connect_selected(self, event=None): """Connect to selected connection""" # Check which listbox has a selection saved_selection = self.connections_listbox.curselection() recent_selection = self.recent_listbox.curselection() if saved_selection: name = self.connections_listbox.get(saved_selection[0]) self._connect_to(name) elif recent_selection: display_text = self.recent_listbox.get(recent_selection[0]) name = display_text.split(' (')[0] if name in self.connections: self._connect_to(name) else: messagebox.showwarning("No Selection", "Please select a connection to connect.") def _connect_to(self, name): """Connect to a specific connection""" if name not in self.connections: messagebox.showerror("Error", f"Connection '{name}' not found.") return conn = self.connections[name] # Get password password = "" if name in self.credentials and "password" in self.credentials[name]: password = self._decrypt_password(self.credentials[name]["password"]) else: password = simpledialog.askstring("Password", f"Enter password for {conn['username']}@{conn['server']}:", show='*') if not password: return # Build and execute RDP command self._add_to_history(name) self._execute_rdp_connection(conn, password) self._refresh_connections_list() # Refresh to update recent connections def _execute_rdp_connection(self, conn, password): """Execute the RDP connection in a separate thread""" def connect(): try: server = conn['server'] username = conn['username'] self.logger.info(f"Attempting RDP connection to {server} as {username}") self.status_var.set(f"Connecting to {server}...") # Test connectivity first if not self._test_server_connectivity(server): error_msg = f"Cannot reach server {server}. Please check the server address and your network connection." self.logger.error(f"Connectivity test failed for {server}") self.root.after(0, lambda: self.status_var.set("Connection failed - Server unreachable")) self.root.after(0, lambda: messagebox.showerror("Connection Error", error_msg)) return # Build xfreerdp command cmd = ["/usr/bin/xfreerdp"] # Basic options cmd.extend(["+window-drag", "+smart-sizing", "/cert-ignore"]) # Server and authentication cmd.append(f"/v:{server}") cmd.append(f"/u:{username}") cmd.append(f"/p:{password}") if conn.get("domain"): cmd.append(f"/d:{conn['domain']}") # Resolution resolution = conn.get("resolution", "1920x1080") if resolution == "Full Screen": cmd.append("/f") else: cmd.append(f"/size:{resolution}") # Color depth color_depth = conn.get("color_depth", 32) cmd.append(f"/bpp:{color_depth}") # Multiple monitors multimon = conn.get("multimon", "No") if multimon == "2 Monitors": monitor_list = self._get_best_monitor_selection(2) cmd.append(f"/monitors:{monitor_list}") elif multimon == "3 Monitors": monitor_list = self._get_best_monitor_selection(3) cmd.append(f"/monitors:{monitor_list}") elif multimon == "4 Monitors": monitor_list = self._get_best_monitor_selection(4) cmd.append(f"/monitors:{monitor_list}") elif multimon == "All Monitors": cmd.append("/multimon") elif multimon == "Span": cmd.append("/span") # Sound sound = conn.get("sound", "Yes") if sound == "Yes": cmd.append("/sound:sys") elif sound == "Remote": cmd.append("/sound:local") # Microphone if conn.get("microphone", "No") == "Yes": cmd.append("/microphone") # Clipboard if conn.get("clipboard", "Yes") == "Yes": cmd.append("/clipboard") # Drive sharing if conn.get("drives", "No") == "Yes": cmd.append(f"/drive:home,{os.path.expanduser('~')}") # Performance options if conn.get("compression", "Yes") == "Yes": cmd.append("/compression") else: cmd.append("-compression") # Compression level comp_level = conn.get("compression_level", "1 (Medium)") if "0" in comp_level: cmd.append("/compression-level:0") elif "1" in comp_level: cmd.append("/compression-level:1") elif "2" in comp_level: cmd.append("/compression-level:2") # Bitmap cache if conn.get("bitmap_cache", "Yes") == "Yes": cmd.append("+bitmap-cache") else: cmd.append("-bitmap-cache") # Offscreen cache if conn.get("offscreen_cache", "Yes") == "Yes": cmd.append("+offscreen-cache") else: cmd.append("-offscreen-cache") # Font smoothing if conn.get("fonts", "Yes") == "No": cmd.append("-fonts") # Desktop composition (Aero) if conn.get("aero", "No") == "Yes": cmd.append("+aero") else: cmd.append("-aero") # Wallpaper if conn.get("wallpaper", "No") == "No": cmd.append("-wallpaper") # Themes if conn.get("themes", "Yes") == "No": cmd.append("-themes") # Menu animations if conn.get("menu_anims", "No") == "No": cmd.append("-menu-anims") # Advanced options if conn.get("printer", "No") == "Yes": cmd.append("/printer") if conn.get("com_ports", "No") == "Yes": cmd.append("/serial") if conn.get("usb", "No") == "Yes": cmd.append("/usb") # Network options if conn.get("network_auto_detect", "Yes") == "No": cmd.append("-network-auto-detect") # Execute cmd_str = ' '.join(cmd).replace(f"/p:{password}", "/p:***") # Hide password in logs self.logger.info(f"Executing RDP command: {cmd_str}") # Run xfreerdp without capturing output to allow interactive authentication # This matches how the bash script runs xfreerdp with eval result = subprocess.run(cmd) # Since we're not capturing output, we can't get detailed error info # but this allows xfreerdp to run interactively like the bash script if result.returncode == 0: self.logger.info(f"RDP connection to {server} completed successfully") self.root.after(0, lambda: self.status_var.set("Connection completed successfully")) else: error_msg = f"RDP connection failed with exit code {result.returncode}. Check credentials and server settings." self.logger.error(f"RDP connection failed with exit code: {result.returncode}") self.root.after(0, lambda: self.status_var.set("Connection failed")) self.root.after(0, lambda: messagebox.showerror("Connection Error", error_msg)) except FileNotFoundError: error_msg = "xfreerdp is not installed or not found in PATH. Please install the freerdp package." self.logger.error("xfreerdp not found") self.root.after(0, lambda: self.status_var.set("xfreerdp not found")) self.root.after(0, lambda: messagebox.showerror("Missing Dependency", error_msg)) except Exception as e: error_msg = f"Unexpected error during connection: {str(e)}" self.logger.error(f"Unexpected RDP connection error: {str(e)}", exc_info=True) self.root.after(0, lambda: self.status_var.set(f"Error: {str(e)}")) self.root.after(0, lambda: messagebox.showerror("Connection Error", error_msg)) # Start connection in separate thread thread = threading.Thread(target=connect, daemon=True) thread.start() def _test_server_connectivity(self, server): """Test if server is reachable""" try: # Extract hostname/IP from server (remove port if present) host = server.split(':')[0] # Try to establish a TCP connection to port 3389 (RDP port) port = 3389 if ':' in server: try: port = int(server.split(':')[1]) except ValueError: port = 3389 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(5) # 5 second timeout result = sock.connect_ex((host, port)) sock.close() return result == 0 except Exception as e: self.logger.warning(f"Connectivity test failed for {server}: {str(e)}") return False def _parse_rdp_error(self, stderr, returncode): """Parse RDP error messages and provide user-friendly descriptions""" error_messages = { 1: "General connection error. Please check your credentials and server address.", 2: "Invalid command line arguments or configuration.", 3: "Authentication failed. Please check your username and password.", 4: "Connection refused. The server may not allow RDP connections.", 5: "Connection timeout. The server may be offline or unreachable.", 131: "Authentication failed. Please verify your credentials.", 132: "Connection was refused by the server.", 133: "Logon failure. Please check your username, password, and domain." } # Check for specific error patterns in stderr stderr_lower = stderr.lower() if "authentication" in stderr_lower or "logon" in stderr_lower: return "Authentication failed. Please verify your username, password, and domain settings." elif "connection" in stderr_lower and "refused" in stderr_lower: return "Connection was refused by the server. The server may not allow RDP connections or may be configured to use different settings." elif "certificate" in stderr_lower: return "Certificate verification failed. The connection is still secure, but the server's certificate couldn't be verified." elif "timeout" in stderr_lower: return "Connection timed out. The server may be offline or unreachable." elif "resolution" in stderr_lower: return "Display resolution error. Try using a different resolution setting." elif returncode in error_messages: return error_messages[returncode] else: return f"Connection failed (Error {returncode}). Please check your connection settings and try again.\n\nTechnical details: {stderr}" def _import_connections(self): """Import connections from file""" file_path = filedialog.askopenfilename( title="Import Connections", filetypes=[("JSON files", "*.json"), ("All files", "*.*")], defaultextension=".json" ) if not file_path: return try: with open(file_path, 'r') as f: import_data = json.load(f) # Validate import data structure if not isinstance(import_data, dict): messagebox.showerror("Import Error", "Invalid file format. Expected JSON object.") return connections_data = import_data.get("connections", {}) credentials_data = import_data.get("credentials", {}) if not isinstance(connections_data, dict): messagebox.showerror("Import Error", "Invalid connections format in file.") return # Ask user about merge strategy if self.connections: choice = messagebox.askyesnocancel( "Import Strategy", "You have existing connections. Choose import strategy:\n\n" "Yes - Merge (keep existing, add new)\n" "No - Replace (delete existing, import new)\n" "Cancel - Cancel import" ) if choice is None: # Cancel return elif choice is False: # Replace self.connections.clear() self.credentials.clear() # Import connections imported_count = 0 for name, conn_data in connections_data.items(): if name in self.connections: if not messagebox.askyesno("Overwrite", f"Connection '{name}' exists. Overwrite?"): continue self.connections[name] = conn_data imported_count += 1 # Import credentials if available and user agrees if credentials_data and messagebox.askyesno( "Import Credentials", "Import saved credentials as well? (Passwords will be re-encrypted with your key)" ): for name, cred_data in credentials_data.items(): if name in self.connections: # Only import credentials for existing connections self.credentials[name] = cred_data # Save the changes self._save_connections() self._save_credentials() self._refresh_connections_list() self.status_var.set(f"Successfully imported {imported_count} connections") messagebox.showinfo("Import Complete", f"Successfully imported {imported_count} connections.") except FileNotFoundError: messagebox.showerror("Import Error", "File not found.") except json.JSONDecodeError: messagebox.showerror("Import Error", "Invalid JSON file format.") except Exception as e: messagebox.showerror("Import Error", f"Error importing connections: {str(e)}") def _export_connections(self): """Export connections to file""" if not self.connections: messagebox.showwarning("Export Warning", "No connections to export.") return file_path = filedialog.asksaveasfilename( title="Export Connections", filetypes=[("JSON files", "*.json"), ("All files", "*.*")], defaultextension=".json", initialname=f"rdp_connections_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" ) if not file_path: return try: # Ask if user wants to include credentials include_credentials = messagebox.askyesno( "Export Credentials", "Include saved credentials in export?\n\n" "Warning: Credentials will be encrypted but should be treated as sensitive data." ) export_data = { "connections": self.connections, "export_date": datetime.now().isoformat(), "exported_by": f"{os.getenv('USER')}@{os.uname().nodename}", "version": "1.0" } if include_credentials: export_data["credentials"] = self.credentials with open(file_path, 'w') as f: json.dump(export_data, f, indent=2) self.status_var.set(f"Connections exported to {os.path.basename(file_path)}") messagebox.showinfo("Export Complete", f"Successfully exported {len(self.connections)} connections to:\n{file_path}") except Exception as e: messagebox.showerror("Export Error", f"Error exporting connections: {str(e)}") def _clear_credentials(self): """Clear all saved credentials""" if messagebox.askyesno("Confirm", "Clear all saved passwords?"): self.credentials.clear() self._save_credentials() self.status_var.set("All credentials cleared") self.logger.info("All credentials cleared by user") def _test_selected_connection(self): """Test the selected connection for reachability""" # Check which listbox has a selection saved_selection = self.connections_listbox.curselection() recent_selection = self.recent_listbox.curselection() name = None if saved_selection: name = self.connections_listbox.get(saved_selection[0]) elif recent_selection: display_text = self.recent_listbox.get(recent_selection[0]) name = display_text.split(' (')[0] if not name: messagebox.showwarning("No Selection", "Please select a connection to test.") return if name not in self.connections: messagebox.showerror("Error", f"Connection '{name}' not found.") return conn = self.connections[name] self._test_connection_async(name, conn) def _test_connection_async(self, name, conn): """Test connection in a separate thread""" def test(): try: server = conn['server'] self.logger.info(f"Testing connection to {server}") # Update status self.root.after(0, lambda: self.status_var.set(f"Testing connection to {server}...")) # Test basic connectivity is_reachable = self._test_server_connectivity(server) if is_reachable: # Test RDP service specifically rdp_available = self._test_rdp_service(server) if rdp_available: message = f"✓ Connection test successful!\n\nServer: {server}\nRDP Port: Accessible\nStatus: Ready for connection" self.logger.info(f"Connection test successful for {server}") self.root.after(0, lambda: self.status_var.set("Connection test passed")) self.root.after(0, lambda: messagebox.showinfo("Connection Test", message)) else: message = f"⚠ Server is reachable but RDP service may not be available.\n\nServer: {server}\nStatus: Network accessible but RDP port (3389) not responding\n\nThis could mean:\n• RDP is disabled on the server\n• A firewall is blocking RDP\n• RDP is running on a different port" self.logger.warning(f"Server {server} reachable but RDP service not available") self.root.after(0, lambda: self.status_var.set("RDP service not available")) self.root.after(0, lambda: messagebox.showwarning("Connection Test", message)) else: message = f"✗ Connection test failed!\n\nServer: {server}\nStatus: Not reachable\n\nPossible causes:\n• Server is offline\n• Incorrect server address\n• Network connectivity issues\n• Firewall blocking access" self.logger.error(f"Connection test failed for {server} - server not reachable") self.root.after(0, lambda: self.status_var.set("Connection test failed")) self.root.after(0, lambda: messagebox.showerror("Connection Test", message)) except Exception as e: error_msg = f"Error during connection test: {str(e)}" self.logger.error(f"Connection test error for {name}: {str(e)}", exc_info=True) self.root.after(0, lambda: self.status_var.set("Connection test error")) self.root.after(0, lambda: messagebox.showerror("Test Error", error_msg)) # Start test in separate thread thread = threading.Thread(target=test, daemon=True) thread.start() def _test_rdp_service(self, server): """Test if RDP service is specifically available""" try: # Extract hostname/IP and port host = server.split(':')[0] port = 3389 if ':' in server: try: port = int(server.split(':')[1]) except ValueError: port = 3389 # Try to connect to RDP port sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(10) # Longer timeout for RDP test result = sock.connect_ex((host, port)) sock.close() return result == 0 except Exception as e: self.logger.warning(f"RDP service test failed for {server}: {str(e)}") return False def run(self): """Start the application""" self.root.mainloop() class ConnectionDialog: def __init__(self, parent, title, initial_data=None, rdp_client=None): self.result = None self.rdp_client = rdp_client # Create dialog self.dialog = tk.Toplevel(parent) self.dialog.title(title) self.dialog.geometry("600x700") self.dialog.resizable(False, False) self.dialog.transient(parent) self.dialog.grab_set() # Center dialog self.dialog.update_idletasks() x = (self.dialog.winfo_screenwidth() // 2) - (600 // 2) y = (self.dialog.winfo_screenheight() // 2) - (700 // 2) self.dialog.geometry(f"600x700+{x}+{y}") self.initial_data = initial_data or {} self._setup_dialog() # Wait for dialog to close self.dialog.wait_window() def _setup_dialog(self): """Setup the connection dialog""" main_frame = ttk.Frame(self.dialog, padding="20") main_frame.pack(fill=tk.BOTH, expand=True) # Create notebook for tabs notebook = ttk.Notebook(main_frame) notebook.pack(fill=tk.BOTH, expand=True, pady=(0, 20)) # Basic tab basic_frame = ttk.Frame(notebook) notebook.add(basic_frame, text="Basic") # Advanced tab advanced_frame = ttk.Frame(notebook) notebook.add(advanced_frame, text="Advanced") # Advanced tab description adv_desc = ttk.Label(advanced_frame, text="Advanced connection and device sharing options", font=("TkDefaultFont", 9, "italic")) adv_desc.grid(row=0, column=0, columnspan=2, pady=(10, 15), padx=10, sticky=tk.W) # Performance tab performance_frame = ttk.Frame(notebook) notebook.add(performance_frame, text="Performance") # Performance tab description perf_desc = ttk.Label(performance_frame, text="Performance optimization and visual quality settings", font=("TkDefaultFont", 9, "italic")) perf_desc.grid(row=0, column=0, columnspan=2, pady=(10, 15), padx=10, sticky=tk.W) # Store field variables self.fields = {} # Basic fields basic_fields = [ ("Connection Name:", "name", "entry"), ("Server/IP:", "server", "entry"), ("Username:", "username", "entry"), ("Password:", "password", "password"), ("Domain:", "domain", "entry"), ] for i, (label, field, widget_type) in enumerate(basic_fields): ttk.Label(basic_frame, text=label).grid(row=i, column=0, sticky=tk.W, pady=5, padx=(10, 5)) if widget_type == "entry": var = tk.StringVar(value=self.initial_data.get(field, "")) widget = ttk.Entry(basic_frame, textvariable=var, width=40) elif widget_type == "password": var = tk.StringVar(value=self.initial_data.get(field, "")) widget = ttk.Entry(basic_frame, textvariable=var, show="*", width=40) widget.grid(row=i, column=1, sticky=tk.W, pady=5, padx=(5, 10)) self.fields[field] = var # Advanced fields multimon_options = self.rdp_client._get_multimon_options() if self.rdp_client else ["No", "2 Monitors", "3 Monitors", "All Monitors", "Span"] advanced_fields = [ ("Resolution:", "resolution", "combo", ["1920x1080", "2560x1440", "1366x768", "1280x1024", "1024x768", "Full Screen"]), ("Color Depth:", "color_depth", "combo", ["32", "24", "16", "15"]), ("Multiple Monitors:", "multimon", "combo", multimon_options), ("Sound:", "sound", "combo", ["Yes", "No", "Remote"]), ("Microphone:", "microphone", "combo", ["No", "Yes"]), ("Clipboard:", "clipboard", "combo", ["Yes", "No"]), ("Share Home Drive:", "drives", "combo", ["No", "Yes"]), ("Printer Sharing:", "printer", "combo", ["No", "Yes"]), ("COM Ports:", "com_ports", "combo", ["No", "Yes"]), ("USB Devices:", "usb", "combo", ["No", "Yes"]), ("Gateway Mode:", "gateway", "combo", ["Auto", "RPC", "HTTP"]), ("Network Detection:", "network_auto_detect", "combo", ["Yes", "No"]), ] for i, (label, field, widget_type, values) in enumerate(advanced_fields): row = i + 1 # Offset by 1 for description label ttk.Label(advanced_frame, text=label).grid(row=row, column=0, sticky=tk.W, pady=5, padx=(10, 5)) var = tk.StringVar(value=self.initial_data.get(field, values[0])) widget = ttk.Combobox(advanced_frame, textvariable=var, values=values, width=37, state="readonly") widget.grid(row=row, column=1, sticky=tk.W, pady=5, padx=(5, 10)) self.fields[field] = var # Performance fields performance_fields = [ ("Compression:", "compression", "combo", ["Yes", "No"]), ("Compression Level:", "compression_level", "combo", ["0 (None)", "1 (Medium)", "2 (High)"]), ("Bitmap Cache:", "bitmap_cache", "combo", ["Yes", "No"]), ("Offscreen Cache:", "offscreen_cache", "combo", ["Yes", "No"]), ("Font Smoothing:", "fonts", "combo", ["Yes", "No"]), ("Desktop Composition:", "aero", "combo", ["No", "Yes"]), ("Wallpaper:", "wallpaper", "combo", ["No", "Yes"]), ("Themes:", "themes", "combo", ["Yes", "No"]), ("Menu Animations:", "menu_anims", "combo", ["No", "Yes"]), ] for i, (label, field, widget_type, values) in enumerate(performance_fields): row = i + 1 # Offset by 1 for description label ttk.Label(performance_frame, text=label).grid(row=row, column=0, sticky=tk.W, pady=5, padx=(10, 5)) var = tk.StringVar(value=self.initial_data.get(field, values[0])) widget = ttk.Combobox(performance_frame, textvariable=var, values=values, width=37, state="readonly") widget.grid(row=row, column=1, sticky=tk.W, pady=5, padx=(5, 10)) self.fields[field] = var # Buttons button_frame = ttk.Frame(main_frame) button_frame.pack(fill=tk.X) ttk.Button(button_frame, text="Cancel", command=self._cancel).pack(side=tk.RIGHT, padx=(10, 0)) ttk.Button(button_frame, text="Save", command=self._save).pack(side=tk.RIGHT) def _save(self): """Save the connection""" # Validate required fields if not self.fields["name"].get().strip(): messagebox.showerror("Error", "Connection name is required.") return if not self.fields["server"].get().strip(): messagebox.showerror("Error", "Server/IP is required.") return if not self.fields["username"].get().strip(): messagebox.showerror("Error", "Username is required.") return # Collect data self.result = {} for field, var in self.fields.items(): self.result[field] = var.get().strip() self.dialog.destroy() def _cancel(self): """Cancel the dialog""" self.dialog.destroy() def main(): # Check dependencies dependencies = ["/usr/bin/xfreerdp"] missing = [] for dep in dependencies: if not os.path.exists(dep): missing.append(dep) if missing: print("Missing dependencies:") for dep in missing: print(f" - {dep}") print("\nPlease install freerdp package:") print(" sudo apt install freerdp2-x11 # Ubuntu/Debian") print(" sudo dnf install freerdp # Fedora") return 1 try: app = RDPClient() app.run() return 0 except KeyboardInterrupt: print("\nExiting...") return 0 except Exception as e: print(f"Error: {e}") return 1 if __name__ == "__main__": exit(main())