const crypto = require('crypto'); const { Pool } = require('pg'); function normalizeText(value) { return typeof value === 'string' ? value.trim() : ''; } function normalizeInteger(value) { return Number.isInteger(value) ? value : null; } function normalizeTextList(values = []) { if (!Array.isArray(values)) return []; return [...new Set( values .map((value) => normalizeText(value)) .filter(Boolean) )]; } function getConnectionString() { return normalizeText( process.env.FDK_STORAGE_CONNECTION_STRING || process.env.DATABASE_URL || process.env.POSTGRES_CONNECTION_STRING ); } let analyticsPool = null; function getPool() { if (analyticsPool) return analyticsPool; const connectionString = getConnectionString(); if (!connectionString) { throw new Error('Analytics database is not configured'); } analyticsPool = new Pool({ connectionString }); return analyticsPool; } function sha256(value) { return crypto.createHash('sha256').update(String(value || '')).digest('hex'); } function buildPhoneMetadata(value) { const digits = String(value || '').replace(/\D/g, ''); if (!digits) { return { toNumberHash: '', toNumberLast4: '', }; } return { toNumberHash: sha256(digits), toNumberLast4: digits.slice(-4), }; } function buildSourceEventKey({ applicationId, shipmentId, orderId, eventSlug, payload = null }) { const normalizedParts = { applicationId: normalizeText(applicationId), shipmentId: normalizeText(shipmentId), orderId: normalizeText(orderId), eventSlug: normalizeText(eventSlug), }; const hasStableIdentifiers = normalizedParts.applicationId && normalizedParts.eventSlug && (normalizedParts.shipmentId || normalizedParts.orderId); if (hasStableIdentifiers) { return sha256(JSON.stringify(normalizedParts)); } return sha256(JSON.stringify({ ...normalizedParts, payload, })); } function buildStatusFingerprint(entry = {}) { return sha256(JSON.stringify({ messageExecutionId: entry.messageExecutionId, statusSource: normalizeText(entry.statusSource), statusType: normalizeText(entry.statusType), normalizedStatus: normalizeText(entry.normalizedStatus), providerMessageId: normalizeText(entry.providerMessageId), providerStatus: normalizeText(entry.providerStatus), providerStatusCode: normalizeText(entry.providerStatusCode), errorCode: normalizeText(entry.errorCode), errorMessage: normalizeText(entry.errorMessage), payload: entry.payload || null, })); } function extractProviderMessageId(value, depth = 0) { if (!value || depth > 4) return ''; if (Array.isArray(value)) { for (const entry of value) { const nestedMatch = extractProviderMessageId(entry, depth + 1); if (nestedMatch) return nestedMatch; } return ''; } if (typeof value === 'object') { const candidates = [ value.message_id, value.messageId, value.msg_id, value.msgid, value.sms_id, value.smsId, value.request_id, value.requestId, value.id, ] .map((entry) => normalizeText(entry)) .filter(Boolean); if (candidates.length > 0) return candidates[0]; for (const nestedValue of Object.values(value)) { const nestedMatch = extractProviderMessageId(nestedValue, depth + 1); if (nestedMatch) return nestedMatch; } } return ''; } function buildExecutionFilters({ companyId, businessId, eventSlugs }) { const values = []; const conditions = []; if (normalizeText(companyId)) { values.push(normalizeText(companyId)); conditions.push(`company_id = $${values.length}`); } if (normalizeText(businessId)) { values.push(normalizeText(businessId)); conditions.push(`business_id = $${values.length}`); } const normalizedEventSlugs = normalizeTextList(eventSlugs); if (normalizedEventSlugs.length > 0) { values.push(normalizedEventSlugs); conditions.push(`event_slug = ANY($${values.length})`); } if (conditions.length === 0) { throw new Error('Analytics queries require at least one scope filter'); } return { whereClause: conditions.join(' AND '), values, }; } function parseCount(value) { const parsed = Number.parseInt(value, 10); return Number.isFinite(parsed) ? parsed : 0; } function computeFallbackRate({ deliveredCount = 0, deliveryFailedCount = 0, acceptedCount = 0, sendFailedCount = 0, }) { const deliveryTerminalTotal = deliveredCount + deliveryFailedCount; if (deliveryTerminalTotal > 0) { return { rate: deliveredCount / deliveryTerminalTotal, mode: 'callback', }; } const sendTerminalTotal = acceptedCount + sendFailedCount; if (sendTerminalTotal > 0) { return { rate: acceptedCount / sendTerminalTotal, mode: 'send_fallback', }; } return { rate: null, mode: 'no_data', }; } async function createOrRefreshExecution(entry = {}) { const pool = getPool(); const result = await pool.query( `INSERT INTO sms_message_executions ( company_id, business_id, application_id, source_type, source_event_key, event_slug, event_label, provider_name, shipment_id, order_id, to_number_hash, to_number_last4, trigger_payload, trigger_status, send_status, delivery_status, triggered_at, is_test ) VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, COALESCE($17, NOW()), $18 ) ON CONFLICT (company_id, business_id, source_type, source_event_key) DO UPDATE SET event_label = COALESCE(EXCLUDED.event_label, sms_message_executions.event_label), provider_name = COALESCE(EXCLUDED.provider_name, sms_message_executions.provider_name), shipment_id = COALESCE(EXCLUDED.shipment_id, sms_message_executions.shipment_id), order_id = COALESCE(EXCLUDED.order_id, sms_message_executions.order_id), to_number_hash = COALESCE(EXCLUDED.to_number_hash, sms_message_executions.to_number_hash), to_number_last4 = COALESCE(EXCLUDED.to_number_last4, sms_message_executions.to_number_last4), trigger_payload = COALESCE(EXCLUDED.trigger_payload, sms_message_executions.trigger_payload), triggered_at = COALESCE(sms_message_executions.triggered_at, EXCLUDED.triggered_at) RETURNING *`, [ normalizeText(entry.companyId), normalizeText(entry.businessId), normalizeText(entry.applicationId), normalizeText(entry.sourceType) || 'fynd_webhook', normalizeText(entry.sourceEventKey), normalizeText(entry.eventSlug), normalizeText(entry.eventLabel), normalizeText(entry.providerName), normalizeText(entry.shipmentId), normalizeText(entry.orderId), normalizeText(entry.toNumberHash), normalizeText(entry.toNumberLast4), entry.triggerPayload || null, normalizeText(entry.triggerStatus) || 'processed', normalizeText(entry.sendStatus) || 'not_attempted', normalizeText(entry.deliveryStatus) || 'unknown', entry.triggeredAt || null, entry.isTest === true, ], ); return result.rows[0] || null; } async function markExecutionAccepted(entry = {}) { const pool = getPool(); const result = await pool.query( `UPDATE sms_message_executions SET event_label = COALESCE($2, event_label), matched_template_event = COALESCE($3, matched_template_event), template_slug = COALESCE($4, template_slug), template_id = COALESCE($5, template_id), curl_profile_id = COALESCE($6, curl_profile_id), provider_name = COALESCE($7, provider_name), provider_message_id = COALESCE($8, provider_message_id), provider_response = COALESCE($9, provider_response), provider_http_status = COALESCE($10, provider_http_status), trigger_status = 'processed', send_status = 'accepted', delivery_status = CASE WHEN delivery_status = 'delivered' THEN 'delivered' WHEN delivery_status = 'failed' THEN 'failed' ELSE 'pending' END, send_attempted_at = COALESCE($11, send_attempted_at, NOW()), accepted_at = COALESCE($12, accepted_at, NOW()) WHERE id = $1 RETURNING *`, [ entry.id, normalizeText(entry.eventLabel), normalizeText(entry.matchedTemplateEvent), normalizeText(entry.templateSlug), normalizeText(entry.templateId), normalizeText(entry.curlProfileId), normalizeText(entry.providerName), normalizeText(entry.providerMessageId), entry.providerResponse || null, normalizeInteger(entry.providerHttpStatus), entry.sendAttemptedAt || null, entry.acceptedAt || null, ], ); return result.rows[0] || null; } async function markExecutionIgnored(entry = {}) { const pool = getPool(); const result = await pool.query( `UPDATE sms_message_executions SET event_label = COALESCE($2, event_label), trigger_status = 'ignored', send_status = CASE WHEN send_status = 'accepted' THEN send_status ELSE 'not_attempted' END, ignore_reason = COALESCE($3, ignore_reason), failure_stage = COALESCE($4, failure_stage), failure_code = COALESCE($5, failure_code), failure_reason = COALESCE($6, failure_reason) WHERE id = $1 RETURNING *`, [ entry.id, normalizeText(entry.eventLabel), normalizeText(entry.ignoreReason), normalizeText(entry.failureStage), normalizeText(entry.failureCode), normalizeText(entry.failureReason), ], ); return result.rows[0] || null; } async function markExecutionFailed(entry = {}) { const pool = getPool(); const result = await pool.query( `UPDATE sms_message_executions SET event_label = COALESCE($2, event_label), matched_template_event = COALESCE($3, matched_template_event), template_slug = COALESCE($4, template_slug), template_id = COALESCE($5, template_id), curl_profile_id = COALESCE($6, curl_profile_id), provider_name = COALESCE($7, provider_name), provider_message_id = COALESCE($8, provider_message_id), provider_response = COALESCE($9, provider_response), provider_http_status = COALESCE($10, provider_http_status), trigger_status = 'processed', send_status = 'send_failed', delivery_status = 'failed', failure_stage = COALESCE($11, failure_stage, 'send'), failure_code = COALESCE($12, failure_code), failure_reason = COALESCE($13, failure_reason), send_attempted_at = COALESCE($14, send_attempted_at, NOW()), failed_at = COALESCE($15, failed_at, NOW()) WHERE id = $1 RETURNING *`, [ entry.id, normalizeText(entry.eventLabel), normalizeText(entry.matchedTemplateEvent), normalizeText(entry.templateSlug), normalizeText(entry.templateId), normalizeText(entry.curlProfileId), normalizeText(entry.providerName), normalizeText(entry.providerMessageId), entry.providerResponse || null, normalizeInteger(entry.providerHttpStatus), normalizeText(entry.failureStage), normalizeText(entry.failureCode), normalizeText(entry.failureReason), entry.sendAttemptedAt || null, entry.failedAt || null, ], ); return result.rows[0] || null; } async function insertStatusHistory(entry = {}) { const pool = getPool(); const statusFingerprint = normalizeText(entry.statusFingerprint) || buildStatusFingerprint(entry); const result = await pool.query( `INSERT INTO sms_message_status_history ( message_execution_id, status_fingerprint, status_source, status_type, normalized_status, provider_name, provider_message_id, provider_status, provider_status_code, error_code, error_message, payload, headers, occurred_at ) VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, COALESCE($14, NOW()) ) ON CONFLICT (status_fingerprint) DO NOTHING RETURNING *`, [ entry.messageExecutionId, statusFingerprint, normalizeText(entry.statusSource) || 'internal', normalizeText(entry.statusType), normalizeText(entry.normalizedStatus), normalizeText(entry.providerName), normalizeText(entry.providerMessageId), normalizeText(entry.providerStatus), normalizeText(entry.providerStatusCode), normalizeText(entry.errorCode), normalizeText(entry.errorMessage), entry.payload || null, entry.headers || null, entry.occurredAt || null, ], ); return result.rows[0] || null; } async function getOverviewMetrics(scope = {}) { const pool = getPool(); const filters = buildExecutionFilters(scope); const [summaryResult, chartResult] = await Promise.all([ pool.query( `SELECT COUNT(*)::int AS total_triggered, COUNT(*) FILTER (WHERE COALESCE(triggered_at, created_at) >= CURRENT_DATE)::int AS triggered_today, COUNT(*) FILTER ( WHERE COALESCE(failed_at, accepted_at, send_attempted_at, triggered_at, created_at) >= NOW() - INTERVAL '24 hours' AND (send_status = 'send_failed' OR delivery_status = 'failed') )::int AS failed_last_24_hours, COUNT(*) FILTER (WHERE send_status = 'accepted')::int AS accepted_count, COUNT(*) FILTER (WHERE send_status = 'send_failed')::int AS send_failed_count, COUNT(*) FILTER (WHERE delivery_status = 'delivered')::int AS delivered_count, COUNT(*) FILTER (WHERE delivery_status = 'failed')::int AS delivery_failed_count FROM sms_message_executions WHERE ${filters.whereClause}`, filters.values, ), pool.query( `SELECT DATE(COALESCE(triggered_at, created_at)) AS day, COUNT(*)::int AS triggered_count, COUNT(*) FILTER (WHERE send_status = 'send_failed' OR delivery_status = 'failed')::int AS failed_count FROM sms_message_executions WHERE ${filters.whereClause} AND COALESCE(triggered_at, created_at) >= CURRENT_DATE - INTERVAL '29 days' GROUP BY 1 ORDER BY 1 ASC`, filters.values, ), ]); const summaryRow = summaryResult.rows[0] || {}; const deliveryRate = computeFallbackRate({ deliveredCount: parseCount(summaryRow.delivered_count), deliveryFailedCount: parseCount(summaryRow.delivery_failed_count), acceptedCount: parseCount(summaryRow.accepted_count), sendFailedCount: parseCount(summaryRow.send_failed_count), }); return { totalTriggered: parseCount(summaryRow.total_triggered), triggeredToday: parseCount(summaryRow.triggered_today), failedLast24Hours: parseCount(summaryRow.failed_last_24_hours), acceptedCount: parseCount(summaryRow.accepted_count), sendFailedCount: parseCount(summaryRow.send_failed_count), deliveredCount: parseCount(summaryRow.delivered_count), deliveryFailedCount: parseCount(summaryRow.delivery_failed_count), deliveryRate, chart: chartResult.rows.map((row) => ({ date: row.day instanceof Date ? row.day.toISOString().slice(0, 10) : String(row.day), triggeredCount: parseCount(row.triggered_count), failedCount: parseCount(row.failed_count), })), }; } async function getEventMetrics(scope = {}) { const pool = getPool(); const filters = buildExecutionFilters(scope); const result = await pool.query( `SELECT event_slug, MAX(NULLIF(event_label, '')) AS event_label, COUNT(*)::int AS total_trigger_count, COUNT(*) FILTER (WHERE COALESCE(triggered_at, created_at) >= CURRENT_DATE)::int AS triggered_today, MAX(COALESCE(triggered_at, created_at)) AS last_triggered_at, COUNT(*) FILTER (WHERE send_status = 'accepted')::int AS accepted_count, COUNT(*) FILTER (WHERE send_status = 'send_failed')::int AS send_failed_count, COUNT(*) FILTER (WHERE delivery_status = 'delivered')::int AS delivered_count, COUNT(*) FILTER (WHERE delivery_status = 'failed')::int AS delivery_failed_count FROM sms_message_executions WHERE ${filters.whereClause} GROUP BY event_slug ORDER BY total_trigger_count DESC, event_slug ASC`, filters.values, ); return result.rows.map((row) => ({ eventSlug: normalizeText(row.event_slug), eventLabel: normalizeText(row.event_label), totalTriggerCount: parseCount(row.total_trigger_count), triggeredToday: parseCount(row.triggered_today), lastTriggeredAt: row.last_triggered_at || null, acceptedCount: parseCount(row.accepted_count), sendFailedCount: parseCount(row.send_failed_count), deliveredCount: parseCount(row.delivered_count), deliveryFailedCount: parseCount(row.delivery_failed_count), deliveryRate: computeFallbackRate({ deliveredCount: parseCount(row.delivered_count), deliveryFailedCount: parseCount(row.delivery_failed_count), acceptedCount: parseCount(row.accepted_count), sendFailedCount: parseCount(row.send_failed_count), }), })); } module.exports = { buildPhoneMetadata, buildSourceEventKey, computeFallbackRate, createOrRefreshExecution, extractProviderMessageId, getOverviewMetrics, getEventMetrics, insertStatusHistory, markExecutionAccepted, markExecutionFailed, markExecutionIgnored, };