ELK 学习笔记(六)—— 从 PostgreSQL 同步数据到 Elasticsearch(Logstash)

Published: 2020-06-09

Tags: elk logstash

本文总阅读量

概要

之前记录使用 abc 工具来同步 PostgreSQL 中的数据到 Elasticsearch,如果从灵活性来说,Logstash 应该是更好的方式,并且 Logstash 支持很多输入源和输出源,数据的过滤,格式化功能也很强大方便。

安装 Logstash

在能够访问外网的机器上安装 Logstash ,官方文档:installing-logstash

Centos 7 安装 Logstash

sudo rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch

# 添加logstash源
vim /etc/yum.repos.d/logstash.repo

[logstash-7.x]
name=Elastic repository for 7.x packages
baseurl=https://artifacts.elastic.co/packages/7.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md

sudo yum install logstash

Ubuntu 安装 Logstash

wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -

echo "deb https://artifacts.elastic.co/packages/7.x/apt stable main" | sudo tee -a /etc/apt/sources.list.d/elastic-7.x.list

sudo apt-get update && sudo apt-get install logstash

修改 jvm 参数(可选)

配置文件地址:/etc/logstash/jvm.options

## JVM configuration

# Xms represents the initial size of total heap space
# Xmx represents the maximum size of total heap space

-Xms64m
-Xmx128m

我的搬瓦工服务器内存512M,这里设置一下最大使用内存大小,不然运行会报错。

获取离线安装包

在服务器上下载插件,网上教程里大多说要下载 logstash-input-jdbc,看提示应该安装 logstash-integration-jdbc

[root@184 ~]# /usr/share/logstash/bin/logstash-plugin install logstash-input-jdbc
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
Validating logstash-input-jdbc
ERROR: Installation aborted, plugin 'logstash-input-jdbc' is already provided by 'logstash-integration-jdbc'

重新下载

[root@184 ~]# /usr/share/logstash/bin/logstash-plugin install logstash-integration-jdbc
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
Validating logstash-integration-jdbc
Installing logstash-integration-jdbc
Installation successful

生成安装包

[root@184 ~]# /usr/share/logstash/bin/logstash-plugin prepare-offline-pack logstash-integration-jdbc
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
Offline package created at: /usr/share/logstash/logstash-offline-plugins-7.7.0.zip
You can install it with this command `bin/logstash-plugin install file:///usr/share/logstash/logstash-offline-plugins-7.7.0.zip`

将生成的离线安装包下载回来,上传到需要安装的服务器

离线安装插件

# 复制logstash 插件到容器
sudo docker cp logstash-offline-plugins-7.7.0.zip elk:/opt

# 进入elk容器执行安装
root@1c622945a48b:/opt/logstash# bin/logstash-plugin install file:///opt/logstash-offline-plugins-7.7.0.zip
Installing file: /opt/logstash-offline-plugins-7.7.0.zip
Install successful

下载 Jar 包

从 https://jdbc.postgresql.org/download.html 下载 jar 包,放到 /opt/logstash/logstash-core/lib/jars/ 目录下。

我下载的 jar 包名叫 postgresql-42.2.12.jar

同步数据测试

1,编辑配置文件:/opt/logstash/config/logstash.yml

config.support_escapes: true

2,创建同步的配置文件

mkdir /etc/logstash/conf.d
vim /etc/logstash/conf.d/testsync.conf

配置内容

input {
      jdbc {
         jdbc_connection_string => "jdbc:postgresql://localhost:5432/testdatabase"
         jdbc_user => "db_user"
         jdbc_password => "db_password"
         jdbc_driver_class => "org.postgresql.Driver"
         schedule => "* * * * *"
         statement => "SELECT * from public.\"Users\" updateTime >= :sql_last_value"
         last_run_metadata_path => "/opt/logstash/lastrun/.last_run_testsync"
     }
}
output {
      elasticsearch {
        hosts => ["http://localhost:9200"]
        index => "users"
        document_id => "users_%{userid}"
        doc_as_upsert => true
        #user => "es_user"
        #password => "es_password"
     }
}

简单解释下:

  • input 从 postgres 数据库获取数据,output 将数据同步到 elasticsearch。
  • schedule 定时任务,此处为1分钟同步一次。
  • sql_last_value 为同步的时间标记,如果同步成功,它就会把时间记录到 last_run_metadata_path 的文件中,也就是说,把 last_run_metadata_path 指定的路径删除,下次同步 Logstash 就会全量同步。
  • statement 是同步的SQL条件,想要同步的数据,通过SQL查询出来。
  • index 为索引名称,document_id 此处是自定义的前缀加上表中记录的 userid 字段。

3,配置管道

编辑配置文件:/opt/logstash/config/pipelines.yml

- pipeline.id: test
  path.config: "/etc/logstash/conf.d/*.conf"

4,启动同步

/opt/logstash/bin/logstash

也可以使用 -f 参数指定配置文件,会忽略 pipelines.yml 配置文件

/opt/logstash/bin/logstash -f /etc/logstash/conf.d/testsync.conf

完成。使用 Logstash 同步数据挺容易,不能在线安装着实是不方便。

参考

  1. https://www.elastic.co/guide/en/logstash/current/offline-plugins.html