dbt-oracle-snapshot_issue

Created Diff never expires
0 removals
Lines
Total
Removed
Words
Total
Removed
To continue using this feature, upgrade to
Diffchecker logo
Diffchecker Pro
167 lines
35 additions
Lines
Total
Added
Words
Total
Added
To continue using this feature, upgrade to
Diffchecker logo
Diffchecker Pro
201 lines
{% macro oracle__snapshot_staging_table(strategy, source_sql, target_relation) -%}
{% macro oracle__snapshot_staging_table(strategy, source_sql, target_relation) -%}


{% set columns = config.get('snapshot_table_column_names') or get_snapshot_table_column_names() %}
{% set columns = config.get('snapshot_table_column_names') or get_snapshot_table_column_names() %}
{% if strategy.hard_deletes == 'new_record' %}
{% if strategy.hard_deletes == 'new_record' %}
{% set new_scd_id = snapshot_hash_arguments([columns.dbt_scd_id, snapshot_get_time()]) %}
{% set new_scd_id = snapshot_hash_arguments([columns.dbt_scd_id, snapshot_get_time()]) %}
{% endif %}
{% endif %}


with snapshot_query as (
with snapshot_query as (


{{ source_sql }}
{{ source_sql }}


),
),


snapshotted_data as (
snapshotted_data as (


select {{ target_relation }}.*,
select {{ target_relation }}.*,
{{ unique_key_fields(strategy.unique_key) }}
{{ unique_key_fields(strategy.unique_key) }}
from {{ target_relation }}
from {{ target_relation }}
where
where
{% if config.get('dbt_valid_to_current') %}
{% if config.get('dbt_valid_to_current') %}
{% set source_unique_key = columns.dbt_valid_to | trim %}
{% set source_unique_key = columns.dbt_valid_to | trim %}
{% set target_unique_key = config.get('dbt_valid_to_current') | trim %}
{% set target_unique_key = config.get('dbt_valid_to_current') | trim %}
( {{ equals(source_unique_key, target_unique_key) }} or {{ source_unique_key }} is null )
( {{ equals(source_unique_key, target_unique_key) }} or {{ source_unique_key }} is null )
{% else %}
{% else %}
{{ columns.dbt_valid_to }} is null
{{ columns.dbt_valid_to }} is null
{% endif %}
{% endif %}
),
),


insertions_source_data as (
insertions_source_data as (


select
select
snapshot_query.*,
snapshot_query.*,
{{ unique_key_fields(strategy.unique_key) }},
{{ unique_key_fields(strategy.unique_key) }},
{{ strategy.updated_at }} as {{ columns.dbt_updated_at }},
{{ strategy.updated_at }} as {{ columns.dbt_updated_at }},
{{ strategy.updated_at }} as {{ columns.dbt_valid_from }},
{{ strategy.updated_at }} as {{ columns.dbt_valid_from }},
{{ oracle__get_dbt_valid_to_current(strategy, columns) }},
{{ oracle__get_dbt_valid_to_current(strategy, columns) }},
{{ strategy.scd_id }} as {{ columns.dbt_scd_id }}
{{ strategy.scd_id }} as {{ columns.dbt_scd_id }}


from snapshot_query
from snapshot_query
),
),


updates_source_data as (
updates_source_data as (


select
select
snapshot_query.*,
snapshot_query.*,
{{ unique_key_fields(strategy.unique_key) }},
{{ unique_key_fields(strategy.unique_key) }},
{{ strategy.updated_at }} as {{ columns.dbt_updated_at }},
{{ strategy.updated_at }} as {{ columns.dbt_updated_at }},
{{ strategy.updated_at }} as {{ columns.dbt_valid_from }},
{{ strategy.updated_at }} as {{ columns.dbt_valid_from }},
{{ strategy.updated_at }} as {{ columns.dbt_valid_to }}
{{ strategy.updated_at }} as {{ columns.dbt_valid_to }}


from snapshot_query
from snapshot_query
),
),


{%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %}
{%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %}


