Skip to content

Commit

Permalink
fix(api-elasticsearch-tasks): data synchronization (#4337)
Browse files Browse the repository at this point in the history
brunozoric authored Nov 11, 2024
1 parent 3e443da commit 9bd4fa2
Showing 67 changed files with 1,479 additions and 426 deletions.
10 changes: 9 additions & 1 deletion packages/api-dynamodb-to-elasticsearch/src/Operations.ts
Original file line number Diff line number Diff line change
@@ -13,12 +13,20 @@ export enum OperationType {
}

export class Operations implements IOperations {
public readonly items: GenericRecord[] = [];
private _items: GenericRecord[] = [];

public get items(): GenericRecord[] {
return this._items;
}

public get total(): number {
return this.items.length;
}

public clear() {
this._items = [];
}

public insert(params: IInsertOperationParams): void {
this.items.push(
{
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import {
Context,
IDeleteOperationParams,
IInsertOperationParams,
IModifyOperationParams,
IOperations
} from "~/types";
import { Operations } from "~/Operations";
import { executeWithRetry, IExecuteWithRetryParams } from "~/executeWithRetry";
import { ITimer } from "@webiny/handler-aws";

export type ISynchronizationBuilderExecuteWithRetryParams = Omit<
IExecuteWithRetryParams,
"context" | "timer" | "maxRunningTime" | "operations"
>;

export interface ISynchronizationBuilder {
insert(params: IInsertOperationParams): void;
delete(params: IDeleteOperationParams): void;
build: () => (params?: ISynchronizationBuilderExecuteWithRetryParams) => Promise<void>;
}

export interface ISynchronizationBuilderParams {
timer: ITimer;
context: Pick<Context, "elasticsearch">;
}

export class SynchronizationBuilder implements ISynchronizationBuilder {
private readonly timer: ITimer;
private readonly context: Pick<Context, "elasticsearch">;
private readonly operations: IOperations;

public constructor(params: ISynchronizationBuilderParams) {
this.timer = params.timer;
this.context = params.context;
this.operations = new Operations();
}

public insert(params: IInsertOperationParams): void {
return this.operations.insert(params);
}

public modify(params: IModifyOperationParams): void {
return this.operations.modify(params);
}

public delete(params: IDeleteOperationParams): void {
return this.operations.delete(params);
}

public build() {
return async (params?: ISynchronizationBuilderExecuteWithRetryParams) => {
if (this.operations.total === 0) {
return;
}
await executeWithRetry({
...params,
maxRunningTime: this.timer.getRemainingMilliseconds(),
timer: this.timer,
context: this.context,
operations: this.operations
});
this.operations.clear();
};
}
}

export const createSynchronizationBuilder = (
params: ISynchronizationBuilderParams
): ISynchronizationBuilder => {
return new SynchronizationBuilder(params);
};
11 changes: 2 additions & 9 deletions packages/api-dynamodb-to-elasticsearch/src/eventHandler.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,9 @@
import { getNumberEnvVariable } from "~/helpers/getNumberEnvVariable";
import { createDynamoDBEventHandler, timerFactory } from "@webiny/handler-aws";
import { ElasticsearchContext } from "@webiny/api-elasticsearch/types";
import { Context } from "~/types";
import { Decompressor } from "~/Decompressor";
import { OperationsBuilder } from "~/OperationsBuilder";
import { executeWithRetry } from "~/executeWithRetry";

const MAX_PROCESSOR_PERCENT = getNumberEnvVariable(
"MAX_ES_PROCESSOR",
process.env.NODE_ENV === "test" ? 101 : 98
);

/**
* Also, we need to set the maximum running time for the Lambda Function.
* https://github.com/webiny/webiny-js/blob/f7352d418da2b5ae0b781376be46785aa7ac6ae0/packages/pulumi-aws/src/apps/core/CoreOpenSearch.ts#L232
@@ -20,7 +14,7 @@ const MAX_RUNNING_TIME = 900;
export const createEventHandler = () => {
return createDynamoDBEventHandler(async ({ event, context: ctx, lambdaContext }) => {
const timer = timerFactory(lambdaContext);
const context = ctx as unknown as ElasticsearchContext;
const context = ctx as unknown as Context;
if (!context.elasticsearch) {
console.error("Missing elasticsearch definition on context.");
return null;
@@ -49,7 +43,6 @@ export const createEventHandler = () => {
await executeWithRetry({
timer,
maxRunningTime: MAX_RUNNING_TIME,
maxProcessorPercent: MAX_PROCESSOR_PERCENT,
context,
operations
});
13 changes: 9 additions & 4 deletions packages/api-dynamodb-to-elasticsearch/src/execute.ts
Original file line number Diff line number Diff line change
@@ -5,9 +5,9 @@ import {
WaitingHealthyClusterAbortedError
} from "@webiny/api-elasticsearch";
import { ITimer } from "@webiny/handler-aws";
import { ApiResponse, ElasticsearchContext } from "@webiny/api-elasticsearch/types";
import { ApiResponse } from "@webiny/api-elasticsearch/types";
import { WebinyError } from "@webiny/error";
import { IOperations } from "./types";
import { Context, IOperations } from "./types";

export interface BulkOperationsResponseBodyItemIndexError {
reason?: string;
@@ -30,8 +30,8 @@ export interface IExecuteParams {
timer: ITimer;
maxRunningTime: number;
maxProcessorPercent: number;
context: Pick<ElasticsearchContext, "elasticsearch">;
operations: IOperations;
context: Pick<Context, "elasticsearch">;
operations: Pick<IOperations, "items" | "total">;
}

const getError = (item: BulkOperationsResponseBodyItem): string | null => {
@@ -67,6 +67,11 @@ const checkErrors = (result?: ApiResponse<BulkOperationsResponseBody>): void =>
export const execute = (params: IExecuteParams) => {
return async (): Promise<void> => {
const { context, timer, maxRunningTime, maxProcessorPercent, operations } = params;

if (operations.total === 0) {
return;
}

const remainingTime = timer.getRemainingSeconds();
const runningTime = maxRunningTime - remainingTime;
const maxWaitingTime = remainingTime - 90;
10 changes: 8 additions & 2 deletions packages/api-dynamodb-to-elasticsearch/src/executeWithRetry.ts
Original file line number Diff line number Diff line change
@@ -5,11 +5,17 @@ import { getNumberEnvVariable } from "./helpers/getNumberEnvVariable";

const minRemainingSecondsToTimeout = 120;

export interface IExecuteWithRetryParams extends IExecuteParams {
const MAX_PROCESSOR_PERCENT = getNumberEnvVariable(
"MAX_ES_PROCESSOR",
process.env.NODE_ENV === "test" ? 101 : 98
);

export interface IExecuteWithRetryParams extends Omit<IExecuteParams, "maxProcessorPercent"> {
maxRetryTime?: number;
retries?: number;
minTimeout?: number;
maxTimeout?: number;
maxProcessorPercent?: number;
}

export const executeWithRetry = async (params: IExecuteWithRetryParams) => {
@@ -35,7 +41,7 @@ export const executeWithRetry = async (params: IExecuteWithRetryParams) => {
execute({
timer: params.timer,
maxRunningTime: params.maxRunningTime,
maxProcessorPercent: params.maxProcessorPercent,
maxProcessorPercent: params.maxProcessorPercent || MAX_PROCESSOR_PERCENT,
context: params.context,
operations: params.operations
}),
1 change: 1 addition & 0 deletions packages/api-dynamodb-to-elasticsearch/src/index.ts
Original file line number Diff line number Diff line change
@@ -6,4 +6,5 @@ export * from "./marshall";
export * from "./NotEnoughRemainingTimeError";
export * from "./Operations";
export * from "./OperationsBuilder";
export * from "./SynchronizationBuilder";
export * from "./types";
4 changes: 4 additions & 0 deletions packages/api-dynamodb-to-elasticsearch/src/types.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import { GenericRecord } from "@webiny/cli/types";
import { DynamoDBRecord } from "@webiny/handler-aws/types";
import { ElasticsearchContext } from "@webiny/api-elasticsearch/types";

export type Context = Pick<ElasticsearchContext, "elasticsearch" | "plugins">;

export interface IOperationsBuilderBuildParams {
records: DynamoDBRecord[];
@@ -25,6 +28,7 @@ export interface IDeleteOperationParams {
export interface IOperations {
items: GenericRecord[];
total: number;
clear(): void;
insert(params: IInsertOperationParams): void;
modify(params: IModifyOperationParams): void;
delete(params: IDeleteOperationParams): void;
6 changes: 3 additions & 3 deletions packages/api-elasticsearch-tasks/__tests__/mocks/store.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { TaskManagerStore } from "@webiny/tasks/runner/TaskManagerStore";
import { Context, ITask, ITaskLog } from "@webiny/tasks/types";
import { Context, ITask, ITaskDataInput, ITaskLog } from "@webiny/tasks/types";
import { createTaskMock } from "~tests/mocks/task";
import { createContextMock } from "~tests/mocks/context";
import { createTaskLogMock } from "~tests/mocks/log";
@@ -9,11 +9,11 @@ interface Params {
task?: ITask;
log?: ITaskLog;
}
export const createTaskManagerStoreMock = (params?: Params) => {
export const createTaskManagerStoreMock = <T extends ITaskDataInput>(params?: Params) => {
const context = params?.context || createContextMock();
const task = params?.task || createTaskMock();
const log = params?.log || createTaskLogMock(task);
return new TaskManagerStore({
return new TaskManagerStore<T>({
context,
task,
log
Loading
Oops, something went wrong.

0 comments on commit 9bd4fa2

Please sign in to comment.