NiFi → Elasticsearch ↔ PostgreSQL → Power BI

NiFi → Elasticsearch ↔ PostgreSQL → Power BI

Drag-and-drop ETL + full-text: как мгновенно искать сделки Bitrix24 и рисовать дашборды (версия руководства — 19 июня 2025 г.)


1. Почему именно NiFi → Elasticsearch ↔ PostgreSQL → Power BI

ETL-конструктор вместо «лего из скриптов». Apache NiFi 2.4.0 приносит 100+ улучшений, ускорённый юнит-тест флоу и поддержку OIDC-SSO — визуально тянем линии, а не пишем Python-крючья. Даже junior-аналитик за час собирает поток «Bitrix24 → Postgres + Elasticsearch». (nifi.apache.org)

Две базы — один выстрел. PostgreSQL 16 остаётся «танком» для DWH: транзакции, ACID, window-функции. Elasticsearch 9.0.2 (уже без Lucene 8.x) добавил бинарную квантовку и ускорил k-NN-поиск в 5 раз — идеальная витрина для «найди мне все сделки со словом «срочно» в комментарии». (elastic.co, elastic.co)

Power BI пишет отчёты, Elastic — отвечает «на всём». BI-команда импортирует агрегаты из Postgres, включает Incremental Refresh — партиционирование модели, чтобы обновлять только «горячие» сутки. Для любых текстовых запросов фронтенд дёргает Elastic REST API и отдаёт результат React-виджету. (learn.microsoft.com, learn.microsoft.com)

Типовые сценарии

  • Enterprise-агентство. НиFi парсит звонки и комментарии из Bitrix24, в Postgres — аналитика конверсий, в Elastic — мгновенный поиск по жалобам клиентов.
  • E-commerce-маркетплейс. Прайсы SKU идут пачками → Elastic обеспечивает автодополнение, Postgres — LTV отчёты, Power BI — витрины SKU-ABC.
  • Финтех. НиFi ловит web-хуки о транзакциях, Postgres хранит аудит, Elastic — поиск по AML-словам, Superset шлёт alert при «подозрительном» описании.

2. Компоненты и роли

Компонент Роль Ответственный
Bitrix24 Источник CRM-событий Sys-admin CRM
Apache NiFi 2.4 Drag-and-drop ETL, параллельный fan-out Data Engineer
Elasticsearch 9.0 Full-text + векторный поиск Search Engineer
PostgreSQL 16 DWH (raw / mart), ACID-хранение DBA
Apache Superset 5.0 Web-дашборды, RLS, алерты BI-аналитик
Power BI (Desktop / Service) «Тяжёлые» отчёты, Incremental Refresh BI-аналитик

3. Подготовка окружения

3.1 Что поставить

ПО Где взять
NiFi 2.4.0 NiFi Download
Elasticsearch 9.0.2 Elastic Downloads
PostgreSQL 16.3 PostgreSQL
Superset 5.0 (Docker) Superset Release
Power BI Desktop June 2025 Power BI Update
Bitrix24 PAT / Webhook Bitrix24 REST

3.2 Мини-compose (DEV)

version: "3.9"
services:
  nifi:
    image: apache/nifi:2.4.0
    environment:
      NIFI_WEB_HTTP_PORT: 8080
    ports: ["8080:8080"]

  elastic:
    image: docker.elastic.co/elasticsearch/elasticsearch:9.0.2
    environment:
      - discovery.type=single-node
      - xpack.security.enabled=false
      - ES_JAVA_OPTS=-Xms2g -Xmx2g
    ports: ["9200:9200"]

  postgres:
    image: postgres:16
    environment:
      POSTGRES_USER: etl
      POSTGRES_PASSWORD: etl_pwd
      POSTGRES_DB: dwh
    ports: ["5432:5432"]

  superset:
    image: apache/superset:5.0
    env_file: superset.env
    depends_on: [postgres]
    ports: ["8088:8088"]