deletes_source_data as (
deletes_source_data as (


select
select
snapshot_query.*,
snapshot_query.*,
{{ unique_key_fields(strategy.unique_key) }}
{{ unique_key_fields(strategy.unique_key) }}
from snapshot_query
from snapshot_query
),
),
{% endif %}
{% endif %}


insertions as (
insertions as (


select
select
'insert' as dbt_change_type,
'insert' as dbt_change_type,
source_data.*
source_data.*
{%- if strategy.hard_deletes == 'new_record' -%}
{%- if strategy.hard_deletes == 'new_record' -%}
,'False' as {{ columns.dbt_is_deleted }}
,'False' as {{ columns.dbt_is_deleted }}
{%- endif %}
{%- endif %}


from insertions_source_data source_data
from insertions_source_data source_data
left outer join snapshotted_data
left outer join snapshotted_data
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
where {{ unique_key_is_null(strategy.unique_key, "snapshotted_data") }}
where {{ unique_key_is_null(strategy.unique_key, "snapshotted_data") }}
or ({{ unique_key_is_not_null(strategy.unique_key, "snapshotted_data") }} and ({{ strategy.row_changed }})
or ({{ unique_key_is_not_null(strategy.unique_key, "snapshotted_data") }} and ({{ strategy.row_changed }})
{%- if strategy.hard_deletes == 'new_record' -%}
or ({{ unique_key_is_not_null(strategy.unique_key, "snapshotted_data") }} and snapshotted_data.{{ columns.dbt_is_deleted }} = 'True')
{%- endif %}


)
)


),
),


updates as (
updates as (


select
select
'update' as dbt_change_type,
'update' as dbt_change_type,
source_data.*,
source_data.*,
snapshotted_data.{{ columns.dbt_scd_id }}
snapshotted_data.{{ columns.dbt_scd_id }}
{%- if strategy.hard_deletes == 'new_record' -%}
{%- if strategy.hard_deletes == 'new_record' -%}
, snapshotted_data.{{ columns.dbt_is_deleted }}
, snapshotted_data.{{ columns.dbt_is_deleted }}
{%- endif %}
{%- endif %}


from updates_source_data source_data
from updates_source_data source_data
join snapshotted_data
join snapshotted_data
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
where (
where (
{{ strategy.row_changed }}
{{ strategy.row_changed }}
)
)
{%- if strategy.hard_deletes == 'new_record' -%}
or snapshotted_data.{{ columns.dbt_is_deleted }} = 'True'
{%- endif %}
)
)


