前言
做态势感知平台时,遇到了数据量的快速增长,MySQL数据终于达到了百万级别,这个时候对于大量模糊查询的需求MySQL显然不如ElasticSearch更为专业。在先前项目中,ES的部署、数据存储都是由其他同事专门负责,因而对ElasticSearch的使用还处于接口调用级别,今天终于轮到自己尝遍苦涩来爬坑了。
需求分析
现有环境:
- MySQL 8版本
- ElasticSearch 7.8
需求为第一次全量更新MySQL数据到ES,后续增量同步数据。
根据需求查阅了相关资料,目前较为使用广泛的方案有两种:
- 基于MySQL binlog
- Logstash
其中基于MySQL binlog的常用组件包括:
- go-mysql-elasticsearch:一个是开源方案,基于GO语言编写的小工具,Github地址:https://github.com/go-mysql-org/go-mysql-elasticsearch 。查看相关文档感觉使用十分简单,但其仅支持MySQL8.0以下版本和ElasticSearch 6.0以下版本。不能说不太时髦,简直是落后时代。
- Canal:阿里开源,一款看起来还不错的数据库中间件,支持基于日志增量订阅和消费的业务场景。文档支持MySQL5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。在经过几天的调研,发现其存在文档过时、各版本之间变化差异大、部署复杂的问题因此放弃。
最终还是选用了业内常用的ELK技术栈,采用Logstash作为数据同步服务。相较于工具级的脚本,他优势在于配置、插件较为丰富,适应不同的数据库以及各类繁杂的版本。和Canal对比的话,莫过于网上的文章普天盖地,文档一直在保持更新~
Logstash采集MySQL数据到ES
为了避免各类Java JDK版本带来的坑,采用Docker进行部署。
同时Logstash存在sql_last_value 变量可以用来做增量更新,增量更新支持两种判断,一是数字类型的字段,另一种是Date、Datetime、Timesteamp 日期类型的判断。判断逻辑为 记录最后一次更新值,下次执行时与上一次比较,从而判断是否更新。
本文MySQL数据表中存在created_at字段,类型为timestamp,由于业务形态,数据不需要更新,因此没有使用updated_at更新时间来判断,如果您的业务有更新操作,建议使用更新时间的时间戳。
Logstash部署
1 | docker pull logstash:7.8.0 |
此时创建了一个名为logstash的docker镜像。同时挂载容器内/etc/logstash/pipeline
到本地logstash目录,后续用于存放需要用到的插件以及配置。
此处需要注意权限问题,Docker容器内logstash启动时使用的是logstash用户,如果不一致可能会没有权限读取该目录下的配置和插件。
Logstash配置
下载MySQL Connector
Logstash读取MySQL采用的是JDBC驱动,因此需要下载MySQL对应版本的Connector。下载地址:https://repo1.maven.org/maven2/mysql/mysql-connector-java/
根据自己MySQL的版本进行选择,我这里选择的是8.0.26.https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.26/mysql-connector-java-8.0.26.jar
将connector放到
/etc/logstash/pipeline
目录.添加logstash输入输出配置
贴一下我的配置,使用时需要删除注释,不然会报错。
文件位置:
/etc/logstash/conf.d/mysql_es.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60input {
stdin {}
jdbc {
# type用于和其他插件隔离
type => "test"
# JDBC 连接字符串
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/test?characterEncoding=UTF-8&useSSL=false&autoReconnect=true"
jdbc_user => "username"
jdbc_password => "password"
# JDBC驱动位置
jdbc_driver_library => "/etc/logstash/pipeline/mysql-connector-java-8.0.26.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
# 支持分页执行
jdbc_paging_enabled => "true"
# 每页数量
jdbc_page_size => "50000"
codec => plain { charset => "UTF-8"}
# 是否需要记录某个字段值,如果为true,我们可以自定义要记录的字段值,例如日期字段。如果为false,记录的是上次执行的timestamp.
use_column_value => "true"
# 需要跟踪的字段,增量更新时,需要借助该字段进行判断。
tracking_column => "created_at"
# 跟踪的字段类型
tracking_column_type => "timestamp"
# 是否记录最后一次执行
record_last_run => "true"
# 最后一次执行 存放的地址,这里首次需要自行创建该文件,由于我使用日期时间进行判断,因此需要写默认值 1970-01-01 00:00:00
last_run_metadata_path => "/opt/logstash/lastrun/.logstash_jdbc_last_run"
# 设置时区
jdbc_default_timezone => "Asia/Shanghai"
# 很多教程建议使用statement_filepath,在SQL文件里写判断语句,我在实际使用过程中发现这种方式会在SQL语句后面加入一个\n换行符。这里我采用statement直接写SQL的方式进行。filepath的方式作为备份。
# statement_filepath => "/etc/logstash/pipeline/jdbc.sql"
statement => "select * from orders where created_at > :sql_last_value"
# 执行的时候是否清空先前的
clean_run => false
# 定时执行Cron规则
schedule => "* * * * *"
}
}
output {
# 和input中的type保持一致,用于插件之间隔离
if [type] == "test" {
elasticsearch {
hosts => "127.0.0.1:9200"
# ElasticSearch 索引
index => "test"
# 文档类型 需要注意的是ES7.0 之前,一个 Index 可以创建多个 Document Type,但在 7.0 开始及之后,一个Index 只能对应一个 Document Type,且默认是 _doc 。同时后续不建议继续使用。
document_type => "_order"
document_id => "%{id}"
}
stdout {
codec => json_lines
}
}
}
在这个过程中也遇到了一个坑,最初的配置由网上的文章复制而来,其 把sql_last_value写成了last_sql_value。。导致一直提示SQL语法错误。这里分享一个调试命令,我通过此方法成功发现了该问题。
bin/logstash -f /etc/logstash/conf.d/mysql_es.conf --verbose --debug
修改执行配置
进入容器的logstash安装目录,修改配置
vi config/pipelines.yml
内容如下,使其支持多个配置的执行1
2- pipeline.id: main
path.config: "/etc/logstash/conf.d/*.conf"一般情况,这里就配置完毕了,但是在某些Logstash官方提供的Docker镜像中,默认开启了x-pack组件,如果ES没有用到该组件需要修改如下配置
vi config/logstash.yaml
,将Xpack相关的配置都注释掉:1
2
3# 注释~
# xpack.monitoring.elasticsearch.hosts: [ "http://127.0.0.1:9200" ]启动Logstash,开始同步数据
查看数据是否同步成功
1
2
3
4
5# 查看索引结构
# curl localhost:9200/索引名称?pretty
curl localhost:9200/test?pretty
# 查看所有索引信息
curl -XGET http://localhost:9200/_cat/indices?v
一些其他的坑和知识
- logstash默认的日志位置:
/var/log/logstash
,错误日志logstash-plain.log
,stdout输出logstash.stdout
- logstash与ElasticSearch的版本尽量保持一致,避免奇怪的问题。
- 在 7.0 以及之后的版本中ES的 Document_Type 被废弃了。一个 index 中只有一个默认的 type,即 _doc。
- 自Elastic 7.11 版本开始,Elasticsearch 与 Kibana 代码所遵循的 Apache 2.0 许可会调整为 SSPL 与 Elastic License 双许可。
- Logstash编写规则过程中,注意MySQL表中的字段名是否和Logstash字段重复,例如上文中做Logstash隔离时,用到了type用于标识不同的插件,如果MySQL中存在同名的type字段,判断时则会根据MySQL中的值进行判断,导致逻辑出错。
ref:
https://www.elastic.co/guide/en/logstash/7.8/index.html logstash官方文档
文章评论