import * as mysql from "mysql"; import { NodeConnection } from "../helper/config"; import { uuid } from "../helper/mainHelper"; import * as child from 'child_process'; import * as path from "path"; import * as fs from "fs"; export enum databaseExportLogType { ERROR = "Error", SUCCESS = "Success", } export interface databaseExportLog { data: any, type: databaseExportLogType, error?: Error, } export class DataWorker { /** * An array of tasks that should be taken care of */ private nodeTasks : Array = []; /** * The amount of parallel tasks to run at once */ private parallelTasksCount: number; private exportPath: string; private id: string; /** * @param parallelTasks The amount of parallel tasks that this Worker can run at once */ constructor(parallelTasks: number, exportPath: string) { // Initialize the amount of paralel tasks to run at the same time this.parallelTasksCount = parallelTasks; // Assign the export path this.exportPath = exportPath; // Generate an ID for the worker this.id = uuid(); } public getID() : string { return this.id; } public addNodeTask(node: NodeConnection) { // Add the task to the list this.nodeTasks.push(node); } /** * Start processing the node tasks. */ public startProcessing() : Promise[] | string> { return new Promise( async (_r, _e) => { try { if ( this.nodeTasks.length <= 0 ) { return _r("No tasks to run, skipping"); } // Go through all of the tasks to run for ( let i = 0; i < this.nodeTasks.length; i++ ) { // Spawn in the task queue let taskQueue : Array> = []; // Start running the parallel tasks for ( let x = 0; x < this.parallelTasksCount; x++ ) { // Check if we have stept out of bounds if ( i >= this.nodeTasks.length ) // Ignore out-of-bounds break; // Add the process to the queue taskQueue.push(this.processTask(this.nodeTasks[i])) // Add to the index i++; } // Return the resulting resolves _r(await Promise.allSettled(taskQueue)); } } catch ( ex ) { _e(ex); } }) } /** * Process a task * @param task The task to process */ private processTask(task: NodeConnection) { return new Promise( async (_r, _e) => { // Connect to the mysql database to test out the connection const conn : mysql.Connection = mysql.createConnection({ host: task.hostname, port: task.port, user: task.username, password: task.password, }); try { // Try and connect to the database // Connect to the database await new Promise( (rez, err) => { conn.connect(e => { if ( !!e ) { // Mark the results as failure err(e); } // Mark the promise as done rez(); }) }) } catch ( error ) { // Return the connection error upstream return _e(error); } let results: databaseExportLog[] = []; try { // Dump the database results = await this.dumpDatabase(conn, task); } catch ( err ) { conn.destroy(); return _e(err); } try { // Close the connection await new Promise((re, er) => { conn.end( err => { // Just ignore it, whatever. re(); }); }) } catch ( err ) {}; // Stop the code and mark as success _r(results); }) } private dumpDatabase(conn : mysql.Connection, task: NodeConnection) { return new Promise( async (_r, _e) => { // Get all of the databases from the connection const databaseList: string[] = []; let rawDatabaseList: string[] = []; try { rawDatabaseList = await new Promise((res, er) => { conn.query("SHOW DATABASES", (err, rez, fields) => { if ( err ) { // Could not list databases, there's definitely something wrong going to happen, do a forceful stop. return _e(err); } let databaseList: string[] = []; for ( let db of rez ) { databaseList.push(db['Database']); } res(databaseList); }) }); } catch ( ex ) { return _e(ex); } // Filter the database list based on the blacklist/whitelist for ( let db of rawDatabaseList ) { if ( !!task.databases ) { // Check the whitelist if ( !!task.databases.whitelist ) { if ( !task.databases.whitelist?.includes(db) ) { continue; } } // Check the blacklist if ( !!task.databases.blacklist ) { if ( task.databases.blacklist?.includes(db) ) { continue; } } } // The database passed our filters, add it to the final list! databaseList.push(db); } // List of logs for the different database dumps const resultLogs: databaseExportLog[] = []; // Run the mysqldump for every database for ( let db of databaseList ) { let startTime = new Date().getTime(); try { await new Promise((res, err) => { // Get the path to export to const exportPath: string = path.join( this.exportPath , `${db}.sql`); // Make sure the folder exists if ( !fs.existsSync(path.dirname(exportPath)) ) { fs.mkdirSync(path.dirname(exportPath)); } // Generate the export command const exportCommand: string = `mysqldump -u${task.username} -p${task.password} -h${task.hostname} -P${task.port} ${db} > ${exportPath}`; // Execute the export process child.exec(exportCommand, (execErr, stdout, stderr) => { if ( execErr ) { resultLogs.push({ type: databaseExportLogType.ERROR, data: { runTime: (new Date().getTime() - startTime) + "ms", stderr: stderr }, error: execErr, }); return res(); } resultLogs.push({ type: databaseExportLogType.SUCCESS, data: { runTime: (new Date().getTime() - startTime) + "ms", stdout: stdout, }, }); res(); }); }); } catch ( error ) { resultLogs.push({ type: databaseExportLogType.ERROR, data: { runTime: (new Date().getTime() - startTime) + "ms", }, error: error }); } } // Resolve the promise _r(resultLogs); }) } }