{%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' -%}
{%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' -%}
,
,


deletes as (
deletes as (


select
select
'delete' as dbt_change_type,
'delete' as dbt_change_type,
source_data.*,
source_data.*,
{{ snapshot_get_time() }} as {{ columns.dbt_valid_from }},
{{ snapshot_get_time() }} as {{ columns.dbt_valid_from }},
{{ snapshot_get_time() }} as {{ columns.dbt_updated_at }},
{{ snapshot_get_time() }} as {{ columns.dbt_updated_at }},
{{ snapshot_get_time() }} as {{ columns.dbt_valid_to }},
{{ snapshot_get_time() }} as {{ columns.dbt_valid_to }},
snapshotted_data.{{ columns.dbt_scd_id }}
snapshotted_data.{{ columns.dbt_scd_id }}
{%- if strategy.hard_deletes == 'new_record' -%}
{%- if strategy.hard_deletes == 'new_record' -%}
, snapshotted_data.{{ columns.dbt_is_deleted }}
, snapshotted_data.{{ columns.dbt_is_deleted }}
{%- endif %}
{%- endif %}


from snapshotted_data
from snapshotted_data
left join deletes_source_data source_data
left join deletes_source_data source_data
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
where {{ unique_key_is_null(strategy.unique_key, "source_data") }}
where {{ unique_key_is_null(strategy.unique_key, "source_data") }}
{%- if strategy.hard_deletes == 'new_record' %}
and not (
--avoid updating the record's valid_to if the latest entry is marked as deleted
snapshotted_data.{{ columns.dbt_is_deleted }} = 'True'
and
{% if config.get('dbt_valid_to_current') %}
{% set source_unique_key = columns.dbt_valid_to | trim %}
{% set target_unique_key = config.get('dbt_valid_to_current') | trim %}
( {{ equals(source_unique_key, target_unique_key) }} or {{ source_unique_key }} is null )
{% else %}
{{ columns.dbt_valid_to }} is null
{% endif %}
)
{%- endif %}

)
)
{%- endif %}
{%- endif %}


{%- if strategy.hard_deletes == 'new_record' %}
{%- if strategy.hard_deletes == 'new_record' %}
{% set source_sql_cols = get_column_schema_from_query(source_sql) %}
{% set source_sql_cols = get_column_schema_from_query(source_sql) %}
,
,
deletion_records as (
deletion_records as (


select
select
'insert' as dbt_change_type,
'insert' as dbt_change_type,
{%- for col in source_sql_cols -%}
{%- for col in source_sql_cols -%}
snapshotted_data.{{ adapter.quote(col.column) }},
snapshotted_data.{{ adapter.quote(col.column) }},
{% endfor -%}
{% endfor -%}
{%- if strategy.unique_key | is_list -%}
{%- if strategy.unique_key | is_list -%}
{%- for key in strategy.unique_key -%}
{%- for key in strategy.unique_key -%}
snapshotted_data.{{ key }} as dbt_unique_key_{{ loop.index }},
snapshotted_data.{{ key }} as dbt_unique_key_{{ loop.index }},
{% endfor -%}
{% endfor -%}
{%- else -%}
{%- else -%}
snapshotted_data.dbt_unique_key as dbt_unique_key,
snapshotted_data.dbt_unique_key as dbt_unique_key,
{% endif -%}
{% endif -%}
{{ snapshot_get_time() }} as {{ columns.dbt_valid_from }},
{{ snapshot_get_time() }} as {{ columns.dbt_valid_from }},
{{ snapshot_get_time() }} as {{ columns.dbt_updated_at }},
{{ snapshot_get_time() }} as {{ columns.dbt_updated_at }},
snapshotted_data.{{ columns.dbt_valid_to }} as {{ columns.dbt_valid_to }},
snapshotted_data.{{ columns.dbt_valid_to }} as {{ columns.dbt_valid_to }},
{{ new_scd_id }} as {{ columns.dbt_scd_id }},
{{ new_scd_id }} as {{ columns.dbt_scd_id }},
'True' as {{ columns.dbt_is_deleted }}
'True' as {{ columns.dbt_is_deleted }}
from snapshotted_data
from snapshotted_data
left join deletes_source_data source_data
left join deletes_source_data source_data
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
where {{ unique_key_is_null(strategy.unique_key, "source_data") }}
where {{ unique_key_is_null(strategy.unique_key, "source_data") }}
and not (
--avoid inserting a new record if the latest one is marked as deleted
snapshotted_data.{{ columns.dbt_is_deleted }} = 'True'
and
{% if config.get('dbt_valid_to_current') %}
{% set source_unique_key = columns.dbt_valid_to | trim %}
{% set target_unique_key = config.get('dbt_valid_to_current') | trim %}
( {{ equals(source_unique_key, target_unique_key) }} or {{ source_unique_key }} is null )
{% else %}
{{ columns.dbt_valid_to }} is null
{% endif %}
)


)
)
{%- endif %}
{%- endif %}


select * from insertions
select * from insertions
union all
union all
select * from updates
select * from updates
{%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %}
{%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %}
union all
union all
select * from deletes
select * from deletes
{%- endif %}
{%- endif %}
{%- if strategy.hard_deletes == 'new_record' %}
{%- if strategy.hard_deletes == 'new_record' %}
union all
union all
select * from deletion_records
select * from deletion_records
{%- endif %}
{%- endif %}


{%- endmacro %}
{%- endmacro %}