intial commit
This commit is contained in:
commit
6764412587
|
|
@ -0,0 +1,22 @@
|
|||
# Dependencies
|
||||
node_modules/
|
||||
npm-debug.log*
|
||||
|
||||
# Environment files
|
||||
.env
|
||||
.env.local
|
||||
.env.*.local
|
||||
|
||||
# Google Cloud
|
||||
.gcloudignore
|
||||
|
||||
# IDE
|
||||
.vscode/
|
||||
.idea/
|
||||
|
||||
# OS
|
||||
.DS_Store
|
||||
Thumbs.db
|
||||
|
||||
# Logs
|
||||
*.log
|
||||
|
|
@ -0,0 +1,93 @@
|
|||
import { BigQuery } from '@google-cloud/bigquery';
|
||||
import { Storage } from '@google-cloud/storage';
|
||||
|
||||
// Cross-project setup:
|
||||
// - BigQuery data source: fynd-jio-commerceml-prod
|
||||
// - GCS destination: fynd-boltic-prod
|
||||
const bigquery = new BigQuery({
|
||||
projectId: process.env.BIGQUERY_PROJECT || 'fynd-jio-commerceml-prod'
|
||||
});
|
||||
const storage = new Storage({
|
||||
projectId: process.env.STORAGE_PROJECT || 'fynd-jio-commerceml-prod',
|
||||
credentials: {}
|
||||
});
|
||||
|
||||
export const exportMergedJson = async (req, res) => {
|
||||
try {
|
||||
const datasetId = process.env.DATASET || "temp_zenith_data";
|
||||
const sourceTable = process.env.SOURCE_TABLE || "pr_training_data";
|
||||
const bucketName = 'pr_dataset_storage';
|
||||
const objectPath = '261/643e93f5ffc8101656a5629b/GET_ALL_PRODUCTS/catalog-*.json';
|
||||
const uri = `gs://${bucketName}/${objectPath}`;
|
||||
|
||||
// Get deletion criteria from request body or environment
|
||||
const companyIdToDelete = req.body?.companyId;
|
||||
const applicationIdToDelete = req.body?.applicationId;
|
||||
|
||||
// 1️⃣ Direct export from source table to GCS (cross-project)
|
||||
const sourceTablePath = `${bigquery.projectId}.${datasetId}.${sourceTable}`;
|
||||
|
||||
const exportSql = `
|
||||
EXPORT DATA
|
||||
OPTIONS(
|
||||
uri='${uri}',
|
||||
format='JSON',
|
||||
overwrite=true
|
||||
) AS
|
||||
SELECT
|
||||
ARRAY_CONCAT_AGG(JSON_EXTRACT_ARRAY(data)) AS merged_array
|
||||
FROM
|
||||
\`${sourceTablePath}\`
|
||||
`;
|
||||
const [queryJob] = await bigquery.createQueryJob({
|
||||
query: exportSql,
|
||||
useLegacySql: false,
|
||||
});
|
||||
const jobResponse = await queryJob.promise();
|
||||
console.log(jobResponse);
|
||||
|
||||
// 2️⃣ Delete rows from source table after successful export
|
||||
// if (companyIdToDelete && applicationIdToDelete) {
|
||||
// console.log('Starting deletion of processed rows...');
|
||||
// const [deleteJob] = await bigquery.createQueryJob({
|
||||
// query: `
|
||||
// DELETE FROM \`${sourceTablePath}\`
|
||||
// WHERE companyId = @companyId AND applicationId = @applicationId
|
||||
// `,
|
||||
// params: {
|
||||
// companyId: companyIdToDelete,
|
||||
// applicationId: applicationIdToDelete
|
||||
// },
|
||||
// useLegacySql: false,
|
||||
// });
|
||||
|
||||
// await deleteJob.promise();
|
||||
// console.log(`Successfully deleted rows with companyId: ${companyIdToDelete}, applicationId: ${applicationIdToDelete}`);
|
||||
// } else {
|
||||
// console.log('No deletion criteria provided, skipping row cleanup');
|
||||
// }
|
||||
|
||||
const options = {
|
||||
version: 'v4',
|
||||
action: 'read',
|
||||
expires: Date.now() + 15 * 60 * 1000,
|
||||
extensionHeaders: {},
|
||||
queryParams: {},
|
||||
method: 'POST'
|
||||
};
|
||||
|
||||
const signedUrl = await storage.bucket(bucketName).file(objectPath.replace('*', '000000000000')).getSignedUrl(options);
|
||||
|
||||
return {
|
||||
signedUrl,
|
||||
success: true,
|
||||
message: `Success: exported to ${uri}`,
|
||||
companyId: companyIdToDelete,
|
||||
applicationId: applicationIdToDelete,
|
||||
destination: uri
|
||||
};
|
||||
} catch (err) {
|
||||
console.error('Export or cleanup failed:', err);
|
||||
throw new Error(`Export failed: ${err.message}`);
|
||||
}
|
||||
};
|
||||
|
|
@ -0,0 +1,52 @@
|
|||
import express from 'express';
|
||||
import { exportMergedJson } from './bqexport.js';
|
||||
|
||||
const app = express();
|
||||
|
||||
// Middleware to parse JSON bodies
|
||||
app.use(express.json());
|
||||
|
||||
// Health check endpoint
|
||||
app.get('/', (req, res) => {
|
||||
res.json({
|
||||
status: 'OK',
|
||||
message: 'BigQuery Export Service is running',
|
||||
endpoints: {
|
||||
export: 'POST /export - Export BigQuery data to GCS'
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
app.post('/export', async (req, res) => {
|
||||
try {
|
||||
const { companyId, applicationId } = req.body;
|
||||
|
||||
console.log(`Received export request:`);
|
||||
console.log(`Company ID: ${companyId}`);
|
||||
console.log(`Application ID: ${applicationId}`);
|
||||
|
||||
// Call the BigQuery export function
|
||||
const result = await exportMergedJson(req, res);
|
||||
|
||||
// Send success response
|
||||
res.status(200).json({
|
||||
success: true,
|
||||
signedUrl: result.signedUrl,
|
||||
message: result.message,
|
||||
companyId: result.companyId,
|
||||
applicationId: result.applicationId,
|
||||
destination: result.destination
|
||||
});
|
||||
} catch (error) {
|
||||
console.error('Export endpoint error:', error);
|
||||
res.status(500).json({
|
||||
success: false,
|
||||
error: error.message
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
const PORT = process.env.PORT || 8080;
|
||||
app.listen(PORT, () => {
|
||||
console.log(`Server is running on port ${PORT}`);
|
||||
});
|
||||
File diff suppressed because it is too large
Load Diff
|
|
@ -0,0 +1,22 @@
|
|||
{
|
||||
"type": "module",
|
||||
"name": "bq-exporter",
|
||||
"version": "1.0.0",
|
||||
"description": "Express.js web server with dynamic greetings",
|
||||
"main": "index.js",
|
||||
"scripts": {
|
||||
"start": "node index.js",
|
||||
"dev": "nodemon index.js"
|
||||
},
|
||||
"engines": {
|
||||
"node": ">=18.0.0"
|
||||
},
|
||||
"dependencies": {
|
||||
"express": "^4.21.2",
|
||||
"@google-cloud/bigquery": "^6.0.0",
|
||||
"@google-cloud/storage": "^7.16.0"
|
||||
},
|
||||
"devDependencies": {
|
||||
"nodemon": "^3.1.10"
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user