AIエージェント間通信におけるプロンプト署名と自動検証システムの開発

AIエージェント通信を守る:Ed25519によるプロンプト署名と自動検証の実装ガイド

約9分で読めます
文字サイズ:
AIエージェント通信を守る:Ed25519によるプロンプト署名と自動検証の実装ガイド
目次

この記事の要点

  • AIエージェント間通信のセキュリティ強化
  • プロンプトの真正性と完全性を保証
  • 悪意ある改ざんやなりすましを防止

自律型AIエージェントを組み合わせた「マルチエージェントシステム」の構築現場で、最近よく耳にする悩みがあります。

「エージェントAからエージェントBに指示を出す際、そのプロンプトが途中で改ざんされていないと、どうやって保証すればいいのでしょうか?」

多くの開発現場では、ここで既存のAPIゲートウェイや高価なセキュリティソリューションを導入しようとする傾向があります。もちろん、それも一つの正解です。しかし、中身がブラックボックス化されたツールに依存することは、エンジニアリングの本質的な解決策と言えるでしょうか。

特に、AIが生成した機微な情報を扱う場合、通信の安全性はプロジェクトチーム自身の手でコントロール可能な状態にしておくべきです。

今回は、あえて商用ツールを使わず、Pythonと強力な署名アルゴリズム「Ed25519」を用いて、エージェント間通信の完全性を保証する署名・検証システムをフルスクラッチで実装する手法を解説します。コード1行1行の意味を理解し、堅牢なAI通信基盤を構築するための実践的なガイドラインとして活用してください。

設計思想

実装に入る前に、なぜAIエージェント間の通信にこれほど神経を使う必要があるのか、その背景と設計思想を整理します。

なぜAIエージェント間に署名が必要なのか

従来のマイクロサービス間通信であれば、mTLS(相互TLS)やIP制限で十分な場合も多いでしょう。しかし、AIエージェント特有のリスクとして「プロンプトインジェクション」と「ハルシネーションの連鎖」があります。

もし、悪意ある攻撃者がエージェント間の通信に割り込み、指示プロンプトの一部(例:「以下の制約を守ってください」という部分)を削除したり書き換えたりしたらどうなるでしょうか?受信側のエージェントは、制約が外れた状態で危険な出力を生成してしまうかもしれません。

また、システム内部であっても「送信元が本当に正当なエージェントAであるか」を常に疑うゼロトラストの姿勢が不可欠です。なりすましによる不正な指示実行を防ぐため、以下の2点を暗号技術で保証します。

  1. 送信元認証(Authentication): データが間違いなく許可された送信者から送られたものであること。
  2. 完全性(Integrity): データが転送中に1ビットたりとも改ざんされていないこと。

実装するアーキテクチャの全体像

今回構築を想定するのは、以下のフローを持つシステムです。

  1. Signer(署名者): 送信側エージェント。送信するJSONデータ(プロンプト)のハッシュ値を計算し、秘密鍵で署名を生成。HTTPヘッダーに付与して送信します。
  2. Verifier(検証者): 受信側エージェント(の前段)。HTTPヘッダーから署名を取り出し、公開鍵を使って検証。データの改ざんや期限切れ(リプレイ攻撃)がないかを確認し、問題があれば処理を拒否します。

Ed25519署名アルゴリズムの採用理由

署名アルゴリズムにはRSAやECDSAなどがありますが、今回はEd25519を採用します。その理由は明確です。

  • 高速性: 署名生成・検証ともに非常に高速で、リアルタイム性が求められるAIエージェント通信に適しています。
  • 安全性: 楕円曲線暗号の一種であり、RSAに比べて短い鍵長(32バイト)で同等以上の強度を持ちます。また、サイドチャネル攻撃(処理時間などを観測して情報を盗む攻撃)への耐性が高い設計になっています。
  • 実装の容易さ: 乱数生成への依存度が低く、実装ミスによる脆弱性が入りにくい特長があります。

