Skip to content

Commit

Permalink
Merge branch 'master' into chore/ai-tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ivasilov authored Dec 19, 2024
2 parents f80f934 + c198dd8 commit 939117a
Show file tree
Hide file tree
Showing 6 changed files with 254 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1559,6 +1559,10 @@ export const realtime: NavMenuConstant = {
name: 'Listening to Postgres Changes with Flutter',
url: '/guides/realtime/realtime-listening-flutter',
},
{
name: 'Migrate to Broadcast Changes',
url: '/guides/realtime/migrate-from-postgres-changes',
},
],
},
{
Expand Down
116 changes: 114 additions & 2 deletions apps/docs/content/guides/realtime/broadcast.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ subtitle: 'Send and receive messages using Realtime Broadcast'
description: 'Send and receive messages using Realtime Broadcast'
---

Let's explore how to implement Realtime Broadcast to send messages between clients.
Let's explore how to implement Realtime Broadcast to send messages between clients using either WebSockets, REST API or triggers from your database.

## Usage

Expand Down Expand Up @@ -88,7 +88,7 @@ Go to your Supabase project's [API Settings](https://supabase.com/dashboard/proj

### Listening to broadcast messages

You can provide a callback for the `broadcast` channel to receive message. In this example we will receive any `broadcast` messages in `room-1`:
You can provide a callback for the `broadcast` channel to receive message. This example will receive any `broadcast` messages in `room-1`:

<Tabs
scrollable
Expand Down Expand Up @@ -637,3 +637,115 @@ You can also send a Broadcast message by making an HTTP request to Realtime serv
Unsupported in Python yet.
</TabPanel>
</Tabs>

## Trigger broadcast messages from your database

<Admonition type="caution">

This feature is currently in Private Alpha. The API and implementation may change. To request access, submit a Support Ticket.

</Admonition>

### How it works

Broadcast Changes allows you to trigger messages from your database. To achieve it Realtime is directly reading your WAL (Write Append Log) file using a publication against the `realtime.messages` table so whenever a new insert happens a message is sent to connected users.

It uses partitioned tables per day which allows the deletion your previous images in a performant way by dropping the physical tables of this partitioned table. Tables older than 3 days old are deleted.

Broadcasting from the database works like a client-side broadcast, using WebSockets to send JSON packages. [Realtime Authorization]/docs/guides/realtime/authorization) is required and enabled by default to protect your data.

The database broadcast feature provides two functions to help you send messages:

- `realtime.send` will insert a message into realtime.messages without a specific format.
- `realtime.broadcast_changes` will insert a message with the required fields to emit database changes to clients. This helps you set up triggers on your tables to emit changes.

### Broadcasting a message from your database

The `realtime.send` function provides the most flexibility by allowing you to broadcast messages from your database without a specific format. This allows you to use database broadcast for messages that aren't necessarily tied to the shape of a Postgres row change.

```sql
SELECT realtime.send (
to_jsonb ('{}'::text), -- JSONB Payload
'event', -- Event name
'topic', -- Topic
FALSE -- Public / Private flag
);
```

### Broadcast record changes

#### Setup realtime authorization

Realtime Authorization is required and enabled by default. To allow your users to listen to messages from topics, create a RLS (Row Level Security) policy:

```sql
CREATE POLICY "authenticated can receive broadcasts"
ON "realtime"."messages"
FOR SELECT
TO authenticated
USING ( true );

```

See the [Realtime Authorization](/docs/guides/realtime/authorization) docs to learn how to set up more specific policies.

#### Set up trigger function

First, set up a trigger function that uses `realtime.broadcast_changes` to insert an event whenever it is triggered. The event is set up to include data on the schema, table, operation, and field changes that triggered it.

For this example use case, we want to have a topic with the name `topic:<record id>` to which we're going to broadcast events.

```sql
CREATE OR REPLACE FUNCTION public.your_table_changes() RETURNS trigger AS $$
BEGIN
PERFORM realtime.broadcast_changes(
'topic:' || NEW.id::text, -- topic
TG_OP, -- event
TG_OP, -- operation
TG_TABLE_NAME, -- table
TG_TABLE_SCHEMA, -- schema
NEW, -- new record
OLD -- old record
);
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
```

