機械学習パイプラインにおける共通特徴量ストア(Feature Store)の構築

Feature Store自作で解剖するMLOps:PythonとRedisで実装する「学習・推論の歪み」解消アーキテクチャ

約10分で読めます
文字サイズ:
Feature Store自作で解剖するMLOps:PythonとRedisで実装する「学習・推論の歪み」解消アーキテクチャ
目次

この記事の要点

  • 特徴量の一元管理と共有による効率化
  • 学習・推論間のデータ一貫性の確保
  • 特徴量エンジニアリングの生産性向上

「Feature Store(特徴量ストア)を導入すべきか、時期尚早か?」

AI導入の現場において、技術責任者の方々から頻繁に寄せられる疑問の一つです。Feastなどのオープンソースソフトウェア(OSS)や、Tecton、Vertex AI Feature Storeといったマネージドサービスは非常に魅力的ですが、導入や学習にかかるコストは決して低くありません。

導入がうまくいかないケースの多くは、機能が不足しているからではなく、「なぜその機能が必要なのか」というシステム全体の構造に対する理解不足に起因しています。仕組みをブラックボックスのまま導入してしまうと、問題発生時の対応や最適化が難しくなります。

本記事では、既存のツールを使わず、PythonとRedisを用いて最小構成のFeature Storeを構築する手順を解説します。

実装を通じて、機械学習モデルの運用における課題や複雑さを具体的に理解することができます。この理解は、自社に最適なアーキテクチャを選択し、ビジネス価値を最大化するための重要な判断材料となります。

それでは、Feature Storeの内部構造について詳しく見ていきましょう。

Feature Storeの本質的役割と「最小構成」の定義

まず、全体像を整理します。Feature Storeは単なるデータベースではなく、学習環境(オフライン)と推論環境(オンライン)という異なる要件を持つ2つのシステムを橋渡しし、データを同期する役割を担います。

Training-Serving Skew(学習・推論の歪み)とは

機械学習モデルの予測精度が、本番環境で想定通りに出ない原因の多くは「学習・推論の歪み(Training-Serving Skew)」にあります。

例えば、データ分析担当者が手元の環境で作成したデータ加工のルール(例:過去30日間の購入平均額の算出など)を、システム開発担当者が本番環境向けに別のプログラミング言語で書き直す際、微妙な計算式の違いや、参照するデータのタイミングにズレが生じることがあります。

Feature Storeは、このような「処理の重複」や「データの不整合」を防ぐための仕組みです。

今回構築するデュアル・データベース・アーキテクチャ

最小構成のFeature Storeには、2つのデータ保存場所が必要です。

  1. オフラインストア(Offline Store):

    • 役割: 過去の全履歴データを保持し、学習用データセットを作成します。
    • 要件: 大量のデータを効率よく処理できることが求められます。
    • 今回の実装: PostgreSQL(実際の業務ではクラウド上のデータウェアハウスなどが一般的です)
  2. オンラインストア(Online Store):

    • 役割: 現在の最新の特徴量のみを保持し、推論システムに即座に値を返します。
    • 要件: 応答速度の速さ(低遅延)と、システムが止まらないこと(高可用性)が求められます。
    • 今回の実装: Redis
      • 技術選定の補足: Redisは応答速度の速いデータストアとして標準的ですが、近年のライセンス変更や技術の進化に伴い、完全オープンソースの代替ソフトウェアや、クラウドベンダーの互換サービスも有力な選択肢となっています。本記事ではRedisを使用しますが、実際の運用では要件に合わせて最新の技術を比較検討することをお勧めします。

これらに対し、データ加工のルールから一貫してデータを供給する一連の流れ(パイプライン)がFeature Storeの本体となります。

環境セットアップ:Dockerによるインフラ構築

Feature Storeの本質的役割と「最小構成」の定義 - Section Image

手元の環境にインフラを構築します。Docker Composeを使用することで、必要なシステム構成を容易に再現できます。

docker-compose.ymlの記述例

最新のDocker環境では、コンテナの構成管理やセキュリティ機能が強化されています。以下は最小構成となるRedisとPostgreSQLの設定例です。

version: '3.8'  # プロジェクトの要件に合わせて適切なバージョンを指定

