Error: 'RDPClient' object has no attribute '_get_best_monitor_selection' Root cause: Method was accidentally removed during previous code refactoring when adding _get_monitors_combined_resolution method. Fix: - Restored complete _get_best_monitor_selection method with all functionality - Cleaned up duplicate/corrupted code fragments - Removed duplicate _setup_gui method definitions - Added test script to verify method accessibility Verified: - _get_best_monitor_selection(2) returns '1,2' ✅ - _get_monitors_combined_resolution('1,2') returns '3840x1080' ✅ - Both methods properly integrated in RDPClient class ✅ The '2 Monitors' selection should now work without AttributeError.
1516 lines
66 KiB
Python
Executable File
1516 lines
66 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Enhanced RDP Client with Professional GUI
|
|
A modern replacement for the zenity-based RDP script with better UX
|
|
"""
|
|
|
|
# Auto-detect and use virtual environment if available
|
|
import sys
|
|
import os
|
|
|
|
def setup_virtual_env():
|
|
"""Setup virtual environment if needed"""
|
|
script_dir = os.path.dirname(os.path.abspath(__file__))
|
|
venv_python = os.path.join(script_dir, '.venv', 'bin', 'python')
|
|
|
|
# If we're not already in the virtual environment and it exists, restart with venv python
|
|
if os.path.exists(venv_python) and sys.executable != venv_python:
|
|
import subprocess
|
|
# Re-execute this script with the virtual environment Python
|
|
os.execv(venv_python, [venv_python] + sys.argv)
|
|
|
|
# Try to setup virtual environment first
|
|
setup_virtual_env()
|
|
|
|
import tkinter as tk
|
|
from tkinter import ttk, messagebox, simpledialog, filedialog
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import base64
|
|
from cryptography.fernet import Fernet
|
|
from cryptography.hazmat.primitives import hashes
|
|
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
|
|
import threading
|
|
from datetime import datetime
|
|
import shutil
|
|
import logging
|
|
import socket
|
|
import sys
|
|
|
|
class RDPClient:
|
|
def __init__(self):
|
|
self.root = tk.Tk()
|
|
self.root.title("RDP Client - Professional (Press F1 for shortcuts)")
|
|
self.root.geometry("1000x700")
|
|
self.root.minsize(800, 600)
|
|
|
|
# Configuration
|
|
self.config_dir = os.path.expanduser("~/.config/rdp-client")
|
|
self.connections_file = os.path.join(self.config_dir, "connections.json")
|
|
self.credentials_file = os.path.join(self.config_dir, "credentials.json")
|
|
self.history_file = os.path.join(self.config_dir, "history.json")
|
|
self.log_file = os.path.join(self.config_dir, "rdp_client.log")
|
|
|
|
# Create config directory
|
|
os.makedirs(self.config_dir, exist_ok=True)
|
|
|
|
# Setup logging
|
|
self._setup_logging()
|
|
|
|
# Initialize encryption
|
|
self._init_encryption()
|
|
|
|
# Load data
|
|
self.connections = self._load_connections()
|
|
self.credentials = self._load_credentials()
|
|
self.history = self._load_history()
|
|
|
|
# Migrate legacy multimon settings
|
|
self._migrate_multimon_settings()
|
|
|
|
# Add existing connections to history if history is empty (first run or migration)
|
|
if not self.history and self.connections:
|
|
for conn_name in self.connections.keys():
|
|
self._add_to_history(conn_name)
|
|
|
|
# Setup GUI
|
|
self._setup_gui()
|
|
self._refresh_connections_list()
|
|
|
|
# Force refresh after a short delay to ensure GUI is ready
|
|
self.root.after(100, self._refresh_connections_list)
|
|
|
|
self.logger.info("RDP Client initialized successfully")
|
|
|
|
def _setup_logging(self):
|
|
"""Setup logging configuration"""
|
|
self.logger = logging.getLogger('rdp_client')
|
|
self.logger.setLevel(logging.INFO)
|
|
|
|
# Create file handler
|
|
file_handler = logging.FileHandler(self.log_file)
|
|
file_handler.setLevel(logging.INFO)
|
|
|
|
# Create console handler for errors
|
|
console_handler = logging.StreamHandler(sys.stdout)
|
|
console_handler.setLevel(logging.ERROR)
|
|
|
|
# Create formatter
|
|
formatter = logging.Formatter(
|
|
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
file_handler.setFormatter(formatter)
|
|
console_handler.setFormatter(formatter)
|
|
|
|
# Add handlers
|
|
self.logger.addHandler(file_handler)
|
|
self.logger.addHandler(console_handler)
|
|
|
|
# Prevent duplicate logs
|
|
self.logger.propagate = False
|
|
|
|
def _init_encryption(self):
|
|
"""Initialize encryption for password storage"""
|
|
password = f"{os.getenv('USER')}@{os.uname().nodename}".encode()
|
|
salt = b'salt_1234567890' # In production, use random salt
|
|
kdf = PBKDF2HMAC(
|
|
algorithm=hashes.SHA256(),
|
|
length=32,
|
|
salt=salt,
|
|
iterations=100000,
|
|
)
|
|
key = base64.urlsafe_b64encode(kdf.derive(password))
|
|
self.cipher = Fernet(key)
|
|
|
|
def _encrypt_password(self, password):
|
|
"""Encrypt password for secure storage"""
|
|
return self.cipher.encrypt(password.encode()).decode()
|
|
|
|
def _decrypt_password(self, encrypted_password):
|
|
"""Decrypt password from storage"""
|
|
try:
|
|
return self.cipher.decrypt(encrypted_password.encode()).decode()
|
|
except:
|
|
return ""
|
|
|
|
def _load_connections(self):
|
|
"""Load saved connections from file"""
|
|
if os.path.exists(self.connections_file):
|
|
try:
|
|
with open(self.connections_file, 'r') as f:
|
|
return json.load(f)
|
|
except:
|
|
return {}
|
|
return {}
|
|
|
|
def _save_connections(self):
|
|
"""Save connections to file"""
|
|
with open(self.connections_file, 'w') as f:
|
|
json.dump(self.connections, f, indent=2)
|
|
|
|
def _load_credentials(self):
|
|
"""Load saved credentials from file"""
|
|
if os.path.exists(self.credentials_file):
|
|
try:
|
|
with open(self.credentials_file, 'r') as f:
|
|
return json.load(f)
|
|
except:
|
|
return {}
|
|
return {}
|
|
|
|
def _save_credentials(self):
|
|
"""Save credentials to file"""
|
|
with open(self.credentials_file, 'w') as f:
|
|
json.dump(self.credentials, f, indent=2)
|
|
|
|
def _load_history(self):
|
|
"""Load connection history from file"""
|
|
if os.path.exists(self.history_file):
|
|
try:
|
|
with open(self.history_file, 'r') as f:
|
|
return json.load(f)
|
|
except:
|
|
return []
|
|
return []
|
|
|
|
def _save_history(self):
|
|
"""Save connection history to file"""
|
|
with open(self.history_file, 'w') as f:
|
|
json.dump(self.history, f, indent=2)
|
|
|
|
def _add_to_history(self, connection_name):
|
|
"""Add a connection to history"""
|
|
# Remove existing entry if present
|
|
self.history = [h for h in self.history if h.get('name') != connection_name]
|
|
|
|
# Add new entry at the beginning
|
|
history_entry = {
|
|
'name': connection_name,
|
|
'timestamp': datetime.now().isoformat(),
|
|
'count': self._get_connection_count(connection_name) + 1
|
|
}
|
|
self.history.insert(0, history_entry)
|
|
|
|
# Keep only last 20 entries
|
|
self.history = self.history[:20]
|
|
|
|
self._save_history()
|
|
|
|
def _get_connection_count(self, connection_name):
|
|
"""Get the connection count for a specific connection"""
|
|
for entry in self.history:
|
|
if entry.get('name') == connection_name:
|
|
return entry.get('count', 0)
|
|
return 0
|
|
|
|
def _migrate_multimon_settings(self):
|
|
"""Migrate legacy 'Yes' multimon settings to new specific monitor options"""
|
|
changed = False
|
|
for conn_name, conn_data in self.connections.items():
|
|
if conn_data.get("multimon") == "Yes":
|
|
# Default to 2 monitors for legacy "Yes" settings
|
|
conn_data["multimon"] = "2 Monitors"
|
|
changed = True
|
|
self.logger.info(f"Migrated multimon setting for {conn_name} from 'Yes' to '2 Monitors'")
|
|
|
|
if changed:
|
|
self._save_connections()
|
|
self.logger.info("Completed multimon settings migration")
|
|
|
|
def _get_available_monitors_count(self):
|
|
"""Get the number of available monitors"""
|
|
try:
|
|
result = subprocess.run(['xrandr', '--listmonitors'],
|
|
capture_output=True, text=True, timeout=5)
|
|
if result.returncode == 0:
|
|
lines = result.stdout.strip().split('\n')
|
|
if lines and lines[0].startswith('Monitors:'):
|
|
return int(lines[0].split(':')[1].strip())
|
|
except:
|
|
pass
|
|
return 1 # Default to 1 monitor if detection fails
|
|
|
|
def _get_multimon_options(self):
|
|
"""Get available multi-monitor options based on system"""
|
|
monitor_count = self._get_available_monitors_count()
|
|
options = ["No"]
|
|
|
|
# Add options for available monitors
|
|
for i in range(2, min(monitor_count + 1, 5)): # Up to 4 monitors
|
|
options.append(f"{i} Monitors")
|
|
|
|
if monitor_count > 1:
|
|
options.extend(["All Monitors", "Span"])
|
|
|
|
return options
|
|
|
|
def _get_monitors_combined_resolution(self, monitor_list):
|
|
"""Get the combined resolution for selected monitors"""
|
|
try:
|
|
result = subprocess.run(['xfreerdp', '/monitor-list'],
|
|
capture_output=True, text=True, timeout=5)
|
|
if result.returncode == 0:
|
|
lines = result.stdout.strip().split('\n')
|
|
selected_monitors = [int(x.strip()) for x in monitor_list.split(',')]
|
|
|
|
min_x = float('inf')
|
|
max_x = 0
|
|
max_y = 0
|
|
|
|
for line in lines:
|
|
cleaned_line = line.strip().replace('*', '').strip()
|
|
if '[' in cleaned_line and ']' in cleaned_line:
|
|
parts = cleaned_line.split()
|
|
if len(parts) >= 3:
|
|
id_part = parts[0]
|
|
res_part = parts[1] # 1920x1080
|
|
pos_part = parts[2] # +3840+0
|
|
|
|
if '[' in id_part and ']' in id_part:
|
|
monitor_id = int(id_part.strip('[]'))
|
|
if monitor_id in selected_monitors:
|
|
# Parse resolution
|
|
width, height = map(int, res_part.split('x'))
|
|
# Parse position
|
|
pos_coords = pos_part.split('+')[1:] # ['3840', '0']
|
|
x_pos = int(pos_coords[0])
|
|
|
|
min_x = min(min_x, x_pos)
|
|
max_x = max(max_x, x_pos + width)
|
|
max_y = max(max_y, height)
|
|
|
|
if min_x != float('inf'):
|
|
total_width = max_x - min_x
|
|
total_height = max_y
|
|
return f"{total_width}x{total_height}"
|
|
except Exception as e:
|
|
self.logger.error(f"Error calculating combined resolution: {e}")
|
|
|
|
# Fallback to a reasonable default
|
|
return "3840x1080" # Assume dual 1920x1080 monitors
|
|
|
|
def _get_best_monitor_selection(self, count):
|
|
"""Get the best monitor selection based on layout"""
|
|
try:
|
|
result = subprocess.run(['xfreerdp', '/monitor-list'],
|
|
capture_output=True, text=True, timeout=5)
|
|
if result.returncode == 0:
|
|
lines = result.stdout.strip().split('\n')
|
|
monitors = []
|
|
primary_monitor = None
|
|
|
|
for line in lines:
|
|
# Clean up the line and split by whitespace/tabs
|
|
cleaned_line = line.strip()
|
|
is_primary = cleaned_line.startswith('*')
|
|
cleaned_line = cleaned_line.replace('*', '').strip()
|
|
|
|
if '[' in cleaned_line and ']' in cleaned_line and 'x' in cleaned_line and '+' in cleaned_line:
|
|
# Split by whitespace/tabs
|
|
parts = cleaned_line.split()
|
|
if len(parts) >= 3:
|
|
id_part = parts[0] # [0]
|
|
pos_part = parts[2] # +3840+0
|
|
if '[' in id_part and ']' in id_part:
|
|
monitor_id = int(id_part.strip('[]'))
|
|
x_pos = int(pos_part.split('+')[1])
|
|
monitor_info = (monitor_id, x_pos, is_primary)
|
|
monitors.append(monitor_info)
|
|
|
|
if is_primary:
|
|
primary_monitor = monitor_id
|
|
|
|
# Sort monitors by X position (left to right)
|
|
monitors.sort(key=lambda x: x[1])
|
|
|
|
# For debugging, log the monitor layout
|
|
self.logger.info(f"Monitor layout detected: {[(m[0], m[1], 'primary' if m[2] else 'secondary') for m in monitors]}")
|
|
|
|
# Return the leftmost monitors (excluding position and primary flag)
|
|
selected = [str(m[0]) for m in monitors[:count]]
|
|
selected_str = ','.join(selected)
|
|
|
|
self.logger.info(f"Selected {count} monitors: {selected_str}")
|
|
return selected_str
|
|
except Exception as e:
|
|
self.logger.error(f"Error detecting monitors: {e}")
|
|
|
|
# Fallback: simple sequential selection
|
|
fallback = ','.join([str(i) for i in range(count)])
|
|
self.logger.warning(f"Using fallback monitor selection: {fallback}")
|
|
return fallback
|
|
|
|
def _setup_gui(self):
|
|
"""Setup the main GUI"""
|
|
# Configure style
|
|
style = ttk.Style()
|
|
style.theme_use('clam')
|
|
|
|
# Main container
|
|
main_frame = ttk.Frame(self.root, padding="10")
|
|
main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
|
|
|
|
# Configure grid weights
|
|
self.root.columnconfigure(0, weight=1)
|
|
self.root.rowconfigure(0, weight=1)
|
|
main_frame.columnconfigure(1, weight=1)
|
|
main_frame.rowconfigure(1, weight=1)
|
|
|
|
# Title
|
|
title_label = ttk.Label(main_frame, text="RDP Client - Professional",
|
|
font=('Arial', 16, 'bold'))
|
|
title_label.grid(row=0, column=0, columnspan=3, pady=(0, 20))
|
|
|
|
# Left panel - Connections
|
|
left_frame = ttk.Frame(main_frame)
|
|
left_frame.grid(row=1, column=0, sticky=(tk.W, tk.E, tk.N, tk.S), padx=(0, 10))
|
|
left_frame.rowconfigure(1, weight=1)
|
|
left_frame.rowconfigure(3, weight=1)
|
|
left_frame.columnconfigure(0, weight=1)
|
|
|
|
# Recent Connections
|
|
recent_label = ttk.Label(left_frame, text="Recent Connections", font=('Arial', 10, 'bold'))
|
|
recent_label.grid(row=0, column=0, sticky=tk.W, pady=(0, 5))
|
|
|
|
# Recent connections frame with listbox and scrollbar
|
|
recent_frame = ttk.Frame(left_frame)
|
|
recent_frame.grid(row=1, column=0, sticky=(tk.W, tk.E, tk.N, tk.S), pady=(0, 10))
|
|
recent_frame.rowconfigure(0, weight=1)
|
|
recent_frame.columnconfigure(0, weight=1)
|
|
|
|
self.recent_listbox = tk.Listbox(recent_frame, font=('Arial', 9), height=6, width=30)
|
|
recent_scrollbar = ttk.Scrollbar(recent_frame, orient="vertical", command=self.recent_listbox.yview)
|
|
self.recent_listbox.configure(yscrollcommand=recent_scrollbar.set)
|
|
|
|
self.recent_listbox.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
|
|
recent_scrollbar.grid(row=0, column=1, sticky=(tk.N, tk.S))
|
|
|
|
# Saved Connections
|
|
saved_label = ttk.Label(left_frame, text="Saved Connections", font=('Arial', 10, 'bold'))
|
|
saved_label.grid(row=2, column=0, sticky=tk.W, pady=(10, 5))
|
|
|
|
# Saved connections frame with listbox and scrollbar
|
|
saved_frame = ttk.Frame(left_frame)
|
|
saved_frame.grid(row=3, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
|
|
saved_frame.rowconfigure(0, weight=1)
|
|
saved_frame.columnconfigure(0, weight=1)
|
|
|
|
self.connections_listbox = tk.Listbox(saved_frame, font=('Arial', 9), width=30)
|
|
scrollbar = ttk.Scrollbar(saved_frame, orient="vertical", command=self.connections_listbox.yview)
|
|
self.connections_listbox.configure(yscrollcommand=scrollbar.set)
|
|
|
|
self.connections_listbox.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
|
|
scrollbar.grid(row=0, column=1, sticky=(tk.N, tk.S))
|
|
|
|
# Connection buttons
|
|
conn_buttons_frame = ttk.Frame(left_frame)
|
|
conn_buttons_frame.grid(row=4, column=0, pady=(10, 0), sticky=(tk.W, tk.E))
|
|
|
|
ttk.Button(conn_buttons_frame, text="Connect",
|
|
command=self._connect_selected).pack(side=tk.LEFT, padx=(0, 5))
|
|
ttk.Button(conn_buttons_frame, text="Edit",
|
|
command=self._edit_selected).pack(side=tk.LEFT, padx=(0, 5))
|
|
ttk.Button(conn_buttons_frame, text="Delete",
|
|
command=self._delete_selected).pack(side=tk.LEFT)
|
|
|
|
# Right panel - Actions and Details
|
|
right_frame = ttk.Frame(main_frame)
|
|
right_frame.grid(row=1, column=1, sticky=(tk.W, tk.E, tk.N, tk.S))
|
|
right_frame.rowconfigure(1, weight=1)
|
|
right_frame.columnconfigure(0, weight=1)
|
|
|
|
# Actions frame
|
|
actions_frame = ttk.LabelFrame(right_frame, text="Actions", padding="10")
|
|
actions_frame.grid(row=0, column=0, sticky=(tk.W, tk.E), pady=(0, 10))
|
|
|
|
ttk.Button(actions_frame, text="New Connection",
|
|
command=self._new_connection, width=20).pack(pady=2)
|
|
ttk.Button(actions_frame, text="Test Connection",
|
|
command=self._test_selected_connection, width=20).pack(pady=2)
|
|
ttk.Button(actions_frame, text="Import Connections",
|
|
command=self._import_connections, width=20).pack(pady=2)
|
|
ttk.Button(actions_frame, text="Export Connections",
|
|
command=self._export_connections, width=20).pack(pady=2)
|
|
ttk.Button(actions_frame, text="Clear All Credentials",
|
|
command=self._clear_credentials, width=20).pack(pady=2)
|
|
|
|
# Details frame
|
|
details_frame = ttk.LabelFrame(right_frame, text="Connection Details", padding="10")
|
|
details_frame.grid(row=1, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
|
|
details_frame.columnconfigure(1, weight=1)
|
|
|
|
# Details labels
|
|
self.details_labels = {}
|
|
details_fields = [
|
|
("Server:", "server"),
|
|
("Username:", "username"),
|
|
("Domain:", "domain"),
|
|
("Resolution:", "resolution"),
|
|
("Color Depth:", "color_depth"),
|
|
("Multi-Monitor:", "multimon"),
|
|
("Sound:", "sound"),
|
|
("Clipboard:", "clipboard"),
|
|
("Drive Sharing:", "drives"),
|
|
("Created:", "created")
|
|
]
|
|
|
|
for i, (label, field) in enumerate(details_fields):
|
|
ttk.Label(details_frame, text=label, font=('Arial', 9, 'bold')).grid(
|
|
row=i, column=0, sticky=tk.W, pady=2)
|
|
value_label = ttk.Label(details_frame, text="", font=('Arial', 9))
|
|
value_label.grid(row=i, column=1, sticky=tk.W, padx=(10, 0), pady=2)
|
|
self.details_labels[field] = value_label
|
|
|
|
# Bind listbox selection
|
|
self.connections_listbox.bind('<<ListboxSelect>>', self._on_connection_select)
|
|
self.connections_listbox.bind('<Double-Button-1>', self._connect_selected)
|
|
self.recent_listbox.bind('<<ListboxSelect>>', self._on_recent_select)
|
|
self.recent_listbox.bind('<Double-Button-1>', self._connect_recent)
|
|
|
|
# Keyboard shortcuts
|
|
self._setup_keyboard_shortcuts()
|
|
|
|
# Bottom frame - Status
|
|
status_frame = ttk.Frame(main_frame)
|
|
status_frame.grid(row=2, column=0, columnspan=3, sticky=(tk.W, tk.E), pady=(10, 0))
|
|
status_frame.columnconfigure(0, weight=1)
|
|
|
|
self.status_var = tk.StringVar(value="Ready")
|
|
status_label = ttk.Label(status_frame, textvariable=self.status_var,
|
|
font=('Arial', 9), foreground='gray')
|
|
status_label.grid(row=0, column=0, sticky=tk.W)
|
|
|
|
# Exit button
|
|
ttk.Button(status_frame, text="Exit",
|
|
command=self.root.quit).grid(row=0, column=1, sticky=tk.E)
|
|
|
|
def _setup_keyboard_shortcuts(self):
|
|
"""Setup keyboard shortcuts for the application"""
|
|
# Global shortcuts
|
|
self.root.bind('<Control-n>', lambda e: self._new_connection())
|
|
self.root.bind('<Control-o>', lambda e: self._import_connections())
|
|
self.root.bind('<Control-s>', lambda e: self._export_connections())
|
|
self.root.bind('<Control-t>', lambda e: self._test_selected_connection())
|
|
self.root.bind('<F5>', lambda e: self._refresh_connections_list())
|
|
self.root.bind('<Control-q>', lambda e: self.root.quit())
|
|
self.root.bind('<Escape>', lambda e: self.root.quit())
|
|
self.root.bind('<F1>', lambda e: self._show_help())
|
|
|
|
# Listbox specific shortcuts
|
|
self.connections_listbox.bind('<Return>', lambda e: self._connect_selected())
|
|
self.connections_listbox.bind('<Delete>', lambda e: self._delete_selected())
|
|
self.connections_listbox.bind('<F2>', lambda e: self._edit_selected())
|
|
self.connections_listbox.bind('<Control-t>', lambda e: self._test_selected_connection())
|
|
|
|
self.recent_listbox.bind('<Return>', lambda e: self._connect_selected())
|
|
self.recent_listbox.bind('<Delete>', lambda e: self._delete_selected())
|
|
self.recent_listbox.bind('<F2>', lambda e: self._edit_selected())
|
|
self.recent_listbox.bind('<Control-t>', lambda e: self._test_selected_connection())
|
|
|
|
# Set focus handling
|
|
self.root.bind('<Tab>', self._handle_tab_focus)
|
|
self.root.bind('<Shift-Tab>', self._handle_shift_tab_focus)
|
|
|
|
# Add tooltip information for shortcuts
|
|
self._add_keyboard_shortcuts_info()
|
|
|
|
def _handle_tab_focus(self, event):
|
|
"""Handle Tab key for focus navigation"""
|
|
current_focus = self.root.focus_get()
|
|
|
|
if current_focus == self.recent_listbox:
|
|
self.connections_listbox.focus_set()
|
|
if self.connections_listbox.size() > 0:
|
|
self.connections_listbox.selection_set(0)
|
|
elif current_focus == self.connections_listbox:
|
|
self.recent_listbox.focus_set()
|
|
if self.recent_listbox.size() > 0:
|
|
self.recent_listbox.selection_set(0)
|
|
else:
|
|
self.recent_listbox.focus_set()
|
|
if self.recent_listbox.size() > 0:
|
|
self.recent_listbox.selection_set(0)
|
|
|
|
return "break" # Prevent default Tab behavior
|
|
|
|
def _handle_shift_tab_focus(self, event):
|
|
"""Handle Shift+Tab key for reverse focus navigation"""
|
|
current_focus = self.root.focus_get()
|
|
|
|
if current_focus == self.connections_listbox:
|
|
self.recent_listbox.focus_set()
|
|
if self.recent_listbox.size() > 0:
|
|
self.recent_listbox.selection_set(0)
|
|
elif current_focus == self.recent_listbox:
|
|
self.connections_listbox.focus_set()
|
|
if self.connections_listbox.size() > 0:
|
|
self.connections_listbox.selection_set(0)
|
|
else:
|
|
self.connections_listbox.focus_set()
|
|
if self.connections_listbox.size() > 0:
|
|
self.connections_listbox.selection_set(0)
|
|
|
|
return "break" # Prevent default Shift+Tab behavior
|
|
|
|
def _add_keyboard_shortcuts_info(self):
|
|
"""Add keyboard shortcuts information to the status bar or help"""
|
|
# You could create a help dialog or status tooltip here
|
|
# For now, we'll add it to the window title when focused
|
|
def show_shortcuts_hint(event):
|
|
self.status_var.set("Shortcuts: Ctrl+N=New, Enter=Connect, Del=Delete, F2=Edit, Ctrl+T=Test, F5=Refresh, Ctrl+Q=Quit")
|
|
|
|
def clear_shortcuts_hint(event):
|
|
self.status_var.set("Ready")
|
|
|
|
# Show shortcuts hint when certain widgets get focus
|
|
self.connections_listbox.bind('<FocusIn>', show_shortcuts_hint)
|
|
self.recent_listbox.bind('<FocusIn>', show_shortcuts_hint)
|
|
self.connections_listbox.bind('<FocusOut>', clear_shortcuts_hint)
|
|
self.recent_listbox.bind('<FocusOut>', clear_shortcuts_hint)
|
|
|
|
def _show_help(self):
|
|
"""Show keyboard shortcuts help dialog"""
|
|
help_text = """RDP Client - Keyboard Shortcuts
|
|
|
|
Global Shortcuts:
|
|
• Ctrl+N - Create new connection
|
|
• Ctrl+O - Import connections from file
|
|
• Ctrl+S - Export connections to file
|
|
• Ctrl+T - Test selected connection
|
|
• F5 - Refresh connections list
|
|
• Ctrl+Q - Quit application
|
|
• Escape - Quit application
|
|
• F1 - Show this help
|
|
|
|
Connection List Shortcuts:
|
|
• Enter - Connect to selected connection
|
|
• Double-click - Connect to selected connection
|
|
• Delete - Delete selected connection
|
|
• F2 - Edit selected connection
|
|
• Tab - Switch between Recent and Saved connections
|
|
|
|
Navigation:
|
|
• Tab - Move between Recent and Saved connections
|
|
• Shift+Tab - Move between connections (reverse)
|
|
• Arrow keys - Navigate within lists
|
|
|
|
Tips:
|
|
• Recent connections show usage count (e.g., "Server (5x)")
|
|
• Double-click any connection to connect quickly
|
|
• Use Test Connection to verify server availability
|
|
• Import/Export to backup or share connection profiles
|
|
|
|
Multi-Monitor Support:
|
|
• "2 Monitors" - Use first 2 monitors (0,1)
|
|
• "3 Monitors" - Use first 3 monitors (0,1,2)
|
|
• "4 Monitors" - Use first 4 monitors (0,1,2,3)
|
|
• "All Monitors" - Use all available monitors
|
|
• "Span" - Span desktop across monitors
|
|
• "No" - Single monitor only"""
|
|
|
|
messagebox.showinfo("Keyboard Shortcuts", help_text)
|
|
|
|
def _refresh_connections_list(self):
|
|
"""Refresh the connections listbox"""
|
|
self.connections_listbox.delete(0, tk.END)
|
|
for name in sorted(self.connections.keys()):
|
|
self.connections_listbox.insert(tk.END, name)
|
|
|
|
# Refresh recent connections
|
|
self.recent_listbox.delete(0, tk.END)
|
|
for entry in self.history[:10]: # Show last 10 recent connections
|
|
name = entry.get('name', '')
|
|
count = entry.get('count', 0)
|
|
if name in self.connections: # Only show if connection still exists
|
|
display_text = f"{name} ({count}x)"
|
|
self.recent_listbox.insert(tk.END, display_text)
|
|
|
|
# Clear details if no connections
|
|
if not self.connections:
|
|
for label in self.details_labels.values():
|
|
label.config(text="")
|
|
|
|
def _on_recent_select(self, event=None):
|
|
"""Handle recent connection selection"""
|
|
selection = self.recent_listbox.curselection()
|
|
if selection:
|
|
display_text = self.recent_listbox.get(selection[0])
|
|
# Extract connection name (everything before " (")
|
|
name = display_text.split(' (')[0]
|
|
if name in self.connections:
|
|
# Clear saved connections selection
|
|
self.connections_listbox.selection_clear(0, tk.END)
|
|
# Update details
|
|
conn = self.connections[name]
|
|
self._update_details(conn)
|
|
|
|
def _connect_recent(self, event=None):
|
|
"""Connect to selected recent connection"""
|
|
selection = self.recent_listbox.curselection()
|
|
if not selection:
|
|
return
|
|
|
|
display_text = self.recent_listbox.get(selection[0])
|
|
name = display_text.split(' (')[0]
|
|
if name in self.connections:
|
|
self._connect_to(name)
|
|
|
|
def _update_details(self, conn):
|
|
"""Update the details panel with connection info"""
|
|
self.details_labels["server"].config(text=conn.get("server", ""))
|
|
self.details_labels["username"].config(text=conn.get("username", ""))
|
|
self.details_labels["domain"].config(text=conn.get("domain", "N/A"))
|
|
self.details_labels["resolution"].config(text=conn.get("resolution", "1920x1080"))
|
|
self.details_labels["color_depth"].config(text=f"{conn.get('color_depth', 32)}-bit")
|
|
self.details_labels["multimon"].config(text=conn.get("multimon", "No"))
|
|
self.details_labels["sound"].config(text=conn.get("sound", "Yes"))
|
|
self.details_labels["clipboard"].config(text=conn.get("clipboard", "Yes"))
|
|
self.details_labels["drives"].config(text=conn.get("drives", "No"))
|
|
self.details_labels["created"].config(text=conn.get("created", "Unknown"))
|
|
|
|
def _on_connection_select(self, event=None):
|
|
"""Handle connection selection"""
|
|
selection = self.connections_listbox.curselection()
|
|
if selection:
|
|
name = self.connections_listbox.get(selection[0])
|
|
conn = self.connections[name]
|
|
|
|
# Clear recent connections selection
|
|
self.recent_listbox.selection_clear(0, tk.END)
|
|
# Update details
|
|
self._update_details(conn)
|
|
|
|
def _new_connection(self):
|
|
"""Create a new connection"""
|
|
dialog = ConnectionDialog(self.root, "New Connection", rdp_client=self)
|
|
if dialog.result:
|
|
name = dialog.result["name"]
|
|
if name in self.connections:
|
|
if not messagebox.askyesno("Overwrite", f"Connection '{name}' already exists. Overwrite?"):
|
|
return
|
|
|
|
# Save connection
|
|
conn_data = dialog.result.copy()
|
|
del conn_data["name"]
|
|
del conn_data["password"]
|
|
conn_data["created"] = datetime.now().strftime("%Y-%m-%d %H:%M")
|
|
|
|
self.connections[name] = conn_data
|
|
self._save_connections()
|
|
|
|
# Save credentials if provided
|
|
if dialog.result["password"]:
|
|
if name not in self.credentials:
|
|
self.credentials[name] = {}
|
|
self.credentials[name]["password"] = self._encrypt_password(dialog.result["password"])
|
|
self._save_credentials()
|
|
|
|
# Add new connection to history so it appears in recent connections
|
|
self._add_to_history(name)
|
|
|
|
self._refresh_connections_list()
|
|
self.status_var.set(f"Connection '{name}' saved successfully")
|
|
|
|
def _edit_selected(self):
|
|
"""Edit selected connection"""
|
|
# Check which listbox has a selection
|
|
saved_selection = self.connections_listbox.curselection()
|
|
recent_selection = self.recent_listbox.curselection()
|
|
|
|
name = None
|
|
if saved_selection:
|
|
name = self.connections_listbox.get(saved_selection[0])
|
|
elif recent_selection:
|
|
display_text = self.recent_listbox.get(recent_selection[0])
|
|
name = display_text.split(' (')[0]
|
|
|
|
if not name:
|
|
messagebox.showwarning("No Selection", "Please select a connection to edit.")
|
|
return
|
|
|
|
if name not in self.connections:
|
|
messagebox.showerror("Error", f"Connection '{name}' not found.")
|
|
return
|
|
conn = self.connections[name].copy()
|
|
|
|
# Get saved password if exists
|
|
password = ""
|
|
if name in self.credentials and "password" in self.credentials[name]:
|
|
password = self._decrypt_password(self.credentials[name]["password"])
|
|
|
|
conn["name"] = name
|
|
conn["password"] = password
|
|
|
|
dialog = ConnectionDialog(self.root, f"Edit Connection: {name}", conn, rdp_client=self)
|
|
if dialog.result:
|
|
new_name = dialog.result["name"]
|
|
|
|
# Handle name change
|
|
if new_name != name:
|
|
if new_name in self.connections:
|
|
if not messagebox.askyesno("Overwrite", f"Connection '{new_name}' already exists. Overwrite?"):
|
|
return
|
|
# Remove old connection
|
|
del self.connections[name]
|
|
if name in self.credentials:
|
|
del self.credentials[name]
|
|
|
|
# Save updated connection
|
|
conn_data = dialog.result.copy()
|
|
del conn_data["name"]
|
|
del conn_data["password"]
|
|
conn_data["created"] = self.connections.get(name, {}).get("created",
|
|
datetime.now().strftime("%Y-%m-%d %H:%M"))
|
|
conn_data["modified"] = datetime.now().strftime("%Y-%m-%d %H:%M")
|
|
|
|
self.connections[new_name] = conn_data
|
|
self._save_connections()
|
|
|
|
# Save credentials
|
|
if dialog.result["password"]:
|
|
if new_name not in self.credentials:
|
|
self.credentials[new_name] = {}
|
|
self.credentials[new_name]["password"] = self._encrypt_password(dialog.result["password"])
|
|
self._save_credentials()
|
|
|
|
# Update history for the new/updated connection
|
|
self._add_to_history(new_name)
|
|
|
|
self._refresh_connections_list()
|
|
self.status_var.set(f"Connection '{new_name}' updated successfully")
|
|
|
|
def _delete_selected(self):
|
|
"""Delete selected connection"""
|
|
# Check which listbox has a selection
|
|
saved_selection = self.connections_listbox.curselection()
|
|
recent_selection = self.recent_listbox.curselection()
|
|
|
|
name = None
|
|
if saved_selection:
|
|
name = self.connections_listbox.get(saved_selection[0])
|
|
elif recent_selection:
|
|
display_text = self.recent_listbox.get(recent_selection[0])
|
|
name = display_text.split(' (')[0]
|
|
|
|
if not name:
|
|
messagebox.showwarning("No Selection", "Please select a connection to delete.")
|
|
return
|
|
|
|
if name not in self.connections:
|
|
messagebox.showerror("Error", f"Connection '{name}' not found.")
|
|
return
|
|
if messagebox.askyesno("Confirm Delete", f"Delete connection '{name}'?"):
|
|
del self.connections[name]
|
|
if name in self.credentials:
|
|
del self.credentials[name]
|
|
|
|
self._save_connections()
|
|
self._save_credentials()
|
|
self._refresh_connections_list()
|
|
|
|
# Clear details
|
|
for label in self.details_labels.values():
|
|
label.config(text="")
|
|
|
|
self.status_var.set(f"Connection '{name}' deleted")
|
|
|
|
def _connect_selected(self, event=None):
|
|
"""Connect to selected connection"""
|
|
# Check which listbox has a selection
|
|
saved_selection = self.connections_listbox.curselection()
|
|
recent_selection = self.recent_listbox.curselection()
|
|
|
|
if saved_selection:
|
|
name = self.connections_listbox.get(saved_selection[0])
|
|
self._connect_to(name)
|
|
elif recent_selection:
|
|
display_text = self.recent_listbox.get(recent_selection[0])
|
|
name = display_text.split(' (')[0]
|
|
if name in self.connections:
|
|
self._connect_to(name)
|
|
else:
|
|
messagebox.showwarning("No Selection", "Please select a connection to connect.")
|
|
|
|
def _connect_to(self, name):
|
|
"""Connect to a specific connection"""
|
|
if name not in self.connections:
|
|
messagebox.showerror("Error", f"Connection '{name}' not found.")
|
|
return
|
|
|
|
conn = self.connections[name]
|
|
|
|
# Get password
|
|
password = ""
|
|
if name in self.credentials and "password" in self.credentials[name]:
|
|
password = self._decrypt_password(self.credentials[name]["password"])
|
|
else:
|
|
password = simpledialog.askstring("Password", f"Enter password for {conn['username']}@{conn['server']}:",
|
|
show='*')
|
|
if not password:
|
|
return
|
|
|
|
# Build and execute RDP command
|
|
self._add_to_history(name)
|
|
self._execute_rdp_connection(conn, password)
|
|
self._refresh_connections_list() # Refresh to update recent connections
|
|
|
|
def _execute_rdp_connection(self, conn, password):
|
|
"""Execute the RDP connection in a separate thread"""
|
|
def connect():
|
|
try:
|
|
server = conn['server']
|
|
username = conn['username']
|
|
|
|
self.logger.info(f"Attempting RDP connection to {server} as {username}")
|
|
self.status_var.set(f"Connecting to {server}...")
|
|
|
|
# Test connectivity first
|
|
if not self._test_server_connectivity(server):
|
|
error_msg = f"Cannot reach server {server}. Please check the server address and your network connection."
|
|
self.logger.error(f"Connectivity test failed for {server}")
|
|
self.root.after(0, lambda: self.status_var.set("Connection failed - Server unreachable"))
|
|
self.root.after(0, lambda: messagebox.showerror("Connection Error", error_msg))
|
|
return
|
|
|
|
# Build xfreerdp command
|
|
cmd = ["/usr/bin/xfreerdp"]
|
|
|
|
# Basic options
|
|
cmd.extend(["+window-drag", "+smart-sizing", "/cert-ignore"])
|
|
|
|
# Server and authentication
|
|
cmd.append(f"/v:{server}")
|
|
cmd.append(f"/u:{username}")
|
|
cmd.append(f"/p:{password}")
|
|
|
|
if conn.get("domain"):
|
|
cmd.append(f"/d:{conn['domain']}")
|
|
|
|
# Check monitor configuration first to determine resolution handling
|
|
multimon = conn.get("multimon", "No")
|
|
use_specific_monitors = multimon in ["2 Monitors", "3 Monitors", "4 Monitors"]
|
|
|
|
# Get monitor list once if needed
|
|
monitor_list = None
|
|
if use_specific_monitors:
|
|
if multimon == "2 Monitors":
|
|
monitor_list = self._get_best_monitor_selection(2)
|
|
elif multimon == "3 Monitors":
|
|
monitor_list = self._get_best_monitor_selection(3)
|
|
elif multimon == "4 Monitors":
|
|
monitor_list = self._get_best_monitor_selection(4)
|
|
|
|
# Resolution - calculate proper size for specific monitor selection
|
|
resolution = conn.get("resolution", "1920x1080")
|
|
if resolution == "Full Screen" and not use_specific_monitors:
|
|
cmd.append("/f")
|
|
elif resolution == "Full Screen" and use_specific_monitors:
|
|
# Calculate combined resolution for selected monitors
|
|
combined_res = self._get_monitors_combined_resolution(monitor_list)
|
|
cmd.append(f"/size:{combined_res}")
|
|
self.logger.info(f"Using calculated resolution {combined_res} for {multimon}: {monitor_list}")
|
|
else:
|
|
cmd.append(f"/size:{resolution}")
|
|
|
|
# Color depth
|
|
color_depth = conn.get("color_depth", 32)
|
|
cmd.append(f"/bpp:{color_depth}")
|
|
|
|
# Multiple monitors - use /monitors: for specific selection
|
|
if use_specific_monitors:
|
|
cmd.append(f"/monitors:{monitor_list}")
|
|
self.logger.info(f"Using specific monitors for {multimon}: {monitor_list}")
|
|
elif multimon == "All Monitors":
|
|
cmd.append("/multimon")
|
|
self.logger.info("Using all available monitors")
|
|
elif multimon == "Span":
|
|
cmd.append("/span")
|
|
self.logger.info("Using span mode across monitors")
|
|
|
|
# Sound
|
|
sound = conn.get("sound", "Yes")
|
|
if sound == "Yes":
|
|
cmd.append("/sound:sys")
|
|
elif sound == "Remote":
|
|
cmd.append("/sound:local")
|
|
|
|
# Microphone
|
|
if conn.get("microphone", "No") == "Yes":
|
|
cmd.append("/microphone")
|
|
|
|
# Clipboard
|
|
if conn.get("clipboard", "Yes") == "Yes":
|
|
cmd.append("/clipboard")
|
|
|
|
# Drive sharing
|
|
if conn.get("drives", "No") == "Yes":
|
|
cmd.append(f"/drive:home,{os.path.expanduser('~')}")
|
|
|
|
# Performance options
|
|
if conn.get("compression", "Yes") == "Yes":
|
|
cmd.append("/compression")
|
|
else:
|
|
cmd.append("-compression")
|
|
|
|
# Compression level
|
|
comp_level = conn.get("compression_level", "1 (Medium)")
|
|
if "0" in comp_level:
|
|
cmd.append("/compression-level:0")
|
|
elif "1" in comp_level:
|
|
cmd.append("/compression-level:1")
|
|
elif "2" in comp_level:
|
|
cmd.append("/compression-level:2")
|
|
|
|
# Bitmap cache
|
|
if conn.get("bitmap_cache", "Yes") == "Yes":
|
|
cmd.append("+bitmap-cache")
|
|
else:
|
|
cmd.append("-bitmap-cache")
|
|
|
|
# Offscreen cache
|
|
if conn.get("offscreen_cache", "Yes") == "Yes":
|
|
cmd.append("+offscreen-cache")
|
|
else:
|
|
cmd.append("-offscreen-cache")
|
|
|
|
# Font smoothing
|
|
if conn.get("fonts", "Yes") == "No":
|
|
cmd.append("-fonts")
|
|
|
|
# Desktop composition (Aero)
|
|
if conn.get("aero", "No") == "Yes":
|
|
cmd.append("+aero")
|
|
else:
|
|
cmd.append("-aero")
|
|
|
|
# Wallpaper
|
|
if conn.get("wallpaper", "No") == "No":
|
|
cmd.append("-wallpaper")
|
|
|
|
# Themes
|
|
if conn.get("themes", "Yes") == "No":
|
|
cmd.append("-themes")
|
|
|
|
# Menu animations
|
|
if conn.get("menu_anims", "No") == "No":
|
|
cmd.append("-menu-anims")
|
|
|
|
# Advanced options
|
|
if conn.get("printer", "No") == "Yes":
|
|
cmd.append("/printer")
|
|
|
|
if conn.get("com_ports", "No") == "Yes":
|
|
cmd.append("/serial")
|
|
|
|
if conn.get("usb", "No") == "Yes":
|
|
cmd.append("/usb")
|
|
|
|
# Network options
|
|
if conn.get("network_auto_detect", "Yes") == "No":
|
|
cmd.append("-network-auto-detect")
|
|
|
|
# Execute
|
|
cmd_str = ' '.join(cmd).replace(f"/p:{password}", "/p:***") # Hide password in logs
|
|
self.logger.info(f"Executing RDP command: {cmd_str}")
|
|
|
|
# Run xfreerdp without capturing output to allow interactive authentication
|
|
# This matches how the bash script runs xfreerdp with eval
|
|
result = subprocess.run(cmd)
|
|
|
|
# Since we're not capturing output, we can't get detailed error info
|
|
# but this allows xfreerdp to run interactively like the bash script
|
|
if result.returncode == 0:
|
|
self.logger.info(f"RDP connection to {server} completed successfully")
|
|
self.root.after(0, lambda: self.status_var.set("Connection completed successfully"))
|
|
else:
|
|
error_msg = f"RDP connection failed with exit code {result.returncode}. Check credentials and server settings."
|
|
self.logger.error(f"RDP connection failed with exit code: {result.returncode}")
|
|
self.root.after(0, lambda: self.status_var.set("Connection failed"))
|
|
self.root.after(0, lambda: messagebox.showerror("Connection Error", error_msg))
|
|
except FileNotFoundError:
|
|
error_msg = "xfreerdp is not installed or not found in PATH. Please install the freerdp package."
|
|
self.logger.error("xfreerdp not found")
|
|
self.root.after(0, lambda: self.status_var.set("xfreerdp not found"))
|
|
self.root.after(0, lambda: messagebox.showerror("Missing Dependency", error_msg))
|
|
except Exception as e:
|
|
error_msg = f"Unexpected error during connection: {str(e)}"
|
|
self.logger.error(f"Unexpected RDP connection error: {str(e)}", exc_info=True)
|
|
self.root.after(0, lambda: self.status_var.set(f"Error: {str(e)}"))
|
|
self.root.after(0, lambda: messagebox.showerror("Connection Error", error_msg))
|
|
|
|
# Start connection in separate thread
|
|
thread = threading.Thread(target=connect, daemon=True)
|
|
thread.start()
|
|
|
|
def _test_server_connectivity(self, server):
|
|
"""Test if server is reachable"""
|
|
try:
|
|
# Extract hostname/IP from server (remove port if present)
|
|
host = server.split(':')[0]
|
|
|
|
# Try to establish a TCP connection to port 3389 (RDP port)
|
|
port = 3389
|
|
if ':' in server:
|
|
try:
|
|
port = int(server.split(':')[1])
|
|
except ValueError:
|
|
port = 3389
|
|
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
sock.settimeout(5) # 5 second timeout
|
|
result = sock.connect_ex((host, port))
|
|
sock.close()
|
|
|
|
return result == 0
|
|
except Exception as e:
|
|
self.logger.warning(f"Connectivity test failed for {server}: {str(e)}")
|
|
return False
|
|
|
|
def _parse_rdp_error(self, stderr, returncode):
|
|
"""Parse RDP error messages and provide user-friendly descriptions"""
|
|
error_messages = {
|
|
1: "General connection error. Please check your credentials and server address.",
|
|
2: "Invalid command line arguments or configuration.",
|
|
3: "Authentication failed. Please check your username and password.",
|
|
4: "Connection refused. The server may not allow RDP connections.",
|
|
5: "Connection timeout. The server may be offline or unreachable.",
|
|
131: "Authentication failed. Please verify your credentials.",
|
|
132: "Connection was refused by the server.",
|
|
133: "Logon failure. Please check your username, password, and domain."
|
|
}
|
|
|
|
# Check for specific error patterns in stderr
|
|
stderr_lower = stderr.lower()
|
|
if "authentication" in stderr_lower or "logon" in stderr_lower:
|
|
return "Authentication failed. Please verify your username, password, and domain settings."
|
|
elif "connection" in stderr_lower and "refused" in stderr_lower:
|
|
return "Connection was refused by the server. The server may not allow RDP connections or may be configured to use different settings."
|
|
elif "certificate" in stderr_lower:
|
|
return "Certificate verification failed. The connection is still secure, but the server's certificate couldn't be verified."
|
|
elif "timeout" in stderr_lower:
|
|
return "Connection timed out. The server may be offline or unreachable."
|
|
elif "resolution" in stderr_lower:
|
|
return "Display resolution error. Try using a different resolution setting."
|
|
elif returncode in error_messages:
|
|
return error_messages[returncode]
|
|
else:
|
|
return f"Connection failed (Error {returncode}). Please check your connection settings and try again.\n\nTechnical details: {stderr}"
|
|
|
|
def _import_connections(self):
|
|
"""Import connections from file"""
|
|
file_path = filedialog.askopenfilename(
|
|
title="Import Connections",
|
|
filetypes=[("JSON files", "*.json"), ("All files", "*.*")],
|
|
defaultextension=".json"
|
|
)
|
|
|
|
if not file_path:
|
|
return
|
|
|
|
try:
|
|
with open(file_path, 'r') as f:
|
|
import_data = json.load(f)
|
|
|
|
# Validate import data structure
|
|
if not isinstance(import_data, dict):
|
|
messagebox.showerror("Import Error", "Invalid file format. Expected JSON object.")
|
|
return
|
|
|
|
connections_data = import_data.get("connections", {})
|
|
credentials_data = import_data.get("credentials", {})
|
|
|
|
if not isinstance(connections_data, dict):
|
|
messagebox.showerror("Import Error", "Invalid connections format in file.")
|
|
return
|
|
|
|
# Ask user about merge strategy
|
|
if self.connections:
|
|
choice = messagebox.askyesnocancel(
|
|
"Import Strategy",
|
|
"You have existing connections. Choose import strategy:\n\n"
|
|
"Yes - Merge (keep existing, add new)\n"
|
|
"No - Replace (delete existing, import new)\n"
|
|
"Cancel - Cancel import"
|
|
)
|
|
|
|
if choice is None: # Cancel
|
|
return
|
|
elif choice is False: # Replace
|
|
self.connections.clear()
|
|
self.credentials.clear()
|
|
|
|
# Import connections
|
|
imported_count = 0
|
|
for name, conn_data in connections_data.items():
|
|
if name in self.connections:
|
|
if not messagebox.askyesno("Overwrite", f"Connection '{name}' exists. Overwrite?"):
|
|
continue
|
|
|
|
self.connections[name] = conn_data
|
|
imported_count += 1
|
|
|
|
# Import credentials if available and user agrees
|
|
if credentials_data and messagebox.askyesno(
|
|
"Import Credentials",
|
|
"Import saved credentials as well? (Passwords will be re-encrypted with your key)"
|
|
):
|
|
for name, cred_data in credentials_data.items():
|
|
if name in self.connections: # Only import credentials for existing connections
|
|
self.credentials[name] = cred_data
|
|
|
|
# Save the changes
|
|
self._save_connections()
|
|
self._save_credentials()
|
|
self._refresh_connections_list()
|
|
|
|
self.status_var.set(f"Successfully imported {imported_count} connections")
|
|
messagebox.showinfo("Import Complete", f"Successfully imported {imported_count} connections.")
|
|
|
|
except FileNotFoundError:
|
|
messagebox.showerror("Import Error", "File not found.")
|
|
except json.JSONDecodeError:
|
|
messagebox.showerror("Import Error", "Invalid JSON file format.")
|
|
except Exception as e:
|
|
messagebox.showerror("Import Error", f"Error importing connections: {str(e)}")
|
|
|
|
def _export_connections(self):
|
|
"""Export connections to file"""
|
|
if not self.connections:
|
|
messagebox.showwarning("Export Warning", "No connections to export.")
|
|
return
|
|
|
|
file_path = filedialog.asksaveasfilename(
|
|
title="Export Connections",
|
|
filetypes=[("JSON files", "*.json"), ("All files", "*.*")],
|
|
defaultextension=".json",
|
|
initialname=f"rdp_connections_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
|
|
)
|
|
|
|
if not file_path:
|
|
return
|
|
|
|
try:
|
|
# Ask if user wants to include credentials
|
|
include_credentials = messagebox.askyesno(
|
|
"Export Credentials",
|
|
"Include saved credentials in export?\n\n"
|
|
"Warning: Credentials will be encrypted but should be treated as sensitive data."
|
|
)
|
|
|
|
export_data = {
|
|
"connections": self.connections,
|
|
"export_date": datetime.now().isoformat(),
|
|
"exported_by": f"{os.getenv('USER')}@{os.uname().nodename}",
|
|
"version": "1.0"
|
|
}
|
|
|
|
if include_credentials:
|
|
export_data["credentials"] = self.credentials
|
|
|
|
with open(file_path, 'w') as f:
|
|
json.dump(export_data, f, indent=2)
|
|
|
|
self.status_var.set(f"Connections exported to {os.path.basename(file_path)}")
|
|
messagebox.showinfo("Export Complete", f"Successfully exported {len(self.connections)} connections to:\n{file_path}")
|
|
|
|
except Exception as e:
|
|
messagebox.showerror("Export Error", f"Error exporting connections: {str(e)}")
|
|
|
|
def _clear_credentials(self):
|
|
"""Clear all saved credentials"""
|
|
if messagebox.askyesno("Confirm", "Clear all saved passwords?"):
|
|
self.credentials.clear()
|
|
self._save_credentials()
|
|
self.status_var.set("All credentials cleared")
|
|
self.logger.info("All credentials cleared by user")
|
|
|
|
def _test_selected_connection(self):
|
|
"""Test the selected connection for reachability"""
|
|
# Check which listbox has a selection
|
|
saved_selection = self.connections_listbox.curselection()
|
|
recent_selection = self.recent_listbox.curselection()
|
|
|
|
name = None
|
|
if saved_selection:
|
|
name = self.connections_listbox.get(saved_selection[0])
|
|
elif recent_selection:
|
|
display_text = self.recent_listbox.get(recent_selection[0])
|
|
name = display_text.split(' (')[0]
|
|
|
|
if not name:
|
|
messagebox.showwarning("No Selection", "Please select a connection to test.")
|
|
return
|
|
|
|
if name not in self.connections:
|
|
messagebox.showerror("Error", f"Connection '{name}' not found.")
|
|
return
|
|
|
|
conn = self.connections[name]
|
|
self._test_connection_async(name, conn)
|
|
|
|
def _test_connection_async(self, name, conn):
|
|
"""Test connection in a separate thread"""
|
|
def test():
|
|
try:
|
|
server = conn['server']
|
|
self.logger.info(f"Testing connection to {server}")
|
|
|
|
# Update status
|
|
self.root.after(0, lambda: self.status_var.set(f"Testing connection to {server}..."))
|
|
|
|
# Test basic connectivity
|
|
is_reachable = self._test_server_connectivity(server)
|
|
|
|
if is_reachable:
|
|
# Test RDP service specifically
|
|
rdp_available = self._test_rdp_service(server)
|
|
|
|
if rdp_available:
|
|
message = f"✓ Connection test successful!\n\nServer: {server}\nRDP Port: Accessible\nStatus: Ready for connection"
|
|
self.logger.info(f"Connection test successful for {server}")
|
|
self.root.after(0, lambda: self.status_var.set("Connection test passed"))
|
|
self.root.after(0, lambda: messagebox.showinfo("Connection Test", message))
|
|
else:
|
|
message = f"⚠ Server is reachable but RDP service may not be available.\n\nServer: {server}\nStatus: Network accessible but RDP port (3389) not responding\n\nThis could mean:\n• RDP is disabled on the server\n• A firewall is blocking RDP\n• RDP is running on a different port"
|
|
self.logger.warning(f"Server {server} reachable but RDP service not available")
|
|
self.root.after(0, lambda: self.status_var.set("RDP service not available"))
|
|
self.root.after(0, lambda: messagebox.showwarning("Connection Test", message))
|
|
else:
|
|
message = f"✗ Connection test failed!\n\nServer: {server}\nStatus: Not reachable\n\nPossible causes:\n• Server is offline\n• Incorrect server address\n• Network connectivity issues\n• Firewall blocking access"
|
|
self.logger.error(f"Connection test failed for {server} - server not reachable")
|
|
self.root.after(0, lambda: self.status_var.set("Connection test failed"))
|
|
self.root.after(0, lambda: messagebox.showerror("Connection Test", message))
|
|
|
|
except Exception as e:
|
|
error_msg = f"Error during connection test: {str(e)}"
|
|
self.logger.error(f"Connection test error for {name}: {str(e)}", exc_info=True)
|
|
self.root.after(0, lambda: self.status_var.set("Connection test error"))
|
|
self.root.after(0, lambda: messagebox.showerror("Test Error", error_msg))
|
|
|
|
# Start test in separate thread
|
|
thread = threading.Thread(target=test, daemon=True)
|
|
thread.start()
|
|
|
|
def _test_rdp_service(self, server):
|
|
"""Test if RDP service is specifically available"""
|
|
try:
|
|
# Extract hostname/IP and port
|
|
host = server.split(':')[0]
|
|
port = 3389
|
|
if ':' in server:
|
|
try:
|
|
port = int(server.split(':')[1])
|
|
except ValueError:
|
|
port = 3389
|
|
|
|
# Try to connect to RDP port
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
sock.settimeout(10) # Longer timeout for RDP test
|
|
result = sock.connect_ex((host, port))
|
|
sock.close()
|
|
|
|
return result == 0
|
|
|
|
except Exception as e:
|
|
self.logger.warning(f"RDP service test failed for {server}: {str(e)}")
|
|
return False
|
|
|
|
def run(self):
|
|
"""Start the application"""
|
|
self.root.mainloop()
|
|
|
|
|
|
class ConnectionDialog:
|
|
def __init__(self, parent, title, initial_data=None, rdp_client=None):
|
|
self.result = None
|
|
self.rdp_client = rdp_client
|
|
|
|
# Create dialog
|
|
self.dialog = tk.Toplevel(parent)
|
|
self.dialog.title(title)
|
|
self.dialog.geometry("600x700")
|
|
self.dialog.resizable(False, False)
|
|
self.dialog.transient(parent)
|
|
self.dialog.grab_set()
|
|
|
|
# Center dialog
|
|
self.dialog.update_idletasks()
|
|
x = (self.dialog.winfo_screenwidth() // 2) - (600 // 2)
|
|
y = (self.dialog.winfo_screenheight() // 2) - (700 // 2)
|
|
self.dialog.geometry(f"600x700+{x}+{y}")
|
|
|
|
self.initial_data = initial_data or {}
|
|
self._setup_dialog()
|
|
|
|
# Wait for dialog to close
|
|
self.dialog.wait_window()
|
|
|
|
def _setup_dialog(self):
|
|
"""Setup the connection dialog"""
|
|
main_frame = ttk.Frame(self.dialog, padding="20")
|
|
main_frame.pack(fill=tk.BOTH, expand=True)
|
|
|
|
# Create notebook for tabs
|
|
notebook = ttk.Notebook(main_frame)
|
|
notebook.pack(fill=tk.BOTH, expand=True, pady=(0, 20))
|
|
|
|
# Basic tab
|
|
basic_frame = ttk.Frame(notebook)
|
|
notebook.add(basic_frame, text="Basic")
|
|
|
|
# Advanced tab
|
|
advanced_frame = ttk.Frame(notebook)
|
|
notebook.add(advanced_frame, text="Advanced")
|
|
|
|
# Advanced tab description
|
|
adv_desc = ttk.Label(advanced_frame, text="Advanced connection and device sharing options",
|
|
font=("TkDefaultFont", 9, "italic"))
|
|
adv_desc.grid(row=0, column=0, columnspan=2, pady=(10, 15), padx=10, sticky=tk.W)
|
|
|
|
# Performance tab
|
|
performance_frame = ttk.Frame(notebook)
|
|
notebook.add(performance_frame, text="Performance")
|
|
|
|
# Performance tab description
|
|
perf_desc = ttk.Label(performance_frame, text="Performance optimization and visual quality settings",
|
|
font=("TkDefaultFont", 9, "italic"))
|
|
perf_desc.grid(row=0, column=0, columnspan=2, pady=(10, 15), padx=10, sticky=tk.W) # Store field variables
|
|
self.fields = {}
|
|
|
|
# Basic fields
|
|
basic_fields = [
|
|
("Connection Name:", "name", "entry"),
|
|
("Server/IP:", "server", "entry"),
|
|
("Username:", "username", "entry"),
|
|
("Password:", "password", "password"),
|
|
("Domain:", "domain", "entry"),
|
|
]
|
|
|
|
for i, (label, field, widget_type) in enumerate(basic_fields):
|
|
ttk.Label(basic_frame, text=label).grid(row=i, column=0, sticky=tk.W, pady=5, padx=(10, 5))
|
|
|
|
if widget_type == "entry":
|
|
var = tk.StringVar(value=self.initial_data.get(field, ""))
|
|
widget = ttk.Entry(basic_frame, textvariable=var, width=40)
|
|
elif widget_type == "password":
|
|
var = tk.StringVar(value=self.initial_data.get(field, ""))
|
|
widget = ttk.Entry(basic_frame, textvariable=var, show="*", width=40)
|
|
|
|
widget.grid(row=i, column=1, sticky=tk.W, pady=5, padx=(5, 10))
|
|
self.fields[field] = var
|
|
|
|
# Advanced fields
|
|
multimon_options = self.rdp_client._get_multimon_options() if self.rdp_client else ["No", "2 Monitors", "3 Monitors", "All Monitors", "Span"]
|
|
advanced_fields = [
|
|
("Resolution:", "resolution", "combo", ["1920x1080", "2560x1440", "1366x768", "1280x1024", "1024x768", "Full Screen"]),
|
|
("Color Depth:", "color_depth", "combo", ["32", "24", "16", "15"]),
|
|
("Multiple Monitors:", "multimon", "combo", multimon_options),
|
|
("Sound:", "sound", "combo", ["Yes", "No", "Remote"]),
|
|
("Microphone:", "microphone", "combo", ["No", "Yes"]),
|
|
("Clipboard:", "clipboard", "combo", ["Yes", "No"]),
|
|
("Share Home Drive:", "drives", "combo", ["No", "Yes"]),
|
|
("Printer Sharing:", "printer", "combo", ["No", "Yes"]),
|
|
("COM Ports:", "com_ports", "combo", ["No", "Yes"]),
|
|
("USB Devices:", "usb", "combo", ["No", "Yes"]),
|
|
("Gateway Mode:", "gateway", "combo", ["Auto", "RPC", "HTTP"]),
|
|
("Network Detection:", "network_auto_detect", "combo", ["Yes", "No"]),
|
|
]
|
|
|
|
for i, (label, field, widget_type, values) in enumerate(advanced_fields):
|
|
row = i + 1 # Offset by 1 for description label
|
|
ttk.Label(advanced_frame, text=label).grid(row=row, column=0, sticky=tk.W, pady=5, padx=(10, 5))
|
|
|
|
var = tk.StringVar(value=self.initial_data.get(field, values[0]))
|
|
widget = ttk.Combobox(advanced_frame, textvariable=var, values=values, width=37, state="readonly")
|
|
widget.grid(row=row, column=1, sticky=tk.W, pady=5, padx=(5, 10))
|
|
self.fields[field] = var
|
|
|
|
# Performance fields
|
|
performance_fields = [
|
|
("Compression:", "compression", "combo", ["Yes", "No"]),
|
|
("Compression Level:", "compression_level", "combo", ["0 (None)", "1 (Medium)", "2 (High)"]),
|
|
("Bitmap Cache:", "bitmap_cache", "combo", ["Yes", "No"]),
|
|
("Offscreen Cache:", "offscreen_cache", "combo", ["Yes", "No"]),
|
|
("Font Smoothing:", "fonts", "combo", ["Yes", "No"]),
|
|
("Desktop Composition:", "aero", "combo", ["No", "Yes"]),
|
|
("Wallpaper:", "wallpaper", "combo", ["No", "Yes"]),
|
|
("Themes:", "themes", "combo", ["Yes", "No"]),
|
|
("Menu Animations:", "menu_anims", "combo", ["No", "Yes"]),
|
|
]
|
|
|
|
for i, (label, field, widget_type, values) in enumerate(performance_fields):
|
|
row = i + 1 # Offset by 1 for description label
|
|
ttk.Label(performance_frame, text=label).grid(row=row, column=0, sticky=tk.W, pady=5, padx=(10, 5))
|
|
|
|
var = tk.StringVar(value=self.initial_data.get(field, values[0]))
|
|
widget = ttk.Combobox(performance_frame, textvariable=var, values=values, width=37, state="readonly")
|
|
widget.grid(row=row, column=1, sticky=tk.W, pady=5, padx=(5, 10))
|
|
self.fields[field] = var
|
|
|
|
# Buttons
|
|
button_frame = ttk.Frame(main_frame)
|
|
button_frame.pack(fill=tk.X)
|
|
|
|
ttk.Button(button_frame, text="Cancel", command=self._cancel).pack(side=tk.RIGHT, padx=(10, 0))
|
|
ttk.Button(button_frame, text="Save", command=self._save).pack(side=tk.RIGHT)
|
|
|
|
def _save(self):
|
|
"""Save the connection"""
|
|
# Validate required fields
|
|
if not self.fields["name"].get().strip():
|
|
messagebox.showerror("Error", "Connection name is required.")
|
|
return
|
|
|
|
if not self.fields["server"].get().strip():
|
|
messagebox.showerror("Error", "Server/IP is required.")
|
|
return
|
|
|
|
if not self.fields["username"].get().strip():
|
|
messagebox.showerror("Error", "Username is required.")
|
|
return
|
|
|
|
# Collect data
|
|
self.result = {}
|
|
for field, var in self.fields.items():
|
|
self.result[field] = var.get().strip()
|
|
|
|
self.dialog.destroy()
|
|
|
|
def _cancel(self):
|
|
"""Cancel the dialog"""
|
|
self.dialog.destroy()
|
|
|
|
|
|
def main():
|
|
# Check dependencies
|
|
dependencies = ["/usr/bin/xfreerdp"]
|
|
missing = []
|
|
|
|
for dep in dependencies:
|
|
if not os.path.exists(dep):
|
|
missing.append(dep)
|
|
|
|
if missing:
|
|
print("Missing dependencies:")
|
|
for dep in missing:
|
|
print(f" - {dep}")
|
|
print("\nPlease install freerdp package:")
|
|
print(" sudo apt install freerdp2-x11 # Ubuntu/Debian")
|
|
print(" sudo dnf install freerdp # Fedora")
|
|
return 1
|
|
|
|
try:
|
|
app = RDPClient()
|
|
app.run()
|
|
return 0
|
|
except KeyboardInterrupt:
|
|
print("\nExiting...")
|
|
return 0
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
return 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
exit(main()) |