需要将mysql中的一些数据增量或者全量的导入到Elasticsearch中。
直接贴 logstash 代码
input{
# file{
# path => "/var/log/messages"
# type => "error"
# start_position => "beginning"
# }
#-----全量---------------
jdbc {
type => "all"
#jdbc Driver .jar
jdbc_driver_library => "/data/EL/logstash/mysql/mysql-connector-java-5.1.47.jar"
#Driver class
jdbc_driver_class => "com.mysql.jdbc.Driver"
#mysql 地址
jdbc_connection_string => "jdbc:mysql://localhost:3306/test"
#重试次数
# connection_retry_attempts => "3"
# connectoin_retry_attempts_wait_time => "1"
#用户名 密码
jdbc_user => "root"
jdbc_password => ""
#parameters => { "favorite_artist" => "Beethoven" }
#特定时间表定期运行 类似cron
schedule => "*/2 * * * *"
statement => "SELECT * from user"
clean_run => false
#允许分页
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
}
#----------全量-------------------
#----增量-----
jdbc {
type => "updata"
#jdbc Driver .jar
jdbc_driver_library => "/data/EL/logstash/mysql/mysql-connector-java-5.1.47.jar"
#Driver class
jdbc_driver_class => "com.mysql.jdbc.Driver"
#mysql 地址
jdbc_connection_string => "jdbc:mysql://localhost:3306/test"
#重试次数
# connection_retry_attempts => "3"
# connectoin_retry_attempts_wait_time => "1"
#用户名 密码
jdbc_user => "root"
jdbc_password => ""
#使用其它字段追踪,而不是用时间
use_column_value => true
#追踪的字段
tracking_column => id
record_last_run => true
#上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值
last_run_metadata_path => "./config/station_parameter.txt"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
#clean_run设置为true,则将忽略此值并将其sql_last_value设置为1970年1月1日
clean_run => false
#sql语句 :sql_last_value会获取sql_last_value值
statement => "select * from user where id > :sql_last_value"
#sql语句 文件形式
#statement_filepath => ""
##特定时间表定期运行 类似cron
schedule => "* * * * *"
}
#-----增量ok------
}
filter{
if [type] == "all"{
mutate {
# remove_field => "@timestamp"
remove_field => "@version"
}
json {
source => "message"
target => "all"
remove_field => ["message"]
}
}
else if [type] == "updata"{
mutate {
remove_field => "@timestamp"
remove_field => "@version"
}
json {
source => "message"
target => "updata"
remove_field => ["message"]
}
}
#定义时间
# date {
# match => [ "timestamp", "yyyy-MM-dd-HH:mm:ss" ]
# locale => "cn"
# }
##定义数据的格式
# grok {
# match => { "message" => "%{IP:client_id_address} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:http_response_time}" }
# }
#定义客户端的IP是哪个字段(上面定义的数据格式)
# geoip {
# source => "clientIp"
# }
}
output{
#stdout
#stdout{codec => rubydebug}
# 输出到 elasticsearch
if [type] == "all"{
elasticsearch {
hosts => ["172.31.3.47:9200"]
index => "all"
#将"_id"的值设为mysql的id列
document_id => "%{id}"
#document_type => "base"
}
}
if [type] == "updata"{
elasticsearch {
hosts => ["172.31.3.47:9200"]
index => "updata"
#将"_id"的值设为mysql的id列
document_id => "%{id}"
#document_type => "base"
}
}
}
效果
问题
- 目前前同步都是没有硬删除操作,如果mysql中执行了del操作,es上不会同步删除。网上找过过方法,大部分建议使用软删除,也就是增加一个字段,删除操作更这个字段。
评论前必须登录!
注册