Запуск:

docker compose up -d

3.3 Доступы и роли

Сервис Роль/токен Права
Bitrix24 PAT crm, task GET + web-hooks
NiFi Processor → Postgres user etl INSERT, SELECT
NiFi Processor → Elastic API Key write, create_index
Superset bi_user SELECT на mart
Power BI Gateway DSN SSL read-only

4. Настройка потоков данных

4.1 NiFi Canvas: поток «Bitrix24 → Postgres + Elastic»

  1. GenerateTableFetch

    • URL https://<portal>.bitrix24.ru/rest/…/crm.deal.list.json
    • Параметр filter[>DATE_MODIFY] = ${now():minusHours(1):format("yyyy-MM-dd'T'HH:mm:ss")}
  2. InvokeHTTP — тянет JSON batch 1000.

  3. JoltTransformJSON — плоская схема:

    [
     { "operation": "shift",
       "spec": { "*": {
         "ID": "[#].id",
         "TITLE": "[#].title",
         "COMMENTS": "[#].comment",
         "DATE_CREATE": "[#].created_at",
         "DATE_MODIFY": "[#].updated_at"
       }}
     }
    ]
  4. PutDatabaseRecord (JDBC Postgres):

    • Schema raw, Table crm_deal.
    • Upsert=ON, Key id.
  5. PutElasticsearchJson:

    • Index crm_deal, Id ${id}.
    • Pipeline b24_ru_analyzer (создадим ниже).
  6. LogAttribute (отладка) → Null.

Читай также:  dbt Cloud + Airflow → PostgreSQL → Power BI

Всего шесть процессоров — меньше кофе-брейка.

4.2 Elastic индекс и анализатор

PUT crm_deal
{
  "settings": {
    "analysis": {
      "analyzer": {
        "ru_text": {
          "type": "custom",
          "tokenizer": "standard",
          "filter": ["lowercase","russian_stop","russian_keywords","russian_stemmer"]
        }
      }
    }
  },
  "mappings": {
    "properties": {
      "title":     { "type": "text", "analyzer": "ru_text" },
      "comment":   { "type": "text", "analyzer": "ru_text" },
      "stage":     { "type": "keyword" },
      "amount":    { "type": "scaled_float", "scaling_factor": 100 },
      "created_at":{ "type": "date" },
      "updated_at":{ "type": "date" }
    }
  }
}

4.3 SQL-upsert в Postgres

CREATE TABLE IF NOT EXISTS raw.crm_deal (
  id           bigint PRIMARY KEY,
  title        text,
  comment      text,
  amount       numeric(14,2),
  stage_id     text,
  created_at   timestamptz,
  updated_at   timestamptz
);

-- индекс для инкремента
CREATE INDEX IF NOT EXISTS ix_deal_updated
  ON raw.crm_deal(updated_at DESC);

5. Трансформации и моделирование

Зона Схема Содержимое
Raw raw Таблицы 1:1 из NiFi
ODS ods Типизированные stg-модели (dbt)
Mart mart Факты/измерения (BI)

5.1 dbt incremental-stage

models/stg_bitrix24/stg_deal.sql

{{ config(materialized='incremental', unique_key='id') }}

SELECT
    id,
    title,
    comment,
    stage_id,
    amount,
    created_at,
    updated_at
FROM {{ source('raw','crm_deal') }}
{% if is_incremental() %}
  WHERE updated_at >= (SELECT max(updated_at) FROM {{ this }})
{% endif %}

5.2 Факт-воронка

models/mart_sales/fct_deal_funnel.sql

SELECT
  date_trunc('day', created_at)::date AS day,
  stage_id,
  COUNT(*) AS deals_cnt,
  SUM(amount) AS pipeline
FROM {{ ref('stg_deal') }}
GROUP BY 1,2;

