Transformations with dbt (Part 2/3)
Normalization and Custom Transformation are deprecated features. Destinations using Normalization will be replaced by Typing and Deduping. Custom Transformation will be removed on March 31. For more information, visit here.
This tutorial will describe how to integrate SQL based transformations with Airbyte syncs using specialized transformation tool: dbt.
This tutorial is the second part of the previous tutorial Transformations with SQL. Next, we'll wrap-up with a third part on submitting transformations back in Airbyte: Transformations with Airbyte.
(Example outputs are updated with Airbyte version 0.23.0-alpha from May 2021)
Transformations with dbt
The tool in charge of transformation behind the scenes is actually called dbt (Data Build Tool).
Before generating the SQL files as we've seen in the previous tutorial, Airbyte sets up a dbt Docker instance and automatically generates a dbt project for us. This is created as specified in the dbt project documentation page with the right credentials for the target destination. The dbt models are then run afterward, thanks to the dbt CLI. However, for now, let's run through working with the dbt tool.
Validate dbt project settings
Let's say we identified our workspace (as shown in the previous tutorial Transformations with SQL), and we have a workspace ID of:
NORMALIZE_WORKSPACE="5/0/"
We can verify that the dbt project is properly configured for that workspace:
#!/usr/bin/env bash
docker run --rm -i -v airbyte_workspace:/data -w /data/$NORMALIZE_WORKSPACE/normalize --network host --entrypoint /usr/local/bin/dbt airbyte/normalization debug --profiles-dir=. --project-dir=.
Example Output:
Running with dbt=0.19.1
dbt version: 0.19.1
python version: 3.8.8
python path: /usr/local/bin/python
os info: Linux-5.10.25-linuxkit-x86_64-with-glibc2.2.5
Using profiles.yml file at ./profiles.yml
Using dbt_project.yml file at /data/5/0/normalize/dbt_project.yml
Configuration:
  profiles.yml file [OK found and valid]
  dbt_project.yml file [OK found and valid]
Required dependencies:
 - git [OK found]
Connection:
  host: localhost
  port: 3000
  user: postgres
  database: postgres
  schema: quarantine
  search_path: None
  keepalives_idle: 0
  sslmode: None
  Connection test: OK connection ok
Compile and build dbt normalization models
If the previous command does not show any errors or discrepancies, it is now possible to invoke the CLI from within the docker image to trigger transformation processing:
#!/usr/bin/env bash
docker run --rm -i -v airbyte_workspace:/data -w /data/$NORMALIZE_WORKSPACE/normalize --network host --entrypoint /usr/local/bin/dbt airbyte/normalization run --profiles-dir=. --project-dir=.
Example Output:
Running with dbt=0.19.1
Found 4 models, 0 tests, 0 snapshots, 0 analyses, 364 macros, 0 operations, 0 seed files, 1 source, 0 exposures
Concurrency: 32 threads (target='prod')
1 of 1 START table model quarantine.covid_epidemiology....................................................... [RUN]
1 of 1 OK created table model quarantine.covid_epidemiology.................................................. [SELECT 35822 in 0.47s]
Finished running 1 table model in 0.74s.
Completed successfully
Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
Exporting dbt normalization project outside Airbyte
As seen in the tutorial on exploring workspace folder, it is possible to browse the normalize folder and examine further logs if an error occurs.
In particular, we can also take a look at the dbt models generated by Airbyte and export them to the local host filesystem:
#!/usr/bin/env bash
TUTORIAL_DIR="$(pwd)/tutorial/"
rm -rf $TUTORIAL_DIR/normalization-files
mkdir -p $TUTORIAL_DIR/normalization-files
docker cp airbyte-server:/tmp/workspace/$NORMALIZE_WORKSPACE/normalize/ $TUTORIAL_DIR/normalization-files
NORMALIZE_DIR=$TUTORIAL_DIR/normalization-files/normalize
cd $NORMALIZE_DIR
cat $NORMALIZE_DIR/models/generated/**/*.sql
Example Output:
{{ config(alias="covid_epidemiology_ab1", schema="_airbyte_quarantine", tags=["top-level-intermediate"]) }}
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
select
    {{ json_extract_scalar('_airbyte_data', ['key']) }} as {{ adapter.quote('key') }},
    {{ json_extract_scalar('_airbyte_data', ['date']) }} as {{ adapter.quote('date') }},
    {{ json_extract_scalar('_airbyte_data', ['new_tested']) }} as new_tested,
    {{ json_extract_scalar('_airbyte_data', ['new_deceased']) }} as new_deceased,
    {{ json_extract_scalar('_airbyte_data', ['total_tested']) }} as total_tested,
    {{ json_extract_scalar('_airbyte_data', ['new_confirmed']) }} as new_confirmed,
    {{ json_extract_scalar('_airbyte_data', ['new_recovered']) }} as new_recovered,
    {{ json_extract_scalar('_airbyte_data', ['total_deceased']) }} as total_deceased,
    {{ json_extract_scalar('_airbyte_data', ['total_confirmed']) }} as total_confirmed,
    {{ json_extract_scalar('_airbyte_data', ['total_recovered']) }} as total_recovered,
    _airbyte_emitted_at
