dbt Cloud + Airflow → PostgreSQL → Power BI

dbt Cloud + Airflow → PostgreSQL → Power BI

Полный Data Ops-конвейер под ключ (редакция от 19 июня 2025 г.)


1. Почему именно dbt Cloud + Airflow orchestration

Big Query? Нет, Big Quality. Когда размер проекта выходит за пределы «пару SQL-скриптов в crontab», требуются процессы, а не героизм. dbt Cloud превращает SQL-модели в репозиторий кода, снабжает их CI/CT-пайплайном и обрушает сборку, если тесты красные. Но сам по себе dbt Cloud — это «трансформация-как-сервис»; его надо запускать по расписанию и по событиям. Airflow 3.0 умеет пробуждать dbt-раннер через REST API, ретраит задачи, снимает метрики и рисует Gantt-граф DAG’а. Итог: любой коммит .sql сначала проходит линтеры и тесты в облаке, затем попадает в прод — и всё под контролем оркестратора.

Чистый слой marts + «толстый» BI. PostgreSQL 16 в роли DWH живёт дешево и предсказуемо: одну и ту же таблицу можно одновременно инкрементально обновлять dbt’ом и шарить в Power BI. А в June 2025 релизе Power BI появился Enhanced Refresh History — легче ловить лаг между DAG’ом Airflow и отчётом у директора.

Типовые кейсы

Сценарий Что даёт связка
Финансовый контроллинг Airflow каждую ночь запускает refresh; dbt тестирует, что 100 % транзакций привязано к контрагенту; Postgres выдаёт mart.pl_rub; CFO открывает Power BI с утра.
Маркетинговое агентство При мерже PR на ветку main GitHub Action создаёт Run в dbt Cloud → DAG переходит в прод → менеджеры видят обновлённые KPI через 20 мин.
E-commerce Триггер Airflow по S3-событию (выгрузка заказов) запускает dbt-job orders_incremental; Power BI DirectQuery показывает горячие данные почти live.

2. Компоненты и роли

Компонент Роль Ответственный
Bitrix24 / ERP / Ads Источники сырых данных Интегратор API
Airflow 3.0 Оркестратор, SLA & ретраи Data Engineer
dbt Cloud Team/Enterprise Трансформации, CI/CD, тесты Analytics Engineer
PostgreSQL 16 Хранилище staging → marts DBA
Power BI (Desktop + Service) Дашборды, self-service BI-аналитик
(опция) Superset 5.1 Web-панели для внутренней команды BI-аналитик

3. Подготовка окружения

3.1 Устанавливаем и регистрируем

Что Где взять
Airflow 3.0.2 (Docker) Airflow Docs
dbt Cloud (Team tier) dbt Cloud Sign-up
PostgreSQL 16.3 Postgres Downloads
Power BI Desktop June 2025 Power BI Update

3.2 Мини-compose для DEV

version: "3.9"
services:
  postgres:
    image: postgres:16
    environment:
      POSTGRES_USER: etl
      POSTGRES_PASSWORD: etl_pwd
      POSTGRES_DB: dwh
    ports: ["5432:5432"]

  airflow:
    image: apache/airflow:3.0.2
    env_file: .env        # содержит AIRFLOW__... и DB creds
    volumes:
      - ./dags:/opt/airflow/dags
      - ./logs:/opt/airflow/logs
    ports: ["8080:8080"]
    depends_on: [postgres]

.env (фрагмент):

AIRFLOW_API_AUTH_BACKEND=airflow.api.auth.backend.basic_auth
DBT_CLOUD_ACCOUNT_ID=12345
DBT_CLOUD_JOB_ID=67890
DBT_CLOUD_TOKEN=shh_its_secret

3.3 Минимальные права

Аккаунт Права
etl INSERT, UPDATE на stage-схему
dbt_runner (в dbt Cloud) CREATE, DROP, ALTER, SELECT на stage, ods, mart
bi_user SELECT только на mart
Airflow → dbt Cloud API Token job:run, job:cancel, metadata:read

4. Настройка потоков данных

4.1 DAG Airflow: триггер dbt-run

from airflow.decorators import dag, task
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator
from airflow.sensors.external_task import ExternalTaskSensor
from pendulum import datetime

@dag(
    schedule="0 */2 * * *",          # каждые 2 часа
    start_date=datetime(2025, 6, 1, tz="Europe/Moscow"),
    catchup=False,
    tags=["dbt","etl"],
    max_active_runs=1
)
def etl_dbt_pipeline():

    # Шаг 1. Подтягиваем свежие выгрузки (Airbyte check / n8n batch)  
    @task
    def check_sources():
        ...
    ok_sources = check_sources()

    # Шаг 2. Запускаем dbt Cloud Job в PROD  
    run_dbt = DbtCloudRunJobOperator(
        task_id="run_dbt_cloud",
        dbt_cloud_conn_id="dbt_cloud_default",   # в Airflow Connections
        job_id="{{ var.value.DBT_CLOUD_JOB_ID }}",
        check_interval=60,
        timeout=3600
    )

    # Шаг 3. Чекаем, что Job = success  
    dbt_sensor = ExternalTaskSensor(
        task_id="wait_dbt_success",
        external_dag_id="run_dbt_cloud",  # task_id
        external_task_id="",
        allowed_states=["success"],
        poke_interval=30
    )

    ok_sources >> run_dbt >> dbt_sensor

