dbt-oracle-snapshot_issue
167 lines
{% macro oracle__snapshot_staging_table(strategy, source_sql, target_relation) -%}
{% macro oracle__snapshot_staging_table(strategy, source_sql, target_relation) -%}
{% set columns = config.get('snapshot_table_column_names') or get_snapshot_table_column_names() %}
{% set columns = config.get('snapshot_table_column_names') or get_snapshot_table_column_names() %}
{% if strategy.hard_deletes == 'new_record' %}
{% if strategy.hard_deletes == 'new_record' %}
{% set new_scd_id = snapshot_hash_arguments([columns.dbt_scd_id, snapshot_get_time()]) %}
{% set new_scd_id = snapshot_hash_arguments([columns.dbt_scd_id, snapshot_get_time()]) %}
{% endif %}
{% endif %}
with snapshot_query as (
with snapshot_query as (
{{ source_sql }}
{{ source_sql }}
),
),
snapshotted_data as (
snapshotted_data as (
select {{ target_relation }}.*,
select {{ target_relation }}.*,
{{ unique_key_fields(strategy.unique_key) }}
{{ unique_key_fields(strategy.unique_key) }}
from {{ target_relation }}
from {{ target_relation }}
where
where
{% if config.get('dbt_valid_to_current') %}
{% if config.get('dbt_valid_to_current') %}
{% set source_unique_key = columns.dbt_valid_to | trim %}
{% set source_unique_key = columns.dbt_valid_to | trim %}
{% set target_unique_key = config.get('dbt_valid_to_current') | trim %}
{% set target_unique_key = config.get('dbt_valid_to_current') | trim %}
( {{ equals(source_unique_key, target_unique_key) }} or {{ source_unique_key }} is null )
( {{ equals(source_unique_key, target_unique_key) }} or {{ source_unique_key }} is null )
{% else %}
{% else %}
{{ columns.dbt_valid_to }} is null
{{ columns.dbt_valid_to }} is null
{% endif %}
{% endif %}
),
),
insertions_source_data as (
insertions_source_data as (
select
select
snapshot_query.*,
snapshot_query.*,
{{ unique_key_fields(strategy.unique_key) }},
{{ unique_key_fields(strategy.unique_key) }},
{{ strategy.updated_at }} as {{ columns.dbt_updated_at }},
{{ strategy.updated_at }} as {{ columns.dbt_updated_at }},
{{ strategy.updated_at }} as {{ columns.dbt_valid_from }},
{{ strategy.updated_at }} as {{ columns.dbt_valid_from }},
{{ oracle__get_dbt_valid_to_current(strategy, columns) }},
{{ oracle__get_dbt_valid_to_current(strategy, columns) }},
{{ strategy.scd_id }} as {{ columns.dbt_scd_id }}
{{ strategy.scd_id }} as {{ columns.dbt_scd_id }}
from snapshot_query
from snapshot_query
),
),
updates_source_data as (
updates_source_data as (
select
select
snapshot_query.*,
snapshot_query.*,
{{ unique_key_fields(strategy.unique_key) }},
{{ unique_key_fields(strategy.unique_key) }},
{{ strategy.updated_at }} as {{ columns.dbt_updated_at }},
{{ strategy.updated_at }} as {{ columns.dbt_updated_at }},
{{ strategy.updated_at }} as {{ columns.dbt_valid_from }},
{{ strategy.updated_at }} as {{ columns.dbt_valid_from }},
{{ strategy.updated_at }} as {{ columns.dbt_valid_to }}
{{ strategy.updated_at }} as {{ columns.dbt_valid_to }}
from snapshot_query
from snapshot_query
),
),
{%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %}
{%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %}
deletes_source_data as (
deletes_source_data as (
select
select
snapshot_query.*,
snapshot_query.*,
{{ unique_key_fields(strategy.unique_key) }}
{{ unique_key_fields(strategy.unique_key) }}
from snapshot_query
from snapshot_query
),
),
{% endif %}
{% endif %}
insertions as (
insertions as (
select
select
'insert' as dbt_change_type,
'insert' as dbt_change_type,
source_data.*
source_data.*
{%- if strategy.hard_deletes == 'new_record' -%}
{%- if strategy.hard_deletes == 'new_record' -%}
,'False' as {{ columns.dbt_is_deleted }}
,'False' as {{ columns.dbt_is_deleted }}
{%- endif %}
{%- endif %}
from insertions_source_data source_data
from insertions_source_data source_data
left outer join snapshotted_data
left outer join snapshotted_data
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
where {{ unique_key_is_null(strategy.unique_key, "snapshotted_data") }}
where {{ unique_key_is_null(strategy.unique_key, "snapshotted_data") }}
or ({{ unique_key_is_not_null(strategy.unique_key, "snapshotted_data") }} and ({{ strategy.row_changed }})
or ({{ unique_key_is_not_null(strategy.unique_key, "snapshotted_data") }} and ({{ strategy.row_changed }})
{%- if strategy.hard_deletes == 'new_record' -%}
or ({{ unique_key_is_not_null(strategy.unique_key, "snapshotted_data") }} and snapshotted_data.{{ columns.dbt_is_deleted }} = 'True')
{%- endif %}
)
)
),
),
updates as (
updates as (
select
select
'update' as dbt_change_type,
'update' as dbt_change_type,
source_data.*,
source_data.*,
snapshotted_data.{{ columns.dbt_scd_id }}
snapshotted_data.{{ columns.dbt_scd_id }}
{%- if strategy.hard_deletes == 'new_record' -%}
{%- if strategy.hard_deletes == 'new_record' -%}
, snapshotted_data.{{ columns.dbt_is_deleted }}
, snapshotted_data.{{ columns.dbt_is_deleted }}
{%- endif %}
{%- endif %}
from updates_source_data source_data
from updates_source_data source_data
join snapshotted_data
join snapshotted_data
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
where (
where (
{{ strategy.row_changed }}
{{ strategy.row_changed }}
)
)
{%- if strategy.hard_deletes == 'new_record' -%}
or snapshotted_data.{{ columns.dbt_is_deleted }} = 'True'
{%- endif %}
)
)
{%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' -%}
{%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' -%}
,
,
deletes as (
deletes as (
select
select
'delete' as dbt_change_type,
'delete' as dbt_change_type,
source_data.*,
source_data.*,
{{ snapshot_get_time() }} as {{ columns.dbt_valid_from }},
{{ snapshot_get_time() }} as {{ columns.dbt_valid_from }},
{{ snapshot_get_time() }} as {{ columns.dbt_updated_at }},
{{ snapshot_get_time() }} as {{ columns.dbt_updated_at }},
{{ snapshot_get_time() }} as {{ columns.dbt_valid_to }},
{{ snapshot_get_time() }} as {{ columns.dbt_valid_to }},
snapshotted_data.{{ columns.dbt_scd_id }}
snapshotted_data.{{ columns.dbt_scd_id }}
{%- if strategy.hard_deletes == 'new_record' -%}
{%- if strategy.hard_deletes == 'new_record' -%}
, snapshotted_data.{{ columns.dbt_is_deleted }}
, snapshotted_data.{{ columns.dbt_is_deleted }}
{%- endif %}
{%- endif %}
from snapshotted_data
from snapshotted_data
left join deletes_source_data source_data
left join deletes_source_data source_data
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
where {{ unique_key_is_null(strategy.unique_key, "source_data") }}
where {{ unique_key_is_null(strategy.unique_key, "source_data") }}
{%- if strategy.hard_deletes == 'new_record' %}
and not (
--avoid updating the record's valid_to if the latest entry is marked as deleted
snapshotted_data.{{ columns.dbt_is_deleted }} = 'True'
and
{% if config.get('dbt_valid_to_current') %}
{% set source_unique_key = columns.dbt_valid_to | trim %}
{% set target_unique_key = config.get('dbt_valid_to_current') | trim %}
( {{ equals(source_unique_key, target_unique_key) }} or {{ source_unique_key }} is null )
{% else %}
{{ columns.dbt_valid_to }} is null
{% endif %}
)
{%- endif %}
)
)
{%- endif %}
{%- endif %}
{%- if strategy.hard_deletes == 'new_record' %}
{%- if strategy.hard_deletes == 'new_record' %}
{% set source_sql_cols = get_column_schema_from_query(source_sql) %}
{% set source_sql_cols = get_column_schema_from_query(source_sql) %}
,
,
deletion_records as (
deletion_records as (
select
select
'insert' as dbt_change_type,
'insert' as dbt_change_type,
{%- for col in source_sql_cols -%}
{%- for col in source_sql_cols -%}
snapshotted_data.{{ adapter.quote(col.column) }},
snapshotted_data.{{ adapter.quote(col.column) }},
{% endfor -%}
{% endfor -%}
{%- if strategy.unique_key | is_list -%}
{%- if strategy.unique_key | is_list -%}
{%- for key in strategy.unique_key -%}
{%- for key in strategy.unique_key -%}
snapshotted_data.{{ key }} as dbt_unique_key_{{ loop.index }},
snapshotted_data.{{ key }} as dbt_unique_key_{{ loop.index }},
{% endfor -%}
{% endfor -%}
{%- else -%}
{%- else -%}
snapshotted_data.dbt_unique_key as dbt_unique_key,
snapshotted_data.dbt_unique_key as dbt_unique_key,
{% endif -%}
{% endif -%}
{{ snapshot_get_time() }} as {{ columns.dbt_valid_from }},
{{ snapshot_get_time() }} as {{ columns.dbt_valid_from }},
{{ snapshot_get_time() }} as {{ columns.dbt_updated_at }},
{{ snapshot_get_time() }} as {{ columns.dbt_updated_at }},
snapshotted_data.{{ columns.dbt_valid_to }} as {{ columns.dbt_valid_to }},
snapshotted_data.{{ columns.dbt_valid_to }} as {{ columns.dbt_valid_to }},
{{ new_scd_id }} as {{ columns.dbt_scd_id }},
{{ new_scd_id }} as {{ columns.dbt_scd_id }},
'True' as {{ columns.dbt_is_deleted }}
'True' as {{ columns.dbt_is_deleted }}
from snapshotted_data
from snapshotted_data
left join deletes_source_data source_data
left join deletes_source_data source_data
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
where {{ unique_key_is_null(strategy.unique_key, "source_data") }}
where {{ unique_key_is_null(strategy.unique_key, "source_data") }}
and not (
--avoid inserting a new record if the latest one is marked as deleted
snapshotted_data.{{ columns.dbt_is_deleted }} = 'True'
and
{% if config.get('dbt_valid_to_current') %}
{% set source_unique_key = columns.dbt_valid_to | trim %}
{% set target_unique_key = config.get('dbt_valid_to_current') | trim %}
( {{ equals(source_unique_key, target_unique_key) }} or {{ source_unique_key }} is null )
{% else %}
{{ columns.dbt_valid_to }} is null
{% endif %}
)
)
)
{%- endif %}
{%- endif %}
select * from insertions
select * from insertions
union all
union all
select * from updates
select * from updates
{%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %}
{%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %}
union all
union all
select * from deletes
select * from deletes
{%- endif %}
{%- endif %}
{%- if strategy.hard_deletes == 'new_record' %}
{%- if strategy.hard_deletes == 'new_record' %}
union all
union all
select * from deletion_records
select * from deletion_records
{%- endif %}
{%- endif %}
{%- endmacro %}
{%- endmacro %}