Python経由でSnowflakeのデータ操作をやってみた

こんにちは。estieでソフトウェアエンジニアをしている万代です。この記事が出ている頃には絶賛育休中の予定です。よろしくお願いします。

estieではデータマネジメントグループに所属しており、主に不動産データ基盤の開発や運用、保守を担当しています。弊社はRustの会社と認識していただいている方も多いと思うのですが、estieではRust以外にもさまざまなプログラミング言語を利用しており、データマネジメントグループではデータ関連の処理で主にPythonを利用しています。

estieは創業以来不動産データに注力しており、最初期から不動産データのパイプラインシステムの開発に力を入れていました。現在のところインフラはオーソドックスなMySQLを利用した構成となっており、データの加工などはPythonとSQLAlchemyというORマッパーライブラリを利用しています。

さて、いままではMySQLを中心とした基盤を利用していたのですが、弊社で掲げているWhole Product構想を進めるにあたり、より全社で横断的に利用できる大規模な分析基盤システムを導入しよう、という意見がでてきました。そのためデータ基盤の移行を決意し、MySQLベースの基盤からSnowflakeへの移行を進めています。Snowflakeでは、柔軟かつ大規模にデータ分析を行え、さまざまなプロダクトから生成されるデータの連携を、よりスムーズに行えると期待しています。

この記事ではデータ分析基盤としてSnowflakeを導入する際に実際にやってみたこと、つまずいたこと、そしてその解決策を紹介します。

やってみた

普段Pythonを使っているとPythonからSnowflakeを操作したくなりますよね?そうすると、公式のSnowflake connectorが第一の候補として挙げられます。

Python用Snowflakeコネクタ | Snowflake Documentation

まず pip などでライブラリをインストールしましょう。

$ pip install sqlalchemy snowflake-sqlalchemy

Snowflakeにはすでにテーブル askings があると仮定します。

create or replace table askings(
    id integer,
    rent integer,
    building_name varchar(255),
    area float,
    floor integer
);

insert into askings (rent, building_name, area, floor) values
    (5000, 'fooビル', 23, 1),
    (15000, 'barビル', 42, 2),
    (20000, 'bazビル', 12, 4),
    (30000, 'hogeビル', 127.3, -1),
    (40000, 'piyoビル', 20, 3)
    ;

そして以下のようなPythonコードを実行すればテーブル askings をSELECTできます。

import snowflake.connector

# "user"などは適したものに変えてください
conn = snowflake.connector.connect(
    user="user",
    password="password",
    account="account",
    warehouse="warehouse",
    database="database",
    schema="schema",
    role="role",
)

with conn:
    cursor = conn.cursor()
    # 賃料が10000円以上のものを取得
    rows = cursor.execute("SELECT * FROM askings WHERE rent >= 10000")
    print("(賃料, ビル名, 面積, 階数)")
    for r in rows:
        print(r)

もちろんSELECTだけでなく、UPDATEやINSERTなどその他のSQL文も利用できます。簡単ですね!

結構つらい

Snowflake connectorを利用して、直接SQLを記述しデータを操作できました!しかしながら以下のようなつらさがあります。

  • 記述量が増える
  • 文字列の扱いなどがつらい(SQLインジェクションなど)
  • 単なる文字列なので文法エラーがあっても知る術がない

社外から利用されるサービスではないのでSQLインジェクションなどは考慮しなくてもよいかもしれませんが、文字列の中にクオートがあったり改行コードがあったりした場合に厄介な問題になります。なのでこの辺りの処理はできるだけOSSのエコシステムにのって処理を任せたいところです。

MySQLのときと同様、SQLAlchemyを利用できれば上記のような問題はある程度解決されますし、すでに利用しているライブラリなので移行のコストも抑えられます。幸いSQLAlchemyは多くのデータベースソフトウェアをサポートしており、Snowflakeもサポートされています。

Snowflake SQLAlchemy ツールキットおよびPythonコネクターの使用 | Snowflake Documentation

SQLAlchemyからSnowflakeを扱うためには、まずSQLAlchemyのモデルを定義します。

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, String, Integer, Float, Sequence

SnowflakeModelBase = declarative_base()


