This repository has been archived on 2024-01-19. You can view files and clone it, but cannot push or open issues or pull requests.
DataHoard/src/models/WorkerProcess.ts

122 lines
3.6 KiB
TypeScript
Raw Normal View History

import * as mysql from "mysql";
import { NodeConnection } from "../helper/config";
import { uuid } from "../helper/mainHelper";
export class DataWorker {
/**
* An array of tasks that should be taken care of
*/
private nodeTasks : Array<NodeConnection> = [];
/**
* The amount of parallel tasks to run at once
*/
private parallelTasksCount: number;
private id: string;
/**
* @param parallelTasks The amount of parallel tasks that this Worker can run at once
*/
constructor(parallelTasks: number) {
// Initialize the amount of paralel tasks to run at the same time
this.parallelTasksCount = parallelTasks;
// Generate an ID for the worker
this.id = uuid();
}
public getID() : string {
return this.id;
}
public addNodeTask(node: NodeConnection) {
// Add the task to the list
this.nodeTasks.push(node);
}
/**
* Start processing the node tasks.
*/
public startProcessing() : Promise<PromiseSettledResult<any>[]> {
return new Promise( async (_r, _e) => {
try {
// Go through all of the tasks to run
for ( let i = 0; i < this.nodeTasks.length; i++ ) {
// Spawn in the task queue
let taskQueue : Array<Promise<any>> = [];
// Start running the parallel tasks
for ( let x = 0; x < this.parallelTasksCount; x++ ) {
// Check if we have stept out of bounds
if ( i >= this.nodeTasks.length ) // Ignore out-of-bounds
break;
// Add the process to the queue
taskQueue.push(this.processTask(this.nodeTasks[i]))
// Add to the index
i++;
}
// Return the resulting resolves
_r(await Promise.allSettled(taskQueue));
}
} catch ( ex ) {
_e(ex);
}
})
}
/**
* Process a task
* @param task The task to process
*/
private processTask(task: NodeConnection) {
return new Promise<string>( async (_r, _e) => {
// Connect to the mysql database to test out the connection
const conn : mysql.Connection = mysql.createConnection({
host: task.hostname,
port: task.port,
user: task.username,
password: task.password,
});
try { // Try and connect to the database
// Connect to the database
await new Promise<void>( (rez, err) => {
conn.connect(e => {
if ( !!e ) {
// Mark the results as failure
err(e);
}
// Mark the promise as done
rez();
})
})
} catch ( error ) { // Return the connection error upstream
return _e(error);
}
// Check if we have target
try { // Close the connection
await new Promise<void>((re, er) => {
conn.end( err => {
// Just ignore it, whatever.
re();
});
})
} catch ( err ) {};
// Stop the code and mark as success
_r(`Succesfully ran task ${task.name}`);
})
}
}