Skip to content

Commit

Permalink
feat(migrations): add file metadata to S3 files (#3804)
Browse files Browse the repository at this point in the history
  • Loading branch information
Pavel910 authored Jan 10, 2024
1 parent 2437a65 commit b0b7d15
Show file tree
Hide file tree
Showing 12 changed files with 530 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,6 @@ describe("5.36.0-001", () => {

// Should force-run the migration
{
// @ts-expect-error
process.env["WEBINY_MIGRATION_FORCE_EXECUTE_5_36_0_001"] = "true";
process.stdout.write("[Second run]\n");
const { data, error } = await handler();
Expand Down
6 changes: 5 additions & 1 deletion packages/migrations/src/ddb-es.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@ import { CmsEntriesRootFolder_5_37_0_002 } from "~/migrations/5.37.0/002/ddb-es"
import { AcoFolders_5_37_0_003 } from "~/migrations/5.37.0/003/ddb-es";
import { AcoRecords_5_37_0_004 } from "~/migrations/5.37.0/004/ddb-es";
import { FileManager_5_37_0_005 } from "~/migrations/5.37.0/005/ddb-es";
// 5.38.0
import { MultiStepForms_5_38_0_001 } from "~/migrations/5.38.0/001/ddb-es";
import { MultiStepForms_5_38_0_002 } from "~/migrations/5.38.0/002/ddb-es";
// Page Blocks storage is the same for both DDB abd DDB-ES projects.
import { PageBlocks_5_38_0_003 } from "~/migrations/5.38.0/003/ddb";
// 5.39.0
import { FileManager_5_39_0_005 } from "~/migrations/5.39.0/005/ddb-es";

export const migrations = () => {
return [
Expand All @@ -34,6 +37,7 @@ export const migrations = () => {
FileManager_5_37_0_005,
MultiStepForms_5_38_0_001,
MultiStepForms_5_38_0_002,
PageBlocks_5_38_0_003
PageBlocks_5_38_0_003,
FileManager_5_39_0_005
];
};
6 changes: 5 additions & 1 deletion packages/migrations/src/ddb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@ import { CmsEntriesRootFolder_5_37_0_002 } from "~/migrations/5.37.0/002/ddb";
import { AcoFolders_5_37_0_003 } from "~/migrations/5.37.0/003/ddb";
import { AcoRecords_5_37_0_004 } from "~/migrations/5.37.0/004/ddb";
import { FileManager_5_37_0_005 } from "~/migrations/5.37.0/005/ddb";
// 5.38.0
import { MultiStepForms_5_38_0_001 } from "~/migrations/5.38.0/001/ddb";
import { MultiStepForms_5_38_0_002 } from "~/migrations/5.38.0/002/ddb";
import { PageBlocks_5_38_0_003 } from "~/migrations/5.38.0/003/ddb";
// 5.39.0
import { FileManager_5_39_0_005 } from "~/migrations/5.39.0/005/ddb";

export const migrations = () => {
return [
Expand All @@ -33,6 +36,7 @@ export const migrations = () => {
FileManager_5_37_0_005,
MultiStepForms_5_38_0_001,
MultiStepForms_5_38_0_002,
PageBlocks_5_38_0_003
PageBlocks_5_38_0_003,
FileManager_5_39_0_005
];
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
import { Client } from "@elastic/elasticsearch";
import { inject, makeInjectable } from "@webiny/ioc";
import { executeWithRetry } from "@webiny/utils";
import { PrimitiveValue } from "@webiny/api-elasticsearch/types";
import {
DataMigration,
DataMigrationContext,
ElasticsearchClientSymbol,
PrimaryDynamoTableSymbol
} from "@webiny/data-migration";
import { S3 } from "@webiny/aws-sdk/client-s3";
import { Table } from "@webiny/db-dynamodb/toolbox";
import {
esQueryAllWithCallback,
forEachTenantLocale,
esFindOne,
esGetIndexExist,
esGetIndexName
} from "~/utils";
import { FileEntry } from "../utils/createFileEntity";
import { FileMetadata } from "../utils/FileMetadata";

const isGroupMigrationCompleted = (
status: PrimitiveValue[] | boolean | undefined
): status is boolean => {
return typeof status === "boolean";
};

export class FileManager_5_39_0_005 implements DataMigration {
private readonly elasticsearchClient: Client;
private readonly bucket: string;
private readonly s3: S3;
private readonly table: Table<string, string, string>;

constructor(table: Table<string, string, string>, elasticsearchClient: Client) {
this.table = table;
this.elasticsearchClient = elasticsearchClient;
this.s3 = new S3({ region: process.env.AWS_REGION });
this.bucket = String(process.env.S3_BUCKET);
}

getId() {
return "5.39.0-005";
}

getDescription() {
return "Generate a metadata file for every File Manager file.";
}

private getIndexParams(tenantId: string, localeCode: string) {
return {
tenant: tenantId,
locale: localeCode,
type: "fmFile",
isHeadlessCmsModel: true
};
}

async shouldExecute({ logger }: DataMigrationContext): Promise<boolean> {
let shouldExecute = false;

await forEachTenantLocale({
table: this.table,
logger,
callback: async ({ tenantId, localeCode }) => {
const indexExists = await esGetIndexExist({
elasticsearchClient: this.elasticsearchClient,
...this.getIndexParams(tenantId, localeCode)
});

if (!indexExists) {
logger.info(
`No elasticsearch index found for File Manager in tenant "${tenantId}" and locale "${localeCode}".`
);
return true;
}

// Fetch the latest file record from ES
const fmIndexName = esGetIndexName(this.getIndexParams(tenantId, localeCode));

const latestFile = await esFindOne<FileEntry>({
elasticsearchClient: this.elasticsearchClient,
index: fmIndexName,
body: {
query: {
bool: {
filter: [
{ term: { "tenant.keyword": tenantId } },
{ term: { "locale.keyword": localeCode } }
]
}
},
sort: [
{
"id.keyword": { order: "desc", unmapped_type: "keyword" }
}
]
}
});

if (!latestFile) {
logger.info(
`No files found in tenant "${tenantId}" and locale "${localeCode}".`
);
return true;
}

const fileMetadata = new FileMetadata(this.s3, this.bucket, latestFile);
const hasMetadata = await fileMetadata.exists();

if (!hasMetadata) {
shouldExecute = true;
return false;
}

// Continue to the next tenant/locale.
return true;
}
});

return shouldExecute;
}

async execute({ logger, ...context }: DataMigrationContext): Promise<void> {
const migrationStatus = context.checkpoint || {};

await forEachTenantLocale({
table: this.table,
logger,
callback: async ({ tenantId, localeCode }) => {
const groupId = `${tenantId}:${localeCode}`;
const status = migrationStatus[groupId];

if (isGroupMigrationCompleted(status)) {
return true;
}

const esIndexName = esGetIndexName(this.getIndexParams(tenantId, localeCode));

let batch = 0;
await esQueryAllWithCallback<FileEntry>({
elasticsearchClient: this.elasticsearchClient,
index: esIndexName,
body: {
query: {
bool: {
filter: [
{ term: { "tenant.keyword": tenantId } },
{ term: { "locale.keyword": localeCode } }
]
}
},
size: 10000,
sort: [
{
"id.keyword": { order: "asc", unmapped_type: "keyword" }
}
],
search_after: status
},
callback: async (files, cursor) => {
batch++;

logger.info(
`Processing batch #${batch} in group ${groupId} (${files.length} files).`
);

const writers = files.map(file => {
const fileMetadata = new FileMetadata(this.s3, this.bucket, file);
const writeMetadata = () => fileMetadata.create();

return executeWithRetry(writeMetadata, {
onFailedAttempt: error => {
logger.error(
`"batchWriteAll" attempt #${error.attemptNumber} failed.`
);
logger.error(error.message);
}
});
});

await Promise.all(writers);

// Update checkpoint after every batch
migrationStatus[groupId] = cursor;

// Check if we should store checkpoint and exit.
if (context.runningOutOfTime()) {
await context.createCheckpointAndExit(migrationStatus);
} else {
await context.createCheckpoint(migrationStatus);
}
}
});

// Mark group as completed.
migrationStatus[groupId] = true;

// Store checkpoint.
await context.createCheckpoint(migrationStatus);

// Continue processing.
return true;
}
});
}
}

makeInjectable(FileManager_5_39_0_005, [
inject(PrimaryDynamoTableSymbol),
inject(ElasticsearchClientSymbol)
]);
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from "./FileManager_5_39_0_005";
Loading

0 comments on commit b0b7d15

Please sign in to comment.