class Asking(SnowflakeModelBase):
    __tablename__ = "askings"
    id = Column(Integer, Sequence(sqlalchemy.text("asking_id_seq")), primary_key=True)
    rent = Column(Integer, nullable=True, comment="賃料")
    building_name = Column(String, nullable=True, comment="ビル名")
    area = Column(Float, comment="面積[坪]")
    floor = Column(Integer, comment="階数")

SnowflakeModelBase という基底クラスを定義し、それを継承する形で Asking というモデルを定義します。この Asking というクラスのインスタンスがテーブルの行一つに対応します。


ここで一つ注意点ですが、SQLAlchemyではprimary_keyがないテーブルをサポートしていません(ORM Configuration — SQLAlchemy 2.0 Documentation)。 ですのでモデル上にprimary_keyを指定してあげる必要があるのですが、Snowflakeではprimary key制約が強制されません(制約の概要 | Snowflake Documentation)。そのため、Snowflakeでは一意性を別の仕組みで保つ必要があります。この記事ではSnowflakeのSEQUENCEという仕組みを使ってMySQLなどでいうAUTO INCREMENTなIDを真似しています。そのため idColumn のなかの Sequence でSnowflakeのSEQUENCEである asking_id_seq を呼び出して、Snowflakeのテーブルの定義も以下のようにちょっと変えます。

create or replace sequence asking_id_seq;

create or replace table askings(
    id integer default asking_id_seq.nextval,
    rent integer,
    building_name varchar(255),
    area float,
    floor integer
);

詳しくは以下のページなどを参考にしてください。


モデルが定義できたらSnowflakeへ接続するために enginesession を作成します。その後はPythonのオブジェクトを操作することによりSQLを表現できます。

from snowflake.sqlalchemy import URL    
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker


# "user"などは適したものに変えてください
engine = create_engine(URL(
    user="user",
    password="password",
    account="account",
    warehouse="warehouse",
    database="database",
    schema="schema",
    role="role",
))

session = sessionmaker(autocommit=False, autoflush=False, bind=engine)()

print("SELECT 賃料10000円以上の募集")
askings = session.query(Asking).filter(Asking.rent >= 10000).all()
for asking in askings:
    print(asking.rent, asking.building_name, asking.area, asking.floor)
    
print("-----")

print("INSERT fugaビル")
new_asking = Asking(rent=10000, building_name="fugaビル", area=100, floor=1)
session.add(new_asking)
session.commit()

print("-----")

print("UPDATE piyoビル")
asking = session.query(Asking).filter(Asking.id == 5).first()
asking.rent = 35000
session.commit()

print("-----")

print("SELECT *")
askings = session.query(Asking).all()
for asking in askings:
    print(asking.rent, asking.building_name, asking.area, asking.floor)
    
session.close()

実行した結果は以下になります

SELECT 賃料10000円以上の募集
15000 barビル 42.0 2
20000 bazビル 12.0 4
30000 hogeビル 127.3 -1
40000 piyoビル 20.0 3
-----
INSERT fugaビル
-----
UPDATE piyoビル
-----
SELECT *
5000 fooビル 23.0 1
15000 barビル 42.0 2
20000 bazビル 12.0 4
30000 hogeビル 127.3 -1
35000 piyoビル 20.0 3
10000 fugaビル 100.0 1

MySQLでやっていた処理をSnowflakeに対しても実現できました!MySQLのときとほとんど同じように書けます。

大量のレコードを挿入・更新・削除する

先ほどから例として挙げている askings テーブルは弊社でよく使われるテーブルで、賃貸オフィス区画の募集を保持しているテーブルです(実際に運用しているテーブルとは異なる点も多いですが)。弊社では50以上の情報ソースから日々データを取得し更新しているため、このテーブルに対して多くの挿入・更新処理がかかります。

挿入処理

上記のコードのようにSnowflakeへINSERT処理を一件一件 commit を行いながら愚直に行うとパフォーマンス的に問題が生じます。MySQLのようなRDBMSとは違い、Snowflakeはデータ分析基盤なので行単位での挿入処理や更新処理は得意ではありません。簡単な計測ではありますが、上記のコードを実行した場合1件挿入するためにおよそ0.5秒程度かかっていました。1000件あれば500秒もかかってしまいます。

