Skip to content

Commit

Permalink
fixed logic for snapshot and added tests
Browse files Browse the repository at this point in the history
  • Loading branch information
cody-scott committed Sep 4, 2024
1 parent c2cf437 commit b7a1af7
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

{% set temp_snapshot_relation_sql = model['compiled_code'].replace("'", "''") %}
{% call statement('create temp_snapshot_relation') %}
USE [{{ model.database}}];
EXEC('DROP VIEW IF EXISTS {{ temp_snapshot_relation.include(database=False) }};');
EXEC('create view {{ temp_snapshot_relation.include(database=False) }} as {{ temp_snapshot_relation_sql }};');
{% endcall %}
Expand Down
1 change: 1 addition & 0 deletions dbt/include/sqlserver/macros/relations/table/create.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
{%- set tmp_relation = relation.incorporate(path={"identifier": relation.identifier ~ '__dbt_tmp_vw'}, type='view') -%}

{%- do adapter.drop_relation(tmp_relation) -%}
USE [{{ relation.database }}];
{{ get_create_view_as_sql(tmp_relation, sql) }}

{%- set table_name -%}
Expand Down
2 changes: 1 addition & 1 deletion dbt/include/sqlserver/macros/relations/views/create.sql
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
{% set tst %}
SELECT '1' as col
{% endset %}

USE [{{ relation.database }}];
EXEC('{{- escape_single_quotes(query) -}}')

{% endmacro %}
1 change: 1 addition & 0 deletions devops/scripts/init_db.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ fi
for i in {1..50};
do
/opt/mssql-tools/bin/sqlcmd -C -S localhost -U sa -P "${SA_PASSWORD}" -d master -I -Q "CREATE DATABASE TestDB COLLATE ${COLLATION}"
/opt/mssql-tools/bin/sqlcmd -C -S localhost -U sa -P "${SA_PASSWORD}" -d master -I -Q "CREATE DATABASE TestDB_Secondary COLLATE ${COLLATION}"
if [ $? -eq 0 ]
then
echo "database creation completed"
Expand Down
141 changes: 141 additions & 0 deletions tests/functional/adapter/mssql/test_cross_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import pytest
from dbt.tests.util import get_connection, run_dbt

snapshot_sql = """
{% snapshot claims_snapshot %}
{{
config(
target_database='TestDB_Secondary',
target_schema='dbo',
unique_key='id',
strategy='timestamp',
updated_at='updated_at',
)
}}
select * from {{source('mysource', 'claims')}}
{% endsnapshot %}
"""

source_csv = """id,updated_date
1,2024-01-01
2,2024-01-01
3,2024-01-01
"""

sources_yml = """
version: 2
sources:
- name: mysource
database: TestDB
tables:
- name: claims
"""


class TestCrossDB:
def cleanup_primary_table(self, project):
drop_sql = "DROP TABLE IF EXISTS {database}.mysource.claims"
with get_connection(project.adapter):
project.adapter.execute(
drop_sql.format(database=project.database),
fetch=True,
)

def cleanup_snapshot_table(self, project):
drop_sql = "DROP TABLE IF EXISTS TestDB_Secondary.dbo.claims_snapshot"
with get_connection(project.adapter):
project.adapter.execute(
drop_sql,
fetch=True,
)

def create_source_schema(self, project):
create_sql = """
IF NOT EXISTS (SELECT * FROM sys.schemas WHERE name = 'mysource')
BEGIN
EXEC('CREATE SCHEMA mysource')
END
"""
with get_connection(project.adapter):
project.adapter.execute(
create_sql,
fetch=True,
)

def create_primary_table(self, project):
src_query = """
SELECT *
INTO
{database}.mysource.claims
FROM
(
SELECT
1 as id,
CAST('2024-01-01' as DATETIME2(6)) updated_at
UNION ALL
SELECT
2 as id,
CAST('2024-01-01' as DATETIME2(6)) updated_at
UNION ALL
SELECT
3 as id,
CAST('2024-01-01' as DATETIME2(6)) updated_at
) as src_data
"""
with get_connection(project.adapter):
project.adapter.execute(
src_query.format(database=project.database, schema=project.test_schema),
fetch=True,
)

def create_secondary_schema(self, project):
src_query = """
USE [TestDB_Secondary]
EXEC ('CREATE SCHEMA {schema}')
"""
with get_connection(project.adapter):
project.adapter.execute(
src_query.format(database=project.database, schema=project.test_schema),
fetch=True,
)

def update_primary_table(self, project):
sql = """
UPDATE [{database}].[mysource].[claims]
SET
updated_at = CAST('2024-02-01' as datetime2(6))
WHERE
id = 3
"""
with get_connection(project.adapter):
project.adapter.execute(
sql.format(database=project.database),
fetch=True,
)

@pytest.fixture(scope="class")
def models(self):
return {"sources.yml": sources_yml}

@pytest.fixture(scope="class")
def snapshots(self):
return {"claims_snapshot.sql": snapshot_sql}

def test_cross_db_snapshot(self, project):
self.cleanup_primary_table(project)
self.cleanup_snapshot_table(project)

self.create_source_schema(project)
self.create_primary_table(project)
# self.create_secondary_schema(project)
run_dbt(["snapshot"])
self.update_primary_table(project)
run_dbt(["snapshot"])

0 comments on commit b7a1af7

Please sign in to comment.