それでは、具体的な実装手順に入ります。

2. 開発環境と暗号化ライブラリのセットアップ

まずは開発環境を整えます。Pythonの標準ライブラリだけではEd25519を扱えないため、信頼性の高い暗号化ライブラリである cryptography を使用します。

必要なPythonライブラリのインストール

以下のコマンドでライブラリをインストールしてください。

pip install cryptography python-dotenv pydantic
  • cryptography: 暗号化処理のデファクトスタンダードライブラリ。
  • python-dotenv: 環境変数を .env ファイルから読み込むために使用。
  • pydantic: データのバリデーションと構造化に使用。

暗号鍵ペア(秘密鍵・公開鍵)の生成と管理

最初に、署名に使用する鍵ペアを生成します。この作業は一度だけ行い、生成された鍵は安全に保管する必要があります。

以下のスクリプト generate_keys.py を作成し、実行してください。

import base64
from cryptography.hazmat.primitives.asymmetric import ed25519
from cryptography.hazmat.primitives import serialization

def generate_key_pair():
    """
    Ed25519の秘密鍵と公開鍵のペアを生成し、Base64エンコードして表示する。
    """
    # 秘密鍵の生成
    private_key = ed25519.Ed25519PrivateKey.generate()
    
    # 公開鍵の導出
    public_key = private_key.public_key()

    # 秘密鍵をバイト列に変換 (Raw形式)
    private_bytes = private_key.private_bytes(
        encoding=serialization.Encoding.Raw,
        format=serialization.PrivateFormat.Raw,
        encryption_algorithm=serialization.NoEncryption()
    )

    # 公開鍵をバイト列に変換 (Raw形式)
    public_bytes = public_key.public_bytes(
        encoding=serialization.Encoding.Raw,
        format=serialization.PublicFormat.Raw
    )

    # Base64エンコードして表示(環境変数に設定しやすくするため)
    print("=== AI Agent Key Pair ===")
    print(f"PRIVATE_KEY={base64.b64encode(private_bytes).decode('utf-8')}")
    print(f"PUBLIC_KEY={base64.b64encode(public_bytes).decode('utf-8')}")
    print("=========================")
    print("※警告: PRIVATE_KEYは絶対に外部に漏らさないでください。")

if __name__ == "__main__":
    generate_key_pair()

環境変数の設定とセキュリティ確保

出力されたキーペアを .env ファイルに保存します。ソースコード内に直接秘密鍵を書き込む(ハードコード)のは、セキュリティ上の重大なリスクとなるため絶対に避けるべきです。

.env ファイルの例:

# 送信側エージェントが使用
AGENT_PRIVATE_KEY=your_generated_private_key_here

# 受信側エージェントが検証に使用
AGENT_PUBLIC_KEY=your_generated_public_key_here

これで準備は整いました。次に署名生成モジュールの実装に進みます。

3. Step 1:署名生成モジュール(Signer)の実装

開発環境と暗号化ライブラリのセットアップ - Section Image

送信側エージェント(Signer)の役割は、送信するデータ(ペイロード)に対してデジタル署名を付与することです。

ここで最も重要な技術的ポイントは「データの正規化(Canonicalization)」です。JSONデータは、キーの順序が変わったり、空白の数が変わったりするだけで、バイナリとしては全く別のデータになります。署名検証を成功させるには、送信側と受信側で「完全に同一のバイト列」を再現する必要があります。

プロンプトペイロードの正規化

Pythonの json.dumps を使う際は、必ず sort_keys=True を指定し、余計な空白を除去(separators=(',', ':'))することで、表現を一意に定めます。

署名生成関数のコーディング

signer.py として以下のクラスを実装します。

import os
import json
import time
import base64
from dataclasses import dataclass
from typing import Dict, Any, Tuple
from cryptography.hazmat.primitives.asymmetric import ed25519
from dotenv import load_dotenv

