Last active
January 13, 2024 04:46
-
-
Save yoggy/b7eed55f1f96c32caa43a2b07c7cab5d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# clusterのサーバから外部通信を受けるREST API | |
# | |
# 参考: https://docs.cluster.mu/script/interfaces/ClusterScript.html#callExternal | |
# | |
from firebase_functions import https_fn, options | |
from firebase_admin import initialize_app | |
import os | |
import json | |
import requests | |
import time | |
from datetime import datetime | |
from zoneinfo import ZoneInfo | |
app = initialize_app() | |
@https_fn.on_request( | |
min_instances=0, | |
max_instances=1, | |
cors=options.CorsOptions(cors_origins="*", cors_methods=["get", "post"]), | |
secrets=["CLUSTER_EXTERNAL_TOKEN", "BEEBOTTE_POST_URL", "BEEBOTTE_CHANNEL_TOKEN"] | |
) | |
def on_request_pingpong(req: https_fn.Request) -> https_fn.Response: | |
# clusterからのアクセスはPOSTのみ。それ以外は応答しない | |
if req.method != "POST": | |
return https_fn.Response(status=404, response="Not Found") | |
if req.json is None: | |
return https_fn.Response(status=400, response="Bad Request") | |
# clusterから送られてくるリクエストのJSON | |
# { | |
# "request": "...1kB以下の文字列..." | |
# } | |
request_json = req.json | |
# CLUSTERに返すトークンをSecrets Managerから取得 | |
# Secrets Managerに値を登録するのは、以下コマンドを実行する | |
# | |
# $ firebase functions:secrets:set CLUSTER_EXTERNAL_TOKEN | |
# | |
# Secrets Managerを有効にしていない場合は、エラーが表示される | |
# エラーに表示されているURLにアクセスして、Secrets Managerを有効にすること | |
# | |
cluster_external_token = os.environ["CLUSTER_EXTERNAL_TOKEN"] | |
# 10時から22時の間のみ動作するようにする | |
now = datetime.now(ZoneInfo("Asia/Tokyo")) | |
if now.hour < 10 or 22 < now.hour: | |
response_json = {} | |
response_json["verify"] = cluster_external_token | |
response_json["response"] = "Quiet hours from 10pm to 10am..." | |
response_json_str = json.dumps(response_json, ensure_ascii=False) | |
return https_fn.Response(status=200, response=response_json_str, mimetype='application/json') | |
# | |
# beeboteにPOSTリクエストを送って、MQTTのトピックにメッセージを送る | |
# Secrets Managerに値を登録するのは、以下コマンドを実行する | |
# | |
# $ firebase functions:secrets:set BEEBOTTE_POST_URL | |
# $ firebase functions:secrets:set BEEBOTTE_CHANNEL_TOKEN | |
# | |
# Secrets Managerを有効にしていない場合は、エラーが表示される | |
# エラーに表示されているURLにアクセスして、Secrets Managerを有効にすること | |
beebotte_post_url = os.environ["BEEBOTTE_POST_URL"] | |
beebotte_channel_token = os.environ["BEEBOTTE_CHANNEL_TOKEN"] | |
headers = {"content-type":"application/json", "X-Auth-Token":beebotte_channel_token} | |
payload = {"data": "remote_addr="+req.remote_addr} | |
# BeebotteのREST APIへアクセスする | |
res_beebotte = requests.post(beebotte_post_url, data=json.dumps(payload), headers=headers) | |
# BEEBOTTE_POST_URLを間違えた時の保護用。自分自身をループで呼び出さないように | |
#time.sleep(0.5) | |
# clusterへ返すレスポンスのJSON | |
# { | |
# "verify": "...verify_token (Creator Kitで取得できます)...", | |
# "response": "...1kB以下の文字列..." | |
# } | |
response_json = {} | |
response_json["verify"] = cluster_external_token | |
response_json["response"] = "response data for cluster" | |
response_json["req.remote_addr"] = req.remote_addr | |
response_json["request_json"] = json.dumps(request_json, ensure_ascii=False) | |
response_json["res_beebotte.status_code"] = res_beebotte.status_code | |
response_json_str = json.dumps(response_json, ensure_ascii=False) | |
return https_fn.Response(status=200, response=response_json_str, mimetype='application/json') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment