test/server.js
2026-04-29 05:52:37 +00:00

424 lines
14 KiB
JavaScript
Executable File

const express = require('express');
const http = require('http');
const { Server } = require('socket.io');
const QRCode = require('qrcode');
const Anthropic = require('@anthropic-ai/sdk');
const { v4: uuidv4 } = require('uuid');
const path = require('path');
const app = express();
const server = http.createServer(app);
const io = new Server(server, { cors: { origin: '*' } });
app.use(express.json());
app.use(express.static(path.join(__dirname, 'public')));
const anthropic = process.env.ANTHROPIC_API_KEY
? new Anthropic({ apiKey: process.env.ANTHROPIC_API_KEY })
: null;
// ── State ────────────────────────────────────────────────────────────────────
let sessionState = 'disconnected';
let qrDataUrl = null;
let waClient = null;
const conversations = new Map(); // chatId -> ConversationObject
const inboundQueue = [];
const outboundQueue = [];
const seenMessageIds = new Set();
const processedIdempotencyKeys = new Set();
let outboundBusy = false;
const stats = {
queueDepth: 0,
outboundDepth: 0,
sendSuccess: 0,
sendFailed: 0,
reconnectCount: 0,
lastHeartbeat: null,
workerStatus: 'idle'
};
// ── Helpers ──────────────────────────────────────────────────────────────────
function broadcastState() {
io.emit('session_state', { state: sessionState, qrDataUrl, stats });
}
function broadcastInbox() {
const inbox = Array.from(conversations.values()).sort(
(a, b) => new Date(b.lastActivity) - new Date(a.lastActivity)
);
io.emit('inbox_update', inbox);
}
function addMessage(chatId, msg) {
if (!conversations.has(chatId)) {
conversations.set(chatId, {
id: chatId,
contact: msg.contact || chatId,
phone: chatId,
avatar: (msg.contact || chatId).charAt(0).toUpperCase(),
messages: [],
unreadCount: 0,
lastActivity: msg.timestamp,
lastPreview: msg.body
});
}
const conv = conversations.get(chatId);
conv.messages.push(msg);
conv.lastActivity = msg.timestamp;
conv.lastPreview = msg.body;
if (!msg.fromMe) conv.unreadCount++;
return conv;
}
// ── WhatsApp Client ───────────────────────────────────────────────────────────
function initWhatsApp() {
let Client, LocalAuth;
try {
({ Client, LocalAuth } = require('whatsapp-web.js'));
} catch {
console.warn('whatsapp-web.js not available, staying disconnected');
sessionState = 'degraded';
broadcastState();
return;
}
waClient = new Client({
authStrategy: new LocalAuth({ dataPath: './.wwebjs_auth' }),
puppeteer: {
args: [
'--no-sandbox', '--disable-setuid-sandbox',
'--disable-dev-shm-usage', '--disable-accelerated-2d-canvas',
'--no-first-run', '--no-zygote', '--single-process', '--disable-gpu'
]
}
});
waClient.on('qr', async (qr) => {
sessionState = 'awaiting_scan';
qrDataUrl = await QRCode.toDataURL(qr, { width: 280, margin: 2 });
broadcastState();
});
waClient.on('authenticated', () => {
sessionState = 'connecting';
broadcastState();
});
waClient.on('ready', () => {
sessionState = 'connected';
qrDataUrl = null;
stats.lastHeartbeat = new Date().toISOString();
broadcastState();
console.log('WhatsApp ready');
});
waClient.on('disconnected', (reason) => {
console.log('Disconnected:', reason);
sessionState = 'reconnecting';
stats.reconnectCount++;
broadcastState();
setTimeout(() => waClient.initialize().catch(console.error), 5000);
});
waClient.on('message', async (msg) => {
if (msg.fromMe) return;
const mid = msg.id._serialized;
if (seenMessageIds.has(mid)) return;
seenMessageIds.add(mid);
let contactName = msg.from;
try {
const contact = await msg.getContact();
contactName = contact.name || contact.pushname || msg.from;
} catch {}
const inboundMsg = {
id: mid,
body: msg.body,
fromMe: false,
contact: contactName,
timestamp: new Date(msg.timestamp * 1000).toISOString(),
state: 'received'
};
const conv = addMessage(msg.from, inboundMsg);
inboundQueue.push({ ...inboundMsg, chatId: msg.from });
stats.queueDepth = inboundQueue.length;
io.emit('new_message', { chatId: msg.from, conversation: conv, message: inboundMsg });
io.emit('stats_update', stats);
broadcastInbox();
});
waClient.initialize().catch((err) => {
console.error('WhatsApp init failed:', err.message);
sessionState = 'degraded';
broadcastState();
});
}
// ── Outbound Worker ───────────────────────────────────────────────────────────
async function processOutbound() {
if (outboundBusy || outboundQueue.length === 0) return;
outboundBusy = true;
stats.workerStatus = 'busy';
io.emit('stats_update', stats);
const job = outboundQueue.shift();
stats.outboundDepth = outboundQueue.length;
try {
job.state = 'sending';
io.emit('message_state', job);
if (sessionState !== 'connected' || !waClient) {
throw new Error('Session not connected');
}
await waClient.sendMessage(job.chatId, job.body);
job.state = 'sent';
job.sentAt = new Date().toISOString();
stats.sendSuccess++;
const sentMsg = {
id: job.idempotencyKey,
body: job.body,
fromMe: true,
timestamp: job.sentAt,
state: 'sent'
};
addMessage(job.chatId, sentMsg);
// Don't increment unread for own messages
const conv = conversations.get(job.chatId);
if (conv) { conv.unreadCount = Math.max(0, conv.unreadCount - 1); conv.unreadCount = 0; }
broadcastInbox();
} catch (err) {
job.retries = (job.retries || 0) + 1;
stats.sendFailed++;
if (job.retries < 3) {
job.state = 'retrying';
setTimeout(() => {
outboundQueue.unshift(job);
stats.outboundDepth = outboundQueue.length;
processOutbound();
}, Math.pow(2, job.retries) * 1500);
} else {
job.state = 'failed';
job.error = err.message;
}
}
io.emit('message_state', job);
io.emit('stats_update', stats);
outboundBusy = false;
stats.workerStatus = 'idle';
if (outboundQueue.length > 0) setTimeout(processOutbound, 400);
}
// ── Heartbeat ─────────────────────────────────────────────────────────────────
setInterval(() => {
if (sessionState === 'connected') {
stats.lastHeartbeat = new Date().toISOString();
io.emit('stats_update', stats);
}
}, 30000);
// ── REST API ──────────────────────────────────────────────────────────────────
app.get('/api/session', (req, res) => {
res.json({ state: sessionState, qrDataUrl, stats });
});
app.get('/api/inbox', (req, res) => {
const inbox = Array.from(conversations.values()).sort(
(a, b) => new Date(b.lastActivity) - new Date(a.lastActivity)
);
res.json(inbox);
});
app.get('/api/conversation/:chatId', (req, res) => {
const conv = conversations.get(decodeURIComponent(req.params.chatId));
if (!conv) return res.status(404).json({ error: 'Not found' });
res.json(conv);
});
app.post('/api/send', (req, res) => {
const { chatId, body, idempotencyKey } = req.body;
if (!chatId || !body) return res.status(400).json({ error: 'chatId and body required' });
const key = idempotencyKey || uuidv4();
if (processedIdempotencyKeys.has(key)) {
return res.json({ queued: false, duplicate: true, key });
}
processedIdempotencyKeys.add(key);
const job = {
id: uuidv4(),
idempotencyKey: key,
chatId,
body,
state: 'queued',
createdAt: new Date().toISOString(),
retries: 0
};
outboundQueue.push(job);
stats.outboundDepth = outboundQueue.length;
io.emit('stats_update', stats);
processOutbound();
res.json({ queued: true, job });
});
app.post('/api/ai-draft', async (req, res) => {
if (!anthropic) {
return res.status(503).json({ error: 'ANTHROPIC_API_KEY not configured' });
}
const { chatId, hint } = req.body;
const conv = conversations.get(chatId);
const recent = conv ? conv.messages.slice(-12) : [];
const transcript = recent
.map(m => `${m.fromMe ? 'Agent' : 'Customer'}: ${m.body}`)
.join('\n');
try {
const response = await anthropic.messages.create({
model: 'claude-sonnet-4-6',
max_tokens: 400,
system: [
{
type: 'text',
text: 'You are a professional customer service agent. Write a concise, friendly reply to the customer\'s latest message. Output only the reply text — no preamble, no quotation marks, no signature.',
cache_control: { type: 'ephemeral' }
}
],
messages: [
{
role: 'user',
content: `Conversation so far:\n${transcript}\n${hint ? `\nAdditional context: ${hint}` : ''}\n\nWrite a reply to the customer's last message.`
}
]
});
res.json({ draft: response.content[0].text });
} catch (err) {
res.status(500).json({ error: err.message });
}
});
app.post('/api/mark-read/:chatId', (req, res) => {
const conv = conversations.get(decodeURIComponent(req.params.chatId));
if (conv) { conv.unreadCount = 0; broadcastInbox(); }
res.json({ ok: true });
});
app.get('/api/stats', (req, res) => res.json(stats));
// Demo mode: inject simulated inbound message
app.post('/api/demo/inject', (req, res) => {
if (process.env.DEMO_MODE !== 'true') return res.status(403).json({ error: 'Demo mode off' });
const { chatId, body } = req.body;
const conv = conversations.get(chatId);
if (!conv) return res.status(404).json({ error: 'Chat not found' });
const msg = {
id: uuidv4(),
body: body || 'Hey, any update?',
fromMe: false,
contact: conv.contact,
timestamp: new Date().toISOString(),
state: 'received'
};
addMessage(chatId, msg);
io.emit('new_message', { chatId, conversation: conversations.get(chatId), message: msg });
broadcastInbox();
res.json({ ok: true });
});
// ── Socket.io ─────────────────────────────────────────────────────────────────
io.on('connection', (socket) => {
socket.emit('session_state', { state: sessionState, qrDataUrl, stats });
const inbox = Array.from(conversations.values()).sort(
(a, b) => new Date(b.lastActivity) - new Date(a.lastActivity)
);
socket.emit('inbox_update', inbox);
});
// ── Demo Mode ─────────────────────────────────────────────────────────────────
if (process.env.DEMO_MODE === 'true') {
sessionState = 'connected';
stats.lastHeartbeat = new Date().toISOString();
const demoChats = [
{ id: '[email protected]', contact: 'Rahul Sharma' },
{ id: '[email protected]', contact: 'Priya Patel' },
{ id: '[email protected]', contact: 'Amit Kumar' },
{ id: '[email protected]', contact: 'Sneha Joshi' }
];
const demoThreads = [
[
{ from: false, body: 'Hi, I placed an order #12345 but haven\'t received a confirmation yet.' },
{ from: true, body: 'Hello Rahul! Let me check that for you right away.' },
{ from: false, body: 'It\'s been 2 days already. Should I be worried?' },
{ from: false, body: 'Also, can I change the delivery address?' }
],
[
{ from: false, body: 'I want to return a product. The size doesn\'t fit.' },
{ from: true, body: 'Hi Priya! I\'d be happy to help with the return. Could you share your order ID?' },
{ from: false, body: 'It\'s ORDER-78923' }
],
[
{ from: false, body: 'Is COD available for my area - 411001?' },
{ from: true, body: 'Yes, COD is available in Pune 411001!' },
{ from: false, body: 'Great! What\'s the maximum COD order value?' }
],
[
{ from: false, body: 'My payment failed but amount was deducted. Please help!' }
]
];
demoChats.forEach((chat, i) => {
const thread = demoThreads[i];
conversations.set(chat.id, {
id: chat.id,
contact: chat.contact,
phone: chat.id,
avatar: chat.contact.charAt(0),
messages: thread.map((m, j) => ({
id: uuidv4(),
body: m.body,
fromMe: m.from,
contact: m.from ? 'Agent' : chat.contact,
timestamp: new Date(Date.now() - (thread.length - j) * 900000 - i * 3600000).toISOString(),
state: 'received'
})),
unreadCount: thread.filter(m => !m.from).length - Math.min(thread.filter(m => m.from).length, thread.filter(m => !m.from).length),
lastActivity: new Date(Date.now() - i * 3600000).toISOString(),
lastPreview: thread[thread.length - 1].body
});
// Fix unreadCount to just be trailing customer messages
const conv = conversations.get(chat.id);
let unread = 0;
for (let k = conv.messages.length - 1; k >= 0; k--) {
if (!conv.messages[k].fromMe) unread++;
else break;
}
conv.unreadCount = unread;
});
console.log('Running in DEMO MODE — no real WhatsApp connection');
} else {
initWhatsApp();
}
// ── Start ─────────────────────────────────────────────────────────────────────
const PORT = process.env.PORT || 8080;
server.listen(PORT, () => {
console.log(`WhatsApp AI Agent on port ${PORT} | Demo=${process.env.DEMO_MODE}`);
});