bigqueryexport/bqexport.js

98 lines
3.1 KiB
JavaScript
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import { BigQuery } from "@google-cloud/bigquery";
import { Storage } from "@google-cloud/storage";
// Cross-project setup:
// - BigQuery data source: fynd-jio-commerceml-prod
// - GCS destination: fynd-boltic-prod
const bigquery = new BigQuery({
projectId: process.env.PROJECT_ID || "fynd-jio-commerceml-prod",
});
const storage = new Storage({
projectId: process.env.PROJECT_ID || "fynd-jio-commerceml-prod",
credentials: process.env.GCS_CREDENTIALS || {}
});
export const exportMergedJson = async (req, res) => {
try {
const companyIdToDelete = req.body?.companyId;
const applicationIdToDelete = req.body?.applicationId;
const datasetId = process.env.DATASET || "temp_zenith_data";
const sourceTable = process.env.SOURCE_TABLE || "pr_training_data";
const bucketName = process.env.BUCKET_NAME || "pr_dataset_storage";
const objectPath = `${companyIdToDelete}/${applicationIdToDelete}/GET_ALL_PRODUCTS/catalog-*.json`;
const uri = `gs://${bucketName}/${objectPath}`;
// Get deletion criteria from request body or environment
// 1⃣ Direct export from source table to GCS (cross-project)
const sourceTablePath = `${bigquery.projectId}.${datasetId}.${sourceTable}`;
const exportSql = `
EXPORT DATA
OPTIONS(
uri='${uri}',
format='JSON',
overwrite=true
) AS
SELECT
ARRAY_CONCAT_AGG(JSON_EXTRACT_ARRAY(data)) AS merged_array
FROM
\`${sourceTablePath}\`
`;
const [queryJob] = await bigquery.createQueryJob({
query: exportSql,
useLegacySql: false,
});
const jobResponse = await queryJob.promise();
console.log(jobResponse);
// 2⃣ Delete rows from source table after successful export
// if (companyIdToDelete && applicationIdToDelete) {
// console.log('Starting deletion of processed rows...');
// const [deleteJob] = await bigquery.createQueryJob({
// query: `
// DELETE FROM \`${sourceTablePath}\`
// WHERE companyId = @companyId AND applicationId = @applicationId
// `,
// params: {
// companyId: companyIdToDelete,
// applicationId: applicationIdToDelete
// },
// useLegacySql: false,
// });
// await deleteJob.promise();
// console.log(`Successfully deleted rows with companyId: ${companyIdToDelete}, applicationId: ${applicationIdToDelete}`);
// } else {
// console.log('No deletion criteria provided, skipping row cleanup');
// }
const options = {
version: "v4",
action: "read",
expires: Date.now() + 15 * 60 * 1000,
extensionHeaders: {},
queryParams: {},
method: "POST",
};
const signedUrl = await storage
.bucket(bucketName)
.file(objectPath.replace("*", "000000000000"))
.getSignedUrl(options);
return {
signedUrl,
success: true,
message: `Success: exported to ${uri}`,
companyId: companyIdToDelete,
applicationId: applicationIdToDelete,
destination: uri,
};
} catch (err) {
console.error("Export or cleanup failed:", err);
throw new Error(`Export failed: ${err.message}`);
}
};