小九博客

  • 首页
  • 编程开发
  • 信息安全
  • 工具资源
  • 随笔
  • 在线工具
    • 在线图片水印
    • Json解析
    • JavaRuntimeExec
    • 加解密/编码工具集
  • 关于
小九博客
Hack The World!
  1. 首页
  2. 编程开发
  3. 正文

MySQL数据同步到ElasticSearch(Logstash方案)爬坑纪实

2021年08月11日

前言

做态势感知平台时,遇到了数据量的快速增长,MySQL数据终于达到了百万级别,这个时候对于大量模糊查询的需求MySQL显然不如ElasticSearch更为专业。在先前项目中,ES的部署、数据存储都是由其他同事专门负责,因而对ElasticSearch的使用还处于接口调用级别,今天终于轮到自己尝遍苦涩来爬坑了。

需求分析

现有环境:

  • MySQL 8版本
  • ElasticSearch 7.8

需求为第一次全量更新MySQL数据到ES,后续增量同步数据。

根据需求查阅了相关资料,目前较为使用广泛的方案有两种:

  1. 基于MySQL binlog
  2. Logstash

其中基于MySQL binlog的常用组件包括:

  • go-mysql-elasticsearch:一个是开源方案,基于GO语言编写的小工具,Github地址:https://github.com/go-mysql-org/go-mysql-elasticsearch 。查看相关文档感觉使用十分简单,但其仅支持MySQL8.0以下版本和ElasticSearch 6.0以下版本。不能说不太时髦,简直是落后时代。
  • Canal:阿里开源,一款看起来还不错的数据库中间件,支持基于日志增量订阅和消费的业务场景。文档支持MySQL5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。在经过几天的调研,发现其存在文档过时、各版本之间变化差异大、部署复杂的问题因此放弃。

最终还是选用了业内常用的ELK技术栈,采用Logstash作为数据同步服务。相较于工具级的脚本,他优势在于配置、插件较为丰富,适应不同的数据库以及各类繁杂的版本。和Canal对比的话,莫过于网上的文章普天盖地,文档一直在保持更新~

Logstash采集MySQL数据到ES

为了避免各类Java JDK版本带来的坑,采用Docker进行部署。

同时Logstash存在sql_last_value 变量可以用来做增量更新,增量更新支持两种判断,一是数字类型的字段,另一种是Date、Datetime、Timesteamp 日期类型的判断。判断逻辑为 记录最后一次更新值,下次执行时与上一次比较,从而判断是否更新。

本文MySQL数据表中存在created_at字段,类型为timestamp,由于业务形态,数据不需要更新,因此没有使用updated_at更新时间来判断,如果您的业务有更新操作,建议使用更新时间的时间戳。

Logstash部署

1
2
3
docker pull logstash:7.8.0
mkdir -p /docker
docker run -d --name logstash -v /docker/logstash/:/etc/logstash/pipeline/ logstash:7.8.0

此时创建了一个名为logstash的docker镜像。同时挂载容器内/etc/logstash/pipeline到本地logstash目录,后续用于存放需要用到的插件以及配置。

此处需要注意权限问题,Docker容器内logstash启动时使用的是logstash用户,如果不一致可能会没有权限读取该目录下的配置和插件。

