要在多租戶架構下使用搜尋引擎有許多種方法,今天將會簡單介紹後實作其中一種方式。除了透過信號來觸發索引更新外,也可以透過命令列下達索引更新的指令,現在就來開始實作吧!
多租戶搜尋引擎有三種方式:
一個租戶對應一個搜尋引擎叢集
可以完全隔離但難以管理,需要許多自動化開發維運來輔助。
一個租戶對應一張索引表
在最初可以運行得很好,但當租戶越來越多會導致內部儲存不足與其他問題。
一個搜尋引擎叢集透過路由區分租戶
將所有內容都放在同一個叢集中,透過 Routing(路由)來進行租戶隔離,可以有更好的性能,同時也是最具有擴展性的方式。
我們將使用第三種方法『一個搜尋引擎叢集透過路由區分租戶』,在租戶更新索引時加入路由參數,並且在搜尋索引的時候加上路由進行搜尋。
為了改造引擎,我們要將 django-elasticsearch-dsl 從 github 拉取下來進行修改
cd ~/example_tenant
git clone --branch 7.2.1 https://github.com/django-es/django-elasticsearch-dsl.git
mv django-elasticsearch-dsl/django_elasticsearch_dsl web
rm -rf django-elasticsearch-dsl
放到專案目錄後,就會優先使用專案目錄下的 django_elasticsearch_dsl 做為 Django 的應用程式而不會使用 pip 安裝的版本。
接下來要在更新索引的時為 ID 加上前綴 schema 名稱來避免重覆,與設定租戶 routing 路由。
調整 django_elasticsearch_dsl 應用程式的 registries.py
在 registries.py 中的 update 函數呼叫 documents.py 中的 update 函數時,將當前的 schema 名稱放入 routing 引數進行傳遞 。
# django_elasticsearch_dsl/registries.py
# ...
from django.db import connection
# ...
def update(self, instance, **kwargs):
"""
Update all the elasticsearch documents attached to this model (if their
ignore_signals flag allows it)
"""
if not DEDConfig.autosync_enabled():
return
if instance.__class__ in self._models:
for doc in self._models[instance.__class__]:
if not doc.django.ignore_signals:
doc().update(instance, routing=connection.schema_name, **kwargs) # add routing
調整 django_elasticsearch_dsl 應用程式 documents.py
在 update 函數會接收 registries.py 傳遞過來的資料,我們要繼續把 routing 引數往下傳遞
# django_elasticsearch_dsl/documents.py
# ...
def update(self, thing, refresh=None, action='index', parallel=False, routing=None, **kwargs): # add routing
"""
Update each document in ES for a model, iterable of models or queryset
"""
if refresh is not None:
kwargs['refresh'] = refresh
elif self.django.auto_refresh:
kwargs['refresh'] = self.django.auto_refresh
if isinstance(thing, models.Model):
object_list = [thing]
else:
object_list = thing
return self._bulk(
self._get_actions(object_list, action, routing), # add routing
parallel=parallel,
**kwargs
)
在 _get_actions 函數會接收 update 函數傳遞過來的資料,接著把 routing 引數往下傳遞
# django_elasticsearch_dsl/documents.py
# ...
def _get_actions(self, object_list, action, routing): # add routing
for object_instance in object_list:
if action == 'delete' or self.should_index_object(object_instance):
yield self._prepare_action(object_instance, action, routing) # add routing
在 _prepare_action 函數,接受 routing 引數,調整索引 ID 與設定 routing
# django_elasticsearch_dsl/documents.py
# ...
def _prepare_action(self, object_instance, action, routing): # add routing
index_id = self.generate_id(object_instance)
doc = {
'_op_type': action,
'_index': self._index._name,
'_id': f'{routing}-{index_id}', # new index id
'_source': (
self.prepare(object_instance) if action != 'delete' else None
),
}
if routing is not None:
doc["routing"] = routing # set routing
return doc
更新索引到這邊就完成了
測試索引更新
在管理介面儲存一筆資料後,這裡可以看到 _id 前面多了租戶的 schema 名稱,並且出現了 _routing 路由
{
"_index": "product",
"_type": "_doc",
"_id": "example01-3",
"_score": 1.0,
"_routing": "example01",
"_source": {
"category": {
"name": "家電3C",
"name_en": "Appliance 3C",
"description": "走在科技尖端",
"description_en": "on the cutting edge of technology",
"created": "2022-09-13T20:11:59.723669+08:06",
"modified": "2022-10-06T22:01:08.914940+08:06",
"image": "/media/example01/productcategory_media/pexels-tyler-lastovich-6991221664214284.jpg"
},
"name": "迷你小冰箱",
"description": "迷你冰箱是一款使用於室內、室外,在路上或在辦公室,迷你冰箱可以保持您的食品和飲料清涼新鮮。具備製冷功能。除可在家庭、辦公室內使用外,也可直接在汽車上使用。",
"price": 2000,
"created": "2022-09-24T18:55:58.153375+08:06",
"modified": "2022-10-07T12:33:18.219146+00:00"
}
}
在上一回『裝上引擎,Django 的移動城堡』並沒有提到 Django Elasticsearch DSL 的更新指令,是因為在多租戶架構下預設的 schema 為租戶共用的 public ,會導致無法正常的使用指令來更新索引,現在我們就來讓多租戶架構的 Django Elasticsearch DSL 指令重新復活吧!
Django Elasticsearch DSL 指令
主要分為
--create 建立索引表
--delete 刪除索引表
--populate 更新索引表
--rebuild 重建索引表
python3.10 manage.py search_index --create
python3.10 manage.py search_index --delete
python3.10 manage.py search_index --populate
python3.10 manage.py search_index --rebuild
當我們在多租戶架構下使用 --populate 進行更新時預設會去更新租戶共用的 public 表導致錯誤,接下來就要來將 schema 名稱作為引數傳遞進去。
調整 django_elasticsearch_dsl/management/commands/search_index.py
新增命令列引數 --schema
# django_elasticsearch_dsl/management/commands/search_index.py
# ...
class Command(BaseCommand):
help = 'Manage elasticsearch index.'
def add_arguments(self, parser):
parser.add_argument(
'--schema',
metavar='schema_name',
type=str,
nargs='*',
help="schema name"
)
# ...
調整 handle 函數,從 options 取得傳遞進來的 schema 引數,並使用 set_schema 函數進行指定
# django_elasticsearch_dsl/management/commands/search_index.py
# ...
from django.db import connection
# ...
class Command(BaseCommand):
# ...
def handle(self, *args, **options):
if not options['action']:
raise CommandError(
"No action specified. Must be one of"
" '--create','--populate', '--delete' or '--rebuild' ."
)
schema_name = options['schema'][0] if options['schema'] else None
if schema_name:
connection.set_schema(schema_name)
action = options['action']
models = self._get_models(options['models'])
if action == 'create':
self._create(models, options)
elif action == 'populate':
self._populate(models, options)
elif action == 'delete':
self._delete(models, options)
elif action == 'rebuild':
self._rebuild(models, options)
else:
raise CommandError(
"Invalid action. Must be one of"
" '--create','--populate', '--delete' or '--rebuild' ."
)
在 _populate 函數中取得 schema 引數儲存到 routing 變數,並作為引數傳遞至 documents.py 中的 update 函數進行索引更新
# django_elasticsearch_dsl/management/commands/search_index.py
# ...
class Command(BaseCommand):
# ...
def _populate(self, models, options):
parallel = options['parallel']
if options['schema']:
routing = options['schema'][0]
else:
raise CommandError(
"No schema name"
)
for doc in registry.get_documents(models):
self.stdout.write("Indexing {} '{}' objects {}".format(
doc().get_queryset().count() if options['count'] else "all",
doc.django.model.__name__,
"(parallel)" if parallel else "")
)
qs = doc().get_indexing_queryset()
doc().update(qs, parallel=parallel, refresh=options['refresh'], routing=routing)
到這裡我們的更新索引指令就完成了
測試指令
# 更新租戶 example01
docker exec --workdir /opt/app/web example_tenant_web \
python3.10 manage.py search_index --populate --schema example01
...
Indexing 3 'ProductCategory' objects
Indexing 2 'Product' objects
# 更新租戶 example02
docker exec --workdir /opt/app/web example_tenant_web \
python3.10 manage.py search_index --populate --schema example02
...
Indexing 1 'ProductCategory' objects
Indexing 2 'Product' objects
更新成功!
完成了多租戶的搜尋引擎,但卻還沒有真正使用到,接下來將為我們的網站加入搜尋功能,明天『Django 來找查,實作搜尋功能』。