Of note are the Postgres native trigger special variables used:

- `TG_OP` - the operation that triggered the function
- `TG_TABLE_NAME` - the table that caused the trigger
- `TG_TABLE_SCHEMA` - the schema of the table that caused the trigger invocation
- `NEW` - the record after the change
- `OLD` - the record before the change

You can read more about them in this [guide](https://www.postgresql.org/docs/current/plpgsql-trigger.html#PLPGSQL-DML-TRIGGER).

#### Set up trigger

Next, set up a trigger so the function runs whenever your target table has a change.

```sql
CREATE TRIGGER broadcast_changes_for_your_table_trigger
AFTER INSERT OR UPDATE OR DELETE ON public.your_table
FOR EACH ROW
EXECUTE FUNCTION your_table_changes ();
```

As you can see, it will be broadcasting all operations so our users will receive events when records are inserted, updated or deleted from `public.your_table` .

#### Listen on client side

Finally, client side will requires to be set up to listen to the topic `topic:<record id>` to receive the events.

```jsx
const gameId = 'id'
await supabase.realtime.setAuth() // Needed for Realtime Authorization
const changes = supabase
.channel(`topic:${gameId}`)
.on('broadcast', { event: 'INSERT' }, (payload) => console.log(payload))
.on('broadcast', { event: 'UPDATE' }, (payload) => console.log(payload))
.on('broadcast', { event: 'DELETE' }, (payload) => console.log(payload))
.subscribe()
```
118 changes: 118 additions & 0 deletions apps/docs/content/guides/realtime/migrate-from-postgres-changes.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
---
title: 'Migrate to Broadcast Changes'
subtitle: 'How to migrate from Postgres Changes to Broadcast Changes'
description: 'How to migrate from Postgres Changes to Broadcast Changes'
sidebar_label: 'Migrate to Broadcast Changes'
---

Postgres Changes has some [limitations](/docs/guides/realtime/postgres-changes#limitations) as your application scales. To continue broadcasting database changes to users as you scale, you can use Broadcast Changes.

## Example application using Postgres Changes

Here we have a simple chess application that has a game id and we want to track whenever we have new moves happening for a given game id.

We store this information in a `public.moves` table and every time a new move is added to a given `game_id` we want to receive the changes in our connected Realtime client

<Image
alt="Schema used for our example"
src={{
light:
'/docs/img/guides/realtime/realtime-broadcast-changes-migration-schema-example-light.png',
dark: '/docs/img/guides/realtime/realtime-broadcast-changes-migration-schema-example-dark.png',
}}
/>

In our client we will have our implementation to receive insert events with the usual code:

```javascript
const gameId = '4a8bbe89-f601-4414-bd47-8d0f7ab2a31a'
const changes = supabase
.channel('chess-moves')
.on(
'postgres_changes',
{
event: 'INSERT',
schema: 'public',
table: 'moves',
filter: `game_id=eq.${gameId}`,
},
(payload) => console.log(payload)
)
.subscribe()
...
```

## Migrate to broadcast changes

To use Broadcast Changes, first familiarize yourself with the [Broadcast Changes implementation](/docs/guides/realtime/broadcast#trigger-broadcast-messages-from-your-database).

### Set up authorization

Broadcast Changes is private by default, using [Realtime Authorization](/docs/guides/realtime/authorization) to control access. First, set up RLS policies to control user access to relevant messages:

```sql
CREATE POLICY "authenticated can listen to game moves"
ON "realtime"."messages"
FOR SELECT
TO authenticated
USING (
EXISTS (
SELECT 1
FROM game_users
WHERE (SELECT auth.uid()) = user_id
AND (select realtime.topic()) = 'games:' || game_id::text
AND realtime.messages.extension = 'broadcast'
)
);
```

### Set up trigger function

We need to define our trigger function to adapt to our use case and use the provided function `realtime.broadcast_changes`

```sql
CREATE OR REPLACE FUNCTION public.broadcast_moves() RETURNS trigger AS $$
BEGIN
PERFORM realtime.broadcast_changes(
'games:' || NEW.game_id::text, -- topic
TG_OP, -- event
TG_OP, -- operation
TG_TABLE_NAME, -- table
TG_TABLE_SCHEMA, -- schema
NEW, -- new record
OLD -- old record
);
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
```

### Setup trigger with created function

Now we need to setup our trigger to capture the events we want

```sql
CREATE TRIGGER chess_move_changes
AFTER INSERT ON public.moves
FOR EACH ROW
EXECUTE FUNCTION public.broadcast_moves();
```

### **Listen to changes in client**

Finally you can setup your client to listen for your events

```js
const gameId = '4a8bbe89-f601-4414-bd47-8d0f7ab2a31a'
await supabase.realtime.setAuth() // Needed for Realtime Authorization
const changes = supabase
.channel(`games:${gameId}`)
.on(
'broadcast',
{
event: 'INSERT',
},
(payload) => console.log(payload)
)
.subscribe()
```
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ const RestoreToNewProject = () => {
PermissionAction.INFRA_EXECUTE,
'queue_job.restore.prepare'
)
const hasPITREnabled = cloneBackups?.pitr_enabled
const PITR_ENABLED = cloneBackups?.pitr_enabled

const dbVersion = getDatabaseMajorVersion(project?.dbVersion ?? '')
const IS_PG15_OR_ABOVE = dbVersion >= 15
Expand All @@ -100,9 +100,10 @@ const RestoreToNewProject = () => {
refetchInterval,
refetchOnWindowFocus: false,
onSuccess: (data) => {
const hasTransientState = data.clones.some((c) => c.status === 'IN_PROGRESS')
const hasTransientState = data?.clones.some((c) => c.status === 'IN_PROGRESS')
if (!hasTransientState) setRefetchInterval(false)
},
enabled: PHYSICAL_BACKUPS_ENABLED || PITR_ENABLED,
}
)
const IS_CLONED_PROJECT = (cloneStatus?.cloned_from?.source_project as any)?.ref ? true : false
Expand Down Expand Up @@ -179,6 +180,16 @@ const RestoreToNewProject = () => {
}
}

if (isFreePlan) {
return (
<UpgradeToPro
buttonText="Upgrade"
primaryText="Restore to a new project requires a pro plan or above."
secondaryText="To restore to a new project, you need to upgrade to a Pro plan and have physical backups enabled."
/>
)
}

if (isOrioleDb) {
return (
<Admonition
Expand All @@ -191,10 +202,6 @@ const RestoreToNewProject = () => {
)
}

if (isLoading) {
return <GenericSkeletonLoader />
}

if (!canReadPhysicalBackups) {
return <NoPermission resourceText="view backups" />
}
Expand Down Expand Up @@ -243,14 +250,8 @@ const RestoreToNewProject = () => {
)
}

if (plan === 'free') {
return (
<UpgradeToPro
buttonText="Upgrade"
primaryText="Restore to a new project requires a pro plan or above."
secondaryText="To restore to a new project, you need to upgrade to a Pro plan and have physical backups enabled."
/>
)
if (isLoading) {
return <GenericSkeletonLoader />
}

if (IS_CLONED_PROJECT) {
Expand Down Expand Up @@ -286,7 +287,7 @@ const RestoreToNewProject = () => {

if (
!isLoading &&
hasPITREnabled &&
PITR_ENABLED &&
!cloneBackups?.physicalBackupData.earliestPhysicalBackupDateUnix
) {
return (
Expand All @@ -298,7 +299,7 @@ const RestoreToNewProject = () => {
)
}

if (!isLoading && !hasPITREnabled && cloneBackups?.backups.length === 0) {
if (!isLoading && !PITR_ENABLED && cloneBackups?.backups.length === 0) {
return (
<>
<Admonition
Expand Down Expand Up @@ -354,7 +355,7 @@ const RestoreToNewProject = () => {
</Panel>
</div>
) : null}
{hasPITREnabled ? (
{PITR_ENABLED ? (
<>
<PITRForm
disabled={isRestoring}
Expand Down

0 comments on commit 939117a

Please sign in to comment.