2020/12/07

Secureなwebhookの設計と実装

python

概要

HTTPを使ったシステム間連携のお話です。

例えば、決済系のシステムでは、webhookと呼ばれるHTTPを使ったシステム間連携を採用することがあります。

決済システムとそれと連携しているECサイトなどが良い例です。

  1. ECサイトでユーザーに対して振込情報を生成・返却する
  2. ユーザーが実際に振込を実施する
  3. 決済システムが振込を検知して、ECサイトにWebhookを送信する ← ここ
  4. ECサイトはWebhookを受け取り、アイテム付与などを行う ← ここ

webhookのセキュリティ的に以下のことに特に注意する必要があります。

  • 悪意のあるユーザーがECサイトのwebhookにリクエストを送信してしまう場合、決済を実際にしていなくてもアイテム付与などが行われないように担保する必要があります
  • 例え、ECサイトと決済システムがプライベートネットワーク上にあったとしても、以下ができないようにする必要があります
    • 悪意のある開発者がリクエストを発行しないようする
    • 悪意のあるネットワーク管理者が、ネットワーク上のデータを傍受して不正なリクエストを発行したりしないようにする

設計

このような問題を解決するために以下のような設計を行いました。

  1. ECサイトと決済システム間で共通の署名用のsecret文字列を定義しておきます
  2. 決済システムはwebhookのhttpリクエストのbodyを署名して、署名した日時(タイムスタンプ)と署名データをHTTP Headerに入れて送信します
  3. ECサイト側は、webhookのhttpリクエストを受け付ける際に、受け取ったbodyデータから署名を行い、headerに入っている署名データと比較してあっているかを確認します。また署名した日時が一定期間以内かを確認します。
  4. また同一のwebhookリクエストが複数回きても複数回処理せず、必ず一回しか処理を行わないことを担保しておきます。(すでに処理済のものであれば無視する等)
  5. (推奨)ECサイトのwebhookは、決済システムのIPアドレス(レンジ)からのみ受け付けるようにwebサーバーやfirewallなどを適切に設定しておきます

こうすることで、以下を実現しています

  • 署名用のsecret文字列を知る人しかwebhookを呼び出すことができなくなる。
  • 決済システムがwebhookをリトライできるようになる。(テストもかなりしやすくなります)
  • 時間制限があるため、あまりに古い時間に署名されたリクエストは(怪しいので)拒否できる。

実装

  • Python3で実装するとこのようになります
  • 署名にはHMAC/SHA256を採用しています

送信側(決済システム側)

import json
from datetime import datetime
from pytz import timezone
import hmac
from hashlib import sha256
import requests

secret = "your-webhook-secret"
url = "https://your-domain.com/webhook/"
data = {
  "transaction_id": "abcdefg",
  "hoge": "fuga"
}
json_data_string = json.dumps(data)
timestamp = int(datetime.now(timezone('UTC')).timestamp())
timestamp_and_signature = "%d.%s" % (timestamp, json_data_string)
signature = hmac.new(
    secret.encode("utf-8"),
    msg=timestamp_and_signature.encode("utf-8"),
    digestmod=sha256,
).hexdigest()
headers = {
    "Content-Type": "application/json",
    "Your-Signature": f"t={timestamp},s={signature}",
}
response = requests.post(url=url, headers=headers, data=json_data_string.encode("utf-8"))

受信側(ECサイト側)

  • 例えば、以下のような処理をmiddlewareなどをうまく定義してチェックします。
from datetime import datetime
from pytz import timezone
import hmac
from hashlib import sha256

secret = "your-webhook-secret"
tolerance = 300

# e.g.) payload = await request.body()
payload: bytes  # http requestのbody bytesです。
# e.g.) timestamp_and_signature = request.headers.get("your-signature")
timestamp_and_signature: str  # http headerの"Your-Signature"の値です。

if hasattr(payload, "decode"):
    payload = payload.decode("utf-8")
items = dict(i.split("=") for i in timestamp_and_signature.split(","))
timestamp = int(items["t"]) if "t" in items else None
signature = items["s"] if "s" in items else None

if timestamp is None or signature is None:
    raise YourException("invalid signature")

signed_payload = "%d.%s" % (timestamp, payload)
expected_signature = hmac.new(
    secret.encode("utf-8"),
    msg=signed_payload.encode("utf-8"),
    digestmod=sha256,
).hexdigest()
if not hmac.compare_digest(expected_signature, signature):
    raise YourException("invalid signature")

if tolerance and timestamp < int(datetime.now(timezone('UTC')).timestamp()) - tolerance:
    raise YourException(f"Timestamp outside the tolerance zone ({timestamp}, {signature})")

# 以下付与処理(payloadのjson文字列をうまくパースして処理をします)
# すでに実行済の transaction_id だった場合はスキップするような処理を実装します

以上になります。