手元のデータ分析でも dbt を使いたい!〜dbt-duckdb で始めるローカルデータパイプライン〜

こんにちは!スタッフエンジニアの @kenkoooo です。相変わらず estie を平均年収2000万円の会社にしようと頑張ってます。引き続きやっていきます。

Snowflake x dbt 中毒

estie のビジネス上、estie にしか無いデータを作ることが非常に重要で、データパイプラインの構築に力を入れています。データパイプラインは dbt で作られており、Snowflake 上でデータがガンガン流れていきます。

dbt の開発体験は非常によく、個人でデータサイエンスのコンペに参加した際にも、dbt-snowflake を使って Snowflake 上に dbt でデータ分析パイプラインを構築し、優勝しました。

このように個人としては Snowflake x dbt 中毒なのですが、Snowflake にないデータを手元でサッと分析したいときは DuckDB を使っています。

DuckDB のある生活

DuckDB はローカルの CSV ファイルや JSON ファイルを読み込んで、SQL でサクッと分析できます。この手のデータ分析では往々にして、意外と書き捨ての SQL クエリが複雑になったり、書き捨てと思っていた作業を最新のデータでもう一回やりたくなったりします。このようなとき dbt 中毒者は、複雑な作業を分割して依存関係を自動的に整理してくれたり、同じ操作をコマンド一発で再現したりしてくれる dbt の禁断症状が出てきてしまいます。

dbt-duckdb を使うことで dbt で DuckDB 上にデータパイプラインを構築することができます。

dbt-duckdb のセットアップ

uv add dbt-duckdb などでパッケージをインストールすれば使えるようになります。dbt の設定ファイルを書いていきましょう。

まずは以下のように profiles.yml を書きます。Snowflake などを使うときは接続情報を書いていましたが、DuckDB の場合はローカルのファイルに保存されるので、ファイルパスを設定します。

test_local_pipeline:
  target: dev
  outputs:
    dev:
      type: duckdb
      path: 'test_local_pipeline.duckdb'
      threads: 24

ほぼ素のままですが dbt_project.yml も載せておきます。特に DuckDB 固有の設定が必要ということもありません。

name: 'test_local_pipeline'

config-version: 2
version: '0.1'

profile: 'test_local_pipeline'

model-paths: ["models"]
seed-paths: ["seeds"]
test-paths: ["tests"]
analysis-paths: ["analysis"]
macro-paths: ["macros"]

target-path: "target"
clean-targets:
    - "target"
    - "dbt_modules"
    - "logs"

require-dbt-version: [">=1.0.0", "<2.0.0"]

models:

これで models/ に SQL ファイルや Python ファイルを入れて uv run dbt build などで実行することで、DAG が順番に実行され、duckdb ファイルの中に table や view が保存されます。

Python Model が動く

ローカルで動作するため当然といえば当然ですが、Python Model が使えます。例えば以下のモデルは、 slack_messages というテーブルの各レコードの message というカラムを JSON としてパースし、その中の text というフィールドの文字数を保存したテーブルを作ります。

import pandas as pd
from tqdm import tqdm
import json

def model(dbt, session):
    ref = dbt.ref("slack_messages")
    df: pd.DataFrame = ref.df()

    counts = []
    for _, record in tqdm(df.iterrows(), total=len(df)):
        message = json.loads(record["message"])
        if "text" in message:
            text = message["text"]
            count = len(text)
        else:
            count = 0
        counts.append(
            {
                "channel_id": record["channel_id"],
                "ts": record["ts"],
                "count": count,
            }
        )

    df = pd.DataFrame(counts)
    return df

dbt の嬉しさは、全体として見ると複雑な処理であっても、処理と依存関係に分割することで、各処理をシンプルに保つことができる点だと思います。処理全体を Python で記述することもできますが、小さく分割し、SQL で書けるところは SQL で書くことで、本当に複雑な部分を絞り込むことができます。

手元のデータ処理に DAG ができるの気持ちいい

最後に

カジュアル面談で最強のデータパイプラインについて語り合いましょう。対戦よろしくお願いします。

hrmos.co

© 2019- estie, inc.