95 lines
3.1 KiB
JavaScript
95 lines
3.1 KiB
JavaScript
import { BigQuery } from '@google-cloud/bigquery';
|
||
import { Storage } from '@google-cloud/storage';
|
||
|
||
// Cross-project setup:
|
||
// - BigQuery data source: fynd-jio-commerceml-prod
|
||
// - GCS destination: fynd-boltic-prod
|
||
const bigquery = new BigQuery({
|
||
projectId: process.env.PROJECT_ID || 'fynd-jio-commerceml-prod'
|
||
});
|
||
const storage = new Storage({
|
||
projectId: process.env.PROJECT_ID || 'fynd-jio-commerceml-prod',
|
||
credentials: process.env.GCS_CREDENTIALS || {}
|
||
});
|
||
|
||
export const exportMergedJson = async (req, res) => {
|
||
try {
|
||
const companyIdToDelete = req.body?.companyId;
|
||
const applicationIdToDelete = req.body?.applicationId;
|
||
|
||
const datasetId = process.env.DATASET || "temp_zenith_data";
|
||
const sourceTable = process.env.SOURCE_TABLE || "pr_training_data";
|
||
const bucketName = process.env.BUCKET_NAME || 'pr_dataset_storage';
|
||
const objectPath = `${companyIdToDelete}/${applicationIdToDelete}/GET_ALL_PRODUCTS/catalog-*.json`;
|
||
const uri = `gs://${bucketName}/${objectPath}`;
|
||
|
||
// Get deletion criteria from request body or environment
|
||
|
||
|
||
// 1️⃣ Direct export from source table to GCS (cross-project)
|
||
const sourceTablePath = `${bigquery.projectId}.${datasetId}.${sourceTable}`;
|
||
|
||
const exportSql = `
|
||
EXPORT DATA
|
||
OPTIONS(
|
||
uri='${uri}',
|
||
format='JSON',
|
||
overwrite=true
|
||
) AS
|
||
SELECT
|
||
ARRAY_CONCAT_AGG(JSON_EXTRACT_ARRAY(data)) AS merged_array
|
||
FROM
|
||
\`${sourceTablePath}\`
|
||
`;
|
||
const [queryJob] = await bigquery.createQueryJob({
|
||
query: exportSql,
|
||
useLegacySql: false,
|
||
});
|
||
const jobResponse = await queryJob.promise();
|
||
console.log(jobResponse);
|
||
|
||
// 2️⃣ Delete rows from source table after successful export
|
||
// if (companyIdToDelete && applicationIdToDelete) {
|
||
// console.log('Starting deletion of processed rows...');
|
||
// const [deleteJob] = await bigquery.createQueryJob({
|
||
// query: `
|
||
// DELETE FROM \`${sourceTablePath}\`
|
||
// WHERE companyId = @companyId AND applicationId = @applicationId
|
||
// `,
|
||
// params: {
|
||
// companyId: companyIdToDelete,
|
||
// applicationId: applicationIdToDelete
|
||
// },
|
||
// useLegacySql: false,
|
||
// });
|
||
|
||
// await deleteJob.promise();
|
||
// console.log(`Successfully deleted rows with companyId: ${companyIdToDelete}, applicationId: ${applicationIdToDelete}`);
|
||
// } else {
|
||
// console.log('No deletion criteria provided, skipping row cleanup');
|
||
// }
|
||
|
||
const options = {
|
||
version: 'v4',
|
||
action: 'read',
|
||
expires: Date.now() + 15 * 60 * 1000,
|
||
extensionHeaders: {},
|
||
queryParams: {},
|
||
method: 'POST'
|
||
};
|
||
|
||
const signedUrl = await storage.bucket(bucketName).file(objectPath.replace('*', '000000000000')).getSignedUrl(options);
|
||
|
||
return {
|
||
signedUrl,
|
||
success: true,
|
||
message: `Success: exported to ${uri}`,
|
||
companyId: companyIdToDelete,
|
||
applicationId: applicationIdToDelete,
|
||
destination: uri
|
||
};
|
||
} catch (err) {
|
||
console.error('Export or cleanup failed:', err);
|
||
throw new Error(`Export failed: ${err.message}`);
|
||
}
|
||
}; |