#!/usr/bin/env node /** * RSpade Realtime WebSocket Server * * Dumb relay: validates HMAC tokens, routes messages by site_id + topic + filter. * Zero business logic, zero database access. PHP is the authority. * * Usage: node system/bin/realtime-server.js * * Requires .env: APP_KEY, REDIS_HOST, REDIS_PORT, REDIS_PASSWORD, * REALTIME_WS_PORT (default 6200) */ const { WebSocketServer } = require('ws'); const { createClient } = require('redis'); const crypto = require('crypto'); const fs = require('fs'); const path = require('path'); // --------------------------------------------------------------------------- // Configuration // --------------------------------------------------------------------------- // Load .env from project root const env_path = path.resolve(__dirname, '../../.env'); if (fs.existsSync(env_path)) { const env_content = fs.readFileSync(env_path, 'utf8'); for (const line of env_content.split('\n')) { const trimmed = line.trim(); if (!trimmed || trimmed.startsWith('#')) continue; const eq = trimmed.indexOf('='); if (eq === -1) continue; const key = trimmed.substring(0, eq); let value = trimmed.substring(eq + 1); // Strip surrounding quotes if ((value.startsWith('"') && value.endsWith('"')) || (value.startsWith("'") && value.endsWith("'"))) { value = value.slice(1, -1); } if (!process.env[key]) { process.env[key] = value; } } } const WS_PORT = parseInt(process.env.REALTIME_WS_PORT || '6200', 10); const APP_KEY = process.env.APP_KEY || ''; const REDIS_HOST = process.env.REDIS_HOST || '127.0.0.1'; const REDIS_PORT = parseInt(process.env.REDIS_PORT || '6379', 10); const REDIS_PASSWORD = process.env.REDIS_PASSWORD === 'null' ? undefined : process.env.REDIS_PASSWORD; const REDIS_PREFIX = 'rsx_rt'; const HEARTBEAT_INTERVAL = 30000; // 30 seconds const PONG_TIMEOUT = 10000; // 10 seconds to respond const AUTH_TIMEOUT = 5000; // 5 seconds to authenticate after connect if (!APP_KEY) { console.error('[realtime] APP_KEY not set in .env'); process.exit(1); } // --------------------------------------------------------------------------- // State // --------------------------------------------------------------------------- // ws → { user_id, site_id, session_id, authenticated, subscriptions: Map, alive } const connections = new Map(); // "site_id:topic:filter_hash" → { topic, data, ts } const last_messages = new Map(); // --------------------------------------------------------------------------- // Token validation // --------------------------------------------------------------------------- function validate_token(token_string) { const dot = token_string.indexOf('.'); if (dot === -1) return null; const payload_b64 = token_string.substring(0, dot); const signature = token_string.substring(dot + 1); let json; try { json = Buffer.from(payload_b64, 'base64').toString('utf8'); } catch { return null; } // Verify HMAC const expected = crypto.createHmac('sha256', APP_KEY).update(json).digest('hex'); if (!crypto.timingSafeEqual(Buffer.from(signature, 'hex'), Buffer.from(expected, 'hex'))) { return null; } let payload; try { payload = JSON.parse(json); } catch { return null; } // Check expiry if (payload.exp && payload.exp < Math.floor(Date.now() / 1000)) { return null; } return payload; } // --------------------------------------------------------------------------- // Filter matching // --------------------------------------------------------------------------- function matches_filter(subscription_filter, message_data) { if (!subscription_filter || Object.keys(subscription_filter).length === 0) return true; for (const key in subscription_filter) { if (String(message_data[key]) !== String(subscription_filter[key])) return false; } return true; } function filter_hash(filter) { if (!filter || Object.keys(filter).length === 0) return '_'; const sorted = Object.keys(filter).sort().map(k => k + '=' + filter[k]).join('&'); return crypto.createHash('md5').update(sorted).digest('hex').substring(0, 12); } // --------------------------------------------------------------------------- // WebSocket Server // --------------------------------------------------------------------------- const wss = new WebSocketServer({ port: WS_PORT }); wss.on('listening', () => { console.log(`[realtime] WebSocket server listening on port ${WS_PORT}`); }); wss.on('connection', (ws) => { const conn = { user_id: null, site_id: null, session_id: null, authenticated: false, subscriptions: new Map(), alive: true, }; connections.set(ws, conn); // Require authentication within timeout const auth_timer = setTimeout(() => { if (!conn.authenticated) { ws.close(4001, 'Authentication timeout'); } }, AUTH_TIMEOUT); ws.on('pong', () => { conn.alive = true; }); ws.on('message', (raw) => { let msg; try { msg = JSON.parse(raw.toString()); } catch { return; } if (msg.type === 'auth') { handle_auth(ws, conn, msg, auth_timer); } else if (!conn.authenticated) { ws.close(4001, 'Not authenticated'); } else if (msg.type === 'subscribe') { handle_subscribe(ws, conn, msg); } else if (msg.type === 'unsubscribe') { handle_unsubscribe(conn, msg); } }); ws.on('close', () => { clearTimeout(auth_timer); connections.delete(ws); }); ws.on('error', () => { clearTimeout(auth_timer); connections.delete(ws); }); }); function handle_auth(ws, conn, msg, auth_timer) { if (conn.authenticated) return; const payload = validate_token(msg.token || ''); if (!payload || !payload.user_id || payload.site_id === undefined) { ws.close(4003, 'Invalid token'); return; } conn.user_id = payload.user_id; conn.site_id = payload.site_id; conn.session_id = payload.session_id; conn.authenticated = true; clearTimeout(auth_timer); ws.send(JSON.stringify({ type: 'auth_ok' })); } function handle_subscribe(ws, conn, msg) { const payload = validate_token(msg.token || ''); if (!payload || !payload.topic) { ws.send(JSON.stringify({ type: 'error', message: 'Invalid subscribe token' })); return; } // Site ID must match connection if (payload.site_id !== conn.site_id) { ws.send(JSON.stringify({ type: 'error', message: 'Site mismatch' })); return; } const sub_id = msg.sub_id; const topic = payload.topic; const filter = payload.filter || {}; conn.subscriptions.set(sub_id, { topic, filter }); ws.send(JSON.stringify({ type: 'subscribed', sub_id })); // Replay last message if available const lm_key = `${conn.site_id}:${topic}:${filter_hash(filter)}`; const last = last_messages.get(lm_key); if (last) { ws.send(JSON.stringify({ type: 'message', topic: last.topic, data: last.data, ts: last.ts, replay: true, })); } } function handle_unsubscribe(conn, msg) { conn.subscriptions.delete(msg.sub_id); } // --------------------------------------------------------------------------- // Heartbeat - ping every 30s, kill unresponsive connections // --------------------------------------------------------------------------- const heartbeat = setInterval(() => { for (const [ws, conn] of connections) { if (!conn.alive) { ws.terminate(); connections.delete(ws); continue; } conn.alive = false; ws.ping(); } }, HEARTBEAT_INTERVAL); wss.on('close', () => { clearInterval(heartbeat); }); // --------------------------------------------------------------------------- // Redis subscriber - receive messages from PHP // --------------------------------------------------------------------------- async function start_redis() { const subscriber = createClient({ socket: { host: REDIS_HOST, port: REDIS_PORT, }, password: REDIS_PASSWORD, }); subscriber.on('error', (err) => { console.error('[realtime] Redis error:', err.message); }); await subscriber.connect(); console.log(`[realtime] Connected to Redis at ${REDIS_HOST}:${REDIS_PORT}`); // Subscribe to pattern rsx_rt:* await subscriber.pSubscribe(`${REDIS_PREFIX}:*`, (message, channel) => { let msg; try { msg = JSON.parse(message); } catch { return; } const topic = msg.topic; const data = msg.data || {}; const site_id = msg.site_id; const ts = msg.ts || Math.floor(Date.now() / 1000); // Store as last message for replay // Store both with filter hash and without (wildcard subscribers get unfiltered last msg) const lm_key_unfiltered = `${site_id}:${topic}:_`; last_messages.set(lm_key_unfiltered, { topic, data, ts }); // Also store with data-derived filter hashes for common filter patterns // (Subscribers with specific filters will match against their own filter hash) if (data.id !== undefined) { const lm_key_id = `${site_id}:${topic}:${filter_hash({ id: data.id })}`; last_messages.set(lm_key_id, { topic, data, ts }); } // Route to matching WebSocket connections for (const [ws, conn] of connections) { if (!conn.authenticated) continue; if (conn.site_id !== site_id) continue; for (const [sub_id, sub] of conn.subscriptions) { if (sub.topic !== topic) continue; if (!matches_filter(sub.filter, data)) continue; ws.send(JSON.stringify({ type: 'message', topic, data, ts, })); break; // Only send once per connection even if multiple subs match } } }); return subscriber; } // --------------------------------------------------------------------------- // Startup // --------------------------------------------------------------------------- start_redis().catch((err) => { console.error('[realtime] Failed to connect to Redis:', err.message); process.exit(1); }); // Graceful shutdown process.on('SIGTERM', () => { console.log('[realtime] Shutting down...'); wss.close(); process.exit(0); }); process.on('SIGINT', () => { console.log('[realtime] Shutting down...'); wss.close(); process.exit(0); }); // Log stats periodically setInterval(() => { const conn_count = connections.size; let sub_count = 0; for (const [, conn] of connections) { sub_count += conn.subscriptions.size; } if (conn_count > 0) { console.log(`[realtime] Connections: ${conn_count}, Subscriptions: ${sub_count}, Cached messages: ${last_messages.size}`); } }, 60000);