load_dotenv()

@dataclass
class SignedRequest:
    payload: Dict[str, Any]
    headers: Dict[str, str]

class AgentSigner:
    def __init__(self, private_key_b64: str):
        """
        Args:
            private_key_b64 (str): Base64エンコードされたEd25519秘密鍵
        """
        try:
            private_bytes = base64.b64decode(private_key_b64)
            self.private_key = ed25519.Ed25519PrivateKey.from_private_bytes(private_bytes)
        except Exception as e:
            raise ValueError(f"Invalid private key: {e}")

    def sign_payload(self, payload: Dict[str, Any]) -> SignedRequest:
        """
        ペイロードにタイムスタンプを付与し、署名を生成する。
        """
        # 1. リプレイ攻撃対策: タイムスタンプの取得 (整数値)
        timestamp = str(int(time.time()))
        
        # 2. ペイロードの正規化 (Canonicalization)
        # キー順序を固定し、空白を除去してバイト列化
        payload_bytes = json.dumps(
            payload, 
            sort_keys=True, 
            separators=(',', ':')
        ).encode('utf-8')
        
        # 3. 署名対象データの構築
        # タイムスタンプとペイロードを結合して署名対象とする
        # 形式: timestamp.payload
        data_to_sign = f"{timestamp}.".encode('utf-8') + payload_bytes
        
        # 4. 署名の生成
        signature = self.private_key.sign(data_to_sign)
        signature_b64 = base64.b64encode(signature).decode('utf-8')
        
        # 5. ヘッダーの生成
        headers = {
            "Content-Type": "application/json",
            "X-Agent-Signature": signature_b64,
            "X-Agent-Timestamp": timestamp
        }
        
        return SignedRequest(payload=payload, headers=headers)

# 使用例
if __name__ == "__main__":
    private_key = os.getenv("AGENT_PRIVATE_KEY")
    if not private_key:
        raise EnvironmentError("PRIVATE_KEY not found in .env")
        
    signer = AgentSigner(private_key)
    
    prompt_data = {
        "task": "summarize",
        "content": "This is a confidential meeting log...",
        "priority": "high"
    }
    
    signed_req = signer.sign_payload(prompt_data)
    print("=== Signed Request ===")
    print(f"Headers: {signed_req.headers}")
    print(f"Payload: {json.dumps(signed_req.payload, indent=2)}")

コードのポイント

  1. タイムスタンプの結合: 署名対象にタイムスタンプを含めることで、古い署名データを再利用する「リプレイ攻撃」を防ぎます。
  2. data_to_sign の構築: timestamppayload をドット . で繋いで一つのデータとして扱っています。これはJWT(JSON Web Token)などの標準規格でもよく見られる手法です。

4. Step 2:自動検証ミドルウェア(Verifier)の実装

次に、受信側エージェント(Verifier)を実装します。このモジュールは、リクエストがビジネスロジック(LLMへの問い合わせなど)に到達するに動作し、不正なリクエストをフィルタリングします。

verifier.py を作成します。

import os
import json
import time
import base64
from typing import Dict, Any
from cryptography.hazmat.primitives.asymmetric import ed25519
from cryptography.exceptions import InvalidSignature
from dotenv import load_dotenv

load_dotenv()