from {{ source('quarantine', '_airbyte_raw_covid_epidemiology') }}
-- covid_epidemiology
{{ config(alias="covid_epidemiology_ab2", schema="_airbyte_quarantine", tags=["top-level-intermediate"]) }}
-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
select
    cast({{ adapter.quote('key') }} as {{ dbt_utils.type_string() }}) as {{ adapter.quote('key') }},
    cast({{ adapter.quote('date') }} as {{ dbt_utils.type_string() }}) as {{ adapter.quote('date') }},
    cast(new_tested as {{ dbt_utils.type_float() }}) as new_tested,
    cast(new_deceased as {{ dbt_utils.type_float() }}) as new_deceased,
    cast(total_tested as {{ dbt_utils.type_float() }}) as total_tested,
    cast(new_confirmed as {{ dbt_utils.type_float() }}) as new_confirmed,
    cast(new_recovered as {{ dbt_utils.type_float() }}) as new_recovered,
    cast(total_deceased as {{ dbt_utils.type_float() }}) as total_deceased,
    cast(total_confirmed as {{ dbt_utils.type_float() }}) as total_confirmed,
    cast(total_recovered as {{ dbt_utils.type_float() }}) as total_recovered,
    _airbyte_emitted_at
from {{ ref('covid_epidemiology_ab1_558') }}
-- covid_epidemiology
{{ config(alias="covid_epidemiology_ab3", schema="_airbyte_quarantine", tags=["top-level-intermediate"]) }}
-- SQL model to build a hash column based on the values of this record
select
    *,
    {{ dbt_utils.surrogate_key([
        adapter.quote('key'),
        adapter.quote('date'),
        'new_tested',
        'new_deceased',
        'total_tested',
        'new_confirmed',
        'new_recovered',
        'total_deceased',
        'total_confirmed',
        'total_recovered',
    ]) }} as _airbyte_covid_epidemiology_hashid
from {{ ref('covid_epidemiology_ab2_558') }}
-- covid_epidemiology
{{ config(alias="covid_epidemiology", schema="quarantine", tags=["top-level"]) }}
-- Final base SQL model
select
    {{ adapter.quote('key') }},
    {{ adapter.quote('date') }},
    new_tested,
    new_deceased,
    total_tested,
    new_confirmed,
    new_recovered,
    total_deceased,
    total_confirmed,
    total_recovered,
    _airbyte_emitted_at,
    _airbyte_covid_epidemiology_hashid
from {{ ref('covid_epidemiology_ab3_558') }}
-- covid_epidemiology from {{ source('quarantine', '_airbyte_raw_covid_epidemiology') }}
If you have dbt installed locally on your machine, you can then view, edit, version, customize, and run the dbt models in your project outside Airbyte syncs.
#!/usr/bin/env bash 
dbt deps --profiles-dir=$NORMALIZE_DIR --project-dir=$NORMALIZE_DIR
dbt run --profiles-dir=$NORMALIZE_DIR --project-dir=$NORMALIZE_DIR --full-refresh
Example Output:
Running with dbt=0.19.1
Installing https://github.com/fishtown-analytics/dbt-utils.git@0.6.4
  Installed from revision 0.6.4
Running with dbt=0.19.1
Found 4 models, 0 tests, 0 snapshots, 0 analyses, 364 macros, 0 operations, 0 seed files, 1 source, 0 exposures
Concurrency: 32 threads (target='prod')
1 of 1 START table model quarantine.covid_epidemiology....................................................... [RUN]
1 of 1 OK created table model quarantine.covid_epidemiology.................................................. [SELECT 35822 in 0.44s]
Finished running 1 table model in 0.63s.
Completed successfully
Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
Now, that you've exported the generated normalization models, you can edit and tweak them as necessary.
If you want to know how to push your modifications back to Airbyte and use your updated dbt project during Airbyte syncs, you can continue with the following tutorial on importing transformations into Airbyte...