services:
  # オンラインストア用: 高速なKVS
  redis:
    image: redis:alpine
    ports:
      - "6379:6379"

  # オフラインストア用: リレーショナルDB
  postgres:
    image: postgres:15  # 本番環境と整合性のある安定版を推奨
    environment:
      POSTGRES_USER: user
      POSTGRES_PASSWORD: password
      POSTGRES_DB: feature_store
    ports:
      - "5432:5432"

  # 実行環境(JupyterLabなどを含む)
  lab:
    image: jupyter/scipy-notebook
    ports:
      - "8888:8888"
    volumes:
      - ./work:/home/jovyan/work
    environment:
      - JUPYTER_ENABLE_LAB=yes

運用の信頼性を高めるためのポイント
Docker Compose V2以降では、docker compose up --dry-run コマンドを使用することで、実際の作成前に構成や変更内容をシミュレーションできます。複雑なインフラ構成を試す際は、この機能で設定ミスを事前に防ぐことをお勧めします。また、最新の環境ではソフトウェアの構成要素を明確にする機能もサポートされており、セキュリティの観点からも最新のツールを利用するメリットは大きいです。

Pythonプロジェクトの依存関係

Feature Storeの中核となる処理の実装に必要なライブラリです。データ操作のpandas、Redis操作のredis-py、データベース操作のsqlalchemyを使用します。

# requirements.txt
# 各ライブラリは互換性を確認の上、最新の安定版を使用してください
pandas>=2.0.0
redis>=5.0.0
sqlalchemy>=2.0.0
psycopg2-binary>=2.9.0

これらをインストールし、コンテナを起動すれば準備は完了です。より高度なシステムも、基本原理はこの構成の延長線上にあります。まずは小規模な構成から始め、保守性を考慮しながら必要に応じて拡張していくことが、安定したAI運用基盤を構築するための現実的なアプローチです。

実装フェーズ1:特徴量登録(Ingestion)パイプライン

生データから特徴量を計算し、オフラインストアとオンラインストアの両方に格納するプロセスを実装します。

バッチ処理による特徴量計算ロジック

ユーザーごとの「過去7日間の購入回数」という特徴量を計算する例です。

import pandas as pd
from datetime import datetime

# サンプルデータ(ログ)
data = {
    'user_id': [101, 102, 101, 103],
    'amount': [5000, 3000, 2000, 10000],
    'timestamp': [
        pd.Timestamp('2023-10-01 10:00:00'),
        pd.Timestamp('2023-10-01 11:00:00'),
        pd.Timestamp('2023-10-02 09:00:00'),
        pd.Timestamp('2023-10-02 12:00:00')
    ]
}
df_logs = pd.DataFrame(data)

# 特徴量計算: ユーザーごとの購入回数と合計金額
features = df_logs.groupby('user_id').agg(
    purchase_count=('amount', 'count'),
    total_amount=('amount', 'sum'),
    last_updated=('timestamp', 'max') # 重要: イベント発生時刻を保持
).reset_index()

print(features)

オフラインストアへの履歴データ保存

オフラインストア(PostgreSQL)には、計算された特徴量の履歴すべてを保存します。これにより「1ヶ月前の時点での特徴量」を取り出すことが可能になります。

from sqlalchemy import create_engine

# PostgreSQLへの接続
engine = create_engine('postgresql://user:password@localhost:5432/feature_store')

# テーブルに追記(append)モードで保存
features.to_sql('user_features_history', engine, if_exists='append', index=False)

オンラインストアへの最新値同期(Materialization)

オンラインストア(Redis)には、推論に必要な最新の値だけを保持し、古い値は上書きします。この処理を「具体化(Materialization)」と呼びます。

Redisのデータ保存のキーは 対象の種類:対象のID の形式が一般的です。

import redis
import json

# Redisへの接続
r = redis.Redis(host='localhost', port=6379, db=0)

def materialize_features(df):
    pipeline = r.pipeline() # パイプライン化して高速化
    for _, row in df.iterrows():
        # キー: user:{user_id}
        key = f"user:{row['user_id']}"
        
        # 値: 特徴量の辞書(JSON形式などで保存)
        # RedisのHash型を使うとさらに効率的ですが、ここではシンプルに文字列で
        value = {
            "purchase_count": row['purchase_count'],
            "total_amount": row['total_amount'],
            "last_updated": str(row['last_updated'])
        }
        pipeline.set(key, json.dumps(value))
    
    pipeline.execute()

materialize_features(features)

