Skip to content

Instantly share code, notes, and snippets.

@dakeshi19

dakeshi19/ldbulk.py

Last active Jun 2, 2020
Embed
What would you like to do?
livedoor グルメのDataSet のCSVをElasticsearchでの全文検索を意識して結合〜JSONデータ化してバルクロード実行するサンプルスクリプト
import pandas as pd
from collections import defaultdict
from collections import ChainMap
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
"""
livedoor グルメのDataSet のCSVをElasticsearchでの全文検索を意識して結合
〜JSONデータ化してバルクロード実行するサンプルスクリプト
livedoor グルメ DataSet
http://blog.livedoor.jp/techblog/archives/65836960.html
"""
# ファイルをDataFrameに読み込む
files = 'areas.csv,categories.csv,prefs.csv,rating_votes.csv,ratings.csv,restaurants.csv,stations.csv'.split(',')
d = {i: pd.read_csv(j,dtype='str').fillna('') for i,j in zip([i.replace('.csv', '') for i in files], files)}
# 軸になるレストランデータ
r = d['restaurants'].copy()
# エリア名の情報を結合する
area = d['areas'].set_index('id').to_dict()['name']
area['0'] = 'エリア不明?'
r['area_name'] = r['area_id'].apply(lambda x: area[x] )
# 都道府県名をルックアップ〜結合する
pref = d['prefs'].set_index('id').to_dict()['name']
pref['0'] = '不明?'
r['pref_name'] = r['pref_id'].apply(lambda x: pref[x] )
r['pref'] = r['pref_id'].apply(lambda x: x.rjust(2, '0') + '__' + pref[x]) #prefフィールドは、13__東京都 のような例
# 緯度経度を「location」フィールドに設定する
def dms2deg(_lat, _lon):
"""
DMS形式(35.12.32.134)から、ミリ秒形式(DEG形式)に変換する。
また、ミリビョウ形式の緯度と経度をElasticsearchのgeo_point形式(配列)に配置する。
"""
def _dms2deg(dms_str):
if dms_str :
d, m, s, ms = map(int,dms_str.split('.'))
deg = d + (m/60) + (s/3600) + (ms/(3600 * 1000))
return deg
return None
lat = _dms2deg(_lat)
lon = _dms2deg(_lon)
if lat and lon:
return [lon, lat] # 経度 緯度の順
return []
r['location'] = r.apply(lambda s: dms2deg(s['north_latitude'],s['east_longitude']),axis=1)
# ちょっと寄り道して、関数定義
def myflatten(l):
"""
2次元のリストを平坦化
"""
return list(set(list(filter(lambda x: len(x) > 0, sum(l,[])))))
# 駅情報を結合する
_sta = {i[0]: i[1]
for i in
d['stations'].apply(
lambda s: [s['id'], [s['name'], s['property']]],
axis=1).to_list()
}
sta = ChainMap(_sta,defaultdict(lambda:['']))
staIdCols = 'station_id1,station_id2,station_id3'.split(',')
s1, s2, s3 = staIdCols
r['stas'] = r[staIdCols].apply(lambda s: myflatten(
[sta[s[s1]], sta[s[s2]], sta[s[s3]]]
), axis=1)
# カテゴリ情報を結合する
# なお、カテゴリ情報は、大中小のように階層関係が存在するようなので、小カテゴリのラベルづけがされているデータについては、上位カテゴリも合わせて取り込む
# (豚骨ラーメン屋は、ラーメン屋であるため、このお店に元の「豚骨ラーメン屋」に加え、「ラーメン屋」もラベルづけする...ということ)
c = d['categories']
"""
【参考】d['categories']の調査
# 次の例をみると「0」となっており、id=0のレコードは存在しないので、無効データと想定
set(c.parent2)
# parent1を辿っていくと、次のpp1が最上位カテゴリと思われるのでそう考える(最大3階層)
p1 = c[c.id.isin(set(c.parent1))]
pp1 = c[c.id.isin(set(p1.parent1))]
pp1
id name name_kana parent1 parent2 similar
100 和食 わしょく 0 0
200 西洋料理 せいようりょうり 0 0
300 中華料理 ちゅうかりょうり 0 0
400 アジア・エスニック あじあ・えすにっく 0 0
1000 スイーツ すいーつ 0 0
1100 パン・軽食 ぱん・けいしょく 0 0
あと、similarにはおおよそ同義語が入っているように見えたので、実質カテゴリの別名(検索でヒットさせても良いワード)だと思って取り扱う。
"""
c_p = pd.merge(
pd.merge(c, c, how='left', left_on='parent1', right_on='id',suffixes=['','_1p']),
c, how='left', left_on='parent1_1p', right_on='id', suffixes=['', '_2p']).fillna('')
cate = {i[0]: i[1]
for i in
c_p.apply(
lambda s:
[s['id'], [s['name'], s['name_1p'], s['name_2p'], s['similar'], s['similar_1p'], s['similar_2p']]]
,
axis=1).to_list()
}
cate['0'] = [''] #手抜き
cate[''] = [''] #手抜き
catIdCols = 'category_id1,category_id2,category_id3,category_id4,category_id5'.split(',')
c1,c2,c3,c4,c5 = catIdCols
r['cates'] = r[catIdCols].apply(lambda s:myflatten(
[cate[s[c1]], cate[s[c2]], cate[s[c3]], cate[s[c4]], cate[s[c5]]]
)
,axis=1)
# くちコミ情報のフリーテキストを全て取り込んだロングテキスト情報を生成し、該当のお店のレコードに結合する(前準備)。
d['ratings']['kuchikomi'] = d['ratings'].apply(lambda s: s['title'] + s['body'], axis=1)
kuchikomi = d['ratings'].groupby('restaurant_id')['kuchikomi'].apply(lambda s: list(s)).reset_index()
# 元々、整数型のフィールドを、(前処理の都合で文字列型としていたため)int型に変換する。
dtype2int = {i: 'int' for i in 'photo_count,special_count,menu_count,fan_count,access_count'.split(',')}
# Elasticsearch(localhostの9200ポートで待機) にバルクロードする
endpoint = 'http://localhost:9200'
indexname = 'ldgourmet'
es = Elasticsearch(endpoint)
r['_index'] = indexname
r['_type'] = '_doc'
actions = pd.merge(r,kuchikomi,how='left',left_on='id',right_on='restaurant_id').fillna('').astype(dtype2int).to_dict(orient='records')
# 補足: クチコミ情報はここで結合(他はルックアップ型だが、pandas.mergeでDataFrameを結合)
bulk(client=es,actions=actions,chunk_size=100)
PUT /ldgourmet?include_type_name=false
{
"settings": {
"analysis": {
"tokenizer": {
"kuro_tk": {
"type": "kuromoji_tokenizer",
"mode": "search"
},
"kuro_tk_nrm": {
"type": "kuromoji_tokenizer",
"mode": "normal"
},
"ng_tk": {
"type": "ngram",
"min_gram": 2,
"max_gram": 3,
"token_chars": [
"letter",
"digit",
"symbol"
]
},
"eNg_tk": {
"type": "edge_ngram",
"min_gram": 1,
"max_gram": 10
}
},
"filter": {
"hiragana_2_katakana": {
"type": "icu_transform",
"id": "Hiragana-Katakana"
},
"eNgram_filter": {
"type": "edge_ngram",
"min_gram": 1,
"max_gram": 10
},
"ngram_filter": {
"type": "ngram",
"min_gram": 2,
"max_gram": 3,
"token_chars": [
"letter",
"digit",
"symbol"
]
},
"getInitial_filter": {
"type": "predicate_token_filter",
"script": {
"source": "token.getStartOffset() === 0"
}
},
"mp_filter": {
"type": "multiplexer",
"filters": [
"eNgram_filter",
"kuromoji_readingform, eNgram_filter",
"kuromoji_readingform, hiragana_2_katakana, eNgram_filter"
]
},
"POS_名詞以外": {
"type": "kuromoji_part_of_speech",
"stoptags": [
"名詞",
"名詞-一般",
"名詞-固有名詞",
"名詞-固有名詞-一般",
"名詞-固有名詞-人名",
"名詞-固有名詞-人名-一般",
"名詞-固有名詞-人名-姓",
"名詞-固有名詞-人名-名",
"名詞-固有名詞-組織",
"名詞-固有名詞-地域",
"名詞-固有名詞-地域-一般",
"名詞-固有名詞-地域-国",
"名詞-代名詞",
"名詞-代名詞-一般",
"名詞-代名詞-縮約",
"名詞-副詞可能",
"名詞-サ変接続",
"名詞-形容動詞語幹",
"名詞-数",
"名詞-非自立",
"名詞-非自立-一般",
"名詞-非自立-副詞可能",
"名詞-非自立-助動詞語幹",
"名詞-非自立-形容動詞語幹",
"名詞-特殊",
"名詞-特殊-助動詞語幹",
"名詞-接尾",
"名詞-接尾-一般",
"名詞-接尾-人名",
"名詞-接尾-地域",
"名詞-接尾-サ変接続",
"名詞-接尾-助動詞語幹",
"名詞-接尾-形容動詞語幹",
"名詞-接尾-副詞可能",
"名詞-接尾-助数詞",
"名詞-接尾-特殊",
"名詞-接続詞的",
"名詞-動詞非自立的",
"名詞-引用文字列",
"名詞-ナイ形容詞語幹",
"接頭詞",
"接頭詞-名詞接続",
"接頭詞-動詞接続",
"接頭詞-形容詞接続",
"接頭詞-数接続",
"連体詞",
"接続詞",
"助詞",
"助詞-格助詞",
"助詞-格助詞-一般",
"助詞-格助詞-引用",
"助詞-格助詞-連語",
"助詞-接続助詞",
"助詞-係助詞",
"助詞-副助詞",
"助詞-間投助詞",
"助詞-並立助詞",
"助詞-終助詞",
"助詞-副助詞/並立助詞/終助詞",
"助詞-連体化",
"助詞-副詞化",
"助詞-特殊",
"助動詞",
"記号",
"記号-一般",
"記号-読点",
"記号-句点",
"記号-空白",
"記号-括弧開",
"記号-括弧閉",
"記号-アルファベット",
"その他",
"その他-間投",
"フィラー",
"非言語音",
"語断片",
"未知語"
]
},
"POS_名詞一般": {
"type": "kuromoji_part_of_speech",
"stoptags": [
"名詞",
"名詞-固有名詞",
"名詞-固有名詞-一般",
"名詞-固有名詞-人名",
"名詞-固有名詞-人名-一般",
"名詞-固有名詞-人名-姓",
"名詞-固有名詞-人名-名",
"名詞-固有名詞-組織",
"名詞-固有名詞-地域",
"名詞-固有名詞-地域-一般",
"名詞-固有名詞-地域-国",
"名詞-代名詞",
"名詞-代名詞-一般",
"名詞-代名詞-縮約",
"名詞-副詞可能",
"名詞-サ変接続",
"名詞-形容動詞語幹",
"名詞-数",
"名詞-非自立",
"名詞-非自立-一般",
"名詞-非自立-副詞可能",
"名詞-非自立-助動詞語幹",
"名詞-非自立-形容動詞語幹",
"名詞-特殊",
"名詞-特殊-助動詞語幹",
"名詞-接尾",
"名詞-接尾-一般",
"名詞-接尾-人名",
"名詞-接尾-地域",
"名詞-接尾-サ変接続",
"名詞-接尾-助動詞語幹",
"名詞-接尾-形容動詞語幹",
"名詞-接尾-副詞可能",
"名詞-接尾-助数詞",
"名詞-接尾-特殊",
"名詞-接続詞的",
"名詞-動詞非自立的",
"名詞-引用文字列",
"名詞-ナイ形容詞語幹",
"接頭詞",
"接頭詞-名詞接続",
"接頭詞-動詞接続",
"接頭詞-形容詞接続",
"接頭詞-数接続",
"動詞",
"動詞-自立",
"動詞-非自立",
"動詞-接尾",
"形容詞",
"形容詞-自立",
"形容詞-非自立",
"形容詞-接尾",
"副詞",
"副詞-一般",
"副詞-助詞類接続",
"連体詞",
"接続詞",
"助詞",
"助詞-格助詞",
"助詞-格助詞-一般",
"助詞-格助詞-引用",
"助詞-格助詞-連語",
"助詞-接続助詞",
"助詞-係助詞",
"助詞-副助詞",
"助詞-間投助詞",
"助詞-並立助詞",
"助詞-終助詞",
"助詞-副助詞/並立助詞/終助詞",
"助詞-連体化",
"助詞-副詞化",
"助詞-特殊",
"助動詞",
"感動詞",
"記号",
"記号-一般",
"記号-読点",
"記号-句点",
"記号-空白",
"記号-括弧開",
"記号-括弧閉",
"記号-アルファベット",
"その他",
"その他-間投",
"フィラー",
"非言語音",
"語断片"
]
}
},
"analyzer": {
"whitespace": {
"type": "custom",
"tokenizer": "whitespace",
"char_filter": [
"icu_normalizer",
"html_strip"
],
"filter": []
},
"ja-default_anlz": {
"type": "custom",
"tokenizer": "kuro_tk",
"char_filter": [
"icu_normalizer",
"kuromoji_iteration_mark",
"html_strip"
],
"filter": [
"kuromoji_baseform",
"kuromoji_part_of_speech",
"ja_stop",
"lowercase",
"kuromoji_number",
"kuromoji_stemmer"
]
},
"ja1_anlz": {
"type": "custom",
"tokenizer": "kuro_tk",
"char_filter": [
"icu_normalizer",
"kuromoji_iteration_mark",
"html_strip"
],
"filter": [
"lowercase",
"kuromoji_stemmer"
]
},
"ja2_anlz": {
"type": "custom",
"tokenizer": "kuro_tk",
"char_filter": [
"icu_normalizer",
"kuromoji_iteration_mark",
"html_strip"
],
"filter": [
"kuromoji_baseform",
"lowercase",
"kuromoji_stemmer"
]
},
"jaReadingform_anlz": {
"type": "custom",
"tokenizer": "kuro_tk",
"char_filter": [
"icu_normalizer",
"kuromoji_iteration_mark",
"html_strip"
],
"filter": [
"kuromoji_readingform",
"lowercase",
"hiragana_2_katakana",
"kuromoji_stemmer"
]
},
"ng_anlz": {
"type": "custom",
"tokenizer": "ng_tk",
"char_filter": [
"icu_normalizer",
"html_strip"
],
"filter": [
"hiragana_2_katakana"
]
},
"eNg_anlz": {
"type": "custom",
"tokenizer": "eNg_tk",
"char_filter": [
"icu_normalizer",
"html_strip"
],
"filter": [
"lowercase",
"hiragana_2_katakana"
]
},
"jaR_x_eNg_anlz": {
"type": "custom",
"tokenizer": "kuro_tk",
"char_filter": [
"icu_normalizer",
"html_strip"
],
"filter": [
"kuromoji_readingform",
"lowercase",
"hiragana_2_katakana",
"eNgram_filter"
]
},
"yomiInitial": {
"type": "custom",
"tokenizer": "kuro_tk",
"char_filter": [
"icu_normalizer",
"html_strip"
],
"filter": [
"kuromoji_readingform",
"lowercase",
"hiragana_2_katakana",
"getInitial_filter"
]
},
"mp_anlz": {
"type": "custom",
"tokenizer": "kuro_tk",
"char_filter": [
"icu_normalizer",
"html_strip"
],
"filter": [
"mp_filter"
]
},
"almostNoop_anlz": {
"type": "custom",
"tokenizer": "keyword",
"filter": [
"hiragana_2_katakana"
]
},
"meishiIgai_anlz": {
"type": "custom",
"tokenizer": "kuro_tk_nrm",
"char_filter": [
"icu_normalizer",
"kuromoji_iteration_mark",
"html_strip"
],
"filter": [
"kuromoji_baseform",
"POS_名詞以外",
"ja_stop",
"lowercase",
"kuromoji_number",
"kuromoji_stemmer"
]
},
"meishi_anlz": {
"type": "custom",
"tokenizer": "kuro_tk_nrm",
"char_filter": [
"icu_normalizer",
"kuromoji_iteration_mark",
"html_strip"
],
"filter": [
"kuromoji_baseform",
"POS_名詞一般",
"ja_stop",
"lowercase",
"kuromoji_number",
"kuromoji_stemmer"
]
}
}
}
},
"mappings": {
"dynamic_templates": [
{
"my_hybrid_style_for_string": {
"match_mapping_type": "string",
"mapping": {
"analyzer": "ja-default_anlz",
"fielddata": true,
"store": true,
"fields": {
"raw": {
"type": "keyword"
},
"ini": {
"type": "text",
"analyzer": "yomiInitial",
"fielddata": true
},
"ws": {
"type": "text",
"analyzer": "whitespace"
},
"ja-default": {
"type": "text",
"analyzer": "ja-default_anlz"
},
"ja1": {
"type": "text",
"analyzer": "ja1_anlz"
},
"ja2": {
"type": "text",
"analyzer": "ja2_anlz"
},
"jaRf": {
"type": "text",
"analyzer": "jaReadingform_anlz"
},
"ng": {
"type": "text",
"analyzer": "ng_anlz"
},
"eNg": {
"type": "text",
"analyzer": "eNg_anlz"
},
"wow": {
"type": "text",
"analyzer": "meishiIgai_anlz",
"fielddata":true
},
"mei": {
"type": "text",
"analyzer": "meishi_anlz",
"fielddata":true
}
}
}
}
}
],
"properties": {
"location": {
"type": "geo_point"
},
"shape": {
"type": "geo_shape",
"strategy": "quadtree"
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment