Task manager using FIFO policy for choosing tasks to run and node worker threads for delegating the execution of the tasks.
git clone NotFiliberto/fifo-task-manager #clone the example
cd fifo-task-manager
npm install #install dependecies
npm run dev
Or if you want to debug it with chrome nodejs inspector
npm run dev-inspect
The only thing that you need to do is create your custom Task (in this example we created CustomTask) extending abstract Task class and overriding it's run method.
You can pass to this class a data type, so you can access the data inside the task in typesafety way.
this task will cause the assigned worker to wait for a random amount of time
type CustomTaskDataType = {
name: string
export default class CustomTask extends GenericTypesafeTask<CustomTaskDataType> {
constructor(data?: CustomTaskDataType) {
private async asyncWork(seconds: number) {
await new Promise((resolve) =>
setTimeout(() => resolve(10), seconds * 1000)
private getRandomNumber(min: number, max: number) {
return Math.floor(Math.random() * (max - min + 1)) + min
async run({ threadId }: { threadId: number }) {
//you can pass everything here
const name = this.data.name //TYPESAFE
const n = this.getRandomNumber(1, 10)
await this.asyncWork(n) //wait for n seconds
`[WORKER #${threadId}] done! ${new Date()
.slice(14, 19)} waited for ${n} seconds --> ${
return n
Then you have to import your custom task and your custom task data type in the worker file and create a new TaskWorker instance like this
/* worker.ts */
import { parentPort, MessagePort, workerData, threadId } from "worker_threads"
import { TaskWorker, CustomTask } from "./classes"
type CustomTaskDataType = {
name: string
async function main() {
const { data, command } = workerData //receveid when the thread is created
if (command === "start") {
const taskData = data as CustomTaskDataType
const taskWorker = new TaskWorker<CustomTask, CustomTaskDataType>(
parentPort as MessagePort,
new CustomTask(taskData),
await taskWorker.doTask()
Eventually in your index.ts you can create your task manager instance passing three parameters:
- worker path (relative path)
- maximum number of concurrent tasks
- tasks to run
/* index.ts */
import { TaskManager, CustomTask } from "./classes"
import { CustomTaskDataType } from "./types"
import path from "path"
type CustomTaskDataType = {
name: string
async function main() {
const WORKER_PATH = path.resolve(__dirname, "./worker")
const tasks = [
new CustomTask({ name: "mario" }), //CustomTaskDataType
new CustomTask({ name: "doccia" }),
const taskManager = new TaskManager<CustomTask, CustomTaskDataType>(
() => taskManager.addTask(new CustomTask({ name: "homyatol" })),
) //add new task after 11 secodns
- when a task is stopped the task manager will save its updated data
- if there are any pending task in the waiting list the task manager will stop the oldest running task (always the first item in the active task manager queue) and insert the first oldest waiting task in the waiting task queue
- You can't add different Task for the same Task Manager, because when the task swaps with another task, they only swap data and not any other information, this is a limitation of worker_threads, you can't pass any function or method inside your custom task when you create a worker from main thread