こんにちは、スタッフエンジニアの @kenkoooo です。Embulk で Snowflake からデータを読み込む embulk-input-snowflakedb
をオープンソースとして公開しました。
Embulk とは?
データをロードするすごい OSS です。プラグイン形式でデータの入力や出力を定義することができるため、各種 SQL や BigQuery などだけでなく、スプレッドシートやアクセス解析など様々な入力元・出力先に対応しています。
- 公式サイト: Embulk
Snowflake とは?
データを集めたり加工したりするデータプラットフォームです。estie でメッチャ流行ってます。
- dbt-snowflake のテストフレームワークを作った話 - estie inside blog
- estie にデータを使って意思決定したいことが 62 個もある話 - estie inside blog
- estie、Snowflake を導入しましたよ - estie inside blog
データ収集・加工が Snowflake 上で行われているので、その結果を色んな場所に吐き出したくなります。
Embulk プラグインの実装
Embulk を使って Snowflake からデータを読み込むプラグインが存在しないので、プラグインを実装していきます。Embulk のプラグインは Java または Ruby (JRuby) で開発することができますが、先達の多い Java で開発していくことにします。
Snowflake から公式 JDBC プラグインが提供されており、Java で Snowflake のデータを読み書きする際は、これを利用することで他のデータベースのときと同じようにデータを操作することができます。
また、Embulk からも公式のプラグイン embulk-input-jdbc が提供されており、JDBC を使って Embulk で簡単にデータを読み込むことができます。
これらを組み合わせることで、一応 Snowflake から Embulk でデータを読み込むことが出来ますが、embulk-input-jdbc はあくまで一般的な JDBC ドライバーに対応しているだけなので、Snowflake 特有のキーペア認証などには一部のみしか対応していません。また、社内の Embulk 初心者がプラグインをポチポチ足していけば動かせるようにしたかったので、上述の要素をパッケージ化し、Snowflake 特有の設定も含めて設定可能なインターフェイスをつけたプラグインを作ります。
コード
先述の通り Java で実装することにしましたが、JVM で動く一般的なプログラミング言語であれば Java 以外も使えるので、JVM 言語の中で個人的に気に入っている Kotlin で実装しました。embulk-input-jdbc の中にある AbstractJdbcInputPlugin
と Snowflake から提供された SnowflakeDriver
を組み合わせればデータの読み込み部分は既存の実装でできるので、設定の読み込みを行って DriverManager
に接続用の URL を渡すだけの非常に短い実装となっています。
package jp.estie.input.snowflake import org.embulk.input.jdbc.AbstractJdbcInputPlugin import org.embulk.input.jdbc.JdbcInputConnection import org.embulk.util.config.Config import org.embulk.util.config.ConfigDefault import java.sql.DriverManager import java.util.* open class SnowflakeInputPlugin : AbstractJdbcInputPlugin() { override fun newConnection(task: PluginTask): JdbcInputConnection { Class.forName("net.snowflake.client.jdbc.SnowflakeDriver") val t = task as SnowflakePluginTask val props = Properties() props["user"] = t.getUser() props["db"] = t.getDatabase() props["schema"] = t.getSchema() if (t.getRole().isNotEmpty()) { props["role"] = t.getRole() } if (t.getWarehouse().isNotEmpty()) { props["warehouse"] = t.getWarehouse() } if (t.getPassword().isNotEmpty()) { props["password"] = t.getPassword() } if (t.getPrivateKey().isNotEmpty()) { val passphrase = t.getPrivateKeyPassphrase().ifEmpty { null } val privateKey = PrivateKeyReader.loadPrivateKey(t.getPrivateKey(), passphrase).getOrThrow() props["privateKey"] = privateKey } if (t.getPrivateKeyPath().isNotEmpty()) { val passphrase = t.getPrivateKeyPassphrase().ifEmpty { null } val privateKey = PrivateKeyReader.loadPrivateKeyFromFile(t.getPrivateKeyPath(), passphrase).getOrThrow() props["privateKey"] = privateKey } val con = DriverManager.getConnection("jdbc:snowflake://${t.getHost()}/", props) return JdbcInputConnection(con, null) } interface SnowflakePluginTask : PluginTask { // (略) } override fun getTaskClass(): Class<out PluginTask> { return SnowflakePluginTask::class.java } }
おわりに
estie では大量の不動産データを集め、商業用不動産のビジネスを加速していきます。
データを作りたい方はぜひご入社ください↓
データを活用したアプリを開発したい方もぜひご入社ください↓
全部まとめて面倒見たらぁ!という方もぜひご入社ください↓
よろしくお願いします。