Kafka + Debezium → PostgreSQL → Superset

Kafka + Debezium → PostgreSQL → Superset

Событийная CDC-репликация без «минут-молчания» (редакция от 19 июня 2025 г.)


1. Почему именно Kafka + Debezium → PostgreSQL → Superset

Когда «полный рефреш» уже не спасает. В крупном Bitrix24-портале или любом high-load CRM число изменений (calls, leads, сделки, кастомные поля) легко переваливает за десятки тысяч в минуту. Передавать такие объёмы батчами непрактично: ETL-окна становятся часовыми, аналитика — вчерашней. Событийная Change Data Capture-схема снимает дельту в момент коммита и транслирует её в центральное хранилище. Debezium 2.6+ захватывает WAL/REDO-логи источника и публикует события в Kafka за сотые доли секунды (debezium.io). Kafka 4.0, впервые полностью без ZooKeeper и с KRaft по умолчанию, упрощает эксплуатацию и ускоряет ребаланс шардов (kafka.apache.org, confluent.io).

Слой хранения остаётся «танком». PostgreSQL 16 умеет применять большие транзакции в logical-replication параллельно и даже принимать поток со standby-сервера — удобно, если прод-кластер не хочется трогать (postgresql.org, crunchydata.com).

Superset 5.0 закрывает вопрос «а где мой отчёт через минуту?». Платформа сразу читает свежие строки Postgres, строит time-series и шлёт алёрты без лицензий, а RLS «отрезает» лишние данные отделам.

Типовые сценарии

  • Кол-центр 24/7. Debezium стримит изменения таблиц call_log, crm_status. В Superset карточка «SLA < 20 сек» подсвечивается через 30 сек после пика входящих.
  • Финтех-CRM. Kafka-топики deal_update, payment_event попадают в PostgreSQL-витрину; Superset и Power BI (DirectQuery) дают единую картину риска.
  • Retail-E-com. Bitrix24, Shopify и OMS транслируются Debezium-коннекторами, Postgres выступает корпоративным ODS, Superset — дешёвая web-витрина для логистов.

2. Компоненты и роли

Компонент Роль Ответственный
Bitrix24 Источник CRM-транзакций (MySQL/Percona) Администратор CRM
Debezium 2.6+ CDC-коннектор, пишет в Kafka Data Engineer
Apache Kafka 4.0 Шина событий, KRaft-кластер DevOps
Kafka Connect Sink (JDBC) Гружает топики в Postgres Platform Engineer
PostgreSQL 16 ODS + Mart, принимает стрим DBA
Apache Superset 5.0 Near-real-time дашборды BI-Analyst
(опция) Power BI DirectQuery к Postgres BI-Analyst
(опция) dbt Core 1.9 SQL-трансформации Analytics Engineer

3. Подготовка окружения

3.1 Что установить

Что Где взять
Kafka 4.0 binary Kafka Blog
Debezium 2.6 connector plugins Debezium 2.6 Notes
Kafka Connect JDBC Sink 12.4 Confluent Hub
PostgreSQL 16 Postgres Docs
Superset 5.0 Docker Superset Releases
superset-db-exporter для Prometheus-метрик

3.2 Мини-Compose для DEV

version: "3.9"
services:
  zookeeper:                  # Только для dev, в prod Kafka 4.0 без него
    image: bitnami/zookeeper:3.9
    environment:
      ALLOW_ANONYMOUS_LOGIN: "yes"

  kafka:
    image: bitnami/kafka:4.0
    environment:
      KAFKA_CFG_PROCESS_ROLES: broker,controller
      KAFKA_CFG_NODE_ID: 1
      KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "1@kafka:9093"
      KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
      KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
    depends_on: [zookeeper]   # ← уберёте при KRaft-кластерe
    ports: ["9092:9092"]

  connect:
    image: debezium/connect:2.6
    environment:
      BOOTSTRAP_SERVERS: kafka:9092
      GROUP_ID: connect-cluster
      CONFIG_STORAGE_TOPIC: _connect-config
      OFFSET_STORAGE_TOPIC: _connect-offsets
      STATUS_STORAGE_TOPIC: _connect-status
    volumes: ["./plugins:/kafka/connect/plugins"]
    ports: ["8083:8083"]

  postgres:
    image: postgres:16
    environment:
      POSTGRES_USER: sink
      POSTGRES_PASSWORD: sink_pwd
      POSTGRES_DB: dwh
    ports: ["5432:5432"]

  superset:
    image: apache/superset:5.0
    env_file: superset.env
    depends_on: [postgres]
    ports: ["8088:8088"]

3.3 Объём минимум

  • Kafka: 3 контроллера + 3 брокера = 6 vCPU / 16 GB RAM.
  • Connect: +2 vCPU / 4 GB на 20 топиков.
  • Postgres ODS: диск NVMe, wal_compression = lz4, max_wal_size = 4 GB.

4. Настройка потоков данных

4.1 Debezium Source → Kafka

curl -X POST connect:8083/connectors -H 'Content-Type: application/json' -d '
{
  "name": "bitrix-mysql-cdc",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "bitrix-db",
    "database.user": "cdc",
    "database.password": "cdc_pwd",
    "database.server.id": "184054",
    "topic.prefix": "b24",
    "include.schema.changes": "false",
    "table.include.list": "bitrix.crm_deal,bitrix.crm_contact",
    "snapshot.mode": "initial",
    "incremental.snapshot.chunk.size": "2048",
    "message.key.columns": "bitrix.crm_deal:id",
    "heartbeat.interval.ms": "10000"
  }
}'

Секретный ингредиентincremental snapshot: позволяет «догонять» исторические данные, не блокируя прод БД.

Читай также:  Stitch Data → PostgreSQL → Metabase