dag = etl_dbt_pipeline()

4.2 dbt Cloud: Continuous Integration

CI Job (Environment dev):

  1. Триггер: Pull Request в GitHub.
  2. Команды: dbt depsdbt build --select state:modified+.
  3. Ни один тест — ни красный, иначе PR блокируется.
Читай также:  Stitch Data → PostgreSQL → Metabase

Prod Job (Environment prod):

  1. Запускается Airflow-оператором.
  2. dbt run --select tag:nightly (или инкрементальными моделями).
  3. dbt test --store-failures. Если тест упал, Airflow ретраит 3 раза, потом шлёт Slack.

4.3 PostgreSQL: схема и права

CREATE SCHEMA stage;
CREATE SCHEMA ods;
CREATE SCHEMA mart;
GRANT USAGE ON SCHEMA stage,ods,mart TO dbt_runner;
GRANT SELECT ON ALL TABLES IN SCHEMA mart TO bi_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA mart GRANT SELECT ON TABLES TO bi_user;

5. Трансформации и моделирование

5.1 Структура проекта dbt

dwh/
  ├─ models/
  │   ├─ stage/
  │   │   └─ stg_deal.sql
  │   ├─ ods/
  │   │   └─ ods_deal_history.sql
  │   └─ mart/
  │       ├─ fct_deal_funnel.sql
  │       └─ dim_manager.sql
  ├─ macros/
  └─ tests/

stg_deal.sql

{{ config(materialized='incremental', unique_key='id') }}

SELECT
    id::bigint,
    title,
    stage_id,
    amount::numeric(14,2),
    assigned_by,
    updated_at::timestamptz
FROM {{ source('raw','crm_deal') }}
{% if is_incremental() %}
  WHERE updated_at >= (SELECT max(updated_at) FROM {{ this }})
{% endif %}

fct_deal_funnel.sql

{{ config(materialized='table', tags=['nightly']) }}

SELECT
    date_trunc('month', updated_at)::date AS month,
    stage_id,
    COUNT(*)                              AS deals_cnt,
    SUM(amount)                           AS pipeline
FROM {{ ref('stg_deal') }}
GROUP BY 1,2;

5.2 Тесты

version: 2

models:
  - name: stg_deal
    columns:
      - name: id
        tests: [not_null, unique]
      - name: stage_id
        tests:
          - accepted_values:
              values: ['NEW','PREPAY','PROPOSAL','WON','LOSE']

CI-Pipeline падает, если выпадает новый stage_id — бизнес узнаёт о сбое раньше, чем KPI сломается.


6. Визуализация в Power BI

  1. Get Data → PostgreSQL

    • Server prod-dwh:5432, Database dwh, SSL On.
  2. Таблицы: mart.fct_deal_funnel, mart.dim_manager.
  3. Import (или DirectQuery при данных <1 M строк).
  4. Incremental Refresh:

    • RangeStart/End → month.
    • Store 60 months, Refresh 1 month.
  5. Роли RLS: manager_id = USERPRINCIPALNAME().

Секрет: dbt exposures: можно документировать в schema.yml; Power BI отчёт добавьте как exposure type = dashboard — lineage появится в dbt Cloud UI.


7. Мониторинг и автоматизация

Автоматический алёрт: Airflow → Slack «run_dbt_cloud failed 3 times, aborting DAG».


8. Типичные ошибки и лайфхаки

  1. “DbtCloudRunJobOperator timeout” — job зависает на dbt deps. Включите Environment npm cache или увеличьте timeout.
  2. “Relation does not exist” в Power BI — имя view изменилось в dbt. Добавьте alias: в schema.yml, чтобы сохранить обратную совместимость.
  3. Тесты зелёные в CI, но красные в прод — dev-env неполные данные. Введите seed-файлы с edge-case строками.
  4. Airflow catchup flood — не забудьте catchup=False, иначе DAG отыграет пропущенные 30 дней.
  5. WAL-диск переполнен при dbt full-refresh — переключите large-модели на incremental + is_incremental() условия, full-refresh запускайте вручную.

9. Заключение

Связка dbt Cloud + Airflow → PostgreSQL → Power BI превращает «кухарку с KPI» в отлаженный Data Ops-конвейер:

Плюсы Минусы
CI/CD и тесты ловят баг до продакшена Стоимость dbt Cloud Team \$100+/мес
Airflow даёт ретраи, алёрты и Gantt-визу Две консоли (UI Airflow + dbt Cloud) — ≈ повтор обучения
Один Postgres — минимум копий данных При > 5 М строк / час Postgres нужен реиндекс и партиционирование

Когда масштабировать?

  • 10 М строк / сутки → ClickHouse реплика для тяжёлых витрин.

  • Realtime < 1 мин → Kafka + dbt Cloud Slim CI (incremental materializations).
  • GitOps-паранойя → переезд с dbt Cloud на self-host dbt-Core + Kubernetes.

Полезные ссылки

Теперь SQL-коммит идёт по красной дорожке: тесты, ревью, CI, прод, и уже через десять минут CFO листает свежий отчёт — пока Airflow выводит зелёные галочки, а dbt Cloud гордо пишет «All checks passed».