原创 更新于 时间( 2018-02-07 10:38:00) ( 1891) 次浏览 标签 : Elasticsearch 分布式搜索
导读

Elasticsearch 索引的全量/增量更新 ...

当你的es 索引数据从mysql 全量导入之后,如何根据其他客户端改变索引数据源带来的变动来更新 es 索引数据呢。

首先用 Python 全量生成 Elasticsearch 和 ik 初始的分词索引数据,增量更新索引实现如下:

后端数据导图

服务端(Python+redis-sub)

# Python-redis 开启监听 'leon' 等待客户端推送消息,来增量更新es文档 

#-*- coding:utf8 -*-

import sys
import redis
import json
import elasticsearch
import os


class Task(object):

    def __init__(self):
        es_servers = [{
            "host": "server-host",
            "port": "es-port"
        }]
        self.es_client = elasticsearch.Elasticsearch(hosts=es_servers)


        pool = redis.ConnectionPool(host='redis-host', port=6379,db=0,password='user:passwd')
        self.r = redis.Redis(connection_pool=pool)
        self.ps = self.r.pubsub()
        self.ps.psubscribe(['macco','comment']) # sub多个频道要用 psubscribe 

    def listen_task(self):
        for i in self.ps.listen():
            # print i

             if i['type'] == 'message' or i['type'] == 'pmessage': # sub多个频道 type是 pmessage 
                data =  json.loads(i['data'])
                self.index = data['index']
                self.doc_type = data['type']
                cate = data['cate']
                id = data['id']
                        
                if cate == 'update':
                    self.update_by_id(id,row_obj)
                elif cate == 'delete':
                    self.delete_by_id(id)
                else:
                    self.create_by_id(id)

    def update_by_id(self, id,row_obj):
        """row_obj 就是 包含了 _id 和 其他 要更新的字段的 kv [] 取id 和 剩下的根据给定的_id,更新ES文档"""
        res = self.es_client.update(index=self.index, doc_type=self.doc_type, body={"doc": row_obj}, id=id)
        print res

    def create_by_id(self, id):
        """id ; 创建新的ES文档"""
        create_by_id = "python /workspace/django-bash/elastic/autobash/"+ self.index  +"_es.py  " + str(id)
        res = os.popen(create_by_id).read()
        print res

    def delete_by_id(self, _id):
        """
        根据给定的id,删除文档
        暂时先不用可以根据查询 条件 isdeleted = 0 来判断
        """
        self.es_client.delete(index=self.index, doc_type=self.doc_type, id=_id)

if __name__ == '__main__':
    print 'listen task queue'
    Task().listen_task()

创建 info single 的 Python 脚本

    def crete_info_single(self,ID):
        index = 'info'
        type = 'full'
        # 生成info的json

        model = Model('ali', 'Info')
        info_list = model.getAllInfoByID(ID)
        all_len = len(info_list)
        for i in range(0, all_len):
        
            # 业务逻辑代码·····

            
            document = info_list[i]
            request_timeout = 100
            create_response = self.es.crete_index(index, type, document, request_timeout, ID)

客户端(Php + redis-pub )

 # php-redis sub 'leon' ,传递约定的格式,指定对 es 文档的操作类型

 # elk 软删除 

            $message = array(
                "index" => "info",
                "type"=>"full",
                "id" => $info[0]['ID'],
                "cate"=>'update',
                "params"=> array(
                    "IsDeleted"=>1
                )
            );
            $json_mess =json_encode($message);

            $redis->publish('leon', $json_mess);

这样,当客户端更改了 mysql 时候,往redis leon 频道 publish 一条对应的消息,服务端接收消息后,就会更新对应的 es 索引。

Leon0204

打杂后端程序猿~

讨论区

发表评论
昵称:
评论:
验证