Skip to content

Commit

Permalink
2248 zapier integration implement typeorm eventsubscribers (twentyhq#…
Browse files Browse the repository at this point in the history
…3122)

* Add new queue to twenty-server

* Add triggers to zapier

* Rename webhook operation

* Use find one or fail

* Use logger

* Fix typescript templating

* Add dedicated call webhook job

* Update logging

* Fix error handling
  • Loading branch information
martmull authored Jan 3, 2024
1 parent 4ebaacc commit 6525083
Show file tree
Hide file tree
Showing 36 changed files with 1,040 additions and 209 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,34 @@
import { Module } from '@nestjs/common';
import { ModuleRef } from '@nestjs/core';
import { HttpModule } from '@nestjs/axios';

import { FetchMessagesJob } from 'src/workspace/messaging/jobs/fetch-messages.job';
import { CallWebhookJobsJob } from 'src/workspace/workspace-query-runner/jobs/call-webhook-jobs.job';
import { CallWebhookJob } from 'src/workspace/workspace-query-runner/jobs/call-webhook.job';
import { WorkspaceDataSourceModule } from 'src/workspace/workspace-datasource/workspace-datasource.module';
import { ObjectMetadataModule } from 'src/metadata/object-metadata/object-metadata.module';
import { DataSourceModule } from 'src/metadata/data-source/data-source.module';

@Module({
imports: [
WorkspaceDataSourceModule,
ObjectMetadataModule,
DataSourceModule,
HttpModule,
],
providers: [
{
provide: FetchMessagesJob.name,
useClass: FetchMessagesJob,
},
{
provide: CallWebhookJobsJob.name,
useClass: CallWebhookJobsJob,
},
{
provide: CallWebhookJob.name,
useClass: CallWebhookJob,
},
],
})
export class JobsModule {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ export const QUEUE_DRIVER = Symbol('QUEUE_DRIVER');
export enum MessageQueue {
taskAssignedQueue = 'task-assigned-queue',
messagingQueue = 'messaging-queue',
webhookQueue = 'webhook-queue',
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ export class MessageQueueModule {
module: MessageQueueModule,
imports: [JobsModule, ...(options.imports || [])],
providers,
exports: [MessageQueue.taskAssignedQueue, MessageQueue.messagingQueue],
exports: [
MessageQueue.taskAssignedQueue,
MessageQueue.messagingQueue,
MessageQueue.webhookQueue,
],
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,24 @@ export class ObjectMetadataService extends TypeOrmQueryService<ObjectMetadataEnt
});
}

public async findOneOrFailWithinWorkspace(
workspaceId: string,
options: FindOneOptions<ObjectMetadataEntity>,
): Promise<ObjectMetadataEntity> {
return this.objectMetadataRepository.findOneOrFail({
relations: [
'fields',
'fields.fromRelationMetadata',
'fields.toRelationMetadata',
],
...options,
where: {
...options.where,
workspaceId,
},
});
}

