Skip to content

Commit

Permalink
Allow overriding parsed change attributes with a custom function
Browse files Browse the repository at this point in the history
  • Loading branch information
exAspArk committed Oct 16, 2024
1 parent 8da374d commit 439740f
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 11 deletions.
3 changes: 3 additions & 0 deletions core/.eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,7 @@ module.exports = {
node: true,
},
ignorePatterns: ['dist/**', 'node_modules/**'],
rules: {
'@typescript-eslint/no-explicit-any': 'off',
},
}
16 changes: 11 additions & 5 deletions core/src/fetched-record.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ export const MESSAGE_PREFIX_CONTEXT = '_bemi'
export const MESSAGE_PREFIX_HEARTBEAT = '_bemi_heartbeat'
const UNAVAILABLE_VALUE_PLACEHOLDER = '__bemi_unavailable_value'

// eslint-disable-next-line @typescript-eslint/no-explicit-any
const parseDebeziumData = (debeziumChange: any, now: Date) => {
const parseDebeziumData = (debeziumChange: any, now: Date): RequiredEntityData<Change> => {
const {
op,
before: beforeRaw,
Expand Down Expand Up @@ -86,8 +85,13 @@ export class FetchedRecord {
this.messagePrefix = messagePrefix
}

static fromNatsMessage(natsMessage: JsMsg, now = new Date()) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
static fromNatsMessage(
natsMessage: JsMsg,
{
now = new Date(),
changeAttributesOverride = (changeAttributes: RequiredEntityData<Change>) => changeAttributes,
} = {},
) {
const debeziumData = decodeData(natsMessage.data) as any

const messagePrefix = debeziumData.message?.prefix
Expand All @@ -96,8 +100,10 @@ export class FetchedRecord {
return null
}

const changeAttributes = changeAttributesOverride(parseDebeziumData(debeziumData, now))

return new FetchedRecord({
changeAttributes: parseDebeziumData(debeziumData, now),
changeAttributes,
subject: natsMessage.subject,
streamSequence: natsMessage.info.streamSequence,
messagePrefix,
Expand Down
6 changes: 4 additions & 2 deletions core/src/ingestion.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { MikroORM } from '@mikro-orm/postgresql'
import { MikroORM, RequiredEntityData } from '@mikro-orm/postgresql'
import { Consumer, JsMsg } from 'nats'

import { logger } from './logger'
Expand Down Expand Up @@ -69,12 +69,14 @@ export const runIngestionLoop = async ({
fetchBatchSize = 100,
insertBatchSize = 100,
useBuffer = false,
changeAttributesOverride = (changeAttributes: RequiredEntityData<Change>) => changeAttributes,
}: {
orm: MikroORM
consumer: Consumer
fetchBatchSize?: number
insertBatchSize?: number
useBuffer?: boolean
changeAttributesOverride?: (changeAttributes: RequiredEntityData<Change>) => RequiredEntityData<Change>
}) => {
let lastStreamSequence: number | null = null
let fetchedRecordBuffer = new FetchedRecordBuffer()
Expand Down Expand Up @@ -102,7 +104,7 @@ export const runIngestionLoop = async ({
const now = new Date()
const natsMessages = Object.values(natsMessageBySequence)
const fetchedRecords = natsMessages
.map((m: JsMsg) => FetchedRecord.fromNatsMessage(m, now))
.map((m: JsMsg) => FetchedRecord.fromNatsMessage(m, { now, changeAttributesOverride }))
.filter((r) => r) as FetchedRecord[]
const { stitchedFetchedRecords, newFetchedRecordBuffer, ackStreamSequence } = stitchFetchedRecords({
fetchedRecordBuffer: fetchedRecordBuffer.addFetchedRecords(fetchedRecords),
Expand Down
3 changes: 0 additions & 3 deletions core/src/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import util from 'util'

const LOG_LEVEL = process.env.LOG_LEVEL

// eslint-disable-next-line @typescript-eslint/no-explicit-any
const log = (message: any) => {
if (typeof message === 'string') {
return console.log(message)
Expand All @@ -12,11 +11,9 @@ const log = (message: any) => {
}

export const logger = {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
debug: (message: any) => {
if (LOG_LEVEL === 'DEBUG') log(message)
},
// eslint-disable-next-line @typescript-eslint/no-explicit-any
info: (message: any) => {
log(message)
},
Expand Down
20 changes: 20 additions & 0 deletions core/src/specs/fetched-record.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,24 @@ describe('fromNatsMessage', () => {

expect(result).toStrictEqual(null)
})

test('customizes changeAttributes with changeAttributesOverride', () => {
const subject = 'bemi-subject'
const natsMessage = buildNatsMessage({ subject, streamSequence: 1, data: MESSAGE_DATA.CREATE })

const result = FetchedRecord.fromNatsMessage(natsMessage, {
changeAttributesOverride: (changeAttributes) => ({
...changeAttributes,
primaryKey: 'custom-primary-key',
}),
})

expect(result).toStrictEqual(
new FetchedRecord({
subject,
streamSequence: 1,
changeAttributes: { ...CHANGE_ATTRIBUTES.CREATE, primaryKey: 'custom-primary-key' },
}),
)
})
})
1 change: 0 additions & 1 deletion core/src/specs/fixtures/nats-messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,6 @@ export const buildNatsMessage = ({
streamSequence,
}: {
subject: string
// eslint-disable-next-line @typescript-eslint/no-explicit-any
data: any
streamSequence: number
}): JsMsg => ({
Expand Down
4 changes: 4 additions & 0 deletions docs/docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ keywords: ['Bemi Changelog', 'Bemi New Features', 'Postgres Audit Trails', 'Chan
* Allow setting and customizing [Ignore Column Rules](https://docs.bemi.io/postgresql/source-database#ignoring-by-columns)
* Platform
* Create PG publications limited to specific tables with selective tracking
* [Bemi Core](https://github.com/BemiHQ/bemi)
* Allow customizing parsed change attributes with an override function
* [Bemi Prisma](https://github.com/BemiHQ/bemi-prisma)
* Fix compatibility with Prisma v5.20+ driver adapter

## 2024-09

Expand Down

0 comments on commit 439740f

Please sign in to comment.