class AgentVerifier:
    def __init__(self, public_key_b64: str, tolerance_seconds: int = 300):
        """
        Args:
            public_key_b64 (str): Base64エンコードされたEd25519公開鍵
            tolerance_seconds (int): タイムスタンプの有効期限(秒)。デフォルト5分。
        """
        try:
            public_bytes = base64.b64decode(public_key_b64)
            self.public_key = ed25519.Ed25519PublicKey.from_public_bytes(public_bytes)
            self.tolerance = tolerance_seconds
        except Exception as e:
            raise ValueError(f"Invalid public key: {e}")

    def verify_request(self, payload: Dict[str, Any], headers: Dict[str, str]) -> bool:
        """
        リクエストの署名とタイムスタンプを検証する。
        Returns:
            bool: 検証成功ならTrue、失敗ならFalse(例外は送出せずログに残す想定)
        """
        signature_b64 = headers.get("X-Agent-Signature")
        timestamp_str = headers.get("X-Agent-Timestamp")

        if not signature_b64 or not timestamp_str:
            print("[Error] Missing signature or timestamp headers.")
            return False

        # 1. タイムスタンプ検証 (リプレイ攻撃対策)
        try:
            request_time = int(timestamp_str)
            current_time = int(time.time())
            
            # 未来の時刻や、有効期限切れの過去の時刻を拒否
            if request_time < current_time - self.tolerance:
                print(f"[Error] Request expired. Diff: {current_time - request_time}s")
                return False
            if request_time > current_time + 5: # クロックのズレを5秒まで許容
                print("[Error] Request time is in the future.")
                return False
                
        except ValueError:
            print("[Error] Invalid timestamp format.")
            return False

        # 2. 署名検証
        try:
            # 署名をバイト列に戻す
            signature = base64.b64decode(signature_b64)
            
            # ペイロードをSignerと同じルールで再構築
            payload_bytes = json.dumps(
                payload, 
                sort_keys=True, 
                separators=(',', ':')
            ).encode('utf-8')
            
            # 検証用データの構築
            data_to_verify = f"{timestamp_str}.".encode('utf-8') + payload_bytes
            
            # 検証実行(失敗するとInvalidSignature例外が発生)
            self.public_key.verify(signature, data_to_verify)
            
            return True
            
        except (InvalidSignature,  base64.binascii.Error) as e:
            print(f"[Error] Signature verification failed: {e}")
            return False
        except Exception as e:
            print(f"[Error] Unexpected error during verification: {e}")
            return False

実装の重要ポイント

  • リプレイ攻撃の防止: どんなに正しい署名でも、過去の通信をそのまま再送されたら意味がありません。tolerance_seconds(ここでは300秒=5分)を設定し、古すぎるリクエストは署名検証をする前に門前払いします。
  • 例外処理: verify メソッドは検証失敗時に InvalidSignature 例外を投げます。これを適切にキャッチし、呼び出し元には False を返す(あるいはHTTP 401エラーにする)設計にします。

5. Step 3:統合テストと動作確認

Step 2:自動検証ミドルウェア(Verifier)の実装 - Section Image

SignerとVerifierが完成したので、実際に組み合わせてテストを行います。正常な通信だけでなく、「改ざんされた通信」が正しく拒否されるかを確認することが重要です。

test_integration.py を作成します。

import os
import json
from signer import AgentSigner
from verifier import AgentVerifier
from dotenv import load_dotenv

load_dotenv()

def run_tests():
    private_key = os.getenv("AGENT_PRIVATE_KEY")
    public_key = os.getenv("AGENT_PUBLIC_KEY")
    
    signer = AgentSigner(private_key)
    verifier = AgentVerifier(public_key)
    
    payload = {"instruction": "Analyze this data", "secure": True}
    
    print("--- Test 1: 正常な通信 ---")
    signed_req = signer.sign_payload(payload)
    is_valid = verifier.verify_request(signed_req.payload, signed_req.headers)
    print(f"Result: {'OK' if is_valid else 'FAIL'}")
    assert is_valid == True

    print("\n--- Test 2: ペイロード改ざん検知 ---")
    # 攻撃者が通信内容を書き換えたと仮定
    tampered_payload = payload.copy()
    tampered_payload["secure"] = False # セキュリティフラグを勝手にOFFにする
    
    # ヘッダー(署名)は元のまま、中身だけ変えて送信
    is_valid_tampered = verifier.verify_request(tampered_payload, signed_req.headers)
    print(f"Result: {'OK' if is_valid_tampered else 'BLOCKED'}")
    assert is_valid_tampered == False

    print("\n--- Test 3: ヘッダー改ざん検知 ---")
    # 攻撃者が署名の一部を適当に書き換える
    tampered_headers = signed_req.headers.copy()
    original_sig = tampered_headers["X-Agent-Signature"]
    # 末尾の文字を変える
    tampered_sig = original_sig[:-1] + ('A' if original_sig[-1] != 'A' else 'B')
    tampered_headers["X-Agent-Signature"] = tampered_sig
    
    is_valid_header = verifier.verify_request(payload, tampered_headers)
    print(f"Result: {'OK' if is_valid_header else 'BLOCKED'}")
    assert is_valid_header == False

