sms-extension/server/services/analyticsStore.js

536 lines
17 KiB
JavaScript

const crypto = require('crypto');
const { Pool } = require('pg');
function normalizeText(value) {
return typeof value === 'string' ? value.trim() : '';
}
function normalizeInteger(value) {
return Number.isInteger(value) ? value : null;
}
function normalizeTextList(values = []) {
if (!Array.isArray(values)) return [];
return [...new Set(
values
.map((value) => normalizeText(value))
.filter(Boolean)
)];
}
function getConnectionString() {
return normalizeText(
process.env.FDK_STORAGE_CONNECTION_STRING
|| process.env.DATABASE_URL
|| process.env.POSTGRES_CONNECTION_STRING
);
}
let analyticsPool = null;
function getPool() {
if (analyticsPool) return analyticsPool;
const connectionString = getConnectionString();
if (!connectionString) {
throw new Error('Analytics database is not configured');
}
analyticsPool = new Pool({ connectionString });
return analyticsPool;
}
function sha256(value) {
return crypto.createHash('sha256').update(String(value || '')).digest('hex');
}
function buildPhoneMetadata(value) {
const digits = String(value || '').replace(/\D/g, '');
if (!digits) {
return {
toNumberHash: '',
toNumberLast4: '',
};
}
return {
toNumberHash: sha256(digits),
toNumberLast4: digits.slice(-4),
};
}
function buildSourceEventKey({ applicationId, shipmentId, orderId, eventSlug, payload = null }) {
const normalizedParts = {
applicationId: normalizeText(applicationId),
shipmentId: normalizeText(shipmentId),
orderId: normalizeText(orderId),
eventSlug: normalizeText(eventSlug),
};
const hasStableIdentifiers = normalizedParts.applicationId && normalizedParts.eventSlug
&& (normalizedParts.shipmentId || normalizedParts.orderId);
if (hasStableIdentifiers) {
return sha256(JSON.stringify(normalizedParts));
}
return sha256(JSON.stringify({
...normalizedParts,
payload,
}));
}
function buildStatusFingerprint(entry = {}) {
return sha256(JSON.stringify({
messageExecutionId: entry.messageExecutionId,
statusSource: normalizeText(entry.statusSource),
statusType: normalizeText(entry.statusType),
normalizedStatus: normalizeText(entry.normalizedStatus),
providerMessageId: normalizeText(entry.providerMessageId),
providerStatus: normalizeText(entry.providerStatus),
providerStatusCode: normalizeText(entry.providerStatusCode),
errorCode: normalizeText(entry.errorCode),
errorMessage: normalizeText(entry.errorMessage),
payload: entry.payload || null,
}));
}
function extractProviderMessageId(value, depth = 0) {
if (!value || depth > 4) return '';
if (Array.isArray(value)) {
for (const entry of value) {
const nestedMatch = extractProviderMessageId(entry, depth + 1);
if (nestedMatch) return nestedMatch;
}
return '';
}
if (typeof value === 'object') {
const candidates = [
value.message_id,
value.messageId,
value.msg_id,
value.msgid,
value.sms_id,
value.smsId,
value.request_id,
value.requestId,
value.id,
]
.map((entry) => normalizeText(entry))
.filter(Boolean);
if (candidates.length > 0) return candidates[0];
for (const nestedValue of Object.values(value)) {
const nestedMatch = extractProviderMessageId(nestedValue, depth + 1);
if (nestedMatch) return nestedMatch;
}
}
return '';
}
function buildExecutionFilters({ companyId, businessId, eventSlugs }) {
const values = [];
const conditions = [];
if (normalizeText(companyId)) {
values.push(normalizeText(companyId));
conditions.push(`company_id = $${values.length}`);
}
if (normalizeText(businessId)) {
values.push(normalizeText(businessId));
conditions.push(`business_id = $${values.length}`);
}
const normalizedEventSlugs = normalizeTextList(eventSlugs);
if (normalizedEventSlugs.length > 0) {
values.push(normalizedEventSlugs);
conditions.push(`event_slug = ANY($${values.length})`);
}
if (conditions.length === 0) {
throw new Error('Analytics queries require at least one scope filter');
}
return {
whereClause: conditions.join(' AND '),
values,
};
}
function parseCount(value) {
const parsed = Number.parseInt(value, 10);
return Number.isFinite(parsed) ? parsed : 0;
}
function computeFallbackRate({
deliveredCount = 0,
deliveryFailedCount = 0,
acceptedCount = 0,
sendFailedCount = 0,
}) {
const deliveryTerminalTotal = deliveredCount + deliveryFailedCount;
if (deliveryTerminalTotal > 0) {
return {
rate: deliveredCount / deliveryTerminalTotal,
mode: 'callback',
};
}
const sendTerminalTotal = acceptedCount + sendFailedCount;
if (sendTerminalTotal > 0) {
return {
rate: acceptedCount / sendTerminalTotal,
mode: 'send_fallback',
};
}
return {
rate: null,
mode: 'no_data',
};
}
async function createOrRefreshExecution(entry = {}) {
const pool = getPool();
const result = await pool.query(
`INSERT INTO sms_message_executions (
company_id,
business_id,
application_id,
source_type,
source_event_key,
event_slug,
event_label,
provider_name,
shipment_id,
order_id,
to_number_hash,
to_number_last4,
trigger_payload,
trigger_status,
send_status,
delivery_status,
triggered_at,
is_test
) VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, COALESCE($17, NOW()), $18
)
ON CONFLICT (company_id, business_id, source_type, source_event_key)
DO UPDATE SET
event_label = COALESCE(EXCLUDED.event_label, sms_message_executions.event_label),
provider_name = COALESCE(EXCLUDED.provider_name, sms_message_executions.provider_name),
shipment_id = COALESCE(EXCLUDED.shipment_id, sms_message_executions.shipment_id),
order_id = COALESCE(EXCLUDED.order_id, sms_message_executions.order_id),
to_number_hash = COALESCE(EXCLUDED.to_number_hash, sms_message_executions.to_number_hash),
to_number_last4 = COALESCE(EXCLUDED.to_number_last4, sms_message_executions.to_number_last4),
trigger_payload = COALESCE(EXCLUDED.trigger_payload, sms_message_executions.trigger_payload),
triggered_at = COALESCE(sms_message_executions.triggered_at, EXCLUDED.triggered_at)
RETURNING *`,
[
normalizeText(entry.companyId),
normalizeText(entry.businessId),
normalizeText(entry.applicationId),
normalizeText(entry.sourceType) || 'fynd_webhook',
normalizeText(entry.sourceEventKey),
normalizeText(entry.eventSlug),
normalizeText(entry.eventLabel),
normalizeText(entry.providerName),
normalizeText(entry.shipmentId),
normalizeText(entry.orderId),
normalizeText(entry.toNumberHash),
normalizeText(entry.toNumberLast4),
entry.triggerPayload || null,
normalizeText(entry.triggerStatus) || 'processed',
normalizeText(entry.sendStatus) || 'not_attempted',
normalizeText(entry.deliveryStatus) || 'unknown',
entry.triggeredAt || null,
entry.isTest === true,
],
);
return result.rows[0] || null;
}
async function markExecutionAccepted(entry = {}) {
const pool = getPool();
const result = await pool.query(
`UPDATE sms_message_executions
SET event_label = COALESCE($2, event_label),
matched_template_event = COALESCE($3, matched_template_event),
template_slug = COALESCE($4, template_slug),
template_id = COALESCE($5, template_id),
curl_profile_id = COALESCE($6, curl_profile_id),
provider_name = COALESCE($7, provider_name),
provider_message_id = COALESCE($8, provider_message_id),
provider_response = COALESCE($9, provider_response),
provider_http_status = COALESCE($10, provider_http_status),
trigger_status = 'processed',
send_status = 'accepted',
delivery_status = CASE
WHEN delivery_status = 'delivered' THEN 'delivered'
WHEN delivery_status = 'failed' THEN 'failed'
ELSE 'pending'
END,
send_attempted_at = COALESCE($11, send_attempted_at, NOW()),
accepted_at = COALESCE($12, accepted_at, NOW())
WHERE id = $1
RETURNING *`,
[
entry.id,
normalizeText(entry.eventLabel),
normalizeText(entry.matchedTemplateEvent),
normalizeText(entry.templateSlug),
normalizeText(entry.templateId),
normalizeText(entry.curlProfileId),
normalizeText(entry.providerName),
normalizeText(entry.providerMessageId),
entry.providerResponse || null,
normalizeInteger(entry.providerHttpStatus),
entry.sendAttemptedAt || null,
entry.acceptedAt || null,
],
);
return result.rows[0] || null;
}
async function markExecutionIgnored(entry = {}) {
const pool = getPool();
const result = await pool.query(
`UPDATE sms_message_executions
SET event_label = COALESCE($2, event_label),
trigger_status = 'ignored',
send_status = CASE
WHEN send_status = 'accepted' THEN send_status
ELSE 'not_attempted'
END,
ignore_reason = COALESCE($3, ignore_reason),
failure_stage = COALESCE($4, failure_stage),
failure_code = COALESCE($5, failure_code),
failure_reason = COALESCE($6, failure_reason)
WHERE id = $1
RETURNING *`,
[
entry.id,
normalizeText(entry.eventLabel),
normalizeText(entry.ignoreReason),
normalizeText(entry.failureStage),
normalizeText(entry.failureCode),
normalizeText(entry.failureReason),
],
);
return result.rows[0] || null;
}
async function markExecutionFailed(entry = {}) {
const pool = getPool();
const result = await pool.query(
`UPDATE sms_message_executions
SET event_label = COALESCE($2, event_label),
matched_template_event = COALESCE($3, matched_template_event),
template_slug = COALESCE($4, template_slug),
template_id = COALESCE($5, template_id),
curl_profile_id = COALESCE($6, curl_profile_id),
provider_name = COALESCE($7, provider_name),
provider_message_id = COALESCE($8, provider_message_id),
provider_response = COALESCE($9, provider_response),
provider_http_status = COALESCE($10, provider_http_status),
trigger_status = 'processed',
send_status = 'send_failed',
delivery_status = 'failed',
failure_stage = COALESCE($11, failure_stage, 'send'),
failure_code = COALESCE($12, failure_code),
failure_reason = COALESCE($13, failure_reason),
send_attempted_at = COALESCE($14, send_attempted_at, NOW()),
failed_at = COALESCE($15, failed_at, NOW())
WHERE id = $1
RETURNING *`,
[
entry.id,
normalizeText(entry.eventLabel),
normalizeText(entry.matchedTemplateEvent),
normalizeText(entry.templateSlug),
normalizeText(entry.templateId),
normalizeText(entry.curlProfileId),
normalizeText(entry.providerName),
normalizeText(entry.providerMessageId),
entry.providerResponse || null,
normalizeInteger(entry.providerHttpStatus),
normalizeText(entry.failureStage),
normalizeText(entry.failureCode),
normalizeText(entry.failureReason),
entry.sendAttemptedAt || null,
entry.failedAt || null,
],
);
return result.rows[0] || null;
}
async function insertStatusHistory(entry = {}) {
const pool = getPool();
const statusFingerprint = normalizeText(entry.statusFingerprint) || buildStatusFingerprint(entry);
const result = await pool.query(
`INSERT INTO sms_message_status_history (
message_execution_id,
status_fingerprint,
status_source,
status_type,
normalized_status,
provider_name,
provider_message_id,
provider_status,
provider_status_code,
error_code,
error_message,
payload,
headers,
occurred_at
) VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, COALESCE($14, NOW())
)
ON CONFLICT (status_fingerprint) DO NOTHING
RETURNING *`,
[
entry.messageExecutionId,
statusFingerprint,
normalizeText(entry.statusSource) || 'internal',
normalizeText(entry.statusType),
normalizeText(entry.normalizedStatus),
normalizeText(entry.providerName),
normalizeText(entry.providerMessageId),
normalizeText(entry.providerStatus),
normalizeText(entry.providerStatusCode),
normalizeText(entry.errorCode),
normalizeText(entry.errorMessage),
entry.payload || null,
entry.headers || null,
entry.occurredAt || null,
],
);
return result.rows[0] || null;
}
async function getOverviewMetrics(scope = {}) {
const pool = getPool();
const filters = buildExecutionFilters(scope);
const [summaryResult, chartResult] = await Promise.all([
pool.query(
`SELECT
COUNT(*)::int AS total_triggered,
COUNT(*) FILTER (WHERE COALESCE(triggered_at, created_at) >= CURRENT_DATE)::int AS triggered_today,
COUNT(*) FILTER (
WHERE COALESCE(failed_at, accepted_at, send_attempted_at, triggered_at, created_at) >= NOW() - INTERVAL '24 hours'
AND (send_status = 'send_failed' OR delivery_status = 'failed')
)::int AS failed_last_24_hours,
COUNT(*) FILTER (WHERE send_status = 'accepted')::int AS accepted_count,
COUNT(*) FILTER (WHERE send_status = 'send_failed')::int AS send_failed_count,
COUNT(*) FILTER (WHERE delivery_status = 'delivered')::int AS delivered_count,
COUNT(*) FILTER (WHERE delivery_status = 'failed')::int AS delivery_failed_count
FROM sms_message_executions
WHERE ${filters.whereClause}`,
filters.values,
),
pool.query(
`SELECT
DATE(COALESCE(triggered_at, created_at)) AS day,
COUNT(*)::int AS triggered_count,
COUNT(*) FILTER (WHERE send_status = 'send_failed' OR delivery_status = 'failed')::int AS failed_count
FROM sms_message_executions
WHERE ${filters.whereClause}
AND COALESCE(triggered_at, created_at) >= CURRENT_DATE - INTERVAL '29 days'
GROUP BY 1
ORDER BY 1 ASC`,
filters.values,
),
]);
const summaryRow = summaryResult.rows[0] || {};
const deliveryRate = computeFallbackRate({
deliveredCount: parseCount(summaryRow.delivered_count),
deliveryFailedCount: parseCount(summaryRow.delivery_failed_count),
acceptedCount: parseCount(summaryRow.accepted_count),
sendFailedCount: parseCount(summaryRow.send_failed_count),
});
return {
totalTriggered: parseCount(summaryRow.total_triggered),
triggeredToday: parseCount(summaryRow.triggered_today),
failedLast24Hours: parseCount(summaryRow.failed_last_24_hours),
acceptedCount: parseCount(summaryRow.accepted_count),
sendFailedCount: parseCount(summaryRow.send_failed_count),
deliveredCount: parseCount(summaryRow.delivered_count),
deliveryFailedCount: parseCount(summaryRow.delivery_failed_count),
deliveryRate,
chart: chartResult.rows.map((row) => ({
date: row.day instanceof Date ? row.day.toISOString().slice(0, 10) : String(row.day),
triggeredCount: parseCount(row.triggered_count),
failedCount: parseCount(row.failed_count),
})),
};
}
async function getEventMetrics(scope = {}) {
const pool = getPool();
const filters = buildExecutionFilters(scope);
const result = await pool.query(
`SELECT
event_slug,
MAX(NULLIF(event_label, '')) AS event_label,
COUNT(*)::int AS total_trigger_count,
COUNT(*) FILTER (WHERE COALESCE(triggered_at, created_at) >= CURRENT_DATE)::int AS triggered_today,
MAX(COALESCE(triggered_at, created_at)) AS last_triggered_at,
COUNT(*) FILTER (WHERE send_status = 'accepted')::int AS accepted_count,
COUNT(*) FILTER (WHERE send_status = 'send_failed')::int AS send_failed_count,
COUNT(*) FILTER (WHERE delivery_status = 'delivered')::int AS delivered_count,
COUNT(*) FILTER (WHERE delivery_status = 'failed')::int AS delivery_failed_count
FROM sms_message_executions
WHERE ${filters.whereClause}
GROUP BY event_slug
ORDER BY total_trigger_count DESC, event_slug ASC`,
filters.values,
);
return result.rows.map((row) => ({
eventSlug: normalizeText(row.event_slug),
eventLabel: normalizeText(row.event_label),
totalTriggerCount: parseCount(row.total_trigger_count),
triggeredToday: parseCount(row.triggered_today),
lastTriggeredAt: row.last_triggered_at || null,
acceptedCount: parseCount(row.accepted_count),
sendFailedCount: parseCount(row.send_failed_count),
deliveredCount: parseCount(row.delivered_count),
deliveryFailedCount: parseCount(row.delivery_failed_count),
deliveryRate: computeFallbackRate({
deliveredCount: parseCount(row.delivered_count),
deliveryFailedCount: parseCount(row.delivery_failed_count),
acceptedCount: parseCount(row.accepted_count),
sendFailedCount: parseCount(row.send_failed_count),
}),
}));
}
module.exports = {
buildPhoneMetadata,
buildSourceEventKey,
computeFallbackRate,
createOrRefreshExecution,
extractProviderMessageId,
getOverviewMetrics,
getEventMetrics,
insertStatusHistory,
markExecutionAccepted,
markExecutionFailed,
markExecutionIgnored,
};