2016年1月10日

PL/Pythonで独自の集約関数を作成する

先月、PostgreSQLアンカンファレンスで「PL/Pythonで独自の集約関数を作ってみる」という発表をしました。

本エントリでは、その詳細について紹介させていただきます。

■なぜPL/Pythonで集約関数なのか?


既に広く知られている通り、PostgreSQLではさまざまなプログラミング言語でプロシージャ/ファンクションをユーザが定義することができます。

では、なぜPL/Pythonで集約関数なのでしょうか?

まず1つ目の理由としては、Pythonとデータ処理が非常に相性が良い、ということが挙げられます。古くから統計解析でRが使われてエコシステムが発展してきたのと同じように、最近ではPythonを取り巻くデータ分析に関するエコシステムが大きくなり、データ分析に関するさまざまな処理をPythonで行えるようになってきています。

2つ目の理由としては、データ処理と集約関数というのは切っても切れない関係にあるということです。シンプルな COUNT() や AVG() から、最近ではウィンドウ関数まで、集約関数にはさまざまな処理があり、データベースにおけるデータ分析に集約関数は欠かせません。

そして、最後の理由としては、集約関数は「インデータベース(In-Database)処理」である、ということです。「インデータベース処理」というのは、データベースからデータを取り出してクライアントサイドで処理させるのではなく、データベースの内部でデータ処理をさせることを言います。こうすることによって、データの転送にかかる時間を削減したり、よりスペックの高いサーバ側で大量の処理をさせることが可能になります。

これらの理由から、これからPL/Pythonを使って独自の集約関数を作成できる意義はより高まっていくでしょう。

■PostgreSQLにおける集約関数の構造


次に、PostgreSQLの集約関数の構造を見てみましょう。

以下は、基本的なPostgreSQLの集約関数の構造および動作です。


集約関数には、リレーションから1レコード分ずつ入力があります。この入力を受け取って内部の状態を作成・更新する処理を行うのが、sfuncと書かれている関数です。このsfuncは、データを受け取るたびに逐次処理していきます。

sfunc関数が作成・更新するのがstateで、このstateのデータ型がstate_data_typeです。このstateが、集約関数の処理途中の状態を保持しています。

そして、集約関数がすべてのレコードを受け取り処理を終えた上で、最終的な「集約関数の結果」を返す関数が ffunc です。

このように、関数やデータ型を組み合わせてデータを逐次受け取りながら内部状態を更新していき、最終的に返すべき値を算出する、というのがPostgreSQLの集約関数のしくみです。

そのため、CREATE AGGREGATION FUNCTIONコマンドでは、これらの関数やデータ型を指定して集約関数を定義することになります。

■PL/Pythonの集約関数の作り方


次に、PL/Pythonでの集約関数の作り方を見てみましょう。

先に出てきたsfuncやffuncの作成方法はPL/Pythonで一般的なUDFを作成する場合と特に変わりません。

PL/Pythonからデータベースへアクセスする場合には、plpyモジュールをimportしてアクセス用の関数を使います。これはC言語でUDFを作成するときのSPI関数に似た仕組みです。詳細はマニュアルを参照してください。
注意すべきところとしては、
  • PL/PythonからリンクされているPythonのバージョン
  • データ、およびデータ型の相互マッピング
  • 文字列のエンコーディング
  • state変数を更新する場合には global 宣言が必要。
  • PostgreSQLのsuperuserしか使えない(PL/PythonがuntrustedなPL言語なので)
あたりでしょうか。

PL/PythonでのUDFの作り方そのものについては以前のエントリでご紹介しましたので、そちらをご覧ください。

■min(), max(), avg()を実装してみる


それでは、基本的な集約関数である min(), max(), avg() をPL/Pythonで実装してみましょう。この実装を通じて、そもそもどのように集約関数の実装への理解が深まるかと思います。

まず、min() と同等の動作をする pymin() という集約関数を作成してみます。