これを解決するためには挿入処理を一度にまとめて行うバルクインサートをすればよいですね。
SQLAlchemyの場合 Session.execute 関数でまとめて挿入することができます(以前は bulk_insert_mappingsという関数がありましたが非推奨になっています)。

import random

new_askings = [
    {
        "id": i,
        "rent": random.randint(10000, 100000),
        "building_name": "randomビル", 
        "area": random.randint(10, 2000),
        "floor": random.randint(-3, 100),
    } 
    for i in range(1000)
]

session = sessionmaker(autocommit=False, autoflush=False, bind=engine)()
session.execute(sqlalchemy.insert(Asking), new_askings)
session.commit()
session.close

バルクインサートの場合全てのレコードを挿入するのにかかった時間は1件挿入するのと同じおよそ 0.5秒でした。1000倍高速化することができましたね!

更新処理

前述の通り現在私たちはMySQLで構築された基盤をSnowflakeに移行しようとしています。このときネックになるのがデータの更新処理です。Snowflakeのようなデータ分析基盤はデータの集計は得意でも更新は非常に遅い処理になってしまいます。これはSnowflake内部のデータの持ち方に起因するので使い方を工夫しなければなりません(そもそもSnowflakeでデータの更新を行うな、という話ではあるのですが現状のデータ基盤がそのような構成になっているので一旦大きく構成を変えずに実験としてやってみています)。

更新が遅いならどうすればいいのでしょうか?簡単ですね、更新処理をしなければよいですね!

Snowflakeはデータの挿入と削除はまとめて行えばまあまあ速いので、更新処理を

  • 更新したい値を持つ行を削除
  • 更新したい値にした行を挿入

に分解すれば理論的には速いはずです。やってみましょう。

session = sessionmaker(autocommit=False, autoflush=False, bind=engine)()

askings_to_be_updated = session.query(Asking).filter(Asking.rent >= 10000, Asking.rent <= 20000).all()

# 賃料が 10000円以上20000円以下のものを削除
session.execute(
    sqlalchemy.delete(Asking).where(Asking.id.in_([a.id for a in askings_to_be_updated])),
    execution_options={"synchronize_session": "fetch"},
)
# さきほど削除したものを、賃料を倍にして再度挿入
session.execute(
    sqlalchemy.insert(Asking),
    [{"id": a.id, rent": a.rent, "floor": a.floor, "area": a.area * 2, "building_name": a.building_name} for a in askings_to_be_updated],
)
session.commit()
session.close()

このようなコードの場合、DELETE文とNSERT文をそれぞれ0.5秒で実行できました。

参考までに一件一件UPDATEするようなコードも載せておきます。この場合も先ほどと同様にUPDATE文一つにつき0.5秒程度かかりました。

session = sessionmaker(autocommit=False, autoflush=False, bind=engine)()
askings_to_be_updated = session.query(Asking).filter(Asking.rent >= 10000, Asking.rent <= 20000).all()

for a in askings_to_be_updated:
    a.rent *= 2

session.commit()
session.close

終わりに

MySQLと同様にPythonからSnowflakeのデータを操作する方法としてSQLAlchemyを利用してみました。RDBMSと同じノリで利用するとパフォーマンス的にハマってしまうのですが、Snowflakeの特性を理解することで問題を回避できました。

Snowflakeは非常に強力な基盤で、最近弊社での利用シーンが広がっており、ソフトウェアエンジニアだけでなくビジネスサイドのメンバーなどもSnowflakeでクエリを書いて分析をしています。全社的に同じデータ分析基盤を利用することでさまざまなチームが生成したデータを横断的に分析可能になり、ビジネス的にもインパクトのあるツールだと感じます。

estieはデータを非常に大事にしており、データ分析基盤の構築やそれに関連するソフトウェアの開発など、まだまだやらなければならないことは山ほどあります!データ分析基盤を利用したソフトウェア開発や、データを活かして「産業の真価を、さらに拓く」ことに興味のある方、ぜひ一緒に働きましょう!

hrmos.co

hrmos.co

© 2019- estie, inc.