4.2 Kafka Connect JDBC Sink → Postgres

curl -X POST connect:8083/connectors -H 'Content-Type: application/json' -d '
{
  "name": "sink-postgres",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "4",
    "topics.regex": "b24\\.crm_.*",
    "connection.url": "jdbc:postgresql://postgres:5432/dwh",
    "connection.user": "sink",
    "connection.password": "sink_pwd",
    "insert.mode": "upsert",
    "pk.mode": "record_key",
    "auto.evolve": "true",
    "delete.enabled": "true",
    "table.name.format": "raw_${topic}"
  }
}'

Upsert-режим с pk.mode=record_key позволяет Sink’у покрывать UPDATE/DELETE без ручных ON CONFLICT.

4.3 Миграция схемы

Debezium публикует сообщения schema_change_event; JDBC Sink с auto.evolve=true применяет DDL к Postgres. Для production лучше гонять DDL через Liquibase/Flyway, но на старте так быстрее.


5. Трансформации и моделирование

Зона Схема Что хранится
Raw raw_* Таблицы один-в-один с источником
ODS ods Типизированные stg_*, нормализованные JSON → columns
Mart mart Агрегаты, витрины Superset/Power BI

5.1 dbt-пример для инкрементального факта

models/stg_crm/stg_deal.sql

{{ config(materialized='incremental', unique_key='id') }}

SELECT
  id::bigint               AS deal_id,
  status_id                AS stage,
  opportunity::numeric(14,2),
  to_timestamp(date_create) AT TIME ZONE 'UTC' AS created_at,
  to_timestamp(date_modify) AT TIME ZONE 'UTC' AS updated_at
FROM {{ source('raw_b24', 'crm_deal') }}
{% if is_incremental() %}
 WHERE updated_at >= (SELECT max(updated_at) FROM {{ this }})
{% endif %}

models/mart_sales/fct_deal_funnel.sql

SELECT
  date_trunc('minute', updated_at) AS minute_bucket,
  stage,
  COUNT(*)                         AS deals_cnt,
  SUM(opportunity)                 AS pipeline
FROM {{ ref('stg_deal') }}
GROUP BY 1,2;

Обновляем dbt run по событию sink-completed (см. Kafka Connect REST).


6. Визуализация в Power BI и Superset

6.1 Superset 5.0

  1. Database → +: postgresql://bi_user:bi_pwd@postgres:5432/dwh.
  2. Dataset → +: mart.fct_deal_funnel.
  3. Time Series Bar:

    • Time = minute_bucket, Metric = pipeline, Series = stage.
  4. Alerts & Reports: email «pipeline < план на 15 мин» — реакция быстрее, чем утренний кофе.

6.2 Power BI (DirectQuery)

  1. Get Data → PostgreSQL, Server = postgres:5432, Database = dwh.
  2. DirectQuery — движок каждый визуальный запрос обращается в Postgres.
  3. Плитка Card = SUM(pipeline) за Last 30 min.

Для latency < 2 с используйте Hybrid Tables: импортируйте историю, а «текущий день» оставьте на DirectQuery.


7. Мониторинг и автоматизация

Автолечение: настроить Cruise Control (Kafka 4.0) для ребаланс партиций, а в Postgres включить logical_decoding_work_mem 128 MB для больших транзакций.


8. Типичные ошибки и лайфхаки

  1. Duplicate key в JDBC Sink. — Убедитесь, что Debezium публикует key (PK) — message.key.columns=… в source-конфиге.
  2. Lag растёт после psql VACUUM FULL.VACUUM FULL перестраивает таблицу → Log Sequence Number прыгает → Debezium перечитывает стрим. Лучше VACUUM + REINDEX CONCURRENTLY.
  3. Schema change ломает Superset. — В 5.0 нажмите Sync columns from source, либо подключите Superset к dbt manifest.json, чтобы метаданные подтягивались автоматически.
  4. Не хватает места под WAL на источнике (Bitrix24-MySQL). — Поставьтеbinlog_expire_logs_seconds 3600, а Debezium heartbeat 10 с; реплика не отстанет.
  5. События DELETE не доходят. — В Debezium добавьте "tombstone.on.delete":"true", а в JDBC Sink — "delete.enabled":"true".

9. Заключение

Связка Kafka + Debezium → PostgreSQL → Superset превращает ваш CRM-хаос в почти real-time поток, который легко масштабировать:

Плюсы

  • Малый лаг — большие решения. Событие появляется в отчёте через секунды.
  • Горизонтальное масштабирование. Kafka 4.0 без ZooKeeper, PostgreSQL 16 с параллельным apply.
  • Открытый стек. Нет скрытых лицензий; всё отладимо и расширяемо.

Минусы

  • Сложность DevOps. Три кластера (Kafka, Connect, Postgres) + Superset.
  • Прод лимиты Bitrix24. Wal/binlog > 2 GB/день — понадобится Tier-B-DB или дебаркация на стэйдж.
  • Схемы меняются — пайплайн паникует. Не автоматизируйте DDL в shadow-модели без ревью.

Когда масштабировать дальше?

  • 100 тыс msg/сек — добавьте Kafka Tiered Storage и ClickHouse вместо Postgres ODS.

  • Нужно global-multi-region — используйте Kafka MirrorMaker 2 и pglogical.
  • ML-streaming — Debezium → Kafka → Flink SQL → Feature Store.

Полезные ссылки

Теперь аналитики видят, как менеджер меняет статус сделки почти «на лету», а инженер спокойнее спит: Kafka четырежды дублирует сообщение, Postgres гарантирует idempotent-upsert, Superset сигналит, если pipeline «просел» на 20 %. Пейте раф, пока данные сами строят дашборды!