以下が、pymin() の全体のコードです。
--
-- min()
--
-- sがstate変数、nが入力値、最後の計算は不要なのでffuncは無し
--
CREATE FUNCTION float8_pymin(s float8, n float8)
  RETURNS float8
AS $$
    global s   # 内部状態

    # 入力値がNULLでない場合のみ処理する。
    # 内部状態がNULLだったら入力値で内部状態を初期化、
    # 内部状態が存在している場合には入力値と比較して
    # 入力値の方が小さかったら内部状態を更新する。
    if n is not None:
        if s is None or n < s:
            s = n
    return s
$$ LANGUAGE plpython2u;

-- 集約関数の定義
CREATE AGGREGATE pymin (float8)
(
    sfunc = float8_pymin,      -- 内部状態を更新する関数
    stype = float8             -- 内部状態のデータ型
);
pymin() という集約関数は、最終的には受け取った値の最小値を返却します。内部状態 state は「それまで受け取った最小値のみ」を保持していればいいことになります。

それでは、この集約関数を使って動作確認してみます。

まず、あるカラムにランダムな値を持つテーブルを作成します。
testdb=> select setseed(0.5);
 setseed
---------

(1 row)

testdb=> create table t1 as select generate_series(1,10) as i,(random()*10)::int as v;
SELECT 10
testdb=> select * from t1;
 i  | v
----+---
  1 | 8
  2 | 5
  3 | 1
  4 | 6
  5 | 0
  6 | 9
  7 | 5
  8 | 6
  9 | 3
 10 | 4
(10 rows)

testdb=>
次に、このカラムに対して、min()関数と上記で作成したpymin()関数を実行して、結果を見てみます。
testdb=> select min(v),pymin(v) from t1;
 min | pymin
-----+-------
   0 |     0
(1 row)

testdb=>
上記の通り、きちんと最小値を取得できていることが分かります。

次に、平均値を求める avg() をPL/Pythonで実装してみます。pyavg() のコードは以下の通りです。
--
-- avg()
--

-- 内部状態を更新する
CREATE FUNCTION float8_pyavg(s float8[], n float8)
  RETURNS float8[]
AS $$
    global s

    if n is not None:
        if s is None:
            # sum,count
            s = [0,0]
        s[0] = s[0] + n
        s[1] = s[1] + 1
    return s
$$ LANGUAGE plpython2u;

-- 最終的な結果を計算、出力する
CREATE FUNCTION float8_pyavg_final(s float8[])
  RETURNS float8
AS $$
    global s

    if s is not None:
        return s[0]/s[1]
    return None
$$ LANGUAGE plpython2u;

-- 集約関数の定義
CREATE AGGREGATE pyavg (float8)
(
    sfunc = float8_pyavg,
    stype = float8[],
    finalfunc = float8_pyavg_final
);
平均値を計算する pyavg() では、最小値を取得する pymin() と比べて以下の点に違いがあります。
  • 最終的な計算をする必要がある(入力値の総和÷入力件数)。
  • そのため、ffunc を作成する必要がある(float8_pyavg_final)。
  • と同時に、内部状態 state には「それまでの入力値の総和と件数」を保持する必要がある。
  • そのため、内部状態 state は2要素からなる float8 の配列として定義する(stype = float8[])。
この pyavg() を先ほどのテスト用のテーブル t1 で実行してみます。
testdb=> select avg(v),pyavg(v) from t1;
        avg         | pyavg
--------------------+-------
 4.7000000000000000 |   4.7
(1 row)

testdb=>
このように、通常の avg() 関数と同じようにきちんと平均値を取得できていることが分かります。

ここで紹介した pymin(), pyavg() (と紹介していない pymax())のコードは以下にありますので、興味のある方はご覧ください。

■単回帰分析を行う集約関数を実装してみる


