Mysql到Elasticsearch高效实时同步Debezium实现

目录
  • 题记
  • 1、 binlog认知
    • 1.1 啥是 binlog?
    • 1.2 阿里的canal实现了增量mysql同步
  • 2、基于binlog的同步方式
    • 3、debezium介绍
      • 4、同步架构
        • 5、debezium实现mysql到es增删改实时同步
          • 5.1 debezium安装
          • 5.2 mysql binlog等相关配置。
          • 5.3 配置connector连接器。
          • 5.4 启动connector
          • 5.5 验证写入是否成功。
          • 5.6 验证消费数据验证写入是否正常
        • 6、kafka-connector实现kafka同步elasticsearch
          • 6.1、kafka-connector介绍
          • 6.2、kafka到es connector同步配置
          • 6.3 kafka到es启动connector
          • 6.4 kafka-connctor restful api查看
        • 7、坑复盘。
          • 8、小结

            题记

            来自elasticsearch中文社区的问题——
            mysql中表无唯一递增字段,也无唯一递增时间字段,该怎么使用logstash实现mysql实时增量导数据到es中?

            logstash和kafka_connector都仅支持基于自增id或者时间戳更新的方式增量同步数据。
            回到问题本身:如果库表里没有相关字段,该如何处理呢?

            本文给出相关探讨和解决方案。

            1、 binlog认知

            1.1 啥是 binlog?

            binlog是mysql sever层维护的一种二进制日志,与innodb引擎中的redo/undo log是完全不同的日志;
            其主要是用来记录对mysql数据更新或潜在发生更新的sql语句,并以”事务”的形式保存在磁盘中;
            作用主要有:

            1)复制:达到master-slave数据一致的目的。2)数据恢复:通过mysqlbinlog工具恢复数据。3)增量备份。

            1.2 阿里的canal实现了增量mysql同步

            一图胜千言,canal是用java开发的基于数据库增量日志解析、提供增量数据订阅&消费的中间件。

            目前,canal主要支持了mysql的binlog解析,解析完成后才利用canal client 用来处理获得的相关数据。目的:增量数据订阅&消费。

            综上,使用binlog可以突破logstash或者kafka-connector没有自增id或者没有时间戳字段的限制,实现增量同步。

            2、基于binlog的同步方式

            1)基于kafka connect的debezium 开源工程,地址:. https://debezium.io/
            2)不依赖第三方的独立应用: maxwell开源项目,地址:http://maxwells-daemon.io/

            由于已经部署过conluent(kafka的企业版本,自带zookeeper、kafka、ksql、kafka-connector等),本文仅针对debezium展开。

            3、debezium介绍

            debezium是捕获数据实时动态变化的开源的分布式同步平台。能实时捕获到数据源(mysql、mongo、postgresql)的:新增(inserts)、更新(updates)、删除(deletes)操作,实时同步到kafka,稳定性强且速度非常快。
            特点:

            1)简单。无需修改应用程序。可对外提供服务。

            2)稳定。持续跟踪每一行的每一处变动。

            3)快速。构建于kafka之上,可扩展,经官方验证可处理大容量的数据。

            4、同步架构

            如图,mysql到es的同步策略,采取“曲线救国”机制。
            步骤1 基debezium的binlog机制,将mysql数据同步到kafka。
            步骤2 基于kafka_connector机制,将kafka数据同步到elasticsearch。

            5、debezium实现mysql到es增删改实时同步

            软件版本:

            confluent:5.1.2;
            debezium:0.9.2_final;
            mysql:5.7.x.
            elasticsearch:6.6.1

            5.1 debezium安装

            confluent的安装部署参见:http://t.cn/ef5pozk,不再赘述。

            debezium的安装只需要把debezium-connector-mysql的压缩包解压放到confluent的解压后的插件目录(share/java)中。

            mysql connector plugin 压缩包的下载地址:https://debezium.io/docs/install/。

            注意重启一下confluent,以使得debezium生效。

            5.2 mysql binlog等相关配置。

            debezium使用mysql的binlog机制实现数据动态变化监测,所以需要mysql提前配置binlog。
            核心配置如下,在mysql机器的/etc/my.cnf的mysqld下添加如下配置。

            [mysqld]
            server-id         = 223344
            log_bin           = mysql-bin
            binlog_format     = row
            binlog_row_image  = full
            expire_logs_days  = 10
            

            然后,重启一下mysql以使得binlog生效。

            systemctl start mysqld.service

            5.3 配置connector连接器。

            配置confluent路径目录 : /etc
            创建文件夹命令 :

            mkdir kafka-connect-debezium

            在mysql2kafka_debezium.json存放connector的配置信息 :

            [root@localhost kafka-connect-debezium]# cat mysql2kafka_debezium.json
            { 
                    "name" : "debezium-mysql-source-0223",
                    "config":
                    {
                         "connector.class" : "io.debezium.connector.mysql.mysqlconnector",
                         "database.hostname" : "192.168.1.22",
                         "database.port" : "3306",
                         "database.user" : "root",
                         "database.password" : "xxxxxx",
                         "database.whitelist" : "kafka_base_db",
                         "table.whitlelist" : "accounts",
                         "database.server.id" : "223344",
                         "database.server.name" : "full",
                         "database.history.kafka.bootstrap.servers" : "192.168.1.22:9092",
                         "database.history.kafka.topic" : "account_topic",
                         "include.schema.changes" : "true" ,
                         "incrementing.column.name" : "id",
                         "database.history.skip.unparseable.ddl" : "true",
                         "transforms": "unwrap,changetopic",
                         "transforms.unwrap.type": "io.debezium.transforms.unwrapfromenvelope",
                         "transforms.changetopic.type":"org.apache.kafka.connect.transforms.regexrouter",
                         "transforms.changetopic.regex":"(.*)",
                         "transforms.changetopic.replacement":"$1-smt"
                    }
            }
            

            注意如下配置:

            “database.server.id”:对应mysql中的server-id的配置。

            “database.whitelist”: 待同步的mysql数据库名。

            “table.whitlelist”:待同步的mysq表名。

            “database.history.kafka.topic”:存储数据库的shcema的记录信息,而非写入数据的topic

            “database.server.name”:逻辑名称,每个connector确保唯一,作为写入数据的kafka topic的前缀名称。

            坑1:transforms相关5行配置作用是写入数据格式转换。
            如果没有,输入数据会包含:before、after记录修改前对比信息以及元数据信息(source,op,ts_ms等)。

            这些信息在后续数据写入elasticsearch是不需要的。(注意结合自己业务场景)。

            格式转换相关原理:https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-3/

            5.4 启动connector

            curl -x post -h "content-type:application/json" 
            --data @mysql2kafka_debezium.json.json 
            http://192.168.1.22:18083/connectors | jq
            

            5.5 验证写入是否成功。

            查看kafka-topic

            kafka-topics --list --zookeeper localhost:2181
            

            此处会看到写入数据topic的信息。

            注意新写入数据topic的格式:database.schema.table-smt 三部分组成。

            本示例topic名称:full.kafka_base_db.account-smt。

            5.6 验证消费数据验证写入是否正常

            ./kafka-avro-console-consumer –topic full.kafka_base_db.account-smt –bootstrap-server 192.168.1.22:9092 –from-beginning

            至此,debezium实现mysql同步kafka完成。

            6、kafka-connector实现kafka同步elasticsearch

            6.1、kafka-connector介绍

            见官网:https://docs.confluent.io/current/connect.html

            kafka connect是一个用于连接kafka与外部系统(如数据库,键值存储,检索系统索引和文件系统)的框架。
            连接器实现公共数据源数据(如mysql、mongo、pgsql等)写入kafka,或者kafka数据写入目标数据库,也可以自己开发连接器。

            6.2、kafka到es connector同步配置

            配置路径:

            /home/confluent-5.1.0/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties
            

            配置内容:

            "connector.class": "io.confluent.connect.elasticsearch.elasticsearchsinkconnector",
            "tasks.max": "1",
            "topics": "full.kafka_base_db.account-smt",
            "key.ignore": "true",
            "connection.url": "http://192.168.1.22:9200",
            "type.name": "_doc",
            "name": "elasticsearch-sink-test"
            

            6.3 kafka到es启动connector

            启动命令

            confluent load  elasticsearch-sink-test 
            -d /home/confluent-5.1.0/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties
            

            6.4 kafka-connctor restful api查看

            mysql2kafka,kafka2es的connector详情信息可以借助postman或者浏览器或者命令行查看。

               curl -x get http://localhost:8083/connectors

            7、坑复盘。

            坑2 同步的过程中可能出现错误,比如:kafka topic没法消费到数据。
            排解思路如下:

            1)确认消费的topic是否是写入数据的topic;2)确认同步的过程中没有出错。可以借助connector如下命令查看。

               curl -x get http://localhost:8083/connectors-xxx/status
            

            坑3 mysql2es出现日期格式不能识别。
            是mysql jar包的问题,解决方案:在my.cnf中配置时区信息即可。

            坑4 kafka2es,es没有写入数据。
            排解思路:

            1)建议:先创建同topic名称一致的索引,注意:mapping静态自定义,不要动态识别生成。

            2)通过connetor/status排查出错原因,一步步分析。

            8、小结

            binlog的实现突破了字段的限制,实际上业界的go-mysql-elasticsearch已经实现。

            对比:logstash、kafka-connector,虽然debezium“曲线救国”两步实现了实时同步,但稳定性+实时性能相对不错。

            参考:
            [1] https://rmoff.net/2018/03/24/streaming-data-from-mysql-into-kafka-with-kafka-connect-and-debezium/
            [2] https://www.smwenku.com/a/5c0a7b61bd9eee6fb21356a1/zh-cn
            [3] https://juejin.im/post/5b7c036bf265da43506e8cfd
            [4] https://debezium.io/docs/connectors/mysql/#configuration
            [5] https://docs.confluent.io/current/connect/kafka-connect-jdbc/index.html#connect-jdbc

            以上就是mysql到elasticsearch高效实时同步debezium实现的详细内容,更多关于mysql到elasticsearch同步的资料请关注www.887551.com其它相关文章!

            (0)
            上一篇 2022年3月21日
            下一篇 2022年3月21日

            相关推荐