Skip to content

Commit

Permalink
Fix gracefulShutdown and introduce forcefulShutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
benjie committed Oct 17, 2023
1 parent 5ddf9de commit e60099b
Show file tree
Hide file tree
Showing 7 changed files with 248 additions and 66 deletions.
13 changes: 8 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1657,11 +1657,14 @@ If a job throws an error, the job is failed and scheduled for retries with
exponential back-off. We use async/await so assuming you write your task code
well all errors should be cascaded down automatically.

If the worker is terminated (`SIGTERM`, `SIGINT`, etc), it
[triggers a graceful shutdown](https://github.com/graphile/worker/blob/3540df5ab4eb73f846d54959fdfad07897b616f0/src/main.ts#L39-L66) -
i.e. it stops accepting new jobs, waits for the existing jobs to complete, and
then exits. If you need to restart your worker, you should do so using this
graceful process.
If the worker is sent a termination signal (`SIGTERM`, `SIGINT`, etc), it
triggers a graceful shutdown - i.e. it stops accepting new jobs, waits for the
existing jobs to complete, and then exits. If you need to restart your worker,
you should do so using this graceful process. After 5 seconds (during which
duplicate signals are ignored), if this same signal is sent again it will
trigger a forceful shutdown: all running jobs will be "failed" (i.e. will retry
on another worker after their exponential back-off) and then the worker will
exit.

If the worker completely dies unexpectedly (e.g. `process.exit()`, segfault,
`SIGKILL`) then the jobs that that worker was executing remain locked for at
Expand Down
4 changes: 2 additions & 2 deletions __tests__/main.runTaskList.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ test("main will execute jobs as they come up, and exits cleanly", () =>

await sleep(1);
expect(finished).toBeFalsy();
await workerPool.release();
await workerPool.gracefulShutdown();
expect(job1).toHaveBeenCalledTimes(5);
await sleep(1);
expect(finished).toBeTruthy();
Expand All @@ -90,7 +90,7 @@ test("doesn't bail on deprecated `debug` function", () =>
await addJob(pgPool);
await sleepUntil(() => !!jobPromise);
jobPromise!.resolve();
await workerPool.release();
await workerPool.gracefulShutdown();
} finally {
if (jobPromise) {
(jobPromise as Deferred).resolve();
Expand Down
2 changes: 1 addition & 1 deletion __tests__/resetLockedAt.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ where task_id = (select id from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}.tasks where id
const states: string[] = [];
events.on("resetLocked:started", () => {
states.push("started");
workerPool.release();
workerPool.gracefulShutdown();
});
events.on("resetLocked:success", () => {
states.push("success");
Expand Down
2 changes: 1 addition & 1 deletion perfTest/latencyTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ async function main() {
)}ms, avg: ${average.toFixed(2)}ms`,
);

await workerPool.release();
await workerPool.gracefulShutdown();
await pgPool.end();
console.log("Done");
}
Expand Down
39 changes: 38 additions & 1 deletion src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,10 @@ export interface Worker {
}

export interface WorkerPool {
/** @deprecated Use gracefulShutdown instead */
release: () => Promise<void>;
gracefulShutdown: (message: string) => Promise<void>;
gracefulShutdown: (message?: string) => Promise<void>;
forcefulShutdown: (message: string) => Promise<void>;
promise: Promise<void>;
}

Expand Down Expand Up @@ -646,6 +648,36 @@ export type WorkerEventMap = {
*/
"pool:gracefulShutdown:error": { pool: WorkerPool; error: any };

/**
* When a worker pool graceful shutdown is successful, but one of the workers
* throws an error from release()
*/
"pool:gracefulShutdown:workerError": {
pool: WorkerPool;
error: any;
job: Job | null;
};

/**
* When a worker pool graceful shutdown throws an error
*/
"pool:gracefulShutdown:complete": { pool: WorkerPool };

/**
* When a worker pool starts a forceful shutdown
*/
"pool:forcefulShutdown": { pool: WorkerPool; message: string };

/**
* When a worker pool forceful shutdown throws an error
*/
"pool:forcefulShutdown:error": { pool: WorkerPool; error: any };

/**
* When a worker pool forceful shutdown throws an error
*/
"pool:forcefulShutdown:complete": { pool: WorkerPool };

/**
* When a worker is created
*/
Expand Down Expand Up @@ -813,6 +845,11 @@ export type WorkerEventMap = {
*/
gracefulShutdown: { signal: Signal };

/**
* When the runner is terminated by a signal _again_ after 5 seconds
*/
forcefulShutdown: { signal: Signal };

/**
* When the runner is stopped
*/
Expand Down
Loading

0 comments on commit e60099b

Please sign in to comment.