Skip to content

Instantly share code, notes, and snippets.

@Elfsong
Created March 4, 2019 12:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Elfsong/62dd0addecbc911693deddb33a3c3b91 to your computer and use it in GitHub Desktop.
Save Elfsong/62dd0addecbc911693deddb33a3c3b91 to your computer and use it in GitHub Desktop.
pinyin
# coding: utf-8
import ast
import clr
import json
import os
import re
from StoryTelling.tts import append_pronunciation
from StoryTelling.tts.tools import OnlineTts
from Speech.Cognitive.Services import TimeLine
def pinyin_transformer(pinyin):
pinyin_tone = {
'a': ["ā", "á", "ǎ", "à", "a"],
'o': ["ō", "ó", "ǒ", "ò", "o"],
'e': ["ē", "é", "ě", "è", "e"],
'i': ["ī", "í", "ǐ", "ì", "i"],
'u': ["ū", "ú", "ǔ", "ù", "u"],
'lv': ["lǖ", "lǘ", "lǚ", "lǜ", "lü"],
"nv": ["nǖ", "nǘ", "nǚ", "nǜ", "nü"]
}
for item in pinyin.split(" - "):
pinyin, tone = item.split(" ")
print(pinyin, tone)
aIdx = pinyin.find('a')
if aIdx != -1:
result = pinyin[:aIdx] + pinyin_tone['a'][int(tone) - 1]
if aIdx != len(pinyin) - 1:
result += pinyin[aIdx+1:]
print(result)
return result
oIdx = pinyin.find('o')
if oIdx != -1:
result = pinyin[:oIdx] + pinyin_tone['a'][int(tone) - 1]
if oIdx != len(pinyin) - 1:
result += pinyin[oIdx + 1:]
print(result)
return result
eIdx = pinyin.find('e')
if eIdx != -1:
result = pinyin[:eIdx] + pinyin_tone['a'][int(tone) - 1]
if eIdx != len(pinyin) - 1:
result += pinyin[eIdx+1:]
print(result)
return result
iIdx = pinyin.find('i')
uIdx = pinyin.find('u')
if ((iIdx != -1 && uIdx != -1 && uIdx > iIdx) || (uIdx != -1 & & iIdx == -1))
{
var
result = pinyin.Substring(0, uIdx) + pinyinWithTone['u'][tone - 1];
if (uIdx != pinyin.Length - 1)
{
result += pinyin.Substring(uIdx + 1, pinyin.Length - uIdx - 1);
}
return result;
}
else if ((iIdx != -1 & & uIdx != -1 & & uIdx < iIdx) | | (uIdx == -1 & & iIdx != -1))
{
var
result = pinyin.Substring(0, iIdx) + pinyinWithTone['i'][tone - 1];
if (iIdx != pinyin.Length - 1)
{
result += pinyin.Substring(iIdx + 1, pinyin.Length - iIdx - 1);
}
return result;
}
return ""
def get_pronunciation(sentence):
content = sentence
ttsserver = "https://sttruntime-customvoice-tts.cloudapp.net:443/synthesize/customvoice"
subscription = "575EA104-45C6-4264-831D-7B4FE35003D2"
prnc = OnlineTts.pronunciation(content, ttsserver, subscription)
sentence_structure = {
"Sentences": [
{"Text": sentence}
]
}
pronunciation_story = append_pronunciation([sentence_structure], [content], [prnc])
pronunciation = list()
for item in pronunciation_story[0]["Sentences"][0]["Pronunciation"]:
pronunciation += [
{
"item": item[0],
"index": [item[1], item[2]],
"pronunciation": pinyin_transformer(item[3]),
}
]
return pronunciation
def get_timeline_json(ssml, endpoint, subscription):
"""
通过CLR获取时间线
:param ssml: 时间线生成脚本
:param endpoint: 订阅点
:param subscription: 订阅源
:return: 时间线
"""
timeline_string = TimeLine.PipeLineAsyncPython(ssml, endpoint, subscription).Result
timeline_json = json.loads(timeline_string)
return timeline_json
def cut_sentence(sentence):
"""
标点符号过滤
:param sentence: 待过滤的句子
:return: 过滤之后的句子
"""
string = re.sub(r"[\s+\.\!\/_,$%^*(+\"\')]+|[+——()?【】\[\]1234567890“”!,,.。…:?、~@#¥%&*();;]+", "", sentence)
return string
def lcs(X, Y):
"""
最长公共子序列(用来对齐时间线和文本)
:param X: 序列一
:param Y: 序列二
:return: 最长公共子序列
"""
m = len(X)
n = len(Y)
L = [[0 for x in range(n + 1)] for x in range(m + 1)]
for i in range(m + 1):
for j in range(n + 1):
if i == 0 or j == 0:
L[i][j] = 0
elif X[i - 1] == Y[j - 1]:
L[i][j] = L[i - 1][j - 1] + 1
else:
L[i][j] = max(L[i - 1][j], L[i][j - 1])
index = L[m][n]
lcs = [""] * (index + 1)
lcs[index] = ""
i = m
j = n
mapping_list = list()
while i > 0 and j > 0:
if X[i - 1] == Y[j - 1]:
lcs[index - 1] = X[i - 1]
mapping_list += [{"element": Y[j - 1], "timeline_index":i - 1, "script_index":j - 1}]
i -= 1
j -= 1
index -= 1
elif L[i - 1][j] > L[i][j - 1]:
i -= 1
else:
j -= 1
return mapping_list[::-1]
def is_chinese(uchar):
"""
判断是否是中文字符
:param uchar: 待判断字符
:return: 是否是中文字符
"""
if uchar >= '\u4e00' and uchar <= '\u9fff':
return True
else:
return False
def get_location_mark_index(sentence):
"""
获取句子的起始结束位置索引
:param sentence: 句子
:return: 起始结束位置索引
"""
start_index = 0
end_index = 0
flag = True
for index, character in enumerate(sentence):
if is_chinese(character) and flag:
start_index = index
flag = False
elif is_chinese(character) and not flag:
end_index = index
return start_index, end_index
def get_timeline_index(script_index, mapping_list):
"""
获取时间线索引
:param script_index: 文本索引
:param mapping_list: 时间线-文本映射表
:return: 时间线索引
"""
for item in mapping_list:
if item["script_index"] >= script_index:
return item["timeline_index"]
return mapping_list[-1]["timeline_index"]
def get_appear_time(dialogue_list, name_list):
"""
获取人物出现时间(如果没有识别到出现时间,默认为全场景出现)
:param dialogue_list: 当前场景的会话列表
:param name_list: 人物姓名+人物昵称
:return: 出现时间
"""
for sentence in dialogue_list:
for character in name_list:
if character in sentence["text"]:
return sentence["start"]
return dialogue_list[0]["start"]
def merge(story_name, timejson, schema):
"""
合并对齐时间线以及文本内容
:param story_name: 故事名称
:param timejson: 时间线
:param schema: 文本内容
:return: 合并Schema
"""
schema = json.loads(schema)
story = []
for scenario in schema:
for sentence in scenario["content"]:
story += [sentence["dialogue"]]
storyString = "".join(story)
timeJsonString = "".join([item["element"] for item in timejson])
mapping_list = lcs(timeJsonString[:-20], storyString)
# print(storyString)
# print(timejson)
# print(mapping_list)
scenario_list = list()
# 开场 Scenario
init_scenario = dict()
init_scenario["scene_no"] = 0
init_scenario["scene_background"] = "bedroom"
init_scenario["scene_weather"] = "default"
init_scenario["scene_time"] = "default"
init_scenario["start"] = 0.0
init_scenario["end"] = float(timejson[mapping_list[0]["timeline_index"] - 3]["endTime"])
init_scenario["element_list"] = [
{
"id": "xiaoice_1",
"category": "xiaoice",
"name": "xiaoice",
"scale": [0.6, 0.6],
"rotation": 0,
"start": 0,
"age": "child",
"gender": 1,
"role": "protagonist",
"status": "normal",
"type": "character",
"end": init_scenario["end"]
}
]
init_scenario["dialog_list"] = [
{
"text": "小朋友 你好呀",
"pronunciation": get_pronunciation("小朋友 你好呀"),
"speaker": "xiaoice_1",
"start": 200,
"end": 652.57,
"type": "talk"
},
{
"text": "我是你的好朋友小冰姐姐",
"pronunciation": get_pronunciation("我是你的好朋友小冰姐姐"),
"speaker": "xiaoice_1",
"start": 652.57,
"end": 1215.54,
"type": "talk"
},
{
"text": "今天我给你准备了一个有趣的故事",
"pronunciation": get_pronunciation("今天我给你准备了一个有趣的故事"),
"speaker": "xiaoice_1",
"start": 1215.54,
"end": 1938.98,
"type": "talk"
},
{
"text": "名字叫做" + story_name,
"pronunciation": get_pronunciation("名字叫做" + story_name),
"speaker": "xiaoice_1",
"start": 1938.98,
"end": init_scenario["end"],
"type": "talk"
}
]
scenario_list += [init_scenario]
# 常规 Scenario
original_index = 0
for index, scenario in enumerate(schema):
temp_scenario = dict()
# scenario content
temp_scenario["dialogue_list"] = list()
for sentence in scenario["content"]:
original_sentence = sentence["dialogue"]
start_index, end_index = get_location_mark_index(original_sentence)
abs_start_index = original_index + start_index
abs_end_index = original_index + end_index
original_index += len(original_sentence)
abs_timeline_start_index = get_timeline_index(abs_start_index, mapping_list)
abs_timeline_end_index = get_timeline_index(abs_end_index, mapping_list)
temp_sentence = {
"text": sentence["dialogue"],
"type": sentence["type"],
"start": timejson[abs_timeline_start_index]["startTime"],
"end": timejson[abs_timeline_end_index]["endTime"],
"speaker": sentence["speaker_id"],
"pronunciation": get_pronunciation(sentence["dialogue"]),
}
temp_scenario["dialogue_list"] += [temp_sentence]
# scenario info
temp_scenario["scene_no"] = index + 1
temp_scenario["scene_background"] = scenario["background"]["category"]
temp_scenario["scene_weather"] = scenario["weather"]
temp_scenario["scene_time"] = scenario["time"]
temp_scenario["start"] = float(temp_scenario["dialogue_list"][0]["start"])
temp_scenario["end"] = float(temp_scenario["dialogue_list"][-1]["end"])
# scenario character
temp_scenario["element_list"] = list()
for index, item in enumerate(scenario["elements"]):
temp_character = dict()
temp_character["id"] = item["id"]
temp_character["category"] = item["name"]
temp_character["age"] = item["age"]
temp_character["priority"] = item["priority"]
temp_character["role"] = item["role"]
temp_character["status"] = item["status"]
temp_character["type"] = item["type"]
temp_character["alias"] = item["alias"]
temp_character["gender"] = item["gender"]
temp_character["file_name"] = item["name"]
temp_character["scale"] = item["scale"]
temp_character["flip"] = [0 if index % 2 == 0 else 1, 0]
temp_character["rotation"] = item["rotation"]
temp_character["start"] = get_appear_time(temp_scenario["dialogue_list"], [item["name"]] + item["alias"])
temp_character["end"] = temp_scenario["end"]
temp_scenario["element_list"] += [temp_character]
# Add scenario to list
scenario_list += [temp_scenario]
# 片尾 Scenario
tail_scenario = dict()
tail_scenario["scene_no"] = len(scenario_list)
tail_scenario["scene_background"] = "bedroom"
tail_scenario["scene_weather"] = "default"
tail_scenario["scene_time"] = "default"
tail_scenario["start"] = scenario_list[-1]["end"] + 400
tail_scenario["end"] = float(timejson[-1]["endTime"])
tail_scenario["element_list"] = [
{
"id": "xiaoice_1",
"category": "xiaoice",
"name": "xiaoice",
"scale": [0.6, 0.6],
"rotation": 0,
"start": tail_scenario["start"],
"age": "child",
"gender": 1,
"role": "protagonist",
"status": "normal",
"type": "character",
"end": tail_scenario["end"]
}
]
tail_scenario["dialog_list"] = [
{
"text": "小朋友 今天的故事 讲完了 你喜欢听吗",
"speaker": "xiaoice_1",
"start": tail_scenario["start"],
"end": float(tail_scenario["start"]) + 1072,
"type": "talk",
"pronunciation": get_pronunciation("小朋友 今天的故事 讲完了 你喜欢听吗")
},
{
"text": "小冰姐姐就陪你到这里啦 白白",
"speaker": "xiaoice_1",
"start": float(tail_scenario["start"]) + 1072,
"end": float(tail_scenario["start"]) + 1965,
"type": "talk",
"pronunciation": get_pronunciation("小冰姐姐就陪你到这里啦 白白")
}
]
scenario_list += [tail_scenario]
return scenario_list
def process(story_name, ssml, schema, endpoint, subscription):
try:
timejson = get_timeline_json(ssml, endpoint, subscription)
new_schema = merge(story_name, timejson, schema)
with open("../../result-json/final_schema/" + story_name, "w", encoding="utf-8") as fd:
fd.write(json.dumps(new_schema, ensure_ascii=False))
print("Written Successfully!")
return new_schema
except Exception as error:
print(error)
return ""
if __name__ == "__main__":
script_repos = r"D:\Project-ice\KidsStory\StoryTelling\result-json\scenario_schema"
# ssml_repos = r"D:\Project-ice\KidsStory\StoryTelling\result-ssml"
ssml_repos = r"C:\Users\t-midu\PycharmProjects\Scenario\playground\ssml"
for story in os.listdir(ssml_repos):
story_name = story.split(".")[0]
print(story_name)
if story_name:
ssml_name = "\\" + story_name + ".ssml.txt"
script_name = "\\" + story_name + ".txt"
try:
with open(script_repos + script_name, encoding="utf-8") as script_fd:
schema = script_fd.read()
with open(ssml_repos + ssml_name, encoding="utf-8") as ssml_fd:
ssml = ssml_fd.read()
except FileNotFoundError:
print("File not found!")
continue
except UnicodeDecodeError:
print("Unicode decode error!")
continue
endpoint = "http://kidstory-tts-0.cloudapp.net:81/synthesize/customvoice"
subscription = "575EA104-45C6-4264-831D-7B4FE35003D2"
new_schema = process(story_name, ssml, schema, endpoint, subscription)
print(new_schema)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment