Skip to content

Instantly share code, notes, and snippets.

@tanbro
Last active March 21, 2019 08:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tanbro/59c87c58a1c87d114202af17d5f8dcf8 to your computer and use it in GitHub Desktop.
Save tanbro/59c87c58a1c87d114202af17d5f8dcf8 to your computer and use it in GitHub Desktop.
使用 Web API 远程调用 CoreNLP Server 的 ssplit 进行分句和分词。
# -*- coding: utf-8 -*-
"""
使用 CoreNLP 进行汉语语料的分句和分词
"""
import os
import re
import unicodedata
import unittest
import opencc
from corenlp_webclient import (CoreNlpWebClient, WordsToSentenceAnnotator,
create_annotator, extract_words)
from emoji_data import EmojiSequence
REPLACEMENT = chr(0xFFFD)
RX_HANZI = r'([\u4E00-\u9FFF]|[\u3400-\u4DBF]|[\U00020000-\U0002A6DF]|[\U0002A700-\U0002B73F]|[\U0002B740-\U0002B81F]|[\U0002B820-\U0002CEAF]|[\uF900-\uFAFF]|[\U0002F800-\U0002FA1F])'
RE_REPLACEMENT = re.compile(r'\uFFFD')
RE_REPLACEMENTS = re.compile(r'\uFFFD+')
RE_SPACES = re.compile(r'\s+')
RE_HANZI_SPACES = re.compile(r'(?P<c>{})(\s+)'.format(RX_HANZI))
def remove_control_characters(s):
return ''.join(ch for ch in s if unicodedata.category(ch)[0] != 'C')
def normalize_spaces(s):
"""连续空白转为单一空格"""
return re.sub(RE_SPACES, ' ', s)
def remove_replacements(s):
return re.sub(RE_REPLACEMENTS, '', s)
def remove_hanzi_spaces(s):
"""消除汉字之后的空白
空格会影响 CoreNLP 分词!
"""
return re.sub(RE_HANZI_SPACES, r'\g<c>', s)
def segment_emoji(s):
"""确保文本中每个Emoji前后都有空格"""
pat = EmojiSequence.pattern
result = ''
pos = 0
m = pat.search(s)
while m:
result += s[pos: m.start()]
pos = m.end()
try:
space_head = not result[-1].isspace()
except IndexError:
space_head = False
try:
space_tail = not s[pos].isspace()
except IndexError:
space_tail = False
result += '{}{}{}'.format(
' ' if space_head else '',
m.group(),
' ' if space_tail else ''
)
m = pat.search(s, pos)
try:
result += s[pos:]
except IndexError:
pass
return result
def normalize(s) -> str:
"""“标准化”以汉字为主,可能带有 Emoji 的文本
处理标准:
- 头尾没有空白
- 连续空白转为单一空格
- 去掉汉字之后的空白
- 删除所有的 REPLACEMENT UNICODE `\uFFFD`
- 每个 Emoji 前后都有空格
"""
# 去掉头尾空白
s = s.strip()
# 去掉连续空白
s = normalize_spaces(s)
# 去掉空格
s = remove_hanzi_spaces(s)
# 去掉替换符
s = remove_replacements(s)
# Emoji 加上空格
s = segment_emoji(s)
return s
def backup_emojis(s): # type: (str)-> Tuple[str, Tuple[str, Tuple[int, int]]]
"""将Emoji中的每个字符都替换为\uFFFD,然后返回替换后的文字,以及被替换的Emoji和它在原文的中起始位置
"""
pat = EmojiSequence.pattern
result = ''
backup = []
pos = 0
m = pat.search(s)
while m:
result += s[pos: m.start()]
pos = m.end()
result += REPLACEMENT * len(m.group())
backup.append((m.group(), (m.start(), m.end())))
m = pat.search(s, m.end())
try:
result += s[pos:]
except IndexError:
pass
return result, backup
ANNOTATOR = create_annotator(WordsToSentenceAnnotator)
CC = opencc.OpenCC()
def ssplit(s, url=None): # type: (str, Optional[str]) -> t.List[t.List[str]]
s = s.strip()
if not url:
url = os.environ.get('CORENLP_SERVER_URL')
if not url:
url = 'http://localhost:9000'
# 标准化
s = normalize(s)
# Emoji 安全
safe_text, emoji_bak = backup_emojis(s)
safe_text = CC.convert(safe_text)
safe_text = remove_control_characters(safe_text)
# CoreNLP 分句+分词
client = CoreNlpWebClient(url)
parsed = client.api_call(safe_text, ANNOTATOR)
# 如果没有 Emoji,直接返回
if not emoji_bak:
return extract_words(parsed)
# 还原 Emoji
result = []
i = 0
for sent in extract_words(parsed):
result_sent = []
for word in sent:
if re.fullmatch(RE_REPLACEMENTS, word):
word = emoji_bak[i][0]
i += 1
result_sent.append(word)
result.append(result_sent)
return result
class TestSsplit(unittest.TestCase):
def setUp(self):
if 'CORENLP_SERVER_URL' not in os.environ:
raise RuntimeError('You shall set environment variable `CORENLP_SERVER_URL`!')
def test_ssplit_many(self):
text = """
😃世界你好!
我爱💖我的👨‍👨‍👧‍👧人。
Hello world🌍!
🐕抓🐈🐈抓🐁
"""
expected = [
['😃', '世界', '你好', '!'],
['我', '爱', '💖', '我', '的', '👨\u200d👨\u200d👧\u200d👧', '人', '。'],
['Hello', 'world', '🌍', '!'],
['🐕', '抓', '🐈', '🐈', '抓', '🐁']
]
result = ssplit(text)
self.assertListEqual(result, expected)
def test_ssplit_no_emoji(self):
text = "这段話裏面没有Emoji"
expected = [['这', '段', '话', '里面', '没有', 'Emoji']]
result = ssplit(text)
self.assertListEqual(result, expected)
if __name__ == "__main__":
unittest.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment