上一篇進行到 token 補全的部分,相比於原本使用 significant-text aggregation,採用了詞頻的方始把 token 補完;那 suggestion 的部分是不是也要這樣做呢?
我覺得 suggestion 的部分使用 significant-text aggregation 是正確的,因為我們確實是要找到在某個範圍內『重要的文字』,所以這個部分我們保持原樣。
最後我們把前面的所有修改都加總起來成為以下的兩個 python script 和兩個 json 文件:
index_mapping.json
{
"properties": {
"name": {"type": "keyword"},
"user_id": {"type": "keyword"},
"tweet": {"type": "text", "analyzer": "index_analyzer", "fielddata": "true"},
"tweet_id": {"type": "keyword"},
"retweets": {"type": "integer"},
"favorites": {"type": "integer"},
"created": {"type": "date"},
"followers": {"type": "integer"},
"is_user_verified": {"type": "boolean"},
"geo": {"type": "geo_point"},
"location": {"type": "keyword"}
}}
index_setting.json
{
"properties": {
"name": {"type": "keyword"},
"user_id": {"type": "keyword"},
"tweet": {"type": "text", "analyzer": "index_analyzer", "fielddata": "true"},
"tweet_id": {"type": "keyword"},
"retweets": {"type": "integer"},
"favorites": {"type": "integer"},
"created": {"type": "date"},
"followers": {"type": "integer"},
"is_user_verified": {"type": "boolean"},
"geo": {"type": "geo_point"},
"location": {"type": "keyword"}
}}
index.py
import csv
import json
import elasticsearch
import itertools
from datetime import datetime
from elasticsearch import helpers
index_mapping_file = 'index_mapping.json'
index_setting_file = 'index_settings.json'
data_folder = 'covid_20200330'
data_file_name = '2020-03-31_afternoon.json'
index_name = 'covid19_tweets'
# initiate es client
es_cli = elasticsearch.Elasticsearch("http://localhost:9200")
# read index mapping file
with open(index_mapping_file, 'r') as f:
index_mapping = json.load(f)
# read index setting file
with open(index_setting_file, 'r') as f:
index_setting = json.load(f)
# delete index
# es_cli.indices.delete(index=index_name)
# create index
r = es_cli.indices.create(index=index_name, mappings=index_mapping, settings=index_setting)
def data_to_es(json_file: str, index_name: str):
with open(json_file, 'r') as f:
data = json.load(f)
fields_to_rm = ['primary_location', 'geo_location']
for d in data:
for f in fields_to_rm:
if d.get(f):
d.pop(f)
if d.get('geo'):
d['geo'] = d['geo']['coordinates'].reverse()
if d.get('created'):
d['created'] = datetime.strptime(d['created'], "%d-%b-%Y")
d['_index'] = index_name
yield d
json_file = f'{data_folder}/{data_file_name}'
result = helpers.bulk(es_cli, data_to_es(json_file, index_name), raise_on_error=False)
search.py
import elasticsearch
import itertools
import re
def get_token_compelete(kw_phrase, index_name):
tail = re.findall("\S+", kw_phrase)[-1] # 取得最後沒有打完的那個字
query_to_compelete = {
"query": {
"prefix": {
"tweet": {
"value": tail
}
}
},
"aggregations": {
"my_sample": {
"sampler": {
"shard_size": 1000
},
"aggregations": {
"find_the_whole_token": {
"significant_text": {
"field": "tweet",
"include": f"{tail}.+",
"min_doc_count": 10,
"size": 2
}
}
}
}
},
"fields": ["aggregation"],
"_source": False
}
r = es_cli.search(index=index_name, body=query_to_compelete)
buckets = r["aggregations"]["my_sample"]["find_the_whole_token"]["buckets"]
tokens = [bucket["key"] for bucket in buckets]
return tokens
def get_suggestion(search_phrase, index_name):
suggestion_query = {
"query": {
"match": {
"tweet": search_phrase
}
},
"aggregations": {
"my_sample": {
"sampler": {
"shard_size": 1000
},
"aggregations": {
"tags": {
"significant_text": {
"field": "tweet",
"exclude": search_phrase.split(" "),
"min_doc_count": 10,
"size": 3
}
}
}
}
}
}
r = es_cli.search(index=index_name, body=suggestion_query)
buckets = r["aggregations"]["my_sample"]["tags"]["buckets"]
suggestions = [bucket["key"] for bucket in buckets]
return suggestions
def get_auto_complete(search_phrase):
pre_search_phrase = " ".join(re.findall("\S+", search_phrase)[0:-1])
comelete_tokens = get_token_compelete(search_phrase, index_name)
guess = []
for token in comelete_tokens:
compelete_search_prahse = " ".join([pre_search_phrase, token])
suggestions = get_suggestion(compelete_search_prahse, index_name)
if not suggestions:
guess.append(token)
for suggestion in suggestions:
guess.append(" ".join([compelete_search_prahse, suggestion]).strip())
return guess
es_cli = elasticsearch.Elasticsearch("http://localhost:9200")
index_name = 'covid19_tweets'
search_phrase = "cov"
print(get_auto_complete(search_phrase))
綜整一下這幾篇以來進行的主要優化:
雖然作為搜尋,應該要有個介面來展示才對,但我是前端苦手,如果你覺得這個做法非常適合你的應用,歡迎把他拿去做為搜尋的後端,幫我實現最後一步 XD
截至目前為止大概是回應了一開始寫這系列文章的主要目的:紀錄之前使用 ES 實作 auto-complete 的內容。
如同一開始所說,我發現 ES 近年有很多新的功能出現,非常目不暇給,所以我希望把往後的篇幅拿來更新 ES 的其中一項新功能:作為 vector db 使用。
下一篇會從 vector db 的簡介開始。