ELK 学习笔记(四)—— 从 PostgreSQL 同步数据到 Elasticsearch


概要

搜索了解这个是因为我在学习 Elasticsearch 过程中需要一定量的测试数据,而我的测试数据大多保存在 PostgreSQL 中,懒的写 Python 脚本遍历插入,于是在网上找有没有现成的工具,真找到一个——“abc”,可以从 PostgreSQL 导入数据,还能实时同步新增数据,本文记录使用这个工具的使用方法

PS:这个命令行工具是一个商业公司写的,他们家有 ElasticSearch 云服务,这个命令行工具可以帮助用户登录、上传数据到线上,可喜的是,它也支持导入数据到本地的 Elasticsearch

数据源支持

CSV,Elasticsearch,Cloud Firestore,kafka,MongoDB,MSSQL,MySQL,PostgreSQL,Redis

创建模拟数据

PostgreSQL 中新建名为 library 的数据库和名为 xuefu 的模式,以及 books

-- 创建数据库
CREATE DATABASE library;

-- 创建模式
CREATE SCHEMA xuefu
    AUTHORIZATION postgres;

COMMENT ON SCHEMA xuefu
    IS '学府书城';

-- 创建表
CREATE TABLE xuefu.books
(
    name text COLLATE pg_catalog."default",
    author text COLLATE pg_catalog."default",
    publish text COLLATE pg_catalog."default",
    isbn text COLLATE pg_catalog."default",
    pubdate text COLLATE pg_catalog."default"
)

插入测试数据

insert into xuefu.books (name, author, publish, isbn, pubdate) values ('深入浅出Rust','范长春','机械工业出版社','9787111606420','2018');
insert into xuefu.books (name, author, publish, isbn, pubdate) values ('活着','余华','作家出版社','9787506365437','2017');
insert into xuefu.books (name, author, publish, isbn, pubdate) values ('数学之美','吴军','人民邮电出版社','9787115373557','2014');
insert into xuefu.books (name, author, publish, isbn, pubdate) values ('LaTeX入门','刘海洋','电子工业出版社','9787121202087','2013');
insert into xuefu.books (name, author, publish, isbn, pubdate) values ('Rust编程之道','张汉东','电子工业出版社','9787121354854','2019');
insert into xuefu.books (name, author, publish, isbn, pubdate) values ('上帝掷骰子吗','曹天元','北京联合出版公司','9787550218895','2013');

创建 Elasticsearch 索引

curl -X PUT "http://127.0.0.1:9200/books?pretty"

导入数据

运行命令如下,将 library 数据库 books 表的数据导入到 Elasticsearch 名为 books 的索引

abc import --tail --src_type=postgres --src_uri="postgresql://postgres:postgres@127.0.0.1:5432/library" "http://127.0.0.1:9200/books" --src_filter="books"

经测试:如果不指定哪一张表,工具将会把指定数据库所有的数据都进行导入/同步

报错与解决

1)可能会遇到报错(关于 wal_level)

ERRO[0001] error plucking from logical decoding pq: logical decoding requires wal_level >= logical  db=library

数据库配置的 wal_level 需要设置为 logical ,修改配置文件 postgresql.conf 中配置并重启数据库

2)可能会遇到报错(关于 standby_replication_slot)

ERRO[0001] error plucking from logical decoding pq: replication slot "standby_replication_slot" does not exist  db=library

因为逻辑复制的 slot 不存在,手动创建重试即可

-- 创建 standby_replication_slot
select * from pg_create_logical_replication_slot('standby_replication_slot', 'test_decoding');

-- 删除 standby_replication_slot
-- select pg_drop_replication_slot('standby_replication_slot');

实时同步

关于实时同步,我只测试了 PostgreSQL,效果很不错,在数据导入完成后程序不会自动退出,出于监听状态

在数据库插入新数据后,大概两秒钟数据会同步到 Elasticsearch

abc tool sync

参数说明

# abc import --help

USAGE
  abc import --src_type={SourceDatabase} --src_uri={SourceURI} [-t|--tail] [Cluster URL|App Name]

FLAGS
  --bulk_requests=1000                         bulk 每次提交数据量
  --config=                                    配置文件, 如果指定则只使用配置文件内选项
  --log.level="error"                          日志级别,可选项: [debug, info, error]
  --log_dir=                                   用于保存提交日志的路径
  --replication_slot=standby_replication_slot  使用指定的 [postgres] replication slot
  --request_size=1048576                       Http 请求大小,单位bytes, 明确指定 bulk 请求值
  --sac_path=./ServiceAccountKey.json          firebase 服务用户认证文件
  --src_filter=.*                              正则过滤数据,如指定某一表:"table1|table2|table3"
  --src_password=                              数据源密码
  --src_realm=                                 source realm
  --src_type=postgres                          数据源类型
  --src_uri=http://user:pass@host:port/db      数据源Uri
  --src_username=                              数据源用户名
  --ssl=false                                  使用 SSL 连接数据源
  --tail=false                                 是否使用 tail 输出
  --test=false                                 启用后, pipeline 会创建但是不会启用同步
                                               用于确认你的配置
  --transform_file=                            指定数据转换规则文件对数据进行调整
  --typename=mytype                            [csv] 用于设置字段类型
  --verify=false                               验证源与目标的连接

数据转换

abc工具还有个数据转换的选项,这个应该比较常用,因为数据库中可能有很多冗余的字符不需要存入 Elasticsearch 中,这时就可以用 omit 函数进行忽略

示例:使用数据转化文件 transform_file.js 将数据源中的 bio 字段进行忽略

t.Source("source", source, "/.*/")
 .Transform(omit({"fields":["bio"]}))
 .Save("sink", sink, "/.*/")

更详细的文档与示例见:Github > transform_file

结语

经过简单的测试使用,abc是一个很不错的将数据同步到Elasticsearch的工具,能否用于生产环境,待进一步的测试与使用,先到这里。

参考

  1. Github > Import command
  2. CLI for Indexing data from Postgres to Elasticsearch