Source Code: https://github.com/supabase/walrus
Write Ahead Log Realtime Unified Security (WALRUS) is a utility for managing realtime subscriptions to tables and applying row level security rules to those subscriptions.
The subscription stream is based on logical replication slots.
User subscriptions are managed through a table
create table realtime.subscription (
id bigint generated always as identity primary key,
subscription_id uuid not null,
entity regclass not null,
filters realtime.user_defined_filter[] not null default '{}',
claims jsonb not null,
claims_role regrole not null generated always as (realtime.to_regrole(claims ->> 'role')) stored,
created_at timestamp not null default timezone('utc', now()),
unique (subscription_id, entity, filters)
);
where realtime.user_defined_filter
is
create type realtime.user_defined_filter as (
column_name text,
op realtime.equality_op,
value text
);
and realtime.equality_op
s are a subset of postgrest ops. Specifically:
create type realtime.equality_op as enum(
'eq', 'neq', 'lt', 'lte', 'gt', 'gte', 'in'
);
For example, to subscribe to a table named public.notes
where the id
is 6
as the authenticated
role:
insert into realtime.subscription(subscription_id, entity, filters, claims)
values ('832bd278-dac7-4bef-96be-e21c8a0023c4', 'public.notes', array[('id', 'eq', '6')], '{"role", "authenticated"}');
This package exposes 1 public SQL function realtime.apply_rls(jsonb)
. It processes the output of a wal2json
decoded logical replication slot and returns:
wal
: (jsonb) The WAL record as JSONB in the formis_rls_enabled
: (bool) If the entity (table) the WAL record represents has row level security enabledsubscription_ids
: (uuid[]) An array subscription ids that should be notified about the WAL recorderrors
: (text[]) An array of errors
The jsonb WAL record is in the following format for inserts.
{
"type": "INSERT",
"schema": "public",
"table": "todos",
"columns": [
{
"name": "id",
"type": "int8",
},
{
"name": "details",
"type": "text",
},
{
"name": "user_id",
"type": "int8",
}
],
"commit_timestamp": "2021-09-29T17:35:38Z",
"record": {
"id": 1,
"user_id": 1,
"details": "mow the lawn"
}
}
updates:
{
"type": "UPDATE",
"schema": "public",
"table": "todos",
"columns": [
{
"name": "id",
"type": "int8",
},
{
"name": "details",
"type": "text",
},
{
"name": "user_id",
"type": "int8",
}
],
"commit_timestamp": "2021-09-29T17:35:38Z",
"record": {
"id": 2,
"user_id": 1,
"details": "mow the lawn"
},
"old_record": {
"id": 1,
}
}
deletes:
{
"type": "DELETE",
"schema": "public",
"table": "todos",
"columns": [
{
"name": "id",
"type": "int8",
},
{
"name": "details",
"type": "text",
},
{
"name": "user_id",
"type": "int8",
}
],
"old_record": {
"id": 1
}
}
Important Notes:
- Row level security is not applied to delete statements
- The key/value pairs displayed in the
old_record
field include the table's identity columns for the record being updated/deleted. To display all values inold_record
set the replica identity for the table to full - When a delete occurs, the contents of
old_record
will be broadcast to all subscribers to that table so ensure that each table's replica identity only contains information that is safe to expose publicly
If a WAL record for a table that does not have a primary key is passed through realtime.apply_rls
, an error is returned
Ex:
(
{
"type": ...,
"schema": ...,
"table": ...
}, -- wal
true, -- is_rls_enabled
[...], -- subscription_ids,
array['Error 400: Bad Request, no primary key'] -- errors
)::realtime.wal_rls;
If a WAL record is passed through realtime.apply_rls
and the subscription's clams_role
does not have permission to select
the primary key columns in that table, an Unauthorized
error is returned with no WAL data.
Ex:
(
{
"type": ...,
"schema": ...,
"table": ...
}, -- wal
true, -- is_rls_enabled
[...], -- subscription_ids,
array['Error 401: Unauthorized'] -- errors
)::realtime.wal_rls;
When the size of the wal2json record exceeds max_record_bytes
the record
and old_record
objects are filtered to include only fields with a value size <= 64 bytes. The errors
output array is set to contain the string "Error 413: Payload Too Large"
.
Ex:
(
{..., "record": {"id": 1}, "old_record": {"id": 1}}, -- wal
true, -- is_rls_enabled
[...], -- subscription_ids,
array['Error 413: Payload Too Large'] -- errors
)::realtime.wal_rls;
Each WAL record is passed into realtime.apply_rls(jsonb)
which:
- impersonates each subscribed user by setting the appropriate role and
request.jwt.claims
that RLS policies depend on - queries for the row using its primary key values
- applies the subscription's filters to check if the WAL record is filtered out
- filters out all columns that are not visible to the user's role
Given a wal2json
replication slot with the name realtime
select * from pg_create_logical_replication_slot('realtime', 'wal2json')
A complete list of config options can be found here:
The stream can be polled with
select
xyz.wal,
xyz.is_rls_enabled,
xyz.subscription_ids,
xyz.errors
from
pg_logical_slot_get_changes(
'realtime', null, null,
'include-pk', '1',
'include-transaction', 'false',
'include-timestamp', 'true',
'include-type-oids', 'true',
'write-in-chunks', 'true',
'format-version', '2',
'actions', 'insert,update,delete',
'filter-tables', 'realtime.*'
),
lateral (
select
x.wal,
x.is_rls_enabled,
x.subscription_ids,
x.errors
from
realtime.apply_rls(data::jsonb) x(wal, is_rls_enabled, subcription_ids, errors)
) xyz
where
xyz.subscription_ids[1] is not null
Or, if the stream should be filtered according to a publication:
with pub as (
select
concat_ws(
',',
case when bool_or(pubinsert) then 'insert' else null end,
case when bool_or(pubupdate) then 'update' else null end,
case when bool_or(pubdelete) then 'delete' else null end
) as w2j_actions,
coalesce(
string_agg(
realtime.quote_wal2json(format('%I.%I', schemaname, tablename)::regclass),
','
) filter (where ppt.tablename is not null and ppt.tablename not like '% %'),
''
) w2j_add_tables
from
pg_publication pp
left join pg_publication_tables ppt
on pp.pubname = ppt.pubname
where
pp.pubname = 'supabase_realtime'
group by
pp.pubname
limit 1
),
w2j as (
select
x.*, pub.w2j_add_tables
from
pub,
pg_logical_slot_get_changes(
'realtime', null, null,
'include-pk', '1',
'include-transaction', 'false',
'include-type-oids', 'true',
'include-timestamp', 'true',
'write-in-chunks', 'true',
'format-version', '2',
'actions', pub.w2j_actions,
'add-tables', pub.w2j_add_tables
) x
)
select
xyz.wal,
xyz.is_rls_enabled,
xyz.subscription_ids,
xyz.errors
from
w2j,
realtime.apply_rls(
wal := w2j.data::jsonb,
max_record_bytes := 1048576
) xyz(wal, is_rls_enabled, subscription_ids, errors)
where
w2j.w2j_add_tables <> ''
and xyz.subscription_ids[1] is not null
max_record_bytes
(default 1 MiB): Controls the maximum size of a WAL record that will be emitted with complete record
and old_record
data. When the size of the wal2json record exceeds max_record_bytes
the record
and old_record
objects are filtered to include only fields with a value size <= 64 bytes. The errors
output array is set to contain the string "Error 413: Payload Too Large"
.
Ex:
realtime.apply_rls(wal := w2j.data::jsonb, max_record_bytes := 1024*1024) x(wal, is_rls_enabled, subscription_ids, errors)
The project is SQL only and can be installed by executing the contents of sql/walrus--0.1.sql
in a database instance.
Requires
- Postgres 13+
- wal2json >= 53b548a29ebd6119323b6eb2f6013d7c5fe807ec
On a Mac:
Install postgres
brew install postgres
Install wal2json
git clone https://github.com/eulerto/wal2json.git
cd wal2json
git reset --hard 53b548a
make
make install
Run the tests, from the repo root.
./bin/installcheck
To open an request for comment (RFC), open a github issue against this repo and select the RFC template.