if __name__ == "__main__":
    run_tests()

実行結果を確認します。Test 2とTest 3で確実に BLOCKED となり、検証エラーログが出力されれば成功です。これにより、AIエージェント通信が暗号学的に保護された状態となります。

6. 本番運用に向けたセキュリティ強化設定

PoC(概念実証)から本番環境へ移行する際、さらに考慮すべき運用上のポイントが存在します。

鍵のローテーション戦略

どれほど強固な暗号でも、秘密鍵が漏洩すればシステムは脆弱になります。本番運用では、定期的に鍵を交換(ローテーション)する仕組みが必要です。

  • 推奨: 鍵に key_id(バージョンID)を割り当て、ヘッダーに X-Agent-Key-ID を含める。
  • Verifier側: 複数の公開鍵を保持できるようにし、新しい鍵への移行期間中は新旧両方の鍵で検証できるようにする。

パフォーマンスへの影響と最適化

Ed25519は非常に高速ですが、毎秒数千リクエストを処理する場合、検証処理がボトルネックになる可能性があります。

  • 対策: 検証処理をPythonアプリ内ではなく、RustやGoで書かれたサイドカープロキシ(Envoyなど)のWasmプラグインとして実装し、オフロードすることも検討の余地があります。ただし、ロジック自体は今回解説したものと同様です。

エラー発生時のフォールバック運用

検証システム自体にバグがあった場合、全てのAI通信が停止するリスクがあります。

  • 緊急対応: 環境変数 SKIP_VERIFICATION=true を設定すれば検証をスキップできる「キルスイッチ」をコードに組み込んでおくことは、可用性の観点から有効な手段です。ただし、このスイッチの管理は厳重に行う必要があります。

まとめ

4. Step 2:自動検証ミドルウェア(Verifier)の実装 - Section Image 3

今回は、既存のセキュリティ製品に頼らず、PythonとEd25519を用いて自律型AIエージェント間の通信を守る仕組みの実装方法を解説しました。

  • ゼロトラスト: 内部通信でも相手を信用せず、常に検証する。
  • 正規化: JSONデータの揺らぎを排除し、一意な署名対象を作る。
  • リプレイ対策: タイムスタンプを含めて署名し、有効期限を設ける。

この3つの原則を守ることで、AIエージェントは「なりすまし」や「改ざん」から保護され、より安全に自律動作させることが可能になります。

しかし、エージェントの数が増え、組織をまたぐような大規模な連携が必要になった場合、鍵管理の複雑さは指数関数的に増大します。数十、数百のエージェントが協調するエンタープライズ環境では、より高度な鍵管理基盤(KMS)との連携や、分散型ID(DID)の活用が視野に入ります。

実際に、大規模なエンタープライズ環境のプロジェクトでは、これらをどのように統合し、コンプライアンス要件を満たしていくかが重要な課題となります。

大規模AIシステムにおけるセキュリティ実装の成功事例などを参考に、自社のアーキテクチャ設計を見直すことをおすすめします。

AIエージェント通信を守る:Ed25519によるプロンプト署名と自動検証の実装ガイド - Conclusion Image

コメント

コメントは1週間で消えます
コメントを読み込み中...