Skip to content

Instantly share code, notes, and snippets.

@Miura55
Last active December 5, 2021 04:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Miura55/1f7bc48fc79366b272d4e88055235ba7 to your computer and use it in GitHub Desktop.
Save Miura55/1f7bc48fc79366b272d4e88055235ba7 to your computer and use it in GitHub Desktop.
LINE botでAWS IoT Coreを使いデバイスにメッセージを送信するLINE bot
import os
import json
import boto3
import urllib3
http = urllib3.PoolManager()
class LineBotApi(object):
def __init__(self, access_token):
self.access_token = access_token
def text_send_message(self, message):
return [{
"type": "text",
"text": message
}]
def reply_message(self, reply_token, msg):
body = {
"replyToken": reply_token,
"messages": msg
}
res = http.request(
'POST',
'https://api.line.me/v2/bot/message/reply',
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer {}".format(self.access_token)
},
body=json.dumps(body)
)
return res.read()
# 環境変数を読み込む
imsi = os.getenv('DEVICE_IMSI', None)
channel_access_token = os.getenv('LINE_CHANNEL_ACCESS_TOKEN', None)
line_bot_api = LineBotApi(channel_access_token)
iot = boto3.client('iot-data', region_name='ap-northeast-1')
def lambda_handler(event, context):
# Webhookの接続確認用
if len(event['events']) == 0:
return {
'statusCode': 200,
'body':''
}
# メッセージ、Reply Tokenを取得
text_message = event['events'][0]['message']['text']
reply_token = event['events'][0]['replyToken']
try:
# 受け取ったメッセージをデバイスへPublish
pub_msg = {
'command': text_message
}
iot.publish(
topic = "switchbot/{}".format(imsi),
qos = 0,
payload = json.dumps(pub_msg)
)
except Exception as e:
print(e)
raise e
# 受けとったメッセージをオウム返し
rep = '送信しました'
message_obj = line_bot_api.text_send_message(rep)
line_bot_api.reply_message(reply_token, message_obj)
return {
'statusCode': 200,
'body': ''
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment