536 lines
17 KiB
JavaScript
536 lines
17 KiB
JavaScript
const crypto = require('crypto');
|
|
const { Pool } = require('pg');
|
|
|
|
function normalizeText(value) {
|
|
return typeof value === 'string' ? value.trim() : '';
|
|
}
|
|
|
|
function normalizeInteger(value) {
|
|
return Number.isInteger(value) ? value : null;
|
|
}
|
|
|
|
function normalizeTextList(values = []) {
|
|
if (!Array.isArray(values)) return [];
|
|
|
|
return [...new Set(
|
|
values
|
|
.map((value) => normalizeText(value))
|
|
.filter(Boolean)
|
|
)];
|
|
}
|
|
|
|
function getConnectionString() {
|
|
return normalizeText(
|
|
process.env.FDK_STORAGE_CONNECTION_STRING
|
|
|| process.env.DATABASE_URL
|
|
|| process.env.POSTGRES_CONNECTION_STRING
|
|
);
|
|
}
|
|
|
|
let analyticsPool = null;
|
|
|
|
function getPool() {
|
|
if (analyticsPool) return analyticsPool;
|
|
|
|
const connectionString = getConnectionString();
|
|
if (!connectionString) {
|
|
throw new Error('Analytics database is not configured');
|
|
}
|
|
|
|
analyticsPool = new Pool({ connectionString });
|
|
return analyticsPool;
|
|
}
|
|
|
|
function sha256(value) {
|
|
return crypto.createHash('sha256').update(String(value || '')).digest('hex');
|
|
}
|
|
|
|
function buildPhoneMetadata(value) {
|
|
const digits = String(value || '').replace(/\D/g, '');
|
|
if (!digits) {
|
|
return {
|
|
toNumberHash: '',
|
|
toNumberLast4: '',
|
|
};
|
|
}
|
|
|
|
return {
|
|
toNumberHash: sha256(digits),
|
|
toNumberLast4: digits.slice(-4),
|
|
};
|
|
}
|
|
|
|
function buildSourceEventKey({ applicationId, shipmentId, orderId, eventSlug, payload = null }) {
|
|
const normalizedParts = {
|
|
applicationId: normalizeText(applicationId),
|
|
shipmentId: normalizeText(shipmentId),
|
|
orderId: normalizeText(orderId),
|
|
eventSlug: normalizeText(eventSlug),
|
|
};
|
|
|
|
const hasStableIdentifiers = normalizedParts.applicationId && normalizedParts.eventSlug
|
|
&& (normalizedParts.shipmentId || normalizedParts.orderId);
|
|
|
|
if (hasStableIdentifiers) {
|
|
return sha256(JSON.stringify(normalizedParts));
|
|
}
|
|
|
|
return sha256(JSON.stringify({
|
|
...normalizedParts,
|
|
payload,
|
|
}));
|
|
}
|
|
|
|
function buildStatusFingerprint(entry = {}) {
|
|
return sha256(JSON.stringify({
|
|
messageExecutionId: entry.messageExecutionId,
|
|
statusSource: normalizeText(entry.statusSource),
|
|
statusType: normalizeText(entry.statusType),
|
|
normalizedStatus: normalizeText(entry.normalizedStatus),
|
|
providerMessageId: normalizeText(entry.providerMessageId),
|
|
providerStatus: normalizeText(entry.providerStatus),
|
|
providerStatusCode: normalizeText(entry.providerStatusCode),
|
|
errorCode: normalizeText(entry.errorCode),
|
|
errorMessage: normalizeText(entry.errorMessage),
|
|
payload: entry.payload || null,
|
|
}));
|
|
}
|
|
|
|
function extractProviderMessageId(value, depth = 0) {
|
|
if (!value || depth > 4) return '';
|
|
|
|
if (Array.isArray(value)) {
|
|
for (const entry of value) {
|
|
const nestedMatch = extractProviderMessageId(entry, depth + 1);
|
|
if (nestedMatch) return nestedMatch;
|
|
}
|
|
return '';
|
|
}
|
|
|
|
if (typeof value === 'object') {
|
|
const candidates = [
|
|
value.message_id,
|
|
value.messageId,
|
|
value.msg_id,
|
|
value.msgid,
|
|
value.sms_id,
|
|
value.smsId,
|
|
value.request_id,
|
|
value.requestId,
|
|
value.id,
|
|
]
|
|
.map((entry) => normalizeText(entry))
|
|
.filter(Boolean);
|
|
|
|
if (candidates.length > 0) return candidates[0];
|
|
|
|
for (const nestedValue of Object.values(value)) {
|
|
const nestedMatch = extractProviderMessageId(nestedValue, depth + 1);
|
|
if (nestedMatch) return nestedMatch;
|
|
}
|
|
}
|
|
|
|
return '';
|
|
}
|
|
|
|
function buildExecutionFilters({ companyId, businessId, eventSlugs }) {
|
|
const values = [];
|
|
const conditions = [];
|
|
|
|
if (normalizeText(companyId)) {
|
|
values.push(normalizeText(companyId));
|
|
conditions.push(`company_id = $${values.length}`);
|
|
}
|
|
|
|
if (normalizeText(businessId)) {
|
|
values.push(normalizeText(businessId));
|
|
conditions.push(`business_id = $${values.length}`);
|
|
}
|
|
|
|
const normalizedEventSlugs = normalizeTextList(eventSlugs);
|
|
if (normalizedEventSlugs.length > 0) {
|
|
values.push(normalizedEventSlugs);
|
|
conditions.push(`event_slug = ANY($${values.length})`);
|
|
}
|
|
|
|
if (conditions.length === 0) {
|
|
throw new Error('Analytics queries require at least one scope filter');
|
|
}
|
|
|
|
return {
|
|
whereClause: conditions.join(' AND '),
|
|
values,
|
|
};
|
|
}
|
|
|
|
function parseCount(value) {
|
|
const parsed = Number.parseInt(value, 10);
|
|
return Number.isFinite(parsed) ? parsed : 0;
|
|
}
|
|
|
|
function computeFallbackRate({
|
|
deliveredCount = 0,
|
|
deliveryFailedCount = 0,
|
|
acceptedCount = 0,
|
|
sendFailedCount = 0,
|
|
}) {
|
|
const deliveryTerminalTotal = deliveredCount + deliveryFailedCount;
|
|
if (deliveryTerminalTotal > 0) {
|
|
return {
|
|
rate: deliveredCount / deliveryTerminalTotal,
|
|
mode: 'callback',
|
|
};
|
|
}
|
|
|
|
const sendTerminalTotal = acceptedCount + sendFailedCount;
|
|
if (sendTerminalTotal > 0) {
|
|
return {
|
|
rate: acceptedCount / sendTerminalTotal,
|
|
mode: 'send_fallback',
|
|
};
|
|
}
|
|
|
|
return {
|
|
rate: null,
|
|
mode: 'no_data',
|
|
};
|
|
}
|
|
|
|
async function createOrRefreshExecution(entry = {}) {
|
|
const pool = getPool();
|
|
const result = await pool.query(
|
|
`INSERT INTO sms_message_executions (
|
|
company_id,
|
|
business_id,
|
|
application_id,
|
|
source_type,
|
|
source_event_key,
|
|
event_slug,
|
|
event_label,
|
|
provider_name,
|
|
shipment_id,
|
|
order_id,
|
|
to_number_hash,
|
|
to_number_last4,
|
|
trigger_payload,
|
|
trigger_status,
|
|
send_status,
|
|
delivery_status,
|
|
triggered_at,
|
|
is_test
|
|
) VALUES (
|
|
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, COALESCE($17, NOW()), $18
|
|
)
|
|
ON CONFLICT (company_id, business_id, source_type, source_event_key)
|
|
DO UPDATE SET
|
|
event_label = COALESCE(EXCLUDED.event_label, sms_message_executions.event_label),
|
|
provider_name = COALESCE(EXCLUDED.provider_name, sms_message_executions.provider_name),
|
|
shipment_id = COALESCE(EXCLUDED.shipment_id, sms_message_executions.shipment_id),
|
|
order_id = COALESCE(EXCLUDED.order_id, sms_message_executions.order_id),
|
|
to_number_hash = COALESCE(EXCLUDED.to_number_hash, sms_message_executions.to_number_hash),
|
|
to_number_last4 = COALESCE(EXCLUDED.to_number_last4, sms_message_executions.to_number_last4),
|
|
trigger_payload = COALESCE(EXCLUDED.trigger_payload, sms_message_executions.trigger_payload),
|
|
triggered_at = COALESCE(sms_message_executions.triggered_at, EXCLUDED.triggered_at)
|
|
RETURNING *`,
|
|
[
|
|
normalizeText(entry.companyId),
|
|
normalizeText(entry.businessId),
|
|
normalizeText(entry.applicationId),
|
|
normalizeText(entry.sourceType) || 'fynd_webhook',
|
|
normalizeText(entry.sourceEventKey),
|
|
normalizeText(entry.eventSlug),
|
|
normalizeText(entry.eventLabel),
|
|
normalizeText(entry.providerName),
|
|
normalizeText(entry.shipmentId),
|
|
normalizeText(entry.orderId),
|
|
normalizeText(entry.toNumberHash),
|
|
normalizeText(entry.toNumberLast4),
|
|
entry.triggerPayload || null,
|
|
normalizeText(entry.triggerStatus) || 'processed',
|
|
normalizeText(entry.sendStatus) || 'not_attempted',
|
|
normalizeText(entry.deliveryStatus) || 'unknown',
|
|
entry.triggeredAt || null,
|
|
entry.isTest === true,
|
|
],
|
|
);
|
|
|
|
return result.rows[0] || null;
|
|
}
|
|
|
|
async function markExecutionAccepted(entry = {}) {
|
|
const pool = getPool();
|
|
const result = await pool.query(
|
|
`UPDATE sms_message_executions
|
|
SET event_label = COALESCE($2, event_label),
|
|
matched_template_event = COALESCE($3, matched_template_event),
|
|
template_slug = COALESCE($4, template_slug),
|
|
template_id = COALESCE($5, template_id),
|
|
curl_profile_id = COALESCE($6, curl_profile_id),
|
|
provider_name = COALESCE($7, provider_name),
|
|
provider_message_id = COALESCE($8, provider_message_id),
|
|
provider_response = COALESCE($9, provider_response),
|
|
provider_http_status = COALESCE($10, provider_http_status),
|
|
trigger_status = 'processed',
|
|
send_status = 'accepted',
|
|
delivery_status = CASE
|
|
WHEN delivery_status = 'delivered' THEN 'delivered'
|
|
WHEN delivery_status = 'failed' THEN 'failed'
|
|
ELSE 'pending'
|
|
END,
|
|
send_attempted_at = COALESCE($11, send_attempted_at, NOW()),
|
|
accepted_at = COALESCE($12, accepted_at, NOW())
|
|
WHERE id = $1
|
|
RETURNING *`,
|
|
[
|
|
entry.id,
|
|
normalizeText(entry.eventLabel),
|
|
normalizeText(entry.matchedTemplateEvent),
|
|
normalizeText(entry.templateSlug),
|
|
normalizeText(entry.templateId),
|
|
normalizeText(entry.curlProfileId),
|
|
normalizeText(entry.providerName),
|
|
normalizeText(entry.providerMessageId),
|
|
entry.providerResponse || null,
|
|
normalizeInteger(entry.providerHttpStatus),
|
|
entry.sendAttemptedAt || null,
|
|
entry.acceptedAt || null,
|
|
],
|
|
);
|
|
|
|
return result.rows[0] || null;
|
|
}
|
|
|
|
async function markExecutionIgnored(entry = {}) {
|
|
const pool = getPool();
|
|
const result = await pool.query(
|
|
`UPDATE sms_message_executions
|
|
SET event_label = COALESCE($2, event_label),
|
|
trigger_status = 'ignored',
|
|
send_status = CASE
|
|
WHEN send_status = 'accepted' THEN send_status
|
|
ELSE 'not_attempted'
|
|
END,
|
|
ignore_reason = COALESCE($3, ignore_reason),
|
|
failure_stage = COALESCE($4, failure_stage),
|
|
failure_code = COALESCE($5, failure_code),
|
|
failure_reason = COALESCE($6, failure_reason)
|
|
WHERE id = $1
|
|
RETURNING *`,
|
|
[
|
|
entry.id,
|
|
normalizeText(entry.eventLabel),
|
|
normalizeText(entry.ignoreReason),
|
|
normalizeText(entry.failureStage),
|
|
normalizeText(entry.failureCode),
|
|
normalizeText(entry.failureReason),
|
|
],
|
|
);
|
|
|
|
return result.rows[0] || null;
|
|
}
|
|
|
|
async function markExecutionFailed(entry = {}) {
|
|
const pool = getPool();
|
|
const result = await pool.query(
|
|
`UPDATE sms_message_executions
|
|
SET event_label = COALESCE($2, event_label),
|
|
matched_template_event = COALESCE($3, matched_template_event),
|
|
template_slug = COALESCE($4, template_slug),
|
|
template_id = COALESCE($5, template_id),
|
|
curl_profile_id = COALESCE($6, curl_profile_id),
|
|
provider_name = COALESCE($7, provider_name),
|
|
provider_message_id = COALESCE($8, provider_message_id),
|
|
provider_response = COALESCE($9, provider_response),
|
|
provider_http_status = COALESCE($10, provider_http_status),
|
|
trigger_status = 'processed',
|
|
send_status = 'send_failed',
|
|
delivery_status = 'failed',
|
|
failure_stage = COALESCE($11, failure_stage, 'send'),
|
|
failure_code = COALESCE($12, failure_code),
|
|
failure_reason = COALESCE($13, failure_reason),
|
|
send_attempted_at = COALESCE($14, send_attempted_at, NOW()),
|
|
failed_at = COALESCE($15, failed_at, NOW())
|
|
WHERE id = $1
|
|
RETURNING *`,
|
|
[
|
|
entry.id,
|
|
normalizeText(entry.eventLabel),
|
|
normalizeText(entry.matchedTemplateEvent),
|
|
normalizeText(entry.templateSlug),
|
|
normalizeText(entry.templateId),
|
|
normalizeText(entry.curlProfileId),
|
|
normalizeText(entry.providerName),
|
|
normalizeText(entry.providerMessageId),
|
|
entry.providerResponse || null,
|
|
normalizeInteger(entry.providerHttpStatus),
|
|
normalizeText(entry.failureStage),
|
|
normalizeText(entry.failureCode),
|
|
normalizeText(entry.failureReason),
|
|
entry.sendAttemptedAt || null,
|
|
entry.failedAt || null,
|
|
],
|
|
);
|
|
|
|
return result.rows[0] || null;
|
|
}
|
|
|
|
async function insertStatusHistory(entry = {}) {
|
|
const pool = getPool();
|
|
const statusFingerprint = normalizeText(entry.statusFingerprint) || buildStatusFingerprint(entry);
|
|
|
|
const result = await pool.query(
|
|
`INSERT INTO sms_message_status_history (
|
|
message_execution_id,
|
|
status_fingerprint,
|
|
status_source,
|
|
status_type,
|
|
normalized_status,
|
|
provider_name,
|
|
provider_message_id,
|
|
provider_status,
|
|
provider_status_code,
|
|
error_code,
|
|
error_message,
|
|
payload,
|
|
headers,
|
|
occurred_at
|
|
) VALUES (
|
|
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, COALESCE($14, NOW())
|
|
)
|
|
ON CONFLICT (status_fingerprint) DO NOTHING
|
|
RETURNING *`,
|
|
[
|
|
entry.messageExecutionId,
|
|
statusFingerprint,
|
|
normalizeText(entry.statusSource) || 'internal',
|
|
normalizeText(entry.statusType),
|
|
normalizeText(entry.normalizedStatus),
|
|
normalizeText(entry.providerName),
|
|
normalizeText(entry.providerMessageId),
|
|
normalizeText(entry.providerStatus),
|
|
normalizeText(entry.providerStatusCode),
|
|
normalizeText(entry.errorCode),
|
|
normalizeText(entry.errorMessage),
|
|
entry.payload || null,
|
|
entry.headers || null,
|
|
entry.occurredAt || null,
|
|
],
|
|
);
|
|
|
|
return result.rows[0] || null;
|
|
}
|
|
|
|
async function getOverviewMetrics(scope = {}) {
|
|
const pool = getPool();
|
|
const filters = buildExecutionFilters(scope);
|
|
|
|
const [summaryResult, chartResult] = await Promise.all([
|
|
pool.query(
|
|
`SELECT
|
|
COUNT(*)::int AS total_triggered,
|
|
COUNT(*) FILTER (WHERE COALESCE(triggered_at, created_at) >= CURRENT_DATE)::int AS triggered_today,
|
|
COUNT(*) FILTER (
|
|
WHERE COALESCE(failed_at, accepted_at, send_attempted_at, triggered_at, created_at) >= NOW() - INTERVAL '24 hours'
|
|
AND (send_status = 'send_failed' OR delivery_status = 'failed')
|
|
)::int AS failed_last_24_hours,
|
|
COUNT(*) FILTER (WHERE send_status = 'accepted')::int AS accepted_count,
|
|
COUNT(*) FILTER (WHERE send_status = 'send_failed')::int AS send_failed_count,
|
|
COUNT(*) FILTER (WHERE delivery_status = 'delivered')::int AS delivered_count,
|
|
COUNT(*) FILTER (WHERE delivery_status = 'failed')::int AS delivery_failed_count
|
|
FROM sms_message_executions
|
|
WHERE ${filters.whereClause}`,
|
|
filters.values,
|
|
),
|
|
pool.query(
|
|
`SELECT
|
|
DATE(COALESCE(triggered_at, created_at)) AS day,
|
|
COUNT(*)::int AS triggered_count,
|
|
COUNT(*) FILTER (WHERE send_status = 'send_failed' OR delivery_status = 'failed')::int AS failed_count
|
|
FROM sms_message_executions
|
|
WHERE ${filters.whereClause}
|
|
AND COALESCE(triggered_at, created_at) >= CURRENT_DATE - INTERVAL '29 days'
|
|
GROUP BY 1
|
|
ORDER BY 1 ASC`,
|
|
filters.values,
|
|
),
|
|
]);
|
|
|
|
const summaryRow = summaryResult.rows[0] || {};
|
|
const deliveryRate = computeFallbackRate({
|
|
deliveredCount: parseCount(summaryRow.delivered_count),
|
|
deliveryFailedCount: parseCount(summaryRow.delivery_failed_count),
|
|
acceptedCount: parseCount(summaryRow.accepted_count),
|
|
sendFailedCount: parseCount(summaryRow.send_failed_count),
|
|
});
|
|
|
|
return {
|
|
totalTriggered: parseCount(summaryRow.total_triggered),
|
|
triggeredToday: parseCount(summaryRow.triggered_today),
|
|
failedLast24Hours: parseCount(summaryRow.failed_last_24_hours),
|
|
acceptedCount: parseCount(summaryRow.accepted_count),
|
|
sendFailedCount: parseCount(summaryRow.send_failed_count),
|
|
deliveredCount: parseCount(summaryRow.delivered_count),
|
|
deliveryFailedCount: parseCount(summaryRow.delivery_failed_count),
|
|
deliveryRate,
|
|
chart: chartResult.rows.map((row) => ({
|
|
date: row.day instanceof Date ? row.day.toISOString().slice(0, 10) : String(row.day),
|
|
triggeredCount: parseCount(row.triggered_count),
|
|
failedCount: parseCount(row.failed_count),
|
|
})),
|
|
};
|
|
}
|
|
|
|
async function getEventMetrics(scope = {}) {
|
|
const pool = getPool();
|
|
const filters = buildExecutionFilters(scope);
|
|
const result = await pool.query(
|
|
`SELECT
|
|
event_slug,
|
|
MAX(NULLIF(event_label, '')) AS event_label,
|
|
COUNT(*)::int AS total_trigger_count,
|
|
COUNT(*) FILTER (WHERE COALESCE(triggered_at, created_at) >= CURRENT_DATE)::int AS triggered_today,
|
|
MAX(COALESCE(triggered_at, created_at)) AS last_triggered_at,
|
|
COUNT(*) FILTER (WHERE send_status = 'accepted')::int AS accepted_count,
|
|
COUNT(*) FILTER (WHERE send_status = 'send_failed')::int AS send_failed_count,
|
|
COUNT(*) FILTER (WHERE delivery_status = 'delivered')::int AS delivered_count,
|
|
COUNT(*) FILTER (WHERE delivery_status = 'failed')::int AS delivery_failed_count
|
|
FROM sms_message_executions
|
|
WHERE ${filters.whereClause}
|
|
GROUP BY event_slug
|
|
ORDER BY total_trigger_count DESC, event_slug ASC`,
|
|
filters.values,
|
|
);
|
|
|
|
return result.rows.map((row) => ({
|
|
eventSlug: normalizeText(row.event_slug),
|
|
eventLabel: normalizeText(row.event_label),
|
|
totalTriggerCount: parseCount(row.total_trigger_count),
|
|
triggeredToday: parseCount(row.triggered_today),
|
|
lastTriggeredAt: row.last_triggered_at || null,
|
|
acceptedCount: parseCount(row.accepted_count),
|
|
sendFailedCount: parseCount(row.send_failed_count),
|
|
deliveredCount: parseCount(row.delivered_count),
|
|
deliveryFailedCount: parseCount(row.delivery_failed_count),
|
|
deliveryRate: computeFallbackRate({
|
|
deliveredCount: parseCount(row.delivered_count),
|
|
deliveryFailedCount: parseCount(row.delivery_failed_count),
|
|
acceptedCount: parseCount(row.accepted_count),
|
|
sendFailedCount: parseCount(row.send_failed_count),
|
|
}),
|
|
}));
|
|
}
|
|
|
|
module.exports = {
|
|
buildPhoneMetadata,
|
|
buildSourceEventKey,
|
|
computeFallbackRate,
|
|
createOrRefreshExecution,
|
|
extractProviderMessageId,
|
|
getOverviewMetrics,
|
|
getEventMetrics,
|
|
insertStatusHistory,
|
|
markExecutionAccepted,
|
|
markExecutionFailed,
|
|
markExecutionIgnored,
|
|
};
|