これにより、Redis上には常に各ユーザーの最新状態が保持され、推論システムはキーを指定するだけで瞬時に特徴量を取得できるようになります。

実装フェーズ2:推論時の低遅延アクセス(Serving)

実装フェーズ1:特徴量登録(Ingestion)パイプライン - Section Image

Feature Storeを導入する大きな利点の一つは、推論時の応答速度(レイテンシ)を向上させることです。リクエストのたびにデータベースで集計処理を行うことを避けることで、リアルタイムな予測が可能になります。

推論APIからの特徴量取得クラスの実装

推論システムから呼び出されるクラスを実装します。Redisの複数データ一括取得機能を活用し、効率的な処理を考慮します。

class OnlineFeatureStore:
    def __init__(self, redis_client):
        self.redis = redis_client

    def get_online_features(self, user_ids: list, feature_names: list):
        """
        指定されたユーザーIDリストの特徴量を取得する
        """
        keys = [f"user:{uid}" for uid in user_ids]
        
        # Redisから一括取得(ネットワークラウンドトリップを最小化)
        raw_values = self.redis.mget(keys)
        
        results = []
        for uid, val in zip(user_ids, raw_values):
            if val is None:
                # 欠損値ハンドリング: デフォルト値を返すか、エラーにするか方針を決める
                results.append(self._get_default_features())
            else:
                data = json.loads(val)
                # 必要な特徴量のみを抽出
                filtered = {k: data.get(k) for k in feature_names}
                results.append(filtered)
        
        return results

    def _get_default_features(self):
        return {"purchase_count": 0, "total_amount": 0.0}

# 使用例
store = OnlineFeatureStore(r)
features_for_inference = store.get_online_features([101, 103], ["purchase_count"])
print(features_for_inference)
# 出力: [{'purchase_count': 2}, {'purchase_count': 1}]

この設計により、データ加工のルールが複雑であっても、推論時にはRedisから直接値を取得するだけのシンプルな処理となり、アクセスが集中するAIサービスでも安定して稼働させることができます。

実装フェーズ3:学習データ作成とPoint-in-Time Join

ここからがFeature Storeの重要な役割であり、独自に構築する際の難所でもあります。

学習用のデータを作成する際、ユーザー情報とイベント履歴を単純に結合してしまうと、予測時点では知り得ない「未来の情報」が混入してしまう(データリーク)恐れがあります。

過去の特定時点の特徴量を再現する難しさ

例えば、「10月1日」の行動を予測するモデルを構築する場合、入力データには「10月1日時点での状態」を使用する必要があります。
誤って「現在の最新の状態(例えば10月30日時点)」を含めて学習してしまうと、未来の情報を知った状態で学習することになり、実際の運用では精度が出ないモデルとなってしまいます。

これを防ぐための仕組みが、Point-in-Time Correctness(時点整合性)の保証です。

ASOF JOINを用いた時系列結合の実装

Pythonの pandas には、これを実現する強力な機能 merge_asof があります。データベースの言語(SQL)では複雑な記述が必要ですが、pandasであれば直感的に記述できます。

# 学習対象のラベルデータ(いつ、誰が、何をしたか)
labels = pd.DataFrame({
    'user_id': [101, 103],
    'event_timestamp': [pd.Timestamp('2023-10-01 12:00:00'), pd.Timestamp('2023-10-02 13:00:00')],
    'target_label': [1, 0] # クリックしたかどうかの正解ラベル
})

# オフラインストアから取得した特徴量履歴データ
# (実際はDBから読み込みますが、ここではDataFrameを使用)
history_features = pd.DataFrame({
    'user_id': [101, 101, 103],
    'timestamp': [
        pd.Timestamp('2023-10-01 10:00:00'), # 1回目の更新
        pd.Timestamp('2023-10-02 09:00:00'), # 2回目の更新
        pd.Timestamp('2023-10-02 12:00:00')
    ],
    'purchase_count': [1, 2, 1]
})

# 両方のデータをタイムスタンプでソートしておく必要があります
labels = labels.sort_values('event_timestamp')
history_features = history_features.sort_values('timestamp')

# Point-in-Time Joinの実行
training_data = pd.merge_asof(
    labels,
    history_features,
    left_on='event_timestamp',
    right_on='timestamp',
    by='user_id',
    direction='backward' # 過去の方向で最も近い時点を探す
)

print(training_data[['user_id', 'event_timestamp', 'timestamp', 'purchase_count', 'target_label']])

