概要
搜索了解这个是因为我在学习 Elasticsearch 过程中需要一定量的测试数据,而我的测试数据大多保存在 PostgreSQL 中,懒的写 Python 脚本遍历插入,于是在网上找有没有现成的工具,真找到一个——“abc”,可以从 PostgreSQL 导入数据,还能实时同步新增数据,本文记录使用这个工具的使用方法
PS:这个命令行工具是一个商业公司写的,他们家有 ElasticSearch 云服务,这个命令行工具可以帮助用户登录、上传数据到线上,可喜的是,它也支持导入数据到本地的 Elasticsearch
数据源支持
CSV,Elasticsearch,Cloud Firestore,kafka,MongoDB,MSSQL,MySQL,PostgreSQL,Redis
创建模拟数据
在 PostgreSQL 中新建名为 library 的数据库和名为 xuefu 的模式,以及 books 表
-- 创建数据库
CREATE DATABASE library;
-- 创建模式
CREATE SCHEMA xuefu
AUTHORIZATION postgres;
COMMENT ON SCHEMA xuefu
IS '学府书城';
-- 创建表
CREATE TABLE xuefu.books
(
name text COLLATE pg_catalog."default",
author text COLLATE pg_catalog."default",
publish text COLLATE pg_catalog."default",
isbn text COLLATE pg_catalog."default",
pubdate text COLLATE pg_catalog."default"
)
插入测试数据
insert into xuefu.books (name, author, publish, isbn, pubdate) values ('深入浅出Rust','范长春','机械工业出版社','9787111606420','2018');
insert into xuefu.books (name, author, publish, isbn, pubdate) values ('活着','余华','作家出版社','9787506365437','2017');
insert into xuefu.books (name, author, publish, isbn, pubdate) values ('数学之美','吴军','人民邮电出版社','9787115373557','2014');
insert into xuefu.books (name, author, publish, isbn, pubdate) values ('LaTeX入门','刘海洋','电子工业出版社','9787121202087','2013');
insert into xuefu.books (name, author, publish, isbn, pubdate) values ('Rust编程之道','张汉东','电子工业出版社','9787121354854','2019');
insert into xuefu.books (name, author, publish, isbn, pubdate) values ('上帝掷骰子吗','曹天元','北京联合出版公司','9787550218895','2013');
创建 Elasticsearch 索引
curl -X PUT "http://127.0.0.1:9200/books?pretty"
导入数据
运行命令如下,将 library 数据库 books 表的数据导入到 Elasticsearch 名为 books 的索引
abc import --tail --src_type=postgres --src_uri="postgresql://postgres:postgres@127.0.0.1:5432/library" "http://127.0.0.1:9200/books" --src_filter="books"
经测试:如果不指定哪一张表,工具将会把指定数据库所有的数据都进行导入/同步
报错与解决
1)可能会遇到报错(关于 wal_level)
ERRO[0001] error plucking from logical decoding pq: logical decoding requires wal_level >= logical db=library
数据库配置的 wal_level 需要设置为 logical ,修改配置文件 postgresql.conf 中配置并重启数据库
2)可能会遇到报错(关于 standby_replication_slot)
ERRO[0001] error plucking from logical decoding pq: replication slot "standby_replication_slot" does not exist db=library
因为逻辑复制的 slot 不存在,手动创建重试即可
-- 创建 standby_replication_slot
select * from pg_create_logical_replication_slot('standby_replication_slot', 'test_decoding');
-- 删除 standby_replication_slot
-- select pg_drop_replication_slot('standby_replication_slot');
实时同步
关于实时同步,我只测试了 PostgreSQL,效果很不错,在数据导入完成后程序不会自动退出,出于监听状态
在数据库插入新数据后,大概两秒钟数据会同步到 Elasticsearch
参数说明
# abc import --help
USAGE
abc import --src_type={SourceDatabase} --src_uri={SourceURI} [-t|--tail] [Cluster URL|App Name]
FLAGS
--bulk_requests=1000 bulk 每次提交数据量
--config= 配置文件, 如果指定则只使用配置文件内选项
--log.level="error" 日志级别,可选项: [debug, info, error]
--log_dir= 用于保存提交日志的路径
--replication_slot=standby_replication_slot 使用指定的 [postgres] replication slot
--request_size=1048576 Http 请求大小,单位bytes, 明确指定 bulk 请求值
--sac_path=./ServiceAccountKey.json firebase 服务用户认证文件
--src_filter=.* 正则过滤数据,如指定某一表:"table1|table2|table3"
--src_password= 数据源密码
--src_realm= source realm
--src_type=postgres 数据源类型
--src_uri=http://user:pass@host:port/db 数据源Uri
--src_username= 数据源用户名
--ssl=false 使用 SSL 连接数据源
--tail=false 是否使用 tail 输出
--test=false 启用后, pipeline 会创建但是不会启用同步
用于确认你的配置
--transform_file= 指定数据转换规则文件对数据进行调整
--typename=mytype [csv] 用于设置字段类型
--verify=false 验证源与目标的连接
数据转换
abc
工具还有个数据转换的选项,这个应该比较常用,因为数据库中可能有很多冗余的字符不需要存入 Elasticsearch 中,这时就可以用 omit 函数进行忽略
示例:使用数据转化文件 transform_file.js 将数据源中的 bio 字段进行忽略
t.Source("source", source, "/.*/")
.Transform(omit({"fields":["bio"]}))
.Save("sink", sink, "/.*/")
更详细的文档与示例见:Github > transform_file
结语
经过简单的测试使用,abc
是一个很不错的将数据同步到Elasticsearch的工具,能否用于生产环境,待进一步的测试与使用,先到这里。
参考