基于 magi_dataset 构建 ElasticSearch 搜索

date
Feb 15, 2023
slug
from-magi_dataset-to-elasticsearch
status
Published
tags
DataScience
SemanticSearch
summary
如何在Amazon EC2上配置Elasticsearch并基于 magi_dataset导入数据
type
Post
notion image
magi_dataset 是我为了近期的一个小项目写的数据集工具。通过这个工具,你可以快速访问从 GitHub 和 HackerNews 抓取的开源软件语料。通过这个工具,可以快速建立GitHub的语义搜索和传统检索服务,也可以基于Metarank做二者的混合搜索。
本文简单介绍如何在Amazon EC2上配置Elasticsearch,然后基于 magi_dataset向Elasticsearch导入数据。

安装Elasticsearch 8.6.2

首先创建一台 EC2 Instance,系统选用 Amazon Linux。运行下列命令:
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.6.2-x86_64.rpm
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.6.2-x86_64.rpm.sha512
shasum -a 512 -c elasticsearch-8.6.2-x86_64.rpm.sha512 
sudo rpm --install elasticsearch-8.6.2-x86_64.rpm
完成后理论上会自动执行安全配置,并且输出配置结果。这时记得把终端输出拷贝保存一下。然后配置 systemd
sudo /bin/systemctl daemon-reload
sudo /bin/systemctl enable elasticsearch.service
sudo systemctl start elasticsearch.service
安装完成以后可以测试一下:
sudo curl --cacert /etc/elasticsearch/certs/http_ca.crt -u elastic https://localhost:9200
在要求 Enter host password for user 'elastic' 时输入刚才复制的密码。看到如下输出表示一切正常:
{
  "name" : "ip-172-31-50-108.ec2.internal",
  "cluster_name" : "elasticsearch",
  "cluster_uuid" : "CnxGUCkvRpqQhRl4ghNhKQ",
  "version" : {
    "number" : "8.6.2",
    "build_flavor" : "default",
    "build_type" : "rpm",
    "build_hash" : "2d58d0f136141f03239816a4e360a8d17b6d8f29",
    "build_date" : "2023-02-13T09:35:20.314882762Z",
    "build_snapshot" : false,
    "lucene_version" : "9.4.2",
    "minimum_wire_compatibility_version" : "7.17.0",
    "minimum_index_compatibility_version" : "7.0.0"
  },
  "tagline" : "You Know, for Search"
}
接下来修改 Elasticsearch 的配置。在sudo nano /etc/elasticsearch/elasticsearch.yml中,修改
network.host: $EC2_IP_PRIV_ADDR
注意这里的$EC2_IP_PRIV_ADDR是机器的内网IP地址。然后重启服务
sudo systemctl stop elasticsearch.service
sudo systemctl start elasticsearch.service
最后我们保存 http_ca.crt 证书到本地的机器上,方便以后连接使用。首先把这个文件的所有权转换到普通用户上:
sudo cp /etc/elasticsearch/certs/http_ca.crt .
sudo chown $USER:$USER ./http_ca.crt
然后在本地机器上运行
scp ec2-user@$EC2_IP_ADDR:/home/ec2-user/http_ca.crt ./http_ca.crt
即可拷贝证书至本地。其中 $EC2_IP_ADDR 是这台 EC2 机器的公网 IP 地址。

使用 Python 接口建立 Index

Python Elasticsearch Client 文档:
Magi Dataset 文档:
尝试手动向这个 Elasticsearch 实例添加 magi_dataset 中的数据。首先安装依赖:
pip3 install magi_dataset elasticsearch
建立连接:
from magi_dataset import GitHubDataset
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from dataclasses import asdict
from tqdm.auto import tqdm

import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

ELASTIC_PASSWORD = 'YOUR_PASSWORD'
es = Elasticsearch(
    'https://52.87.231.111:9200', 
    # ssl_assert_fingerprint=CERT_FINGERPRINT,
    ca_certs =  './http_ca.crt',
    basic_auth = ("elastic", ELASTIC_PASSWORD),
    verify_certs=False,
)
es.info()
运行后显示:
ObjectApiResponse({'name': 'ip-172-31-50-108.ec2.internal', 'cluster_name': 'elasticsearch', 'cluster_uuid': 'CnxGUCkvRpqQhRl4ghNhKQ', 'version': {'number': '8.6.2', 'build_flavor': 'default', 'build_type': 'rpm', 'build_hash': '2d58d0f136141f03239816a4e360a8d17b6d8f29', 'build_date': '2023-02-13T09:35:20.314882762Z', 'build_snapshot': False, 'lucene_version': '9.4.2', 'minimum_wire_compatibility_version': '7.17.0', 'minimum_index_compatibility_version': '7.0.0'}, 'tagline': 'You Know, for Search'})
下面来尝试批量上传数据。
def upload_to_es(es_instance, data, index:str, batch_size=1000):
    bulk_data = []
    for i, repo in enumerate(tqdm(data)):
        bulk_data.append(
            {
                '_index': index,
                '_id': i,
                "_source": asdict(repo)
            }
        )
        if (i + 1) % batch_size == 0:
            bulk(es_instance, bulk_data)
            bulk_data = []
    bulk(es_instance, bulk_data)
    es_instance.indices.refresh(index=index)
    return es_instance.cat.count(index=index, format="json")

for lang in ['Python', 'C++', 'JavaScript', 'Go', 'Rust']:
    lang_safe = lang.lower().replace('++', 'pp')
    es.options(ignore_status=400).indices.create(index=f'{lang_safe}-index')
    data = GitHubDataset(empty=False, file_path=f'{lang_safe}-latest')
    print(
        upload_to_es(
            data, 
            index = f'{lang_safe}-index', 
            batch_size = 1000
        )
    )
构建一个简单的搜索:
resp = es.search(
    index='python-index',
    body={
        "query": {
            "match" : {
                "readme" : "python web archiving service"
            }
        },            
    }
)
[(x['_source']['name'], x['_score']) for x in resp.body['hits']['hits']]
[('internetarchive/brozzler', 17.063648),
 ('ArchiveBox/ArchiveBox', 16.825933),
 ('Rhizome-Conifer/conifer', 15.135596),
 ('oduwsdl/ipwb', 14.298318),
 ('foxmask/django-th', 13.880616),
 ('wal-e/wal-e', 12.302505),
 ('laiwei/thepast', 11.558967),
 ('inAudible-NG/audible-activator', 11.079715),
 ('ciur/papermerge', 11.074305),
 ('WikiTeam/wikiteam', 10.133091)]
 

© Enoch2090 2017 - 2023