public async findManyWithinWorkspace(
workspaceId: string,
options?: FindManyOptions<ObjectMetadataEntity>,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import { Inject, Injectable, Logger } from '@nestjs/common';

import { MessageQueueJob } from 'src/integrations/message-queue/interfaces/message-queue-job.interface';

import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service';
import { ObjectMetadataService } from 'src/metadata/object-metadata/object-metadata.service';
import { DataSourceService } from 'src/metadata/data-source/data-source.service';
import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service';
import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants';
import {
CallWebhookJob,
CallWebhookJobData,
} from 'src/workspace/workspace-query-runner/jobs/call-webhook.job';

export enum CallWebhookJobsJobOperation {
create = 'create',
update = 'update',
delete = 'delete',
}

export type CallWebhookJobsJobData = {
workspaceId: string;
objectNameSingular: string;
recordData: any;
operation: CallWebhookJobsJobOperation;
};

@Injectable()
export class CallWebhookJobsJob
implements MessageQueueJob<CallWebhookJobsJobData>
{
private readonly logger = new Logger(CallWebhookJobsJob.name);

constructor(
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
private readonly objectMetadataService: ObjectMetadataService,
private readonly dataSourceService: DataSourceService,
@Inject(MessageQueue.webhookQueue)
private readonly messageQueueService: MessageQueueService,
) {}

async handle(data: CallWebhookJobsJobData): Promise<void> {
const objectMetadataItem =
await this.objectMetadataService.findOneOrFailWithinWorkspace(
data.workspaceId,
{ where: { nameSingular: data.objectNameSingular } },
);
const dataSourceMetadata =
await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceIdOrFail(
data.workspaceId,
);
const workspaceDataSource =
await this.workspaceDataSourceService.connectToWorkspaceDataSource(
data.workspaceId,
);
const operationName = `${data.operation}.${objectMetadataItem.namePlural}`;
const webhooks: { id: string; targetUrl: string }[] =
await workspaceDataSource?.query(
`SELECT * FROM ${dataSourceMetadata.schema}."webhook" WHERE operation='${operationName}'`,
);

webhooks.forEach((webhook) => {
this.messageQueueService.add<CallWebhookJobData>(
CallWebhookJob.name,
{
recordData: data.recordData,
targetUrl: webhook.targetUrl,
},
{ retryLimit: 3 },
);
});

this.logger.log(
`CallWebhookJobsJob on operation '${operationName}' called on webhooks ids [\n"${webhooks
.map((webhook) => webhook.id)
.join('",\n"')}"\n]`,
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { Injectable, Logger } from '@nestjs/common';
import { HttpService } from '@nestjs/axios';

import { MessageQueueJob } from 'src/integrations/message-queue/interfaces/message-queue-job.interface';

export type CallWebhookJobData = {
targetUrl: string;
recordData: any;
};

@Injectable()
export class CallWebhookJob implements MessageQueueJob<CallWebhookJobData> {
private readonly logger = new Logger(CallWebhookJob.name);

constructor(private readonly httpService: HttpService) {}

async handle(data: CallWebhookJobData): Promise<void> {
try {
await this.httpService.axiosRef.post(data.targetUrl, data.recordData);
this.logger.log(
`CallWebhookJob successfully called on targetUrl '${
data.targetUrl
}' with data: ${JSON.stringify(data.recordData)}`,
);
} catch (err) {
throw new Error(
`Error calling webhook on targetUrl '${data.targetUrl}': ${err}`,
);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {
BadRequestException,
Inject,
Injectable,
InternalServerErrorException,
Logger,
Expand All @@ -24,6 +25,13 @@ import {

import { WorkspaceQueryBuilderFactory } from 'src/workspace/workspace-query-builder/workspace-query-builder.factory';
import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service';
import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service';
import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants';
import {
CallWebhookJobsJob,
CallWebhookJobsJobData,
CallWebhookJobsJobOperation,
} from 'src/workspace/workspace-query-runner/jobs/call-webhook-jobs.job';
import { parseResult } from 'src/workspace/workspace-query-runner/utils/parse-result.util';
import { ExceptionHandlerService } from 'src/integrations/exception-handler/exception-handler.service';
import { globalExceptionHandler } from 'src/filters/utils/global-exception-handler.util';
Expand All @@ -41,6 +49,8 @@ export class WorkspaceQueryRunnerService {
constructor(
private readonly workspaceQueryBuilderFactory: WorkspaceQueryBuilderFactory,
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
@Inject(MessageQueue.webhookQueue)
private readonly messageQueueService: MessageQueueService,
private readonly exceptionHandlerService: ExceptionHandlerService,
) {}

Expand Down Expand Up @@ -117,11 +127,19 @@ export class WorkspaceQueryRunnerService {
);
const result = await this.execute(query, workspaceId);

return this.parseResult<PGGraphQLMutation<Record>>(
const parsedResults = this.parseResult<PGGraphQLMutation<Record>>(
result,
targetTableName,
'insertInto',
)?.records;

await this.triggerWebhooks<Record>(
parsedResults,
CallWebhookJobsJobOperation.create,
options,
);

return parsedResults;
} catch (exception) {
const error = globalExceptionHandler(
exception,
Expand All @@ -136,9 +154,15 @@ export class WorkspaceQueryRunnerService {
args: CreateOneResolverArgs<Record>,
options: WorkspaceQueryRunnerOptions,
): Promise<Record | undefined> {
const records = await this.createMany({ data: [args.data] }, options);
const results = await this.createMany({ data: [args.data] }, options);

await this.triggerWebhooks<Record>(
results,
CallWebhookJobsJobOperation.create,
options,
);

return records?.[0];
return results?.[0];
}

async updateOne<Record extends IRecord = IRecord>(
Expand All @@ -153,11 +177,19 @@ export class WorkspaceQueryRunnerService {
);
const result = await this.execute(query, workspaceId);

return this.parseResult<PGGraphQLMutation<Record>>(
const parsedResults = this.parseResult<PGGraphQLMutation<Record>>(
result,
targetTableName,
'update',
)?.records?.[0];
)?.records;

await this.triggerWebhooks<Record>(
parsedResults,
CallWebhookJobsJobOperation.update,
options,
);

return parsedResults?.[0];
} catch (exception) {
const error = globalExceptionHandler(
exception,
Expand All @@ -180,11 +212,19 @@ export class WorkspaceQueryRunnerService {
);
const result = await this.execute(query, workspaceId);

return this.parseResult<PGGraphQLMutation<Record>>(
const parsedResults = this.parseResult<PGGraphQLMutation<Record>>(
result,
targetTableName,
'deleteFrom',
)?.records?.[0];
)?.records;

await this.triggerWebhooks<Record>(
parsedResults,
CallWebhookJobsJobOperation.delete,
options,
);

return parsedResults?.[0];
} catch (exception) {
const error = globalExceptionHandler(
exception,
Expand All @@ -207,11 +247,19 @@ export class WorkspaceQueryRunnerService {
);
const result = await this.execute(query, workspaceId);

return this.parseResult<PGGraphQLMutation<Record>>(
const parsedResults = this.parseResult<PGGraphQLMutation<Record>>(
result,
targetTableName,
'update',
)?.records;

await this.triggerWebhooks<Record>(
parsedResults,
CallWebhookJobsJobOperation.update,
options,
);

return parsedResults;
} catch (exception) {
const error = globalExceptionHandler(
exception,
Expand All @@ -237,11 +285,19 @@ export class WorkspaceQueryRunnerService {
);
const result = await this.execute(query, workspaceId);

return this.parseResult<PGGraphQLMutation<Record>>(
const parsedResults = this.parseResult<PGGraphQLMutation<Record>>(
result,
targetTableName,
'deleteFrom',
)?.records;

await this.triggerWebhooks<Record>(
parsedResults,
CallWebhookJobsJobOperation.delete,
options,
);

return parsedResults;
} catch (exception) {
const error = globalExceptionHandler(
exception,
Expand Down Expand Up @@ -306,4 +362,26 @@ export class WorkspaceQueryRunnerService {

return this.parseResult(result, targetTableName, command);
}

async triggerWebhooks<Record>(
jobsData: Record[] | undefined,
operation: CallWebhookJobsJobOperation,
options: WorkspaceQueryRunnerOptions,
) {
if (!Array.isArray(jobsData)) {
return;
}
jobsData.forEach((jobData) => {
this.messageQueueService.add<CallWebhookJobsJobData>(
CallWebhookJobsJob.name,
{
recordData: jobData,
workspaceId: options.workspaceId,
operation,
objectNameSingular: options.targetTableName,
},
{ retryLimit: 3 },
);
});
}
}
Loading

0 comments on commit 6525083

Please sign in to comment.