Airflow → PostgreSQL DWH → dbt → Superset

Airflow → PostgreSQL DWH → dbt → Superset

Классическая платформа данных с Enterprise-запахом (редакция от 19 июня 2025 г.)


1. Почему именно Airflow-Postgres-dbt-Superset

Оркестрация без магии, но со стабильностью. Apache Airflow 3.0.2 — свежеиспечённая версия с новым TaskFlow API, нативным Async-экзекьютором и улучшенной UI-диагностикой DAG’ов — дефакто-стандарт для планирования пайплайнов. Он позволяет триггерить извлечения из Bitrix24, Google Ads или вашего S3, расставляя ретраи и SLA как шахматы, а не «cron + bash-скрипт» (airflow.apache.org).

SQL-ферма на «танке» PostgreSQL. Postgres 16 остаётся бесплатным, но принес 30 % ускорения параллельных джойнов, logical replication для DDL и сжатие LZ4 — всё, что нужно, чтобы хранить как «сырые» REST-дампы Bitrix24, так и готовые marts для Power BI и Superset (postgresql.org).

dbt превращает трансформации в pull-request. dbt Core 1.9.8 предлагает unit-тесты на SQL, semantic-layer и AI-хелпер dbt Copilot — код-же‐ревью с вашим аналитиком-ботом (github.com). Всё пишется декларативно в YAML и Jinja, что убирает «случайный» бизнес-лог прямо из BI-инструментов.

Superset 5.0 — open-source-львица против платных BI. Версия 5.0 ускорила chart-рендеринг на 30 %, добавила dark-mode и поддержку row-level-security, а ещё научилась импортировать dbt-модели (yaml) одной кнопкой (github.com).

Типовые сценарии

  • B2B-агентство. Airflow таскает сделки Bitrix24 → raw.deal, dbt собирает воронку, Superset выдаёт «сквозную» приборку, а Power BI — презентабельный Board Pack.
  • Финтех-in-house. Airflow забирает Kafka-партиции, Postgres — DWH, dbt — нормы ПОД/ФТ, Superset мониторит риски, Power BI — для CFO.

2. Компоненты и роли

Компонент Роль в цепочке Ответственный
Bitrix24 Источник CRM-событий Маркетолог / Sales-аналитик
Apache Airflow 3.0 Оркестратор ETL DAG’ов Data Engineer
PostgreSQL 16 DWH: raw, ODS, mart DBA
dbt Core 1.9 Декларативные трансформации & тесты Analytics Engineer
Apache Superset 5.0 Self-service-дашборды BI-аналитик
Power BI Презентации для C-level / офлайн.pbix BI-аналитик

3. Подготовка окружения

3.1 Установка и доступы

Что Где взять
Airflow 3.0.2 (Docker) Airflow Docs
PostgreSQL 16.3 (RPM/Docker) PostgreSQL Downloads
dbt Core 1.9.8 (pip) dbt Docs
Superset 5.0 (Docker) Superset Releases
Bitrix24 PAT + Webhook Bitrix24 REST
Power BI Desktop June 2025 Power BI Update

3.2 Мини-docker-compose

version: "3.9"
services:
  postgres:
    image: postgres:16
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow_pwd
      POSTGRES_DB: dwh
    volumes:
      - ./pgdata:/var/lib/postgresql/data
    ports: ["5432:5432"]

  airflow:
    image: apache/airflow:3.0.2
    env_file: .env        # AIRFLOW_* переменные
    depends_on: [postgres]
    volumes:
      - ./dags:/opt/airflow/dags
      - ./logs:/opt/airflow/logs
    ports: ["8080:8080"]

  superset:
    image: apache/superset:5.0
    env_file: superset.env
    depends_on: [postgres]
    ports: ["8088:8088"]
docker compose up -d

3.3 Минимальные права

  • Bitrix24 PAT — scope crm + task, лимит 2 r/s.
  • Postgres role etlINSERT, UPDATE на схемы raw, ods.
  • Superset RLS — роль bi_user со SELECT на mart.

4. Настройка потоков данных

4.1 Airflow DAG: тянем Bitrix24

from airflow.decorators import dag, task
from pendulum import datetime
from bitrix24 import Bitrix24  # pip install pybitrix24
import pandas as pd
import sqlalchemy as sa

@dag(schedule="*/30 * * * *", start_date=datetime(2025, 6, 1, tz="Europe/Moscow"),
     catchup=False, max_active_runs=1, tags=["bitrix24"])
def bitrix_to_raw():

    @task(retries=3, retry_delay=300)
    def extract_deals():
        b = Bitrix24("<YOUR_WEBHOOK>")
        rows = b.get_all("crm.deal.list", {"select": ["*", "UF_*"]})
        return pd.DataFrame(rows).to_json()

    @task()
    def load(json_rows: str):
        df = pd.read_json(json_rows)
        engine = sa.create_engine("postgresql+psycopg2://etl:etl_pwd@postgres:5432/dwh")
        df.to_sql("deal", engine, schema="raw", if_exists="append", index=False)

    load(extract_deals())

dag = bitrix_to_raw()