dbt run после каждого PutDatabaseRecord-коммита — триггерим через NiFi ExecuteProcess или Airflow-callback.


6. Визуализация в Power BI и Superset

6.1 Power BI: Incremental Refresh

  1. Get Data → PostgreSQL (DSN postgres:5432/dwh).
  2. Параметры RangeStart / RangeEnd типа DateTime.
  3. Фильтр на колонку updated_at.
  4. Table → Incremental refresh:

    • Store rows = 60 months.
    • Refresh rows = 2 days.
    • Detect change = updated_at.
  5. Publish; первый рефреш партиционирует модель, далее Power BI дергает лишь «горячее».

6.2 Superset 5.0: live-дашборд

  1. Database → + — URI postgresql://bi_user:pwd@postgres:5432/dwh.
  2. Dataset → +mart.fct_deal_funnel.
  3. Чарт Time Series Line: Time = day, Metrics = pipeline.
  4. Алерт «pipeline < план/день» — Superset шлёт письмо CEO в 1-е число.
Читай также:  Retool → PostgreSQL (views) → Power BI

6.3 Elastic API — быстрый поиск

POST crm_deal/_search
{
  "query": {
    "bool": {
      "must":   { "match": { "comment": "срочно" } },
      "filter": { "term":  { "stage": "PROPOSAL" } }
    }
  },
  "highlight": { "fields": { "comment": {} } },
  "size": 20
}

В UI зовём через React-hook, показываем подсветку — результат в 100 мс, пока Postgres крутил JOIN-ы.


7. Мониторинг и автоматизация

Объект Инструмент Порог
NiFi Bulletin Board NiFi UI + Prometheus scrape error > 0
Elastic search-latency /_nodes/stats → Grafana P95 > 200 мс
Postgres bloat pg_stat_user_tables > 20 %
Superset /health exporter latency > 500 мс
Power BI refresh Service history fail > 1/сутки

Cron-скрипт curl -sL /nifi-api/flow/status | jq в Slack-алёрт — ловим упавший процессор до жалоб руководства.


8. Типичные ошибки и лайфхаки

  1. NiFi «HTTP 429» от Bitrix24 — добавьте ControlRate (2 req/s) + BackPressure.
  2. Elasticsearch ругается на поле text vs keyword — уточните mapping заранее, либо включите dynamic_templates для строк.
  3. Power BI «не видит» новые колонкиTransform dataRefresh preview или перепубликуйте semantic-model.
  4. Superset дублирует связи при Sync — перед «Sync Columns» выключите Expose in SQL Lab, иначе таблица объявится дважды.
  5. NiFi heap-out-of-memory при загрузке больших файлов — в bootstrap.conf поставьте java.arg.2=-Xmx4g и включите swap-out-content-repository на диск SSD.

9. Заключение

Связка NiFi → Elasticsearch ↔ PostgreSQL → Power BI превращает массивы Bitrix24 в быстрый поиск и живые дашборды:

Плюсы

  • Drag-and-drop — минимум кода, максимум прозрачности.
  • Два хранилища обслуживаются одной трубой: SQL-аналитика + полнотекст.
  • Поиск не грузит DWH: Elastic открыл API, Postgres спит.

Минусы

  • Двойная запись = двойная отладка.
  • Elastic > 50 GB — следите за shards и ILM-политикой, иначе cluster yellow.
  • Incremental Refresh требует Power BI Pro/Premium.

Когда масштабировать?

  • 1 млн events/час — переход на NiFi cluster + ECK (Elastic on K8s).

  • Deep-analytics — ClickHouse вместо части Postgres marts.
  • Мультирегион — Cross-Cluster Replication в Elastic, logical-replication в Postgres.

Полезные ссылки

В итоге — менеджер ищет «срочный возврат» в Elastic, аналитик кликает F5 в Power BI, а NiFi послушно гонит свежие сделки по двум рельсам. Бизнес доволен, DevOps спит, данные — работают.