1546 lines
67 KiB
Python
Executable File
1546 lines
67 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
"""
|
||
Enhanced RDP Client with Professional GUI
|
||
A modern replacement for the zenity-based RDP script with better UX
|
||
"""
|
||
|
||
# Auto-detect and use virtual environment if available
|
||
import sys
|
||
import os
|
||
|
||
def setup_virtual_env():
|
||
"""Setup virtual environment if needed"""
|
||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||
venv_python = os.path.join(script_dir, '.venv', 'bin', 'python')
|
||
|
||
# If we're not already in the virtual environment and it exists, restart with venv python
|
||
if os.path.exists(venv_python) and sys.executable != venv_python:
|
||
import subprocess
|
||
# Re-execute this script with the virtual environment Python
|
||
os.execv(venv_python, [venv_python] + sys.argv)
|
||
|
||
# Try to setup virtual environment first
|
||
setup_virtual_env()
|
||
|
||
import tkinter as tk
|
||
from tkinter import ttk, messagebox, simpledialog, filedialog
|
||
import json
|
||
import os
|
||
import subprocess
|
||
import base64
|
||
from cryptography.fernet import Fernet
|
||
from cryptography.hazmat.primitives import hashes
|
||
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
|
||
import threading
|
||
from datetime import datetime
|
||
import shutil
|
||
import logging
|
||
import socket
|
||
import sys
|
||
|
||
class RDPClient:
|
||
def __init__(self):
|
||
self.root = tk.Tk()
|
||
self.root.title("RDP Client - Professional")
|
||
self.root.geometry("1200x800")
|
||
self.root.minsize(900, 650)
|
||
|
||
# Modern styling
|
||
self._setup_modern_theme()
|
||
|
||
# Configuration
|
||
self.config_dir = os.path.expanduser("~/.config/rdp-client")
|
||
self.connections_file = os.path.join(self.config_dir, "connections.json")
|
||
self.credentials_file = os.path.join(self.config_dir, "credentials.json")
|
||
self.history_file = os.path.join(self.config_dir, "history.json")
|
||
self.log_file = os.path.join(self.config_dir, "rdp_client.log")
|
||
|
||
# Create config directory
|
||
os.makedirs(self.config_dir, exist_ok=True)
|
||
|
||
# Setup logging
|
||
self._setup_logging()
|
||
|
||
# Initialize encryption
|
||
self._init_encryption()
|
||
|
||
# Load data
|
||
self.connections = self._load_connections()
|
||
self.credentials = self._load_credentials()
|
||
self.history = self._load_history()
|
||
|
||
# Migrate legacy multimon settings
|
||
self._migrate_multimon_settings()
|
||
|
||
# Add existing connections to history if history is empty (first run or migration)
|
||
if not self.history and self.connections:
|
||
for conn_name in self.connections.keys():
|
||
self._add_to_history(conn_name)
|
||
|
||
# Setup GUI
|
||
self._setup_gui()
|
||
self._refresh_connections_list()
|
||
|
||
# Force refresh after a short delay to ensure GUI is ready
|
||
self.root.after(100, self._refresh_connections_list)
|
||
|
||
self.logger.info("RDP Client initialized successfully")
|
||
|
||
def _setup_modern_theme(self):
|
||
"""Setup modern visual theme and styling"""
|
||
# Configure the main window
|
||
self.root.configure(bg='#f8f9fa')
|
||
|
||
# Create and configure a modern style
|
||
style = ttk.Style()
|
||
|
||
# Try to use a modern theme if available
|
||
try:
|
||
style.theme_use('clam') # More modern than default
|
||
except:
|
||
pass
|
||
|
||
# Define modern color palette
|
||
self.colors = {
|
||
'primary': '#0066cc', # Modern blue
|
||
'primary_dark': '#004499', # Darker blue for hover
|
||
'secondary': '#6c757d', # Muted gray
|
||
'success': '#28a745', # Green
|
||
'danger': '#dc3545', # Red
|
||
'warning': '#ffc107', # Yellow
|
||
'info': '#17a2b8', # Cyan
|
||
'light': '#f8f9fa', # Light gray
|
||
'dark': '#343a40', # Dark gray
|
||
'white': '#ffffff',
|
||
'border': '#dee2e6', # Light border
|
||
'hover': '#e9ecef' # Hover gray
|
||
}
|
||
|
||
# Configure ttk styles with modern colors
|
||
style.configure('Modern.TFrame',
|
||
background=self.colors['white'],
|
||
relief='flat',
|
||
borderwidth=1)
|
||
|
||
style.configure('Card.TFrame',
|
||
background=self.colors['white'],
|
||
relief='solid',
|
||
borderwidth=1,
|
||
bordercolor=self.colors['border'])
|
||
|
||
style.configure('Modern.TLabel',
|
||
background=self.colors['white'],
|
||
foreground=self.colors['dark'],
|
||
font=('Segoe UI', 10))
|
||
|
||
style.configure('Title.TLabel',
|
||
background=self.colors['white'],
|
||
foreground=self.colors['dark'],
|
||
font=('Segoe UI', 12, 'bold'))
|
||
|
||
style.configure('Modern.TButton',
|
||
background=self.colors['primary'],
|
||
foreground=self.colors['white'],
|
||
borderwidth=0,
|
||
focuscolor='none',
|
||
font=('Segoe UI', 9))
|
||
|
||
style.map('Modern.TButton',
|
||
background=[('active', self.colors['primary_dark']),
|
||
('pressed', self.colors['primary_dark'])])
|
||
|
||
style.configure('Success.TButton',
|
||
background=self.colors['success'],
|
||
foreground=self.colors['white'],
|
||
borderwidth=0,
|
||
focuscolor='none',
|
||
font=('Segoe UI', 9))
|
||
|
||
style.configure('Danger.TButton',
|
||
background=self.colors['danger'],
|
||
foreground=self.colors['white'],
|
||
borderwidth=0,
|
||
focuscolor='none',
|
||
font=('Segoe UI', 9))
|
||
|
||
style.configure('Modern.Treeview',
|
||
background=self.colors['white'],
|
||
foreground=self.colors['dark'],
|
||
fieldbackground=self.colors['white'],
|
||
borderwidth=1,
|
||
relief='solid')
|
||
|
||
style.configure('Modern.Treeview.Heading',
|
||
background=self.colors['light'],
|
||
foreground=self.colors['dark'],
|
||
relief='flat',
|
||
font=('Segoe UI', 9, 'bold'))
|
||
|
||
def _setup_logging(self):
|
||
"""Setup logging configuration"""
|
||
self.logger = logging.getLogger('rdp_client')
|
||
self.logger.setLevel(logging.INFO)
|
||
|
||
# Create file handler
|
||
file_handler = logging.FileHandler(self.log_file)
|
||
file_handler.setLevel(logging.INFO)
|
||
|
||
# Create console handler for errors
|
||
console_handler = logging.StreamHandler(sys.stdout)
|
||
console_handler.setLevel(logging.ERROR)
|
||
|
||
# Create formatter
|
||
formatter = logging.Formatter(
|
||
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||
)
|
||
file_handler.setFormatter(formatter)
|
||
console_handler.setFormatter(formatter)
|
||
|
||
# Add handlers
|
||
self.logger.addHandler(file_handler)
|
||
self.logger.addHandler(console_handler)
|
||
|
||
# Prevent duplicate logs
|
||
self.logger.propagate = False
|
||
|
||
def _init_encryption(self):
|
||
"""Initialize encryption for password storage"""
|
||
password = f"{os.getenv('USER')}@{os.uname().nodename}".encode()
|
||
salt = b'salt_1234567890' # In production, use random salt
|
||
kdf = PBKDF2HMAC(
|
||
algorithm=hashes.SHA256(),
|
||
length=32,
|
||
salt=salt,
|
||
iterations=100000,
|
||
)
|
||
key = base64.urlsafe_b64encode(kdf.derive(password))
|
||
self.cipher = Fernet(key)
|
||
|
||
def _encrypt_password(self, password):
|
||
"""Encrypt password for secure storage"""
|
||
return self.cipher.encrypt(password.encode()).decode()
|
||
|
||
def _decrypt_password(self, encrypted_password):
|
||
"""Decrypt password from storage"""
|
||
try:
|
||
return self.cipher.decrypt(encrypted_password.encode()).decode()
|
||
except:
|
||
return ""
|
||
|
||
def _load_connections(self):
|
||
"""Load saved connections from file"""
|
||
if os.path.exists(self.connections_file):
|
||
try:
|
||
with open(self.connections_file, 'r') as f:
|
||
return json.load(f)
|
||
except:
|
||
return {}
|
||
return {}
|
||
|
||
def _save_connections(self):
|
||
"""Save connections to file"""
|
||
with open(self.connections_file, 'w') as f:
|
||
json.dump(self.connections, f, indent=2)
|
||
|
||
def _load_credentials(self):
|
||
"""Load saved credentials from file"""
|
||
if os.path.exists(self.credentials_file):
|
||
try:
|
||
with open(self.credentials_file, 'r') as f:
|
||
return json.load(f)
|
||
except:
|
||
return {}
|
||
return {}
|
||
|
||
def _save_credentials(self):
|
||
"""Save credentials to file"""
|
||
with open(self.credentials_file, 'w') as f:
|
||
json.dump(self.credentials, f, indent=2)
|
||
|
||
def _load_history(self):
|
||
"""Load connection history from file"""
|
||
if os.path.exists(self.history_file):
|
||
try:
|
||
with open(self.history_file, 'r') as f:
|
||
return json.load(f)
|
||
except:
|
||
return []
|
||
return []
|
||
|
||
def _save_history(self):
|
||
"""Save connection history to file"""
|
||
with open(self.history_file, 'w') as f:
|
||
json.dump(self.history, f, indent=2)
|
||
|
||
def _add_to_history(self, connection_name):
|
||
"""Add a connection to history"""
|
||
# Remove existing entry if present
|
||
self.history = [h for h in self.history if h.get('name') != connection_name]
|
||
|
||
# Add new entry at the beginning
|
||
history_entry = {
|
||
'name': connection_name,
|
||
'timestamp': datetime.now().isoformat(),
|
||
'count': self._get_connection_count(connection_name) + 1
|
||
}
|
||
self.history.insert(0, history_entry)
|
||
|
||
# Keep only last 20 entries
|
||
self.history = self.history[:20]
|
||
|
||
self._save_history()
|
||
|
||
def _get_connection_count(self, connection_name):
|
||
"""Get the connection count for a specific connection"""
|
||
for entry in self.history:
|
||
if entry.get('name') == connection_name:
|
||
return entry.get('count', 0)
|
||
return 0
|
||
|
||
def _migrate_multimon_settings(self):
|
||
"""Migrate legacy 'Yes' multimon settings to new specific monitor options"""
|
||
changed = False
|
||
for conn_name, conn_data in self.connections.items():
|
||
if conn_data.get("multimon") == "Yes":
|
||
# Default to 2 monitors for legacy "Yes" settings
|
||
conn_data["multimon"] = "2 Monitors"
|
||
changed = True
|
||
self.logger.info(f"Migrated multimon setting for {conn_name} from 'Yes' to '2 Monitors'")
|
||
|
||
if changed:
|
||
self._save_connections()
|
||
self.logger.info("Completed multimon settings migration")
|
||
|
||
def _get_available_monitors_count(self):
|
||
"""Get the number of available monitors"""
|
||
try:
|
||
result = subprocess.run(['xrandr', '--listmonitors'],
|
||
capture_output=True, text=True, timeout=5)
|
||
if result.returncode == 0:
|
||
lines = result.stdout.strip().split('\n')
|
||
if lines and lines[0].startswith('Monitors:'):
|
||
return int(lines[0].split(':')[1].strip())
|
||
except:
|
||
pass
|
||
return 1 # Default to 1 monitor if detection fails
|
||
|
||
def _get_multimon_options(self):
|
||
"""Get available multi-monitor options based on system"""
|
||
monitor_count = self._get_available_monitors_count()
|
||
options = ["No"]
|
||
|
||
# Add options for available monitors
|
||
for i in range(2, min(monitor_count + 1, 5)): # Up to 4 monitors
|
||
options.append(f"{i} Monitors")
|
||
|
||
if monitor_count > 1:
|
||
options.extend(["All Monitors", "Span"])
|
||
|
||
return options
|
||
|
||
def _get_monitors_combined_resolution(self, monitor_list):
|
||
"""Get the combined resolution for selected monitors"""
|
||
try:
|
||
result = subprocess.run(['xfreerdp', '/monitor-list'],
|
||
capture_output=True, text=True, timeout=5)
|
||
if result.returncode == 0:
|
||
lines = result.stdout.strip().split('\n')
|
||
selected_monitors = [int(x.strip()) for x in monitor_list.split(',')]
|
||
|
||
min_x = float('inf')
|
||
max_x = 0
|
||
max_y = 0
|
||
|
||
for line in lines:
|
||
cleaned_line = line.strip().replace('*', '').strip()
|
||
if '[' in cleaned_line and ']' in cleaned_line:
|
||
parts = cleaned_line.split()
|
||
if len(parts) >= 3:
|
||
id_part = parts[0]
|
||
res_part = parts[1] # 1920x1080
|
||
pos_part = parts[2] # +3840+0
|
||
|
||
if '[' in id_part and ']' in id_part:
|
||
monitor_id = int(id_part.strip('[]'))
|
||
if monitor_id in selected_monitors:
|
||
# Parse resolution
|
||
width, height = map(int, res_part.split('x'))
|
||
# Parse position
|
||
pos_coords = pos_part.split('+')[1:] # ['3840', '0']
|
||
x_pos = int(pos_coords[0])
|
||
|
||
min_x = min(min_x, x_pos)
|
||
max_x = max(max_x, x_pos + width)
|
||
max_y = max(max_y, height)
|
||
|
||
if min_x != float('inf'):
|
||
total_width = max_x - min_x
|
||
total_height = max_y
|
||
return f"{total_width}x{total_height}"
|
||
except Exception as e:
|
||
self.logger.error(f"Error calculating combined resolution: {e}")
|
||
|
||
# Fallback to a reasonable default
|
||
return "3840x1080" # Assume dual 1920x1080 monitors
|
||
|
||
def _get_best_monitor_selection(self, count):
|
||
"""Get the best monitor selection based on layout"""
|
||
try:
|
||
result = subprocess.run(['xfreerdp', '/monitor-list'],
|
||
capture_output=True, text=True, timeout=5)
|
||
if result.returncode == 0:
|
||
lines = result.stdout.strip().split('\n')
|
||
monitors = []
|
||
primary_monitor = None
|
||
|
||
for line in lines:
|
||
# Clean up the line and split by whitespace/tabs
|
||
cleaned_line = line.strip()
|
||
is_primary = cleaned_line.startswith('*')
|
||
cleaned_line = cleaned_line.replace('*', '').strip()
|
||
|
||
if '[' in cleaned_line and ']' in cleaned_line and 'x' in cleaned_line and '+' in cleaned_line:
|
||
# Split by whitespace/tabs
|
||
parts = cleaned_line.split()
|
||
if len(parts) >= 3:
|
||
id_part = parts[0] # [0]
|
||
pos_part = parts[2] # +3840+0
|
||
if '[' in id_part and ']' in id_part:
|
||
monitor_id = int(id_part.strip('[]'))
|
||
x_pos = int(pos_part.split('+')[1])
|
||
monitor_info = (monitor_id, x_pos, is_primary)
|
||
monitors.append(monitor_info)
|
||
|
||
if is_primary:
|
||
primary_monitor = monitor_id
|
||
|
||
# Sort monitors by X position (left to right)
|
||
monitors.sort(key=lambda x: x[1])
|
||
|
||
# For debugging, log the monitor layout
|
||
self.logger.info(f"Monitor layout detected: {[(m[0], m[1], 'primary' if m[2] else 'secondary') for m in monitors]}")
|
||
|
||
# Return the leftmost monitors (excluding position and primary flag)
|
||
selected = [str(m[0]) for m in monitors[:count]]
|
||
selected_str = ','.join(selected)
|
||
|
||
self.logger.info(f"Selected {count} monitors: {selected_str}")
|
||
return selected_str
|
||
except Exception as e:
|
||
self.logger.error(f"Error detecting monitors: {e}")
|
||
|
||
# Fallback: simple sequential selection
|
||
fallback = ','.join([str(i) for i in range(count)])
|
||
self.logger.warning(f"Using fallback monitor selection: {fallback}")
|
||
return fallback
|
||
|
||
def _setup_gui(self):
|
||
"""Setup the main GUI with modern styling"""
|
||
# Main container with modern styling
|
||
main_frame = ttk.Frame(self.root, style='Modern.TFrame', padding="20")
|
||
main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
|
||
|
||
# Configure grid weights
|
||
self.root.columnconfigure(0, weight=1)
|
||
self.root.rowconfigure(0, weight=1)
|
||
main_frame.columnconfigure(1, weight=1)
|
||
main_frame.rowconfigure(1, weight=1)
|
||
|
||
# Modern header with icon and title
|
||
header_frame = ttk.Frame(main_frame, style='Modern.TFrame')
|
||
header_frame.grid(row=0, column=0, columnspan=3, sticky=(tk.W, tk.E), pady=(0, 20))
|
||
header_frame.columnconfigure(1, weight=1)
|
||
|
||
# Title with modern styling
|
||
title_label = ttk.Label(header_frame, text="🖥️ RDP Client Professional",
|
||
style='Title.TLabel', font=('Segoe UI', 18, 'bold'))
|
||
title_label.grid(row=0, column=0, sticky=tk.W)
|
||
|
||
# Help button in header
|
||
help_btn = ttk.Button(header_frame, text="❓ Help (F1)",
|
||
command=self._show_help, style='Modern.TButton')
|
||
help_btn.grid(row=0, column=2, sticky=tk.E)
|
||
|
||
# Left panel - Connections (modern card style)
|
||
left_card = ttk.Frame(main_frame, style='Card.TFrame', padding="15")
|
||
left_card.grid(row=1, column=0, sticky=(tk.W, tk.E, tk.N, tk.S), padx=(0, 15))
|
||
left_card.rowconfigure(1, weight=1)
|
||
left_card.columnconfigure(0, weight=1)
|
||
|
||
# Connections header with count
|
||
connections_header = ttk.Frame(left_card, style='Modern.TFrame')
|
||
connections_header.grid(row=0, column=0, sticky=(tk.W, tk.E), pady=(0, 15))
|
||
connections_header.columnconfigure(0, weight=1)
|
||
|
||
self.connections_title = ttk.Label(connections_header, text="📁 Saved Connections",
|
||
style='Title.TLabel')
|
||
self.connections_title.grid(row=0, column=0, sticky=tk.W)
|
||
|
||
# Modern connections listbox with better styling
|
||
listbox_frame = ttk.Frame(left_card, style='Modern.TFrame')
|
||
listbox_frame.grid(row=1, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
|
||
listbox_frame.rowconfigure(0, weight=1)
|
||
listbox_frame.columnconfigure(0, weight=1)
|
||
|
||
# Replace old listbox with modern styling
|
||
self.connections_listbox = tk.Listbox(
|
||
listbox_frame,
|
||
font=('Segoe UI', 10),
|
||
bg=self.colors['white'],
|
||
fg=self.colors['dark'],
|
||
selectbackground=self.colors['primary'],
|
||
selectforeground=self.colors['white'],
|
||
borderwidth=0,
|
||
highlightthickness=1,
|
||
highlightcolor=self.colors['primary'],
|
||
relief='flat',
|
||
activestyle='none'
|
||
)
|
||
|
||
scrollbar = ttk.Scrollbar(listbox_frame, orient="vertical", command=self.connections_listbox.yview)
|
||
self.connections_listbox.configure(yscrollcommand=scrollbar.set)
|
||
|
||
self.connections_listbox.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S), padx=(0, 2))
|
||
scrollbar.grid(row=0, column=1, sticky=(tk.N, tk.S))
|
||
|
||
# Modern action buttons with icons
|
||
buttons_frame = ttk.Frame(left_card, style='Modern.TFrame')
|
||
buttons_frame.grid(row=2, column=0, pady=(15, 0), sticky=(tk.W, tk.E))
|
||
buttons_frame.columnconfigure(0, weight=1)
|
||
buttons_frame.columnconfigure(1, weight=1)
|
||
buttons_frame.columnconfigure(2, weight=1)
|
||
|
||
# Primary action buttons
|
||
ttk.Button(buttons_frame, text="🔗 Connect",
|
||
command=self._connect_selected, style='Success.TButton').grid(
|
||
row=0, column=0, sticky=(tk.W, tk.E), padx=(0, 5))
|
||
|
||
ttk.Button(buttons_frame, text="✏️ Edit",
|
||
command=self._edit_selected, style='Modern.TButton').grid(
|
||
row=0, column=1, sticky=(tk.W, tk.E), padx=2.5)
|
||
|
||
ttk.Button(buttons_frame, text="🗑️ Delete",
|
||
command=self._delete_selected, style='Danger.TButton').grid(
|
||
row=0, column=2, sticky=(tk.W, tk.E), padx=(5, 0))
|
||
|
||
# Right panel - Actions and Details (modern card style)
|
||
right_card = ttk.Frame(main_frame, style='Card.TFrame', padding="15")
|
||
right_card.grid(row=1, column=1, sticky=(tk.W, tk.E, tk.N, tk.S))
|
||
right_card.rowconfigure(1, weight=1)
|
||
right_card.columnconfigure(0, weight=1)
|
||
|
||
# Actions section with modern styling
|
||
actions_header = ttk.Label(right_card, text="⚡ Quick Actions", style='Title.TLabel')
|
||
actions_header.grid(row=0, column=0, sticky=tk.W, pady=(0, 15))
|
||
|
||
actions_frame = ttk.Frame(right_card, style='Modern.TFrame')
|
||
actions_frame.grid(row=0, column=1, sticky=(tk.W, tk.E), pady=(0, 15))
|
||
actions_frame.columnconfigure(0, weight=1)
|
||
actions_frame.columnconfigure(1, weight=1)
|
||
|
||
ttk.Button(actions_frame, text="➕ New Connection",
|
||
command=self._new_connection, style='Modern.TButton').grid(
|
||
row=0, column=0, sticky=(tk.W, tk.E), padx=(0, 5), pady=3)
|
||
|
||
ttk.Button(actions_frame, text="🧪 Test Connection",
|
||
command=self._test_selected_connection, style='Modern.TButton').grid(
|
||
row=0, column=1, sticky=(tk.W, tk.E), padx=(5, 0), pady=3)
|
||
|
||
ttk.Button(actions_frame, text="📥 Import",
|
||
command=self._import_connections, style='Modern.TButton').grid(
|
||
row=1, column=0, sticky=(tk.W, tk.E), padx=(0, 5), pady=3)
|
||
|
||
ttk.Button(actions_frame, text="📤 Export",
|
||
command=self._export_connections, style='Modern.TButton').grid(
|
||
row=1, column=1, sticky=(tk.W, tk.E), padx=(5, 0), pady=3)
|
||
|
||
ttk.Button(actions_frame, text="🔐 Clear Credentials",
|
||
command=self._clear_credentials, style='Danger.TButton').grid(
|
||
row=2, column=0, columnspan=2, sticky=(tk.W, tk.E), pady=3)
|
||
|
||
# Details section with modern card styling
|
||
details_label = ttk.Label(right_card, text="📋 Connection Details", style='Title.TLabel')
|
||
details_label.grid(row=1, column=0, sticky=tk.W, pady=(20, 15))
|
||
|
||
details_card = ttk.Frame(right_card, style='Card.TFrame', padding="10")
|
||
details_card.grid(row=2, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
|
||
details_card.columnconfigure(1, weight=1)
|
||
|
||
# Modern details labels with better styling
|
||
self.details_labels = {}
|
||
details_fields = [
|
||
("🖥️ Server:", "server"),
|
||
("👤 Username:", "username"),
|
||
("🏢 Domain:", "domain"),
|
||
("📺 Resolution:", "resolution"),
|
||
("🎨 Color Depth:", "color_depth"),
|
||
("🖼️ Multi-Monitor:", "multimon"),
|
||
("🔊 Sound:", "sound"),
|
||
("📋 Clipboard:", "clipboard"),
|
||
("💾 Drive Sharing:", "drives"),
|
||
("📅 Created:", "created")
|
||
]
|
||
|
||
for i, (label, field) in enumerate(details_fields):
|
||
label_widget = ttk.Label(details_card, text=label, style='Modern.TLabel',
|
||
font=('Segoe UI', 9, 'bold'))
|
||
label_widget.grid(row=i, column=0, sticky=tk.W, pady=4, padx=(0, 10))
|
||
|
||
value_label = ttk.Label(details_card, text="—", style='Modern.TLabel',
|
||
font=('Segoe UI', 9))
|
||
value_label.grid(row=i, column=1, sticky=tk.W, pady=4)
|
||
self.details_labels[field] = value_label
|
||
|
||
# Bind listbox selection
|
||
self.connections_listbox.bind('<<ListboxSelect>>', self._on_connection_select)
|
||
self.connections_listbox.bind('<Double-Button-1>', self._connect_selected)
|
||
|
||
# Keyboard shortcuts
|
||
self._setup_keyboard_shortcuts()
|
||
|
||
# Modern status bar
|
||
status_frame = ttk.Frame(main_frame, style='Modern.TFrame')
|
||
status_frame.grid(row=2, column=0, columnspan=3, sticky=(tk.W, tk.E), pady=(20, 0))
|
||
status_frame.columnconfigure(0, weight=1)
|
||
|
||
self.status_var = tk.StringVar(value="✅ Ready")
|
||
status_label = ttk.Label(status_frame, textvariable=self.status_var,
|
||
style='Modern.TLabel', font=('Segoe UI', 9))
|
||
status_label.grid(row=0, column=0, sticky=tk.W)
|
||
|
||
# Modern exit button
|
||
ttk.Button(status_frame, text="❌ Exit",
|
||
command=self.root.quit, style='Danger.TButton').grid(row=0, column=1, sticky=tk.E)
|
||
|
||
def _setup_keyboard_shortcuts(self):
|
||
"""Setup keyboard shortcuts for the application"""
|
||
# Global shortcuts
|
||
self.root.bind('<Control-n>', lambda e: self._new_connection())
|
||
self.root.bind('<Control-o>', lambda e: self._import_connections())
|
||
self.root.bind('<Control-s>', lambda e: self._export_connections())
|
||
self.root.bind('<Control-t>', lambda e: self._test_selected_connection())
|
||
self.root.bind('<F5>', lambda e: self._refresh_connections_list())
|
||
self.root.bind('<Control-q>', lambda e: self.root.quit())
|
||
self.root.bind('<Escape>', lambda e: self.root.quit())
|
||
self.root.bind('<F1>', lambda e: self._show_help())
|
||
|
||
# Listbox specific shortcuts
|
||
self.connections_listbox.bind('<Return>', lambda e: self._connect_selected())
|
||
self.connections_listbox.bind('<Delete>', lambda e: self._delete_selected())
|
||
self.connections_listbox.bind('<F2>', lambda e: self._edit_selected())
|
||
self.connections_listbox.bind('<Control-t>', lambda e: self._test_selected_connection())
|
||
|
||
# Set initial focus
|
||
self.connections_listbox.focus_set()
|
||
if self.connections_listbox.size() > 0:
|
||
self.connections_listbox.selection_set(0)
|
||
|
||
# Add tooltip information for shortcuts
|
||
self._add_keyboard_shortcuts_info()
|
||
|
||
# Add tooltip information for shortcuts
|
||
self._add_keyboard_shortcuts_info()
|
||
|
||
def _add_keyboard_shortcuts_info(self):
|
||
"""Add keyboard shortcuts information to the status bar or help"""
|
||
def show_shortcuts_hint(event):
|
||
self.status_var.set("⌨️ Shortcuts: Ctrl+N=New, Enter=Connect, Del=Delete, F2=Edit, Ctrl+T=Test, F5=Refresh, Ctrl+Q=Quit")
|
||
|
||
def clear_shortcuts_hint(event):
|
||
self.status_var.set("✅ Ready")
|
||
|
||
# Show shortcuts hint when connections listbox gets focus
|
||
self.connections_listbox.bind('<FocusIn>', show_shortcuts_hint)
|
||
self.connections_listbox.bind('<FocusOut>', clear_shortcuts_hint)
|
||
|
||
def _show_help(self):
|
||
"""Show keyboard shortcuts help dialog"""
|
||
help_text = """RDP Client - Keyboard Shortcuts
|
||
|
||
Global Shortcuts:
|
||
• Ctrl+N - Create new connection
|
||
• Ctrl+O - Import connections from file
|
||
• Ctrl+S - Export connections to file
|
||
• Ctrl+T - Test selected connection
|
||
• F5 - Refresh connections list
|
||
• Ctrl+Q - Quit application
|
||
• Escape - Quit application
|
||
• F1 - Show this help
|
||
|
||
Connection List Shortcuts:
|
||
• Enter - Connect to selected connection
|
||
• Double-click - Connect to selected connection
|
||
• Delete - Delete selected connection
|
||
• F2 - Edit selected connection
|
||
|
||
Navigation:
|
||
• Arrow keys - Navigate within connections list
|
||
|
||
Tips:
|
||
• Connections are sorted alphabetically by name
|
||
• Double-click any connection to connect quickly
|
||
• Use Test Connection to verify server availability
|
||
• Import/Export to backup or share connection profiles
|
||
|
||
Multi-Monitor Support:
|
||
• "2 Monitors" - Use first 2 monitors (0,1)
|
||
• "3 Monitors" - Use first 3 monitors (0,1,2)
|
||
• "4 Monitors" - Use first 4 monitors (0,1,2,3)
|
||
• "All Monitors" - Use all available monitors
|
||
• "Span" - Span desktop across monitors
|
||
• "No" - Single monitor only"""
|
||
|
||
messagebox.showinfo("Keyboard Shortcuts", help_text)
|
||
|
||
def _refresh_connections_list(self):
|
||
"""Refresh the connections listbox with sorted saved connections"""
|
||
self.connections_listbox.delete(0, tk.END)
|
||
# Sort connections by name (case-insensitive)
|
||
for name in sorted(self.connections.keys(), key=str.lower):
|
||
self.connections_listbox.insert(tk.END, name)
|
||
|
||
# Update connections count in title
|
||
count = len(self.connections)
|
||
self.connections_title.config(text=f"📁 Saved Connections ({count})")
|
||
|
||
# Clear details if no connections
|
||
if not self.connections:
|
||
for label in self.details_labels.values():
|
||
label.config(text="—")
|
||
|
||
def _update_details(self, conn):
|
||
"""Update the details panel with connection info"""
|
||
self.details_labels["server"].config(text=conn.get("server", ""))
|
||
self.details_labels["username"].config(text=conn.get("username", ""))
|
||
self.details_labels["domain"].config(text=conn.get("domain", "N/A"))
|
||
self.details_labels["resolution"].config(text=conn.get("resolution", "1920x1080"))
|
||
self.details_labels["color_depth"].config(text=f"{conn.get('color_depth', 32)}-bit")
|
||
self.details_labels["multimon"].config(text=conn.get("multimon", "No"))
|
||
self.details_labels["sound"].config(text=conn.get("sound", "Yes"))
|
||
self.details_labels["clipboard"].config(text=conn.get("clipboard", "Yes"))
|
||
self.details_labels["drives"].config(text=conn.get("drives", "No"))
|
||
self.details_labels["created"].config(text=conn.get("created", "Unknown"))
|
||
|
||
def _on_connection_select(self, event=None):
|
||
"""Handle connection selection"""
|
||
selection = self.connections_listbox.curselection()
|
||
if selection:
|
||
name = self.connections_listbox.get(selection[0])
|
||
conn = self.connections[name]
|
||
|
||
# Update details
|
||
self._update_details(conn)
|
||
|
||
def _new_connection(self):
|
||
"""Create a new connection"""
|
||
dialog = ConnectionDialog(self.root, "New Connection", rdp_client=self)
|
||
if dialog.result:
|
||
name = dialog.result["name"]
|
||
if name in self.connections:
|
||
if not messagebox.askyesno("Overwrite", f"Connection '{name}' already exists. Overwrite?"):
|
||
return
|
||
|
||
# Save connection
|
||
conn_data = dialog.result.copy()
|
||
del conn_data["name"]
|
||
del conn_data["password"]
|
||
conn_data["created"] = datetime.now().strftime("%Y-%m-%d %H:%M")
|
||
|
||
self.connections[name] = conn_data
|
||
self._save_connections()
|
||
|
||
# Save credentials if provided
|
||
if dialog.result["password"]:
|
||
if name not in self.credentials:
|
||
self.credentials[name] = {}
|
||
self.credentials[name]["password"] = self._encrypt_password(dialog.result["password"])
|
||
self._save_credentials()
|
||
|
||
# Add new connection to history
|
||
self._add_to_history(name)
|
||
|
||
self._refresh_connections_list()
|
||
self.status_var.set(f"💾 Connection '{name}' saved successfully")
|
||
|
||
def _edit_selected(self):
|
||
"""Edit selected connection"""
|
||
# Check saved connections selection
|
||
saved_selection = self.connections_listbox.curselection()
|
||
|
||
name = None
|
||
if saved_selection:
|
||
name = self.connections_listbox.get(saved_selection[0])
|
||
|
||
if not name:
|
||
messagebox.showwarning("No Selection", "Please select a connection to edit.")
|
||
return
|
||
|
||
if name not in self.connections:
|
||
messagebox.showerror("Error", f"Connection '{name}' not found.")
|
||
return
|
||
conn = self.connections[name].copy()
|
||
|
||
# Get saved password if exists
|
||
password = ""
|
||
if name in self.credentials and "password" in self.credentials[name]:
|
||
password = self._decrypt_password(self.credentials[name]["password"])
|
||
|
||
conn["name"] = name
|
||
conn["password"] = password
|
||
|
||
dialog = ConnectionDialog(self.root, f"Edit Connection: {name}", conn, rdp_client=self)
|
||
if dialog.result:
|
||
new_name = dialog.result["name"]
|
||
|
||
# Handle name change
|
||
if new_name != name:
|
||
if new_name in self.connections:
|
||
if not messagebox.askyesno("Overwrite", f"Connection '{new_name}' already exists. Overwrite?"):
|
||
return
|
||
# Remove old connection
|
||
del self.connections[name]
|
||
if name in self.credentials:
|
||
del self.credentials[name]
|
||
|
||
# Save updated connection
|
||
conn_data = dialog.result.copy()
|
||
del conn_data["name"]
|
||
del conn_data["password"]
|
||
conn_data["created"] = self.connections.get(name, {}).get("created",
|
||
datetime.now().strftime("%Y-%m-%d %H:%M"))
|
||
conn_data["modified"] = datetime.now().strftime("%Y-%m-%d %H:%M")
|
||
|
||
self.connections[new_name] = conn_data
|
||
self._save_connections()
|
||
|
||
# Save credentials
|
||
if dialog.result["password"]:
|
||
if new_name not in self.credentials:
|
||
self.credentials[new_name] = {}
|
||
self.credentials[new_name]["password"] = self._encrypt_password(dialog.result["password"])
|
||
self._save_credentials()
|
||
|
||
# Update history for the new/updated connection
|
||
self._add_to_history(new_name)
|
||
|
||
self._refresh_connections_list()
|
||
self.status_var.set(f"✏️ Connection '{new_name}' updated successfully")
|
||
|
||
def _delete_selected(self):
|
||
"""Delete selected connection"""
|
||
# Check saved connections selection
|
||
saved_selection = self.connections_listbox.curselection()
|
||
|
||
name = None
|
||
if saved_selection:
|
||
name = self.connections_listbox.get(saved_selection[0])
|
||
|
||
if not name:
|
||
messagebox.showwarning("No Selection", "Please select a connection to delete.")
|
||
return
|
||
|
||
if name not in self.connections:
|
||
messagebox.showerror("Error", f"Connection '{name}' not found.")
|
||
return
|
||
if messagebox.askyesno("Confirm Delete", f"Delete connection '{name}'?"):
|
||
del self.connections[name]
|
||
if name in self.credentials:
|
||
del self.credentials[name]
|
||
|
||
self._save_connections()
|
||
self._save_credentials()
|
||
self._refresh_connections_list()
|
||
|
||
# Clear details
|
||
for label in self.details_labels.values():
|
||
label.config(text="")
|
||
|
||
self.status_var.set(f"🗑️ Connection '{name}' deleted")
|
||
|
||
def _connect_selected(self, event=None):
|
||
"""Connect to selected connection"""
|
||
# Check saved connections selection
|
||
saved_selection = self.connections_listbox.curselection()
|
||
|
||
if saved_selection:
|
||
name = self.connections_listbox.get(saved_selection[0])
|
||
self._connect_to(name)
|
||
else:
|
||
messagebox.showwarning("No Selection", "Please select a connection to connect.")
|
||
|
||
def _connect_to(self, name):
|
||
"""Connect to a specific connection"""
|
||
if name not in self.connections:
|
||
messagebox.showerror("Error", f"Connection '{name}' not found.")
|
||
return
|
||
|
||
conn = self.connections[name]
|
||
|
||
# Get password
|
||
password = ""
|
||
if name in self.credentials and "password" in self.credentials[name]:
|
||
password = self._decrypt_password(self.credentials[name]["password"])
|
||
else:
|
||
password = simpledialog.askstring("Password", f"Enter password for {conn['username']}@{conn['server']}:",
|
||
show='*')
|
||
if not password:
|
||
return
|
||
|
||
# Build and execute RDP command
|
||
self._add_to_history(name)
|
||
self._execute_rdp_connection(conn, password)
|
||
self._refresh_connections_list() # Refresh connections list
|
||
|
||
def _execute_rdp_connection(self, conn, password):
|
||
"""Execute the RDP connection in a separate thread"""
|
||
def connect():
|
||
try:
|
||
server = conn['server']
|
||
username = conn['username']
|
||
|
||
self.logger.info(f"Attempting RDP connection to {server} as {username}")
|
||
self.status_var.set(f"🔗 Connecting to {server}...")
|
||
|
||
# Test connectivity first
|
||
if not self._test_server_connectivity(server):
|
||
error_msg = f"Cannot reach server {server}. Please check the server address and your network connection."
|
||
self.logger.error(f"Connectivity test failed for {server}")
|
||
self.root.after(0, lambda: self.status_var.set("Connection failed - Server unreachable"))
|
||
self.root.after(0, lambda: messagebox.showerror("Connection Error", error_msg))
|
||
return
|
||
|
||
# Build xfreerdp command
|
||
cmd = ["/usr/bin/xfreerdp"]
|
||
|
||
# Basic options
|
||
cmd.extend(["+window-drag", "+smart-sizing", "/cert-ignore"])
|
||
|
||
# Server and authentication
|
||
cmd.append(f"/v:{server}")
|
||
cmd.append(f"/u:{username}")
|
||
cmd.append(f"/p:{password}")
|
||
|
||
if conn.get("domain"):
|
||
cmd.append(f"/d:{conn['domain']}")
|
||
|
||
# Check monitor configuration first to determine resolution handling
|
||
multimon = conn.get("multimon", "No")
|
||
use_specific_monitors = multimon in ["2 Monitors", "3 Monitors", "4 Monitors"]
|
||
|
||
# Get monitor list if needed
|
||
monitor_list = None
|
||
if use_specific_monitors:
|
||
if multimon == "2 Monitors":
|
||
monitor_list = self._get_best_monitor_selection(2)
|
||
elif multimon == "3 Monitors":
|
||
monitor_list = self._get_best_monitor_selection(3)
|
||
elif multimon == "4 Monitors":
|
||
monitor_list = self._get_best_monitor_selection(4)
|
||
|
||
# Resolution - for specific monitors, use /f with /monitors
|
||
resolution = conn.get("resolution", "1920x1080")
|
||
if resolution == "Full Screen":
|
||
cmd.append("/f")
|
||
self.logger.info("Using fullscreen mode")
|
||
else:
|
||
cmd.append(f"/size:{resolution}")
|
||
|
||
# Color depth
|
||
color_depth = conn.get("color_depth", 32)
|
||
cmd.append(f"/bpp:{color_depth}")
|
||
|
||
# Multiple monitors - use /multimon + /monitors for specific monitor selection (like working bash script)
|
||
if use_specific_monitors:
|
||
cmd.append("/multimon")
|
||
cmd.append(f"/monitors:{monitor_list}")
|
||
self.logger.info(f"Using /multimon /monitors:{monitor_list} for {multimon} (like working bash script)")
|
||
|
||
elif multimon == "All Monitors":
|
||
cmd.append("/multimon")
|
||
self.logger.info("Using all available monitors")
|
||
elif multimon == "Span":
|
||
cmd.append("/span")
|
||
self.logger.info("Using span mode across monitors")
|
||
|
||
# Sound
|
||
sound = conn.get("sound", "Yes")
|
||
if sound == "Yes":
|
||
cmd.append("/sound:sys")
|
||
elif sound == "Remote":
|
||
cmd.append("/sound:local")
|
||
|
||
# Microphone
|
||
if conn.get("microphone", "No") == "Yes":
|
||
cmd.append("/microphone")
|
||
|
||
# Clipboard
|
||
if conn.get("clipboard", "Yes") == "Yes":
|
||
cmd.append("/clipboard")
|
||
|
||
# Drive sharing
|
||
if conn.get("drives", "No") == "Yes":
|
||
cmd.append(f"/drive:home,{os.path.expanduser('~')}")
|
||
|
||
# Performance options
|
||
if conn.get("compression", "Yes") == "Yes":
|
||
cmd.append("/compression")
|
||
else:
|
||
cmd.append("-compression")
|
||
|
||
# Compression level
|
||
comp_level = conn.get("compression_level", "1 (Medium)")
|
||
if "0" in comp_level:
|
||
cmd.append("/compression-level:0")
|
||
elif "1" in comp_level:
|
||
cmd.append("/compression-level:1")
|
||
elif "2" in comp_level:
|
||
cmd.append("/compression-level:2")
|
||
|
||
# Bitmap cache
|
||
if conn.get("bitmap_cache", "Yes") == "Yes":
|
||
cmd.append("+bitmap-cache")
|
||
else:
|
||
cmd.append("-bitmap-cache")
|
||
|
||
# Offscreen cache
|
||
if conn.get("offscreen_cache", "Yes") == "Yes":
|
||
cmd.append("+offscreen-cache")
|
||
else:
|
||
cmd.append("-offscreen-cache")
|
||
|
||
# Font smoothing
|
||
if conn.get("fonts", "Yes") == "No":
|
||
cmd.append("-fonts")
|
||
|
||
# Desktop composition (Aero)
|
||
if conn.get("aero", "No") == "Yes":
|
||
cmd.append("+aero")
|
||
else:
|
||
cmd.append("-aero")
|
||
|
||
# Wallpaper
|
||
if conn.get("wallpaper", "No") == "No":
|
||
cmd.append("-wallpaper")
|
||
|
||
# Themes
|
||
if conn.get("themes", "Yes") == "No":
|
||
cmd.append("-themes")
|
||
|
||
# Menu animations
|
||
if conn.get("menu_anims", "No") == "No":
|
||
cmd.append("-menu-anims")
|
||
|
||
# Advanced options
|
||
if conn.get("printer", "No") == "Yes":
|
||
cmd.append("/printer")
|
||
|
||
if conn.get("com_ports", "No") == "Yes":
|
||
cmd.append("/serial")
|
||
|
||
if conn.get("usb", "No") == "Yes":
|
||
cmd.append("/usb")
|
||
|
||
# Network options
|
||
if conn.get("network_auto_detect", "Yes") == "No":
|
||
cmd.append("-network-auto-detect")
|
||
|
||
# Execute
|
||
cmd_str = ' '.join(cmd).replace(f"/p:{password}", "/p:***") # Hide password in logs
|
||
self.logger.info(f"Executing RDP command: {cmd_str}")
|
||
|
||
# Run xfreerdp without capturing output to allow interactive authentication
|
||
# This matches how the bash script runs xfreerdp with eval
|
||
result = subprocess.run(cmd)
|
||
|
||
# Since we're not capturing output, we can't get detailed error info
|
||
# but this allows xfreerdp to run interactively like the bash script
|
||
if result.returncode == 0:
|
||
self.logger.info(f"RDP connection to {server} completed successfully")
|
||
self.root.after(0, lambda: self.status_var.set("Connection completed successfully"))
|
||
else:
|
||
error_msg = f"RDP connection failed with exit code {result.returncode}. Check credentials and server settings."
|
||
self.logger.error(f"RDP connection failed with exit code: {result.returncode}")
|
||
self.root.after(0, lambda: self.status_var.set("Connection failed"))
|
||
self.root.after(0, lambda: messagebox.showerror("Connection Error", error_msg))
|
||
except FileNotFoundError:
|
||
error_msg = "xfreerdp is not installed or not found in PATH. Please install the freerdp package."
|
||
self.logger.error("xfreerdp not found")
|
||
self.root.after(0, lambda: self.status_var.set("xfreerdp not found"))
|
||
self.root.after(0, lambda: messagebox.showerror("Missing Dependency", error_msg))
|
||
except Exception as e:
|
||
error_msg = f"Unexpected error during connection: {str(e)}"
|
||
self.logger.error(f"Unexpected RDP connection error: {str(e)}", exc_info=True)
|
||
self.root.after(0, lambda: self.status_var.set(f"Error: {str(e)}"))
|
||
self.root.after(0, lambda: messagebox.showerror("Connection Error", error_msg))
|
||
|
||
# Start connection in separate thread
|
||
thread = threading.Thread(target=connect, daemon=True)
|
||
thread.start()
|
||
|
||
def _test_server_connectivity(self, server):
|
||
"""Test if server is reachable"""
|
||
try:
|
||
# Extract hostname/IP from server (remove port if present)
|
||
host = server.split(':')[0]
|
||
|
||
# Try to establish a TCP connection to port 3389 (RDP port)
|
||
port = 3389
|
||
if ':' in server:
|
||
try:
|
||
port = int(server.split(':')[1])
|
||
except ValueError:
|
||
port = 3389
|
||
|
||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||
sock.settimeout(5) # 5 second timeout
|
||
result = sock.connect_ex((host, port))
|
||
sock.close()
|
||
|
||
return result == 0
|
||
except Exception as e:
|
||
self.logger.warning(f"Connectivity test failed for {server}: {str(e)}")
|
||
return False
|
||
|
||
def _parse_rdp_error(self, stderr, returncode):
|
||
"""Parse RDP error messages and provide user-friendly descriptions"""
|
||
error_messages = {
|
||
1: "General connection error. Please check your credentials and server address.",
|
||
2: "Invalid command line arguments or configuration.",
|
||
3: "Authentication failed. Please check your username and password.",
|
||
4: "Connection refused. The server may not allow RDP connections.",
|
||
5: "Connection timeout. The server may be offline or unreachable.",
|
||
131: "Authentication failed. Please verify your credentials.",
|
||
132: "Connection was refused by the server.",
|
||
133: "Logon failure. Please check your username, password, and domain."
|
||
}
|
||
|
||
# Check for specific error patterns in stderr
|
||
stderr_lower = stderr.lower()
|
||
if "authentication" in stderr_lower or "logon" in stderr_lower:
|
||
return "Authentication failed. Please verify your username, password, and domain settings."
|
||
elif "connection" in stderr_lower and "refused" in stderr_lower:
|
||
return "Connection was refused by the server. The server may not allow RDP connections or may be configured to use different settings."
|
||
elif "certificate" in stderr_lower:
|
||
return "Certificate verification failed. The connection is still secure, but the server's certificate couldn't be verified."
|
||
elif "timeout" in stderr_lower:
|
||
return "Connection timed out. The server may be offline or unreachable."
|
||
elif "resolution" in stderr_lower:
|
||
return "Display resolution error. Try using a different resolution setting."
|
||
elif returncode in error_messages:
|
||
return error_messages[returncode]
|
||
else:
|
||
return f"Connection failed (Error {returncode}). Please check your connection settings and try again.\n\nTechnical details: {stderr}"
|
||
|
||
def _import_connections(self):
|
||
"""Import connections from file"""
|
||
file_path = filedialog.askopenfilename(
|
||
title="Import Connections",
|
||
filetypes=[("JSON files", "*.json"), ("All files", "*.*")],
|
||
defaultextension=".json"
|
||
)
|
||
|
||
if not file_path:
|
||
return
|
||
|
||
try:
|
||
with open(file_path, 'r') as f:
|
||
import_data = json.load(f)
|
||
|
||
# Validate import data structure
|
||
if not isinstance(import_data, dict):
|
||
messagebox.showerror("Import Error", "Invalid file format. Expected JSON object.")
|
||
return
|
||
|
||
connections_data = import_data.get("connections", {})
|
||
credentials_data = import_data.get("credentials", {})
|
||
|
||
if not isinstance(connections_data, dict):
|
||
messagebox.showerror("Import Error", "Invalid connections format in file.")
|
||
return
|
||
|
||
# Ask user about merge strategy
|
||
if self.connections:
|
||
choice = messagebox.askyesnocancel(
|
||
"Import Strategy",
|
||
"You have existing connections. Choose import strategy:\n\n"
|
||
"Yes - Merge (keep existing, add new)\n"
|
||
"No - Replace (delete existing, import new)\n"
|
||
"Cancel - Cancel import"
|
||
)
|
||
|
||
if choice is None: # Cancel
|
||
return
|
||
elif choice is False: # Replace
|
||
self.connections.clear()
|
||
self.credentials.clear()
|
||
|
||
# Import connections
|
||
imported_count = 0
|
||
for name, conn_data in connections_data.items():
|
||
if name in self.connections:
|
||
if not messagebox.askyesno("Overwrite", f"Connection '{name}' exists. Overwrite?"):
|
||
continue
|
||
|
||
self.connections[name] = conn_data
|
||
imported_count += 1
|
||
|
||
# Import credentials if available and user agrees
|
||
if credentials_data and messagebox.askyesno(
|
||
"Import Credentials",
|
||
"Import saved credentials as well? (Passwords will be re-encrypted with your key)"
|
||
):
|
||
for name, cred_data in credentials_data.items():
|
||
if name in self.connections: # Only import credentials for existing connections
|
||
self.credentials[name] = cred_data
|
||
|
||
# Save the changes
|
||
self._save_connections()
|
||
self._save_credentials()
|
||
self._refresh_connections_list()
|
||
|
||
self.status_var.set(f"📥 Successfully imported {imported_count} connections")
|
||
messagebox.showinfo("Import Complete", f"Successfully imported {imported_count} connections.")
|
||
|
||
except FileNotFoundError:
|
||
messagebox.showerror("Import Error", "File not found.")
|
||
except json.JSONDecodeError:
|
||
messagebox.showerror("Import Error", "Invalid JSON file format.")
|
||
except Exception as e:
|
||
messagebox.showerror("Import Error", f"Error importing connections: {str(e)}")
|
||
|
||
def _export_connections(self):
|
||
"""Export connections to file"""
|
||
if not self.connections:
|
||
messagebox.showwarning("Export Warning", "No connections to export.")
|
||
return
|
||
|
||
file_path = filedialog.asksaveasfilename(
|
||
title="Export Connections",
|
||
filetypes=[("JSON files", "*.json"), ("All files", "*.*")],
|
||
defaultextension=".json",
|
||
initialname=f"rdp_connections_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
|
||
)
|
||
|
||
if not file_path:
|
||
return
|
||
|
||
try:
|
||
# Ask if user wants to include credentials
|
||
include_credentials = messagebox.askyesno(
|
||
"Export Credentials",
|
||
"Include saved credentials in export?\n\n"
|
||
"Warning: Credentials will be encrypted but should be treated as sensitive data."
|
||
)
|
||
|
||
export_data = {
|
||
"connections": self.connections,
|
||
"export_date": datetime.now().isoformat(),
|
||
"exported_by": f"{os.getenv('USER')}@{os.uname().nodename}",
|
||
"version": "1.0"
|
||
}
|
||
|
||
if include_credentials:
|
||
export_data["credentials"] = self.credentials
|
||
|
||
with open(file_path, 'w') as f:
|
||
json.dump(export_data, f, indent=2)
|
||
|
||
self.status_var.set(f"📤 Connections exported to {os.path.basename(file_path)}")
|
||
messagebox.showinfo("Export Complete", f"Successfully exported {len(self.connections)} connections to:\n{file_path}")
|
||
|
||
except Exception as e:
|
||
messagebox.showerror("Export Error", f"Error exporting connections: {str(e)}")
|
||
|
||
def _clear_credentials(self):
|
||
"""Clear all saved credentials"""
|
||
if messagebox.askyesno("Confirm", "Clear all saved passwords?"):
|
||
self.credentials.clear()
|
||
self._save_credentials()
|
||
self.status_var.set("🔐 All credentials cleared")
|
||
self.logger.info("All credentials cleared by user")
|
||
|
||
def _test_selected_connection(self):
|
||
"""Test the selected connection for reachability"""
|
||
# Check saved connections selection
|
||
saved_selection = self.connections_listbox.curselection()
|
||
|
||
name = None
|
||
if saved_selection:
|
||
name = self.connections_listbox.get(saved_selection[0])
|
||
|
||
if not name:
|
||
messagebox.showwarning("No Selection", "Please select a connection to test.")
|
||
return
|
||
|
||
if name not in self.connections:
|
||
messagebox.showerror("Error", f"Connection '{name}' not found.")
|
||
return
|
||
|
||
conn = self.connections[name]
|
||
self._test_connection_async(name, conn)
|
||
|
||
def _test_connection_async(self, name, conn):
|
||
"""Test connection in a separate thread"""
|
||
def test():
|
||
try:
|
||
server = conn['server']
|
||
self.logger.info(f"Testing connection to {server}")
|
||
|
||
# Update status
|
||
self.root.after(0, lambda: self.status_var.set(f"Testing connection to {server}..."))
|
||
|
||
# Test basic connectivity
|
||
is_reachable = self._test_server_connectivity(server)
|
||
|
||
if is_reachable:
|
||
# Test RDP service specifically
|
||
rdp_available = self._test_rdp_service(server)
|
||
|
||
if rdp_available:
|
||
message = f"✓ Connection test successful!\n\nServer: {server}\nRDP Port: Accessible\nStatus: Ready for connection"
|
||
self.logger.info(f"Connection test successful for {server}")
|
||
self.root.after(0, lambda: self.status_var.set("Connection test passed"))
|
||
self.root.after(0, lambda: messagebox.showinfo("Connection Test", message))
|
||
else:
|
||
message = f"⚠ Server is reachable but RDP service may not be available.\n\nServer: {server}\nStatus: Network accessible but RDP port (3389) not responding\n\nThis could mean:\n• RDP is disabled on the server\n• A firewall is blocking RDP\n• RDP is running on a different port"
|
||
self.logger.warning(f"Server {server} reachable but RDP service not available")
|
||
self.root.after(0, lambda: self.status_var.set("RDP service not available"))
|
||
self.root.after(0, lambda: messagebox.showwarning("Connection Test", message))
|
||
else:
|
||
message = f"✗ Connection test failed!\n\nServer: {server}\nStatus: Not reachable\n\nPossible causes:\n• Server is offline\n• Incorrect server address\n• Network connectivity issues\n• Firewall blocking access"
|
||
self.logger.error(f"Connection test failed for {server} - server not reachable")
|
||
self.root.after(0, lambda: self.status_var.set("Connection test failed"))
|
||
self.root.after(0, lambda: messagebox.showerror("Connection Test", message))
|
||
|
||
except Exception as e:
|
||
error_msg = f"Error during connection test: {str(e)}"
|
||
self.logger.error(f"Connection test error for {name}: {str(e)}", exc_info=True)
|
||
self.root.after(0, lambda: self.status_var.set("Connection test error"))
|
||
self.root.after(0, lambda: messagebox.showerror("Test Error", error_msg))
|
||
|
||
# Start test in separate thread
|
||
thread = threading.Thread(target=test, daemon=True)
|
||
thread.start()
|
||
|
||
def _test_rdp_service(self, server):
|
||
"""Test if RDP service is specifically available"""
|
||
try:
|
||
# Extract hostname/IP and port
|
||
host = server.split(':')[0]
|
||
port = 3389
|
||
if ':' in server:
|
||
try:
|
||
port = int(server.split(':')[1])
|
||
except ValueError:
|
||
port = 3389
|
||
|
||
# Try to connect to RDP port
|
||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||
sock.settimeout(10) # Longer timeout for RDP test
|
||
result = sock.connect_ex((host, port))
|
||
sock.close()
|
||
|
||
return result == 0
|
||
|
||
except Exception as e:
|
||
self.logger.warning(f"RDP service test failed for {server}: {str(e)}")
|
||
return False
|
||
|
||
def run(self):
|
||
"""Start the application"""
|
||
self.root.mainloop()
|
||
|
||
|
||
class ConnectionDialog:
|
||
def __init__(self, parent, title, initial_data=None, rdp_client=None):
|
||
self.result = None
|
||
self.rdp_client = rdp_client
|
||
|
||
# Create dialog
|
||
self.dialog = tk.Toplevel(parent)
|
||
self.dialog.title(title)
|
||
self.dialog.geometry("600x700")
|
||
self.dialog.resizable(False, False)
|
||
self.dialog.transient(parent)
|
||
self.dialog.grab_set()
|
||
|
||
# Center dialog
|
||
self.dialog.update_idletasks()
|
||
x = (self.dialog.winfo_screenwidth() // 2) - (600 // 2)
|
||
y = (self.dialog.winfo_screenheight() // 2) - (700 // 2)
|
||
self.dialog.geometry(f"600x700+{x}+{y}")
|
||
|
||
self.initial_data = initial_data or {}
|
||
self._setup_dialog()
|
||
|
||
# Wait for dialog to close
|
||
self.dialog.wait_window()
|
||
|
||
def _setup_dialog(self):
|
||
"""Setup the connection dialog"""
|
||
main_frame = ttk.Frame(self.dialog, padding="20")
|
||
main_frame.pack(fill=tk.BOTH, expand=True)
|
||
|
||
# Create notebook for tabs
|
||
notebook = ttk.Notebook(main_frame)
|
||
notebook.pack(fill=tk.BOTH, expand=True, pady=(0, 20))
|
||
|
||
# Basic tab
|
||
basic_frame = ttk.Frame(notebook)
|
||
notebook.add(basic_frame, text="Basic")
|
||
|
||
# Advanced tab
|
||
advanced_frame = ttk.Frame(notebook)
|
||
notebook.add(advanced_frame, text="Advanced")
|
||
|
||
# Advanced tab description
|
||
adv_desc = ttk.Label(advanced_frame, text="Advanced connection and device sharing options",
|
||
font=("TkDefaultFont", 9, "italic"))
|
||
adv_desc.grid(row=0, column=0, columnspan=2, pady=(10, 15), padx=10, sticky=tk.W)
|
||
|
||
# Performance tab
|
||
performance_frame = ttk.Frame(notebook)
|
||
notebook.add(performance_frame, text="Performance")
|
||
|
||
# Performance tab description
|
||
perf_desc = ttk.Label(performance_frame, text="Performance optimization and visual quality settings",
|
||
font=("TkDefaultFont", 9, "italic"))
|
||
perf_desc.grid(row=0, column=0, columnspan=2, pady=(10, 15), padx=10, sticky=tk.W) # Store field variables
|
||
self.fields = {}
|
||
|
||
# Basic fields
|
||
basic_fields = [
|
||
("Connection Name:", "name", "entry"),
|
||
("Server/IP:", "server", "entry"),
|
||
("Username:", "username", "entry"),
|
||
("Password:", "password", "password"),
|
||
("Domain:", "domain", "entry"),
|
||
]
|
||
|
||
for i, (label, field, widget_type) in enumerate(basic_fields):
|
||
ttk.Label(basic_frame, text=label).grid(row=i, column=0, sticky=tk.W, pady=5, padx=(10, 5))
|
||
|
||
if widget_type == "entry":
|
||
var = tk.StringVar(value=self.initial_data.get(field, ""))
|
||
widget = ttk.Entry(basic_frame, textvariable=var, width=40)
|
||
elif widget_type == "password":
|
||
var = tk.StringVar(value=self.initial_data.get(field, ""))
|
||
widget = ttk.Entry(basic_frame, textvariable=var, show="*", width=40)
|
||
|
||
widget.grid(row=i, column=1, sticky=tk.W, pady=5, padx=(5, 10))
|
||
self.fields[field] = var
|
||
|
||
# Advanced fields
|
||
multimon_options = self.rdp_client._get_multimon_options() if self.rdp_client else ["No", "2 Monitors", "3 Monitors", "All Monitors", "Span"]
|
||
advanced_fields = [
|
||
("Resolution:", "resolution", "combo", ["1920x1080", "2560x1440", "1366x768", "1280x1024", "1024x768", "Full Screen"]),
|
||
("Color Depth:", "color_depth", "combo", ["32", "24", "16", "15"]),
|
||
("Multiple Monitors:", "multimon", "combo", multimon_options),
|
||
("Sound:", "sound", "combo", ["Yes", "No", "Remote"]),
|
||
("Microphone:", "microphone", "combo", ["No", "Yes"]),
|
||
("Clipboard:", "clipboard", "combo", ["Yes", "No"]),
|
||
("Share Home Drive:", "drives", "combo", ["No", "Yes"]),
|
||
("Printer Sharing:", "printer", "combo", ["No", "Yes"]),
|
||
("COM Ports:", "com_ports", "combo", ["No", "Yes"]),
|
||
("USB Devices:", "usb", "combo", ["No", "Yes"]),
|
||
("Gateway Mode:", "gateway", "combo", ["Auto", "RPC", "HTTP"]),
|
||
("Network Detection:", "network_auto_detect", "combo", ["Yes", "No"]),
|
||
]
|
||
|
||
for i, (label, field, widget_type, values) in enumerate(advanced_fields):
|
||
row = i + 1 # Offset by 1 for description label
|
||
ttk.Label(advanced_frame, text=label).grid(row=row, column=0, sticky=tk.W, pady=5, padx=(10, 5))
|
||
|
||
var = tk.StringVar(value=self.initial_data.get(field, values[0]))
|
||
widget = ttk.Combobox(advanced_frame, textvariable=var, values=values, width=37, state="readonly")
|
||
widget.grid(row=row, column=1, sticky=tk.W, pady=5, padx=(5, 10))
|
||
self.fields[field] = var
|
||
|
||
# Performance fields
|
||
performance_fields = [
|
||
("Compression:", "compression", "combo", ["Yes", "No"]),
|
||
("Compression Level:", "compression_level", "combo", ["0 (None)", "1 (Medium)", "2 (High)"]),
|
||
("Bitmap Cache:", "bitmap_cache", "combo", ["Yes", "No"]),
|
||
("Offscreen Cache:", "offscreen_cache", "combo", ["Yes", "No"]),
|
||
("Font Smoothing:", "fonts", "combo", ["Yes", "No"]),
|
||
("Desktop Composition:", "aero", "combo", ["No", "Yes"]),
|
||
("Wallpaper:", "wallpaper", "combo", ["No", "Yes"]),
|
||
("Themes:", "themes", "combo", ["Yes", "No"]),
|
||
("Menu Animations:", "menu_anims", "combo", ["No", "Yes"]),
|
||
]
|
||
|
||
for i, (label, field, widget_type, values) in enumerate(performance_fields):
|
||
row = i + 1 # Offset by 1 for description label
|
||
ttk.Label(performance_frame, text=label).grid(row=row, column=0, sticky=tk.W, pady=5, padx=(10, 5))
|
||
|
||
var = tk.StringVar(value=self.initial_data.get(field, values[0]))
|
||
widget = ttk.Combobox(performance_frame, textvariable=var, values=values, width=37, state="readonly")
|
||
widget.grid(row=row, column=1, sticky=tk.W, pady=5, padx=(5, 10))
|
||
self.fields[field] = var
|
||
|
||
# Buttons
|
||
button_frame = ttk.Frame(main_frame)
|
||
button_frame.pack(fill=tk.X)
|
||
|
||
ttk.Button(button_frame, text="Cancel", command=self._cancel).pack(side=tk.RIGHT, padx=(10, 0))
|
||
ttk.Button(button_frame, text="Save", command=self._save).pack(side=tk.RIGHT)
|
||
|
||
def _save(self):
|
||
"""Save the connection"""
|
||
# Validate required fields
|
||
if not self.fields["name"].get().strip():
|
||
messagebox.showerror("Error", "Connection name is required.")
|
||
return
|
||
|
||
if not self.fields["server"].get().strip():
|
||
messagebox.showerror("Error", "Server/IP is required.")
|
||
return
|
||
|
||
if not self.fields["username"].get().strip():
|
||
messagebox.showerror("Error", "Username is required.")
|
||
return
|
||
|
||
# Collect data
|
||
self.result = {}
|
||
for field, var in self.fields.items():
|
||
self.result[field] = var.get().strip()
|
||
|
||
self.dialog.destroy()
|
||
|
||
def _cancel(self):
|
||
"""Cancel the dialog"""
|
||
self.dialog.destroy()
|
||
|
||
|
||
def main():
|
||
# Check dependencies
|
||
dependencies = ["/usr/bin/xfreerdp"]
|
||
missing = []
|
||
|
||
for dep in dependencies:
|
||
if not os.path.exists(dep):
|
||
missing.append(dep)
|
||
|
||
if missing:
|
||
print("Missing dependencies:")
|
||
for dep in missing:
|
||
print(f" - {dep}")
|
||
print("\nPlease install freerdp package:")
|
||
print(" sudo apt install freerdp2-x11 # Ubuntu/Debian")
|
||
print(" sudo dnf install freerdp # Fedora")
|
||
return 1
|
||
|
||
try:
|
||
app = RDPClient()
|
||
app.run()
|
||
return 0
|
||
except KeyboardInterrupt:
|
||
print("\nExiting...")
|
||
return 0
|
||
except Exception as e:
|
||
print(f"Error: {e}")
|
||
return 1
|
||
|
||
|
||
if __name__ == "__main__":
|
||
exit(main()) |