Skip to content

Commit

Permalink
Publish event in withCtx flow only (#8188)
Browse files Browse the repository at this point in the history
* api key

* attachment

* file operation

* group

* share

* star

* subscription

* publish events in withCtx flow only

* cleanup GroupUser hooks:false

* type and rename

* rename publish to create
  • Loading branch information
hmacr authored Jan 8, 2025
1 parent cf3e29b commit 3c5ce8c
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 35 deletions.
1 change: 0 additions & 1 deletion server/commands/userSuspender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ export default async function userSuspender({
userId: user.id,
},
transaction,
hooks: false,
});
await Event.create(
{
Expand Down
7 changes: 3 additions & 4 deletions server/models/GroupUser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ import {
AfterCreate,
AfterDestroy,
} from "sequelize-typescript";
import { APIContext } from "@server/types";
import Group from "./Group";
import User from "./User";
import Model from "./base/Model";
import Model, { type HookContext } from "./base/Model";
import Fix from "./decorators/Fix";

@DefaultScope(() => ({
Expand Down Expand Up @@ -75,15 +74,15 @@ class GroupUser extends Model<
@AfterCreate
public static async publishAddUserEvent(
model: GroupUser,
context: APIContext["context"]
context: HookContext
) {
await Group.insertEvent("add_user", model, context);
}

@AfterDestroy
public static async publishRemoveUserEvent(
model: GroupUser,
context: APIContext["context"]
context: HookContext
) {
await Group.insertEvent("remove_user", model, context);
}
Expand Down
108 changes: 79 additions & 29 deletions server/models/base/Model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,12 @@ import isObject from "lodash/isObject";
import pick from "lodash/pick";
import {
Attributes,
CreateOptions,
CreationAttributes,
DataTypes,
FindOptions,
FindOrCreateOptions,
InstanceDestroyOptions,
InstanceRestoreOptions,
InstanceUpdateOptions,
ModelStatic,
NonAttribute,
SaveOptions,
} from "sequelize";
import {
AfterCreate,
Expand All @@ -30,10 +25,20 @@ import Logger from "@server/logging/Logger";
import { Replace, APIContext } from "@server/types";
import { getChangsetSkipped } from "../decorators/Changeset";

export type EventOverride = {
type EventOverrideOptions = {
/** Override the default event name. */
name?: string;
};

type EventOptions = EventOverrideOptions & {
/**
* Whether to publish event to the job queue. Defaults to true when using any `withCtx` methods.
*/
create: boolean;
};

export type HookContext = APIContext["context"] & { event?: EventOptions };

class Model<
TModelAttributes extends {} = any,
TCreationAttributes extends {} = TModelAttributes
Expand All @@ -47,34 +52,65 @@ class Model<
/**
* Validates this instance, and if the validation passes, persists it to the database.
*/
public saveWithCtx(ctx: APIContext, eventOverride?: EventOverride) {
this.eventOverride = eventOverride;
public saveWithCtx(ctx: APIContext, eventOpts?: EventOverrideOptions) {
const hookContext: HookContext = {
...ctx.context,
event: {
...eventOpts,
create: true,
},
};
this.cacheChangeset();
return this.save(ctx.context as SaveOptions);
return this.save(hookContext);
}

/**
* This is the same as calling `set` and then calling `save`.
*/
public updateWithCtx(ctx: APIContext, keys: Partial<TModelAttributes>) {
public updateWithCtx(
ctx: APIContext,
keys: Partial<TModelAttributes>,
eventOpts?: EventOverrideOptions
) {
const hookContext: HookContext = {
...ctx.context,
event: {
...eventOpts,
create: true,
},
};
this.set(keys);
this.cacheChangeset();
return this.save(ctx.context as SaveOptions);
return this.save(hookContext);
}

/**
* Destroy the row corresponding to this instance. Depending on your setting for paranoid, the row will
* either be completely deleted, or have its deletedAt timestamp set to the current time.
*/
public destroyWithCtx(ctx: APIContext) {
return this.destroy(ctx.context as InstanceDestroyOptions);
public destroyWithCtx(ctx: APIContext, eventOpts?: EventOverrideOptions) {
const hookContext: HookContext = {
...ctx.context,
event: {
...eventOpts,
create: true,
},
};
return this.destroy(hookContext);
}

/**
* Restore the row corresponding to this instance. Only available for paranoid models.
*/
public restoreWithCtx(ctx: APIContext) {
return this.restore(ctx.context as InstanceRestoreOptions);
public restoreWithCtx(ctx: APIContext, eventOpts?: EventOverrideOptions) {
const hookContext: HookContext = {
...ctx.context,
event: {
...eventOpts,
create: true,
},
};
return this.restore(hookContext);
}

/**
Expand All @@ -84,11 +120,19 @@ class Model<
public static findOrCreateWithCtx<M extends Model>(
this: ModelStatic<M>,
ctx: APIContext,
options: FindOrCreateOptions<Attributes<M>, CreationAttributes<M>>
options: FindOrCreateOptions<Attributes<M>, CreationAttributes<M>>,
eventOpts?: EventOverrideOptions
) {
const hookContext: HookContext = {
...ctx.context,
event: {
...eventOpts,
create: true,
},
};
return this.findOrCreate({
...options,
...ctx.context,
...hookContext,
});
}

Expand All @@ -98,9 +142,17 @@ class Model<
public static createWithCtx<M extends Model>(
this: ModelStatic<M>,
ctx: APIContext,
values?: CreationAttributes<M>
values?: CreationAttributes<M>,
eventOpts?: EventOverrideOptions
) {
return this.create(values, ctx.context as CreateOptions);
const hookContext: HookContext = {
...ctx.context,
event: {
...eventOpts,
create: true,
},
};
return this.create(values, hookContext);
}

@BeforeCreate
Expand All @@ -111,39 +163,39 @@ class Model<
@AfterCreate
static async afterCreateEvent<T extends Model>(
model: T,
context: APIContext["context"]
context: HookContext
) {
await this.insertEvent("create", model, context);
}

@AfterUpsert
static async afterUpsertEvent<T extends Model>(
model: T,
context: APIContext["context"]
context: HookContext
) {
await this.insertEvent("create", model, context);
}

@AfterUpdate
static async afterUpdateEvent<T extends Model>(
model: T,
context: APIContext["context"]
context: HookContext
) {
await this.insertEvent("update", model, context);
}

@AfterDestroy
static async afterDestroyEvent<T extends Model>(
model: T,
context: APIContext["context"]
context: HookContext
) {
await this.insertEvent("delete", model, context);
}

@AfterRestore
static async afterRestoreEvent<T extends Model>(
model: T,
context: APIContext["context"]
context: HookContext
) {
await this.insertEvent("create", model, context);
}
Expand All @@ -158,13 +210,13 @@ class Model<
protected static async insertEvent<T extends Model>(
name: string,
model: T,
context: APIContext["context"] & InstanceUpdateOptions
context: HookContext
) {
const namespace = this.eventNamespace;
const models = this.sequelize!.models;

// If no namespace is defined, don't create an event
if (!namespace || context.silent) {
if (!namespace || !context.event?.create) {
return;
}

Expand All @@ -182,7 +234,7 @@ class Model<

return models.event.create(
{
name: `${namespace}.${model.eventOverride?.name ?? name}`,
name: `${namespace}.${context.event.name ?? name}`,
modelId: "modelId" in model ? model.modelId : model.id,
collectionId:
"collectionId" in model
Expand Down Expand Up @@ -350,8 +402,6 @@ class Model<
attributes: Partial<TModelAttributes>;
previous: Partial<TModelAttributes>;
}> | null;

private eventOverride?: EventOverride;
}

export default Model;
1 change: 0 additions & 1 deletion server/queues/processors/UserDeletedProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ export default class UserDeletedProcessor extends BaseProcessor {
userId: event.userId,
},
transaction,
hooks: false,
});
await UserAuthentication.destroy({
where: {
Expand Down

0 comments on commit 3c5ce8c

Please sign in to comment.