最後に、ここまでの内容を応用して、単回帰分析を行う集約関数を実装してみます。
  • 単回帰分析のモデルは y = w0 + w1 * x
  • データを二種類用意する(訓練用、評価用)
  • 訓練用データのテーブルから y と x を拾う(w0 と w1 を集約関数で計算して出力する)
  • 別のテーブルの評価用データの x から y を予測する
  • 結果および誤差を確認する
というものを想定します。

サンプルデータは、CourseraのMachine Learningのコースで使っている住宅のスペックと価格に関するデータを使います。
訓練用データにはさまざまな変数が含まれていますが、ここでは居住スペースの広さ(sqft_living)と価格(price)のみを使います。
snaga=# \d kc_house_train_data
      Table "public.kc_house_train_data"
    Column     |       Type       | Modifiers
---------------+------------------+-----------
 id            | text             |
 date          | text             |
 price         | double precision |
 bedrooms      | double precision |
 bathrooms     | double precision |
 sqft_living   | double precision |
 sqft_lot      | integer          |
 floors        | text             |
 waterfront    | integer          |
 view          | integer          |
 condition     | integer          |
 grade         | integer          |
 sqft_above    | integer          |
 sqft_basement | integer          |
 yr_built      | integer          |
 yr_renovated  | integer          |
 zipcode       | text             |
 lat           | double precision |
 long          | double precision |
 sqft_living15 | double precision |
 sqft_lot15    | double precision |
Inherits: kc_house_data

snaga=# select price,sqft_living from kc_house_train_data limit 5;
 price  | sqft_living
--------+-------------
 221900 |        1180
 538000 |        2570
 180000 |         770
 604000 |        1960
 510000 |        1680
(5 rows)

snaga=# select count(*) from kc_house_train_data;
 count
-------
 17384
(1 row)

snaga=#

上記の通り、訓練用データは 17,384 件あります。

次に、単回帰分析を行う集約関数 simple_linear_regression() を実装します。以下のレポジトリの simple_linear_regression_1.sql に作成されています。
この関数を使って、先ほどの 17,384 件の訓練用データを流すと、単回帰の w0 と w1 が取得できます。
snaga=# select simple_linear_regression(price,sqft_living) from kc_house_train_data;
    simple_linear_regression
--------------------------------
 {-47116.0791388,281.958839662}
(1 row)

snaga=#
w0(切片)は約 -47116.08、w1(傾き)は約 281.96 となりました。

では最後に、ここで求めた w0, w1 を使って、居住スペースから住宅価格を予測してみます。予測には訓練用データとは別のテスト用データを使います。
snaga=# select sqft_living, round(-47116.08 + 281.96 * sqft_living) as price_predicted, price
  from kc_house_test_data ;
 sqft_living | price_predicted |  price
-------------+-----------------+---------
        1430 |          356087 |  310000
        2950 |          784666 |  650000
        1710 |          435036 |  233000
        2320 |          607031 |  580500
        1090 |          260220 |  535000
(...)
        4170 |         1128657 | 1088000
        2500 |          657784 |  350000
        2520 |          663423 |  610685
        2310 |          604212 |  400000
        1020 |          240483 |  402101
(4229 rows)

snaga=#
上記のように、sqft_livingから価格を予測することができるようになりました。(もちろん、実際の住宅価格はさまざまな要素によって決まりますので、居住スペースだけを使った単回帰ではピッタリは当たりません)

■まとめ


以上、PostgreSQLでの集約関数の作り方とPL/Pythonでの実装方法、そしてその応用方法をご紹介してきました。

最初に書いた通り、データベースでデータ分析・集計をしようとすると集約関数は外せません。パフォーマンスの観点からも非常に重要です。

PostgreSQLの強みのひとつは拡張性の高さにあります。集約関数も例外ではありません。

データ分析に興味のある方は、ぜひこれを機会に集約関数の開発にもトライしてみていただければと思います。

では、また。

0 件のコメント:

コメントを投稿