コードの解説

merge_asof の設定により、「イベント発生時刻以前で、最も近い更新時刻の特徴量」を探して結合します。

これにより、「そのイベントが起きた瞬間にFeature Storeに入っていたはずの値」を正確に再現できます。これがFeature Storeが提供する最大の価値の一つです。

自作の限界とツール選定の判断基準

PythonとRedisを用いて中核となる機能を実装することで、Feature Storeの基本的な仕組みをご理解いただけたかと思います。しかし、この構成を実際の業務で長期的に運用しようとすると、システムの観点から新たな課題が見えてきます。

自作実装が破綻するスケールポイント

システム全体を俯瞰すると、主に以下の3つのポイントで独自構築の限界が見えてきます。

  1. 特徴量の定義管理: 特徴量の種類が増加するにつれ、「どのプログラムで計算されているか」「誰が作成・更新したか」という情報の管理が極めて困難になります。
  2. データの追跡と監視の複雑化: データの傾向変化の検知や、処理の流れ全体の追跡を独自のプログラムのみで実装・維持することは、開発リソースの観点から現実的ではありません。
  3. リアルタイム処理の統合: リアルタイムな特徴量計算を組み込もうとすると、システムの複雑度が大きく増大し、運用保守の負荷が高まります。

Feastや商用SaaSへ移行すべき兆候

独自構築を通じて得た内部構造の知識をベースに、以下の基準に該当し始めたタイミングで既存ツールの導入を検討することをお勧めします。

  • データ分析担当者が複数人規模になった: 共通の定義ファイルを用いた一元的な特徴量管理が不可欠になります。これは、専用のOSSが最も得意とする領域です。
  • 高度な管理体制が求められる: 厳格な監査証跡やアクセス制御が必要な業務において、独自構築での対応はリスクが伴います。商用サービスを利用することで、組織全体のツール管理やセキュリティ機能が強化され、安全な運用体制を構築できます。
  • 最新のAI技術との統合: 検索拡張生成(RAG)などの新しい技術の活用が視野に入る場合、データの扱いやオンライン推論との連携が鍵となります。最新のクラウドサービスでは、データベースから直接モデルを呼び出して予測を行うことが可能になっており、高負荷な処理にも対応できるため、システムの拡張をスムーズに実現できます。
  • インフラ運用の負荷増大: データベースの拡張管理や性能調整に開発リソースが割かれすぎている場合、フルマネージドサービスへの移行を検討する適切なタイミングと言えます。

まとめ:アーキテクチャを理解してこそのツール活用

出力: [{'purchase_count': 2}, {'purchase_count': 1}] - Section Image 3

Feature Storeは決して魔法の箱ではありません。本質的には、「学習環境と推論環境のデータ同期」および「時点整合性を担保したデータの結合」を確実に行うための仕組みの集合体です。

独自構築を通じてこの内部構造を深く理解していれば、流行の言葉に流されることなく、自社の状況や課題に最適なシステムを客観的に評価・設計できます。
現在、AI開発の現場では、データベースと生成AIの統合といった新しい潮流が次々と生まれています。まずは小規模な構成で本質的な課題を特定し、ビジネス要件の拡大や新たなAI技術の導入といった目的が明確になった段階で、適切なマネージドサービスやOSSへ移行する。これこそが、現場の課題に即した現実的で効果的なアプローチであると考えています。

Feature Store自作で解剖するMLOps:PythonとRedisで実装する「学習・推論の歪み」解消アーキテクチャ - Conclusion Image

参考文献

  1. https://qiita.com/cvusk/items/5f77a28daab79ffc0de3
  2. https://developers.google.com/youtube/v3/guides/auth/server-side-web-apps?hl=ja
  3. https://qiita.com/GeneLab_999/items/ea8fd9347749591bd479
  4. https://learn.microsoft.com/ja-jp/azure/app-service/reference-app-settings
  5. https://aws.amazon.com/blogs/database/implementing-search-on-amazon-dynamodb-data-using-zero-etl-integration-with-amazon-opensearch-service/
  6. https://www.schrankmonster.de/tag/development/
  7. https://help.tableau.com/current/offline/ja-jp/tableau_server_windows.pdf
  8. https://docs.contrastsecurity.jp/ja/java-agent-release-notes-and-archive.html

コメント

コメントは1週間で消えます
コメントを読み込み中...