概要
按照官网的说法
Logstash 是开源的服务器端数据处理管道,能够同时从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的“存储库”中。
在大致掌握了 Elasticsearch 语法了功能后,我想想看看 ELK 其它两个组件的使用,比如 Logstash,它应该可以帮助我把更广泛的数据导入到数据库中,并且能在传输的过程中转换数据。
获取数据
如果手中没什么测试数据,可以到 kaggle.com 下载用于分析的测试数据。
像这个数据:https://www.kaggle.com/mirosval/personal-cars-classifieds 页面,里面介绍了关于数据的内容,内容以及格式,我下载了这个数据,并且用 Logstash 导入 Elasticsearch 进行了测试。
下载回来的数据 csv 有 401MB 大小,压缩后 88M,我导入用了半个多小时,因为下载的速度着实有点慢,为了以后看到本文的小伙伴学习,本文还是基于自己造的几条数据进行学习记录。
下面用测试数据来看下 logstash 是如何工作的。
测试数据
users.csv
name,birth,wages,phone,addr
王伟,1991-03-11,8000,19911223344,石家庄长征街园明路28号银宏花苑
李娜,1993-12-05,5000,19901020304,济南花园路90-7号东环花园
刘洋,1989-07-25,3000,19977771111,南京江宁区岔路口宏运大道2199号(秦淮河畔)
张杰,1988-04-22,12000,1991234000,湖南省长沙市芙蓉区德政街298号
我编了几条数据,就用它来学习测试 csv 的导入。
创建配置文件
如下就是完整的数据录入配置。
logstash_users.conf
input {
file {
path => "/etc/logstash/data/users.csv"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
csv {
separator => ","
columns => ["name", "birth", "wages", "phone", "addr"]
convert => {
"wages" => "integer"
}
}
}
output {
elasticsearch {
hosts => "http://localhost:9200"
index => "test_users"
}
stdout {}
}
配置文件简要说明
- input 配置快用来描述数据输入源,filter 配置块用来过滤、转换数据,output 配置块用来描述数据输出位置。
- Logstash 有自己的 DSL (Domain Specific Language) 配置描述语言。
- 各个字段是干什么的挺好理解,这里不多做补充,先把例子跑起来,再翻官方文档了解具体的参数就可以。
- sincedb_path 是用来记录同步到哪里的记录文件,如果同步中断,下次启动将继续之前的位置导入数据,使用 "/dev/null" 创建不了文件,所以每一次都是从头开始。
使用 Logstash 导入数据
首先,把 users.csv 和 logstash_users.conf 放在指定位置。
物理机下执行:
mkdir /etc/logstash/data
然后将 users.csv 放在 /etc/logstash/data 目录下,将 logstash_users.conf 放在 /etc/logstash 目录下。
因为创建容器的时候,容器已经映射了 /etc/logstash/data 目录,所以把数据和配置文件放在物理机的这个目录下,容器内部的工具是找得到它们的。
先运行查看一下 logstash 工具版本信息
[root@nuc5 data]# docker exec -it elk /opt/logstash/bin/logstash --version
logstash 7.5.2
接下来执行配置文件导入测试数据
docker exec -it elk /opt/logstash/bin/logstash -f /etc/logstash/logstash_users.conf
输出如下
Thread.exclusive is deprecated, use Thread::Mutex
Sending Logstash logs to /opt/logstash/logs which is now configured via log4j2.properties
[2020-02-11T16:10:54,569][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2020-02-11T16:10:54,760][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"7.5.2"}
[2020-02-11T16:10:57,416][INFO ][org.reflections.Reflections] Reflections took 59 ms to scan 1 urls, producing 20 keys and 40 values
[2020-02-11T16:11:00,873][INFO ][logstash.outputs.elasticsearch][main] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://localhost:9200/]}}
[2020-02-11T16:11:01,248][WARN ][logstash.outputs.elasticsearch][main] Restored connection to ES instance {:url=>"http://localhost:9200/"}
[2020-02-11T16:11:01,334][INFO ][logstash.outputs.elasticsearch][main] ES Output version determined {:es_version=>7}
[2020-02-11T16:11:01,343][WARN ][logstash.outputs.elasticsearch][main] Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>7}
[2020-02-11T16:11:01,522][INFO ][logstash.outputs.elasticsearch][main] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["http://localhost:9200"]}
[2020-02-11T16:11:01,641][INFO ][logstash.outputs.elasticsearch][main] Using default mapping template
[2020-02-11T16:11:01,741][WARN ][org.logstash.instrument.metrics.gauge.LazyDelegatingGauge][main] A gauge metric of an unknown type (org.jruby.specialized.RubyArrayOneObject) has been create for key: cluster_uuids. This may result in invalid serialization. It is recommended to log an issue to the responsible developer/development team.
[2020-02-11T16:11:01,749][INFO ][logstash.javapipeline ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>500, "pipeline.sources"=>["/etc/logstash/logstash_users.conf"], :thread=>"#<Thread:0x749fb663 run>"}
[2020-02-11T16:11:01,873][INFO ][logstash.outputs.elasticsearch][main] Attempting to install template {:manage_template=>{"index_patterns"=>"logstash-*", "version"=>60001, "settings"=>{"index.refresh_interval"=>"5s", "number_of_shards"=>1}, "mappings"=>{"dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"keyword"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}
[2020-02-11T16:11:02,263][INFO ][logstash.javapipeline ][main] Pipeline started {"pipeline.id"=>"main"}
[2020-02-11T16:11:02,358][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2020-02-11T16:11:02,377][INFO ][filewatch.observingtail ][main] START, creating Discoverer, Watch with file and sincedb collections
[2020-02-11T16:11:02,868][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2020-02-11T16:11:04,144][WARN ][logstash.outputs.elasticsearch][main] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"test_users", :routing=>nil, :_type=>"_doc"}, #<LogStash::Event:0x1d206cdd>], :response=>{"index"=>{"_index"=>"test_users", "_type"=>"_doc", "_id"=>"7yVNM3ABYH6e1sCUOHOt", "status"=>400, "error"=>{"type"=>"illegal_argument_exception", "reason"=>"mapper [birth] of different type, current_type [date], merged_type [text]"}}}}
/opt/logstash/vendor/bundle/jruby/2.5.0/gems/awesome_print-1.7.0/lib/awesome_print/formatters/base_formatter.rb:31: warning: constant ::Fixnum is deprecated
{
"birth" => "1989-07-25",
"host" => "94f3e669e831",
"path" => "/etc/logstash/data/users.csv",
"@timestamp" => 2020-02-11T08:11:03.138Z,
"phone" => "19977771111",
"@version" => "1",
"addr" => "南京江宁区岔路口宏运大道2199号(秦淮河畔)",
"message" => "刘洋,1989-07-25,3000,19977771111,南京江宁区岔路口宏运大道2199号(秦淮河畔)",
"wages" => 3000,
"name" => "刘洋"
}
{
"birth" => "birth",
"host" => "94f3e669e831",
"path" => "/etc/logstash/data/users.csv",
"@timestamp" => 2020-02-11T08:11:03.105Z,
"phone" => "phone",
"@version" => "1",
"addr" => "addr",
"message" => "name,birth,wages,phone,addr",
"wages" => "wages",
"name" => "name"
}
{
"birth" => "1991-03-11",
"host" => "94f3e669e831",
"path" => "/etc/logstash/data/users.csv",
"@timestamp" => 2020-02-11T08:11:03.136Z,
"phone" => "19911223344",
"@version" => "1",
"addr" => "石家庄长征街园明路28号银宏花苑",
"message" => "王伟,1991-03-11,8000,19911223344,石家庄长征街园明路28号银宏花苑",
"wages" => 8000,
"name" => "王伟"
}
{
"birth" => "1993-12-05",
"host" => "94f3e669e831",
"path" => "/etc/logstash/data/users.csv",
"@timestamp" => 2020-02-11T08:11:03.138Z,
"phone" => "19901020304",
"@version" => "1",
"addr" => "济南花园路90-7号东环花园",
"message" => "李娜,1993-12-05,5000,19901020304,济南花园路90-7号东环花园",
"wages" => 5000,
"name" => "李娜"
}
{
"birth" => "1988-04-22",
"host" => "94f3e669e831",
"path" => "/etc/logstash/data/users.csv",
"@timestamp" => 2020-02-11T08:11:03.139Z,
"phone" => "1991234000",
"@version" => "1",
"addr" => "湖南省长沙市芙蓉区德政街298号",
"message" => "张杰,1988-04-22,12000,1991234000,湖南省长沙市芙蓉区德政街298号",
"wages" => 12000,
"name" => "张杰"
}
访问 http://localhost:9200/test_users/_search
接口查看数据
可以看到 Logstash 把我们定义的数据都存入了 Elasticsearch
{
"took": 0,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 4,
"relation": "eq"
},
"max_score": 1,
"hits": [
{
"_index": "test_users",
"_type": "_doc",
"_id": "8yVNM3ABYH6e1sCUOHP1",
"_score": 1,
"_source": {
"birth": "1989-07-25",
"host": "94f3e669e831",
"path": "/etc/logstash/data/users.csv",
"@timestamp": "2020-02-11T08:11:03.138Z",
"phone": "19977771111",
"@version": "1",
"addr": "南京江宁区岔路口宏运大道2199号(秦淮河畔)",
"message": "刘洋,1989-07-25,3000,19977771111,南京江宁区岔路口宏运大道2199号(秦淮河畔)",
"wages": 3000,
"name": "刘洋"
}
},
{
"_index": "test_users",
"_type": "_doc",
"_id": "8iVNM3ABYH6e1sCUOHOz",
"_score": 1,
"_source": {
"birth": "1993-12-05",
"host": "94f3e669e831",
"path": "/etc/logstash/data/users.csv",
"@timestamp": "2020-02-11T08:11:03.138Z",
"phone": "19901020304",
"@version": "1",
"addr": "济南花园路90-7号东环花园",
"message": "李娜,1993-12-05,5000,19901020304,济南花园路90-7号东环花园",
"wages": 5000,
"name": "李娜"
}
},
{
"_index": "test_users",
"_type": "_doc",
"_id": "8SVNM3ABYH6e1sCUOHOx",
"_score": 1,
"_source": {
"birth": "1988-04-22",
"host": "94f3e669e831",
"path": "/etc/logstash/data/users.csv",
"@timestamp": "2020-02-11T08:11:03.139Z",
"phone": "1991234000",
"@version": "1",
"addr": "湖南省长沙市芙蓉区德政街298号",
"message": "张杰,1988-04-22,12000,1991234000,湖南省长沙市芙蓉区德政街298号",
"wages": 12000,
"name": "张杰"
}
},
{
"_index": "test_users",
"_type": "_doc",
"_id": "8CVNM3ABYH6e1sCUOHOt",
"_score": 1,
"_source": {
"birth": "1991-03-11",
"host": "94f3e669e831",
"path": "/etc/logstash/data/users.csv",
"@timestamp": "2020-02-11T08:11:03.136Z",
"phone": "19911223344",
"@version": "1",
"addr": "石家庄长征街园明路28号银宏花苑",
"message": "王伟,1991-03-11,8000,19911223344,石家庄长征街园明路28号银宏花苑",
"wages": 8000,
"name": "王伟"
}
}
]
}
}
插件 Plugin
Logstash 架构设计优良,输入输出,过滤组件都是以插件的形式来接入使用的,官方文档上有支持的列表
插件安装参考:http://docs.flycloud.me/docs/ELKStack/logstash/get-start/install-plugins.html
另外结合如下两个教程,用到 Logstash 的时候着重学习会很方便。
结语
经过导入 csv,对 Logstash 有了大致的了解,后续再了解一下 Kibana 的使用,看在页面上有哪些便捷的功能能够帮助我们管理 Elasticsearch,分析展示数据,粗略的了解下 Kibana 感觉功能挺多的,需要一点儿时间应用起来。
参考