Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
import io
import json
import os
import sys
import time
import uuid
from argparse import ArgumentParser
from azure.cognitiveservices.vision.computervision import ComputerVisionClient
from azure.cognitiveservices.vision.face import FaceClient
from azure.cosmos import CosmosClient
from azure.storage.blob import (BlobClient, BlobServiceClient, ContainerClient,
__version__)
from flask import Flask, abort, request
from linebot import LineBotApi, WebhookHandler
from linebot.exceptions import InvalidSignatureError
from linebot.models import (CameraAction, CameraRollAction, ImageMessage,
ImageSendMessage, MessageEvent, QuickReply,
QuickReplyButton, TextMessage, TextSendMessage)
from msrest.authentication import CognitiveServicesCredentials
from requests import get, post
app = Flask(__name__)
## LINE
channel_secret = '<your channel_secret>'
channel_access_token = '<your channel_access_token>'
if channel_secret is None:
print('Specify LINE_CHANNEL_SECRET as environment variable.')
sys.exit(1)
if channel_access_token is None:
print('Specify LINE_CHANNEL_ACCESS_TOKEN as environment variable.')
sys.exit(1)
line_bot_api = LineBotApi(channel_access_token)
handler = WebhookHandler(channel_secret)
## Azure CosmosDB
cosmos_endpoint = 'https://<your cosmos_endpoint>.documents.azure.com:443/'
cosmos_key = '<your cosmos_key>'
cosmos_client = CosmosClient(cosmos_endpoint, credential=cosmos_key)
database_name = '<your database_name>'
database = cosmos_client.get_database_client(database_name)
cosmos_container_name = '<your cosmos_container_name>'
cosmos_container = database.get_container_client(cosmos_container_name)
## Azure Cognitive Services
cog_apim_key = '<your cog_apim_key>'
cog_endpoint = 'https://<your cog_endpoint>.cognitiveservices.azure.com/'
## Azure Storage
blob_service_connection_string = '<your blob_service_connection_string>'
blob_service_container_client_name = '<your blob_service_container_client_name>'
face_img_base_url = 'https://<your face_img_base_url>'
@app.route("/", methods=['POST'])
def callback():
# get X-Line-Signature header value
signature = request.headers['x-line-signature']
# get request body as text
body = request.get_data(as_text=True)
app.logger.info("Request body: " + body)
# handle webhook body
try:
handler.handle(body, signature)
except InvalidSignatureError:
abort(400)
return 'OK'
@handler.add(MessageEvent, message=TextMessage)
def message_text(event):
if '本人確認開始' == event.message.text:
save_data(event.source.user_id, 0, 0)
send_line_quick_reply(event.reply_token, '正面を向いた写真を送ってください。')
else:
line_bot_api.reply_message(
event.reply_token,
TextSendMessage(text='[本人確認開始]を入力して下さい。')
)
@handler.add(MessageEvent, message=ImageMessage)
def message_image(event):
user_id = event.source.user_id
step = ''
pre_face_id = ''
## 手続きの状態を取得
for item in cosmos_container.query_items(
query=f'SELECT * FROM keyc_container r WHERE r.id="{user_id}"',
enable_cross_partition_query=True):
step = item['step']
pre_face_id = item['face_id']
print(step)
if step != '':
## 送られてきた写真データ取得
message_content = line_bot_api.get_message_content(event.message.id)
data_bytes = b''
for chunk in message_content.iter_content():
data_bytes += chunk
filename = str(uuid.uuid4()) + '.jpg'
## 送られてきた写真データを一時保存
upload_img(data_bytes, filename)
## 送られてきた写真から顔情報を取得
face_client = FaceClient(cog_endpoint, CognitiveServicesCredentials(cog_apim_key))
image_1_faces = face_client.face.detect_with_url(face_img_base_url + filename, detection_model='detection_01',return_face_attributes=['headPose'])
face_1 = image_1_faces[0]
if step == 0:
## 顔の向き確認
if -20 < face_1.face_attributes.head_pose.yaw < 20 and -20 < face_1.face_attributes.head_pose.pitch < 20:
save_data(user_id, 1, face_1.face_id)
send_line_quick_reply(event.reply_token, '次にやや左を向いた写真を送ってください。')
else:
send_line_quick_reply(event.reply_token, '向きが違うようです。もう1度、正面を向いた写真を送ってください。')
elif step == 1:
## 顔の向き確認
if -20 > face_1.face_attributes.head_pose.yaw:
save_data(user_id, 2, pre_face_id)
send_line_quick_reply(event.reply_token, '次にやや下を向いた写真を送ってください。')
else:
send_line_quick_reply(event.reply_token, '向きが違うようです。もう1度、やや左を向いた写真を送ってください。')
elif step == 2:
## 顔の向き確認
if -15 > face_1.face_attributes.head_pose.pitch:
save_data(user_id, 3, pre_face_id)
send_line_quick_reply(event.reply_token, '次に本人確認を行う免許証の写真を送ってください。')
else:
send_line_quick_reply(event.reply_token, '向きが違うようです。もう1度、やや上を向いた写真を送ってください。')
elif step == 3:
## 正面からの写真と免許証の写真が一致しているかの情報取得
image_1_face_ids = list(map(lambda face: face.face_id, image_1_faces))
similar_faces = face_client.face.find_similar(face_id=pre_face_id, face_ids=image_1_face_ids)
matching_face_ids = list(map(lambda face: face.face_id, similar_faces))
match_flg = False
for face in image_1_faces:
if face.face_id in matching_face_ids:
match_flg = True
if match_flg:
## 免許証から名前、生年月日、住所の情報を取得
name, birthday, adress = runAnalysis(data_bytes, 'image/jpeg')
line_bot_api.reply_message(
event.reply_token,
[TextSendMessage(text='ご本人であること確認しました!また、あなたの情報は下記でお間違い無いでしょうか?'), TextSendMessage(text="名前:" + name), TextSendMessage(text="生年月日:" + birthday), TextSendMessage(text="住所:" + adress)]
)
else:
send_line_quick_reply(event.reply_token, 'いただいた正面からの写真と免許証の写真を確認したところ同一人物では無いようです...')
else:
line_bot_api.reply_message(
event.reply_token,
TextSendMessage(text='[本人確認開始]を入力して下さい。')
)
def runAnalysis(data_bytes, file_type):
"""
カスタムモデルを使用したOCR(免許証の一部取得に特化)
"""
# Model ID
model_id = "918b1df6-2c12-476f-a5b1-9aed1ed7aeed"
# API version
API_version = "v2.1-preview.3"
post_url = cog_endpoint + "/formrecognizer/%s/custom/models/%s/analyze" % (API_version, model_id)
params = {
"includeTextDetails": True
}
headers = {
# Request headers
'Content-Type': file_type,
'Ocp-Apim-Subscription-Key': cog_apim_key,
}
try:
print('Initiating analysis...')
resp = post(url = post_url, data = data_bytes, headers = headers, params = params)
if resp.status_code != 202:
print("POST analyze failed:\n%s" % json.dumps(resp.json()))
print("POST analyze succeeded:\n%s" % resp.headers)
get_url = resp.headers["operation-location"]
except Exception as e:
print("POST analyze failed:\n%s" % str(e))
n_tries = 15
n_try = 0
wait_sec = 1
max_wait_sec = 60
name = birthday = adress = ''
print('Getting analysis results...')
while n_try < n_tries:
try:
resp = get(url = get_url, headers = {"Ocp-Apim-Subscription-Key": cog_apim_key})
resp_json = resp.json()
if resp.status_code != 200:
print("GET analyze results failed:\n%s" % json.dumps(resp_json))
status = resp_json["status"]
if status == "succeeded":
print("Analysis succeeded")
name = resp_json['analyzeResult']['documentResults'][0]['fields']['name']['text']
birthday = resp_json['analyzeResult']['documentResults'][0]['fields']['birthday']['text']
adress = resp_json['analyzeResult']['documentResults'][0]['fields']['adress']['text']
if status == "failed":
print("Analysis failed:\n%s" % json.dumps(resp_json))
# Analysis still running. Wait and retry.
time.sleep(wait_sec)
n_try += 1
wait_sec = min(wait_sec, max_wait_sec)
except Exception as e:
msg = "GET analyze results failed:\n%s" % str(e)
print(msg)
return name, birthday, adress
def upload_img(data_bytes, filename):
"""
写真のアップロード
"""
blob_service_client = BlobServiceClient.from_connection_string(blob_service_connection_string)
blob_service_container_client = blob_service_client.get_container_client(blob_service_container_client_name)
blob_client = blob_service_container_client.get_blob_client(filename)
blob_client.upload_blob(data_bytes)
def save_data(user_id, step, face_id):
"""
手続きの状態を保存
"""
cosmos_container.upsert_item({
'id': user_id,
'user_id': user_id,
'step': step,
'face_id': face_id
}
)
def send_line_quick_reply(reply_token, text):
"""
クイックリプライの送信
"""
line_bot_api.reply_message(
reply_token,
TextSendMessage(text=text,
quick_reply=QuickReply(
items=[
QuickReplyButton(
action=CameraRollAction(label='Send photo')
),
QuickReplyButton(
action=CameraAction(label='Open camera')
)
])))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment