diff --git a/app/api/cluster/control/route.ts b/app/api/cluster/control/route.ts index b9b23aa..6991edf 100644 --- a/app/api/cluster/control/route.ts +++ b/app/api/cluster/control/route.ts @@ -2,6 +2,8 @@ import { NextRequest, NextResponse } from 'next/server' import { exec } from 'child_process' import { promisify } from 'util' import path from 'path' +import sqlite3 from 'sqlite3' +import { open } from 'sqlite' const execAsync = promisify(exec) @@ -33,9 +35,19 @@ export async function POST(request: NextRequest) { // Reset any stale "running" chunks to "pending" (orphaned from crashed coordinator) console.log('π§ Checking for stale database chunks...') - const resetCmd = `sqlite3 ${dbPath} "UPDATE chunks SET status='pending', assigned_worker=NULL, started_at=NULL WHERE status='running';"` - await execAsync(resetCmd) - console.log('β Database cleanup complete') + try { + const db = await open({ + filename: dbPath, + driver: sqlite3.Database + }) + await db.run(`UPDATE chunks SET status='pending', assigned_worker=NULL, started_at=NULL WHERE status='running'`) + const { changes } = await db.run('SELECT changes() as changes') + await db.close() + console.log(`β Database cleanup complete - ${changes || 0} chunks reset`) + } catch (dbErr) { + console.error('β οΈ Database cleanup failed:', dbErr) + // Continue anyway - don't block start if database issue + } // Start the coordinator const startCmd = 'cd /home/icke/traderv4/cluster && nohup python3 distributed_coordinator.py > coordinator.log 2>&1 &' @@ -69,38 +81,56 @@ export async function POST(request: NextRequest) { isRunning: true }) } else if (action === 'stop') { - // ENHANCED (Dec 1, 2025): Reset database state when stopping cluster - // Prevents stale "running" chunks after stop + // CRITICAL FIX (Dec 1, 2025): ALWAYS reset database state when stopping + // Issue: Coordinator may have already exited but left chunks in "running" state + // Solution: Reset database FIRST, then attempt to kill any remaining processes console.log('π Stopping cluster...') - // Stop coordinator and workers + // CRITICAL: Reset database state FIRST (even if coordinator already gone) + const dbPath = path.join(process.cwd(), 'cluster', 'exploration.db') + console.log('π§ Resetting database chunks to pending...') + try { + const db = await open({ + filename: dbPath, + driver: sqlite3.Database + }) + const result = await db.run(`UPDATE chunks SET status='pending', assigned_worker=NULL, started_at=NULL WHERE status='running'`) + const pendingCount = await db.get(`SELECT COUNT(*) as count FROM chunks WHERE status='pending'`) + await db.close() + console.log(`β Database cleanup complete - ${result.changes || 0} chunks reset to pending (total pending: ${pendingCount?.count || 0})`) + } catch (dbErr) { + console.error('β Database reset failed:', dbErr) + return NextResponse.json({ + success: false, + error: 'Failed to reset database state', + details: dbErr instanceof Error ? dbErr.message : 'Unknown error' + }, { status: 500 }) + } + + // THEN try to stop any running processes (may already be stopped) const stopCmd = 'pkill -9 -f distributed_coordinator; pkill -9 -f distributed_worker' try { await execAsync(stopCmd) + console.log('β Killed coordinator and worker processes') } catch (err) { // pkill returns error code if no processes found - this is OK console.log('π No processes to kill (already stopped)') } - // Wait a moment + // Wait a moment for cleanup await new Promise(resolve => setTimeout(resolve, 1000)) - // Reset any running chunks to pending (cleanup orphaned state) - const dbPath = path.join(process.cwd(), 'cluster', 'exploration.db') - const resetCmd = `sqlite3 ${dbPath} "UPDATE chunks SET status='pending', assigned_worker=NULL, started_at=NULL WHERE status='running';"` - await execAsync(resetCmd) - console.log('β Database cleanup complete') - - // Verify it's stopped + // Verify everything is stopped const checkCmd = 'ps aux | grep -E "(distributed_coordinator|distributed_worker)" | grep -v grep | wc -l' const { stdout } = await execAsync(checkCmd) const processCount = parseInt(stdout.trim()) return NextResponse.json({ success: true, - message: processCount === 0 ? 'Cluster stopped and database cleaned' : 'Stop signal sent', - isRunning: processCount > 0 + message: 'Cluster stopped and database reset to pending', + isRunning: processCount > 0, + note: processCount === 0 ? 'All processes stopped, chunks reset' : 'Some processes may still be cleaning up' }) } else if (action === 'status') { // Check if coordinator is running diff --git a/app/cluster/page.tsx b/app/cluster/page.tsx index 40743dc..ef9cb4e 100644 --- a/app/cluster/page.tsx +++ b/app/cluster/page.tsx @@ -254,6 +254,9 @@ export default function ClusterPage() { {status.exploration.chunks.running > 0 && ( ({status.exploration.chunks.running} running) )} + {status.exploration.chunks.pending > 0 && status.exploration.chunks.running === 0 && ( + ({status.exploration.chunks.pending} pending) + )}