Cron каждые 30 минут — вкусное «почти-realtime» без лишней нагрузки.

Лайфхак: max_active_runs=1 ограждает от параллельного запуска, если прошлый DAG ещё грузит.

4.2 dbt — от raw к mart

dwh/
  ├─ models/
  │   ├─ stg_bitrix/
  │   │   └─ stg_deal.sql
  │   └─ mart_sales/
  │       └─ fct_deal_funnel.sql
  └─ dbt_project.yml

stg_deal.sql

{{ config(materialized='incremental', unique_key='id') }}

SELECT
    id::int                    AS deal_id,
    title,
    stage_id,
    opportunity::numeric(14,2) AS amount,
    date(create_date)          AS created_dt,
    date(modify_date)          AS modified_dt
FROM {{ source('raw', 'deal') }}
WHERE {{ incremental_clause('modified_dt') }}

fct_deal_funnel.sql

SELECT
    date_trunc('month', created_dt)::date AS month,
    stage_id,
    COUNT(*)                              AS deals_cnt,
    SUM(amount)                           AS pipeline
FROM {{ ref('stg_deal') }}
GROUP BY 1,2

Запуск:

dbt run --profiles-dir profiles --target prod
dbt test

4.3 Airflow × dbt orchestration

from airflow.operators.bash import BashOperator
run_dbt = BashOperator(
    task_id="dbt_run",
    bash_command="dbt run --project-dir /opt/dbt --profiles-dir /opt/dbt --target prod",
)
extract_deals() >> load() >> run_dbt

5. Трансформации и моделирование

SQL-конвенции

-- типизация ENUM'ов
CREATE TYPE stage_enum AS ENUM ('NEW','PREPAY_INVOICE','WON');

-- точный PK
ALTER TABLE ods.stg_deal ADD PRIMARY KEY (deal_id);

Документация dbt docs generate && dbt docs serve выдаёт интерактивную lineage-карту.


6. Визуализация в Power BI и Superset

6.1 Power BI Desktop

  1. Get Data → PostgreSQL — DSN localhost:5432, Database dwh.
  2. DirectQuery если mart.fct_deal_funnel < 1 млн строк, иначе Import.
  3. Диаграмма Stacked Column: Axis → month, Series → stage_id, Value → pipeline.
  4. Refresh в Power BI Service каждые 2 ч (Pro-лицензия).

6.2 Superset 5.0

  1. Settings → Database Connections → + postgresql://superset:sup_pwd@postgres:5432/dwh
  2. Data → Datasets → + — выберите mart.fct_deal_funnel.
  3. Chart → Time Series Bar — Time = month, Metrics = pipeline, Series = stage_id.
  4. Dashboard «Deal Funnel 360».
  5. Alerts & Reports — письмо, если pipeline < плана на 20 %.

Фишка 5.0: Superset импортирует dbt manifest.json и автоматически создаёт датасеты с описаниями колонок.


7. Мониторинг и автоматизация

Объект Инструмент Порог
Airflow DAG duration built-in SLA + Grafana > 15 мин
Task failures Airflow email + SlackWebHook > 1 за сутки
Postgres bloat pg_stat_user_tables > 20 %
dbt tests dbt build --select test в CI error > 0
Superset health /health endpoint + Prometheus latency > 500 мс
Backups pg_dump nightly → S3 retention = 30 дней

8. Типичные ошибки и лайфхаки

  1. «psycopg2.OperationalError: FATAL … password authentication failed» — Проверьте, что Airflow Secret Backend отдаёт пароль, а не None.
  2. Airflow DAG “catching up” по 10 дней — Добавьте catchup=False, иначе он проиграет все пропущенные интервалы.
  3. dbt incremental затерял историю — Укажите unique_key и is_incremental() в WHERE, иначе INSERT OVERWRITE съест старые строки.
  4. «Superset не видит новую колонку» — Dataset → Sync columns from source.
  5. Postgres bloating после частых upsert — Поставьте autovacuum_vacuum_scale_factor = 0.05, плюс REINDEX CONCURRENTLY по расписанию.
Читай также:  Retool → PostgreSQL (views) → Power BI

9. Заключение

Связка Airflow → PostgreSQL → dbt → Superset — это «бетон» корпоративной аналитики:

  • Плюсы

    • Orchestration SLA: Airflow даёт ретраи и алёрты по шагам.
    • Сквозная прозрачность: lineage от raw.deal до дашборда.
    • Открытый стек — без продлений лицензий.
  • Минусы

    • Три инструмента = три контура CI/CD.
    • Airflow требует Kubernetes/Executor-кластер при > 1000 tasks/день.
    • Superset не имеет офлайн-файла как .pbix — придётся жить в браузере.

Когда масштабировать?

  • 5 млн строк в сутки — смотрим ClickHouse или BigQuery для marts.

  • Реaltime < 5 с — берём Kafka + Flink, Airflow остаётся для batch.
  • 50+ моделей dbt — внедряем Semantic Layer и собственный Feature Store.

Полезные ссылки

Пусть DAG’и бегут, dbt зеленеет, а дашборды Superset блистают прежде, чем топ-менеджмент успеет спросить: «А как у нас конверсия?»