こんにちは!スタッフエンジニアの @kenkoooo です。相変わらず estie を平均年収2000万円の会社にしようと頑張ってます。引き続きやっていきます。
Snowflake x dbt 中毒
estie のビジネス上、estie にしか無いデータを作ることが非常に重要で、データパイプラインの構築に力を入れています。データパイプラインは dbt で作られており、Snowflake 上でデータがガンガン流れていきます。
- データ加工パイプラインに用いる dbt
- 株式会社estieでのSnowflake活用事例
- 億単位の住宅データを AWS + Snowflake で分散処理する - estie inside blog
- dbt Python model on Snowflake でユーザ作成モジュールを使う - estie inside blog
dbt の開発体験は非常によく、個人でデータサイエンスのコンペに参加した際にも、dbt-snowflake を使って Snowflake 上に dbt でデータ分析パイプラインを構築し、優勝しました。
- 国交省コンペ優勝!賃料予測コンペ1位解法 - estie inside blog
- 国土交通省初のデータ分析コンペティションにてestieに所属するエンジニアが優勝 | 株式会社estieのプレスリリース
- 【アフタームービー】 第1回 国土交通省 地理空間情報データチャレンジ ~国土数値情報編~ 表彰式
このように個人としては Snowflake x dbt 中毒なのですが、Snowflake にないデータを手元でサッと分析したいときは DuckDB を使っています。
DuckDB のある生活
DuckDB はローカルの CSV ファイルや JSON ファイルを読み込んで、SQL でサクッと分析できます。この手のデータ分析では往々にして、意外と書き捨ての SQL クエリが複雑になったり、書き捨てと思っていた作業を最新のデータでもう一回やりたくなったりします。このようなとき dbt 中毒者は、複雑な作業を分割して依存関係を自動的に整理してくれたり、同じ操作をコマンド一発で再現したりしてくれる dbt の禁断症状が出てきてしまいます。
dbt-duckdb を使うことで dbt で DuckDB 上にデータパイプラインを構築することができます。
dbt-duckdb のセットアップ
uv add dbt-duckdb
などでパッケージをインストールすれば使えるようになります。dbt の設定ファイルを書いていきましょう。
まずは以下のように profiles.yml を書きます。Snowflake などを使うときは接続情報を書いていましたが、DuckDB の場合はローカルのファイルに保存されるので、ファイルパスを設定します。
test_local_pipeline: target: dev outputs: dev: type: duckdb path: 'test_local_pipeline.duckdb' threads: 24
ほぼ素のままですが dbt_project.yml も載せておきます。特に DuckDB 固有の設定が必要ということもありません。
name: 'test_local_pipeline' config-version: 2 version: '0.1' profile: 'test_local_pipeline' model-paths: ["models"] seed-paths: ["seeds"] test-paths: ["tests"] analysis-paths: ["analysis"] macro-paths: ["macros"] target-path: "target" clean-targets: - "target" - "dbt_modules" - "logs" require-dbt-version: [">=1.0.0", "<2.0.0"] models:
これで models/
に SQL ファイルや Python ファイルを入れて uv run dbt build
などで実行することで、DAG が順番に実行され、duckdb ファイルの中に table や view が保存されます。
Python Model が動く
ローカルで動作するため当然といえば当然ですが、Python Model が使えます。例えば以下のモデルは、 slack_messages
というテーブルの各レコードの message
というカラムを JSON としてパースし、その中の text
というフィールドの文字数を保存したテーブルを作ります。
import pandas as pd from tqdm import tqdm import json def model(dbt, session): ref = dbt.ref("slack_messages") df: pd.DataFrame = ref.df() counts = [] for _, record in tqdm(df.iterrows(), total=len(df)): message = json.loads(record["message"]) if "text" in message: text = message["text"] count = len(text) else: count = 0 counts.append( { "channel_id": record["channel_id"], "ts": record["ts"], "count": count, } ) df = pd.DataFrame(counts) return df
dbt の嬉しさは、全体として見ると複雑な処理であっても、処理と依存関係に分割することで、各処理をシンプルに保つことができる点だと思います。処理全体を Python で記述することもできますが、小さく分割し、SQL で書けるところは SQL で書くことで、本当に複雑な部分を絞り込むことができます。
最後に
カジュアル面談で最強のデータパイプラインについて語り合いましょう。対戦よろしくお願いします。