基于 magi_dataset 构建 ElasticSearch 搜索
date
Feb 15, 2023
slug
from-magi_dataset-to-elasticsearch
status
Published
tags
DataScience
SemanticSearch
summary
如何在Amazon EC2上配置Elasticsearch并基于
magi_dataset
导入数据type
Post

magi_dataset
是我为了近期的一个小项目写的数据集工具。通过这个工具,你可以快速访问从 GitHub 和 HackerNews 抓取的开源软件语料。通过这个工具,可以快速建立GitHub的语义搜索和传统检索服务,也可以基于Metarank做二者的混合搜索。本文简单介绍如何在Amazon EC2上配置Elasticsearch,然后基于
magi_dataset
向Elasticsearch导入数据。安装Elasticsearch 8.6.2
首先创建一台 EC2 Instance,系统选用 Amazon Linux。运行下列命令:
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.6.2-x86_64.rpm
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.6.2-x86_64.rpm.sha512
shasum -a 512 -c elasticsearch-8.6.2-x86_64.rpm.sha512
sudo rpm --install elasticsearch-8.6.2-x86_64.rpm
完成后理论上会自动执行安全配置,并且输出配置结果。这时记得把终端输出拷贝保存一下。然后配置
systemd
:sudo /bin/systemctl daemon-reload
sudo /bin/systemctl enable elasticsearch.service
sudo systemctl start elasticsearch.service
安装完成以后可以测试一下:
sudo curl --cacert /etc/elasticsearch/certs/http_ca.crt -u elastic https://localhost:9200
在要求
Enter host password for user 'elastic'
时输入刚才复制的密码。看到如下输出表示一切正常:{
"name" : "ip-172-31-50-108.ec2.internal",
"cluster_name" : "elasticsearch",
"cluster_uuid" : "CnxGUCkvRpqQhRl4ghNhKQ",
"version" : {
"number" : "8.6.2",
"build_flavor" : "default",
"build_type" : "rpm",
"build_hash" : "2d58d0f136141f03239816a4e360a8d17b6d8f29",
"build_date" : "2023-02-13T09:35:20.314882762Z",
"build_snapshot" : false,
"lucene_version" : "9.4.2",
"minimum_wire_compatibility_version" : "7.17.0",
"minimum_index_compatibility_version" : "7.0.0"
},
"tagline" : "You Know, for Search"
}
接下来修改 Elasticsearch 的配置。在
sudo nano /etc/elasticsearch/elasticsearch.yml
中,修改network.host: $EC2_IP_PRIV_ADDR
注意这里的
$EC2_IP_PRIV_ADDR
是机器的内网IP地址。然后重启服务sudo systemctl stop elasticsearch.service
sudo systemctl start elasticsearch.service
最后我们保存
http_ca.crt
证书到本地的机器上,方便以后连接使用。首先把这个文件的所有权转换到普通用户上:sudo cp /etc/elasticsearch/certs/http_ca.crt .
sudo chown $USER:$USER ./http_ca.crt
然后在本地机器上运行
scp ec2-user@$EC2_IP_ADDR:/home/ec2-user/http_ca.crt ./http_ca.crt
即可拷贝证书至本地。其中
$EC2_IP_ADDR
是这台 EC2 机器的公网 IP 地址。使用 Python 接口建立 Index
Python Elasticsearch Client 文档:
Magi Dataset 文档:
尝试手动向这个 Elasticsearch 实例添加
magi_dataset
中的数据。首先安装依赖:pip3 install magi_dataset elasticsearch
建立连接:
from magi_dataset import GitHubDataset
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from dataclasses import asdict
from tqdm.auto import tqdm
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
ELASTIC_PASSWORD = 'YOUR_PASSWORD'
es = Elasticsearch(
'https://52.87.231.111:9200',
# ssl_assert_fingerprint=CERT_FINGERPRINT,
ca_certs = './http_ca.crt',
basic_auth = ("elastic", ELASTIC_PASSWORD),
verify_certs=False,
)
es.info()
运行后显示:
ObjectApiResponse({'name': 'ip-172-31-50-108.ec2.internal', 'cluster_name': 'elasticsearch', 'cluster_uuid': 'CnxGUCkvRpqQhRl4ghNhKQ', 'version': {'number': '8.6.2', 'build_flavor': 'default', 'build_type': 'rpm', 'build_hash': '2d58d0f136141f03239816a4e360a8d17b6d8f29', 'build_date': '2023-02-13T09:35:20.314882762Z', 'build_snapshot': False, 'lucene_version': '9.4.2', 'minimum_wire_compatibility_version': '7.17.0', 'minimum_index_compatibility_version': '7.0.0'}, 'tagline': 'You Know, for Search'})
下面来尝试批量上传数据。
def upload_to_es(es_instance, data, index:str, batch_size=1000):
bulk_data = []
for i, repo in enumerate(tqdm(data)):
bulk_data.append(
{
'_index': index,
'_id': i,
"_source": asdict(repo)
}
)
if (i + 1) % batch_size == 0:
bulk(es_instance, bulk_data)
bulk_data = []
bulk(es_instance, bulk_data)
es_instance.indices.refresh(index=index)
return es_instance.cat.count(index=index, format="json")
for lang in ['Python', 'C++', 'JavaScript', 'Go', 'Rust']:
lang_safe = lang.lower().replace('++', 'pp')
es.options(ignore_status=400).indices.create(index=f'{lang_safe}-index')
data = GitHubDataset(empty=False, file_path=f'{lang_safe}-latest')
print(
upload_to_es(
data,
index = f'{lang_safe}-index',
batch_size = 1000
)
)
构建一个简单的搜索:
resp = es.search(
index='python-index',
body={
"query": {
"match" : {
"readme" : "python web archiving service"
}
},
}
)
[(x['_source']['name'], x['_score']) for x in resp.body['hits']['hits']]
[('internetarchive/brozzler', 17.063648),
('ArchiveBox/ArchiveBox', 16.825933),
('Rhizome-Conifer/conifer', 15.135596),
('oduwsdl/ipwb', 14.298318),
('foxmask/django-th', 13.880616),
('wal-e/wal-e', 12.302505),
('laiwei/thepast', 11.558967),
('inAudible-NG/audible-activator', 11.079715),
('ciur/papermerge', 11.074305),
('WikiTeam/wikiteam', 10.133091)]