Logstash配置

  1. 下载MySQL Connector

    Logstash读取MySQL采用的是JDBC驱动,因此需要下载MySQL对应版本的Connector。下载地址:https://repo1.maven.org/maven2/mysql/mysql-connector-java/

    根据自己MySQL的版本进行选择,我这里选择的是8.0.26.https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.26/mysql-connector-java-8.0.26.jar

    将connector放到/etc/logstash/pipeline目录.

  2. 添加logstash输入输出配置

    贴一下我的配置,使用时需要删除注释,不然会报错。

    文件位置:/etc/logstash/conf.d/mysql_es.conf

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
       input {
    stdin {}
    jdbc {
    # type用于和其他插件隔离
    type => "test"
    # JDBC 连接字符串
    jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/test?characterEncoding=UTF-8&useSSL=false&autoReconnect=true"
    jdbc_user => "username"
    jdbc_password => "password"
    # JDBC驱动位置
    jdbc_driver_library => "/etc/logstash/pipeline/mysql-connector-java-8.0.26.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"

    # 支持分页执行
    jdbc_paging_enabled => "true"
    # 每页数量
    jdbc_page_size => "50000"
    codec => plain { charset => "UTF-8"}

    # 是否需要记录某个字段值,如果为true,我们可以自定义要记录的字段值,例如日期字段。如果为false,记录的是上次执行的timestamp.
    use_column_value => "true"
    # 需要跟踪的字段,增量更新时,需要借助该字段进行判断。
    tracking_column => "created_at"
    # 跟踪的字段类型
    tracking_column_type => "timestamp"
    # 是否记录最后一次执行
    record_last_run => "true"
    # 最后一次执行 存放的地址,这里首次需要自行创建该文件,由于我使用日期时间进行判断,因此需要写默认值 1970-01-01 00:00:00
    last_run_metadata_path => "/opt/logstash/lastrun/.logstash_jdbc_last_run"

    # 设置时区
    jdbc_default_timezone => "Asia/Shanghai"
    # 很多教程建议使用statement_filepath,在SQL文件里写判断语句,我在实际使用过程中发现这种方式会在SQL语句后面加入一个\n换行符。这里我采用statement直接写SQL的方式进行。filepath的方式作为备份。
    # statement_filepath => "/etc/logstash/pipeline/jdbc.sql"
    statement => "select * from orders where created_at > :sql_last_value"
    # 执行的时候是否清空先前的
    clean_run => false
    # 定时执行Cron规则
    schedule => "* * * * *"
    }
    }


    output {
    # 和input中的type保持一致,用于插件之间隔离
    if [type] == "test" {
    elasticsearch {
    hosts => "127.0.0.1:9200"
    # ElasticSearch 索引
    index => "test"
    # 文档类型 需要注意的是ES7.0 之前,一个 Index 可以创建多个 Document Type,但在 7.0 开始及之后,一个Index 只能对应一个 Document Type,且默认是 _doc 。同时后续不建议继续使用。
    document_type => "_order"
    document_id => "%{id}"
    }
    stdout {
    codec => json_lines
    }
    }
    }

    在这个过程中也遇到了一个坑,最初的配置由网上的文章复制而来,其 把sql_last_value写成了last_sql_value。。导致一直提示SQL语法错误。这里分享一个调试命令,我通过此方法成功发现了该问题。

    bin/logstash -f /etc/logstash/conf.d/mysql_es.conf --verbose --debug

  3. 修改执行配置

    进入容器的logstash安装目录,修改配置vi config/pipelines.yml内容如下,使其支持多个配置的执行

    1
    2
    - pipeline.id: main
    path.config: "/etc/logstash/conf.d/*.conf"

    一般情况,这里就配置完毕了,但是在某些Logstash官方提供的Docker镜像中,默认开启了x-pack组件,如果ES没有用到该组件需要修改如下配置vi config/logstash.yaml,将Xpack相关的配置都注释掉:

    1
    2
    3
    # 注释~
    # xpack.monitoring.elasticsearch.hosts: [ "http://127.0.0.1:9200" ]

  4. 启动Logstash,开始同步数据

  5. 查看数据是否同步成功

    1
    2
    3
    4
    5
    # 查看索引结构
    # curl localhost:9200/索引名称?pretty
    curl localhost:9200/test?pretty
    # 查看所有索引信息
    curl -XGET http://localhost:9200/_cat/indices?v

一些其他的坑和知识

  1. logstash默认的日志位置:/var/log/logstash,错误日志logstash-plain.log,stdout输出logstash.stdout
  2. logstash与ElasticSearch的版本尽量保持一致,避免奇怪的问题。
  3. 在 7.0 以及之后的版本中ES的 Document_Type 被废弃了。一个 index 中只有一个默认的 type,即 _doc。
  4. 自Elastic 7.11 版本开始,Elasticsearch 与 Kibana 代码所遵循的 Apache 2.0 许可会调整为 SSPL 与 Elastic License 双许可。
  5. Logstash编写规则过程中,注意MySQL表中的字段名是否和Logstash字段重复,例如上文中做Logstash隔离时,用到了type用于标识不同的插件,如果MySQL中存在同名的type字段,判断时则会根据MySQL中的值进行判断,导致逻辑出错。

ref:

https://www.elastic.co/guide/en/logstash/7.8/index.html logstash官方文档

https://zhuanlan.zhihu.com/p/92207162 ES扫盲

标签 ElasticSearch MySQL同步 Logstash 同步数据 mysql+es
最后更新:2021年08月11日

文章评论

小九

Just For Fun

文章大纲
  1. 前言
  2. 需求分析
  3. Logstash采集MySQL数据到ES
    1. Logstash部署
    2. Logstash配置
  4. 一些其他的坑和知识
分类目录
  • 编程开发
  • Yii2
  • 随笔
  • 工具资源
  • Django
  • 信息安全
标签聚合
同步数据 docker-compose 容器漏洞 七牛云 DockerSwarm codeception
随机 最新 热点
随机 最新 热点
Yii2 资源大全 MySQL数据同步到ElasticSearch(Logstash方案)爬坑纪实 Codeception(三) Yii2 Audit插件的使用【日志记录插件】 PHP反射机制 Glary Utilities PRO注册码
OpenSearch集群部署 DNSRebind攻击 MySQL数据同步到ElasticSearch(Logstash方案)爬坑纪实 自动化编排学习(一)部署篇 常见容器漏洞总结 免费CDN加速手把手教程

COPYRIGHT © 2021 小九博客 ALL RIGHTS RESERVED.