欢迎光临
dockeryun一直在努力

ELK Logstash-input-jdbc同步数据库中的数据全量或增量

需要将mysql中的一些数据增量或者全量的导入到Elasticsearch中。

直接贴 logstash 代码

input{
#   file{
#      path => "/var/log/messages"
#      type => "error"
#      start_position => "beginning"
#   }
#-----全量---------------
    jdbc {
        type => "all"
        #jdbc Driver .jar
        jdbc_driver_library => "/data/EL/logstash/mysql/mysql-connector-java-5.1.47.jar"
        #Driver  class
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        #mysql 地址
        jdbc_connection_string => "jdbc:mysql://localhost:3306/test"
        #重试次数
#        connection_retry_attempts => "3"
#        connectoin_retry_attempts_wait_time => "1"

        #用户名 密码
        jdbc_user => "root"
        jdbc_password => ""

        #parameters => { "favorite_artist" => "Beethoven" }
        #特定时间表定期运行 类似cron
        schedule => "*/2 * * * *"
        statement => "SELECT * from user"
        clean_run => false
        #允许分页
        jdbc_paging_enabled => "true"
        jdbc_page_size => "50000"
    }
#----------全量-------------------
#----增量-----
    jdbc {
        type => "updata"
        #jdbc Driver .jar
        jdbc_driver_library => "/data/EL/logstash/mysql/mysql-connector-java-5.1.47.jar"
        #Driver  class
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        #mysql 地址
        jdbc_connection_string => "jdbc:mysql://localhost:3306/test"
        #重试次数
#        connection_retry_attempts => "3"
#        connectoin_retry_attempts_wait_time => "1"

        #用户名 密码
        jdbc_user => "root"
        jdbc_password => ""

        #使用其它字段追踪,而不是用时间
        use_column_value => true
        #追踪的字段
        tracking_column => id
        record_last_run => true
        #上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值
        last_run_metadata_path => "./config/station_parameter.txt"
        jdbc_paging_enabled => "true"
        jdbc_page_size => "50000"
        #clean_run设置为true,则将忽略此值并将其sql_last_value设置为1970年1月1日
        clean_run => false
        #sql语句 :sql_last_value会获取sql_last_value值
        statement => "select * from user where id > :sql_last_value"
        #sql语句 文件形式
        #statement_filepath => ""
        ##特定时间表定期运行 类似cron
        schedule => "* * * * *"
    }
#-----增量ok------

}




filter{
    if [type] == "all"{
        mutate {
#    remove_field => "@timestamp"
            remove_field => "@version"

        }
        json {
            source => "message"
            target => "all"
            remove_field => ["message"]
        }
    }
    else if [type] == "updata"{
        mutate {
            remove_field => "@timestamp"
            remove_field => "@version"
            }
        json {
            source => "message"
            target => "updata"
            remove_field => ["message"]
        }

    }

#定义时间
#  date {
#    match => [ "timestamp", "yyyy-MM-dd-HH:mm:ss" ]
#    locale => "cn"
#  }

##定义数据的格式
#  grok {
#   match => { "message" => "%{IP:client_id_address} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:http_response_time}" }
#  }
#定义客户端的IP是哪个字段(上面定义的数据格式)
#  geoip {
#    source => "clientIp"
#  }


}


output{
   #stdout
   #stdout{codec => rubydebug}


    # 输出到 elasticsearch
    if [type] == "all"{
        elasticsearch {
        hosts => ["172.31.3.47:9200"]
        index => "all"
#将"_id"的值设为mysql的id列
        document_id => "%{id}"
        #document_type => "base"
        }
    }
    if [type] == "updata"{
        elasticsearch {
        hosts => ["172.31.3.47:9200"]
        index => "updata"
        #将"_id"的值设为mysql的id列
        document_id => "%{id}"
        #document_type => "base"
        }
    }

}

效果

问题

  1. 目前前同步都是没有硬删除操作,如果mysql中执行了del操作,es上不会同步删除。网上找过过方法,大部分建议使用软删除,也就是增加一个字段,删除操作更这个字段。
赞(0) 打赏
未经允许不得转载:DockerYun » ELK Logstash-input-jdbc同步数据库中的数据全量或增量
分享到: 更多 (0)

评论 抢沙发

评论前必须登录!

 

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