ELK 学习笔记(五)—— Logstash 基础知识


概要

按照官网的说法

Logstash 是开源的服务器端数据处理管道,能够同时从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的“存储库”中。

在大致掌握了 Elasticsearch 语法了功能后,我想想看看 ELK 其它两个组件的使用,比如 Logstash,它应该可以帮助我把更广泛的数据导入到数据库中,并且能在传输的过程中转换数据。

获取数据

如果手中没什么测试数据,可以到 kaggle.com 下载用于分析的测试数据。

像这个数据:https://www.kaggle.com/mirosval/personal-cars-classifieds 页面,里面介绍了关于数据的内容,内容以及格式,我下载了这个数据,并且用 Logstash 导入 Elasticsearch 进行了测试。

下载回来的数据 csv 有 401MB 大小,压缩后 88M,我导入用了半个多小时,因为下载的速度着实有点慢,为了以后看到本文的小伙伴学习,本文还是基于自己造的几条数据进行学习记录。

下面用测试数据来看下 logstash 是如何工作的。

测试数据

users.csv

name,birth,wages,phone,addr
王伟,1991-03-11,8000,19911223344,石家庄长征街园明路28号银宏花苑
李娜,1993-12-05,5000,19901020304,济南花园路90-7号东环花园
刘洋,1989-07-25,3000,19977771111,南京江宁区岔路口宏运大道2199号(秦淮河畔)
张杰,1988-04-22,12000,1991234000,湖南省长沙市芙蓉区德政街298号

我编了几条数据,就用它来学习测试 csv 的导入。

创建配置文件

如下就是完整的数据录入配置。

logstash_users.conf

input {
  file {
    path => "/etc/logstash/data/users.csv"
    start_position => "beginning"
    sincedb_path => "/dev/null"
  }
}
filter {
  csv {
    separator => ","
    columns => ["name", "birth", "wages", "phone", "addr"]
    convert => {
      "wages" => "integer"
    }
  }
}
output {
  elasticsearch {
    hosts => "http://localhost:9200"
    index => "test_users"
  }
  stdout {}
}

配置文件简要说明

  • input 配置快用来描述数据输入源,filter 配置块用来过滤、转换数据,output 配置块用来描述数据输出位置。
  • Logstash 有自己的 DSL (Domain Specific Language) 配置描述语言。
  • 各个字段是干什么的挺好理解,这里不多做补充,先把例子跑起来,再翻官方文档了解具体的参数就可以。
  • sincedb_path 是用来记录同步到哪里的记录文件,如果同步中断,下次启动将继续之前的位置导入数据,使用 "/dev/null" 创建不了文件,所以每一次都是从头开始。

使用 Logstash 导入数据

首先,把 users.csvlogstash_users.conf 放在指定位置。

物理机下执行:

mkdir /etc/logstash/data

然后将 users.csv 放在 /etc/logstash/data 目录下,将 logstash_users.conf 放在 /etc/logstash 目录下。

因为创建容器的时候,容器已经映射了 /etc/logstash/data 目录,所以把数据和配置文件放在物理机的这个目录下,容器内部的工具是找得到它们的。

先运行查看一下 logstash 工具版本信息

[root@nuc5 data]# docker exec -it elk /opt/logstash/bin/logstash --version
logstash 7.5.2

接下来执行配置文件导入测试数据

docker exec -it elk /opt/logstash/bin/logstash -f /etc/logstash/logstash_users.conf

输出如下

Thread.exclusive is deprecated, use Thread::Mutex
Sending Logstash logs to /opt/logstash/logs which is now configured via log4j2.properties
[2020-02-11T16:10:54,569][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2020-02-11T16:10:54,760][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"7.5.2"}
[2020-02-11T16:10:57,416][INFO ][org.reflections.Reflections] Reflections took 59 ms to scan 1 urls, producing 20 keys and 40 values 
[2020-02-11T16:11:00,873][INFO ][logstash.outputs.elasticsearch][main] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://localhost:9200/]}}
[2020-02-11T16:11:01,248][WARN ][logstash.outputs.elasticsearch][main] Restored connection to ES instance {:url=>"http://localhost:9200/"}
[2020-02-11T16:11:01,334][INFO ][logstash.outputs.elasticsearch][main] ES Output version determined {:es_version=>7}
[2020-02-11T16:11:01,343][WARN ][logstash.outputs.elasticsearch][main] Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>7}
[2020-02-11T16:11:01,522][INFO ][logstash.outputs.elasticsearch][main] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["http://localhost:9200"]}
[2020-02-11T16:11:01,641][INFO ][logstash.outputs.elasticsearch][main] Using default mapping template
[2020-02-11T16:11:01,741][WARN ][org.logstash.instrument.metrics.gauge.LazyDelegatingGauge][main] A gauge metric of an unknown type (org.jruby.specialized.RubyArrayOneObject) has been create for key: cluster_uuids. This may result in invalid serialization.  It is recommended to log an issue to the responsible developer/development team.
[2020-02-11T16:11:01,749][INFO ][logstash.javapipeline    ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>500, "pipeline.sources"=>["/etc/logstash/logstash_users.conf"], :thread=>"#<Thread:0x749fb663 run>"}
[2020-02-11T16:11:01,873][INFO ][logstash.outputs.elasticsearch][main] Attempting to install template {:manage_template=>{"index_patterns"=>"logstash-*", "version"=>60001, "settings"=>{"index.refresh_interval"=>"5s", "number_of_shards"=>1}, "mappings"=>{"dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"keyword"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}
[2020-02-11T16:11:02,263][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
[2020-02-11T16:11:02,358][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2020-02-11T16:11:02,377][INFO ][filewatch.observingtail  ][main] START, creating Discoverer, Watch with file and sincedb collections
[2020-02-11T16:11:02,868][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2020-02-11T16:11:04,144][WARN ][logstash.outputs.elasticsearch][main] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"test_users", :routing=>nil, :_type=>"_doc"}, #<LogStash::Event:0x1d206cdd>], :response=>{"index"=>{"_index"=>"test_users", "_type"=>"_doc", "_id"=>"7yVNM3ABYH6e1sCUOHOt", "status"=>400, "error"=>{"type"=>"illegal_argument_exception", "reason"=>"mapper [birth] of different type, current_type [date], merged_type [text]"}}}}
/opt/logstash/vendor/bundle/jruby/2.5.0/gems/awesome_print-1.7.0/lib/awesome_print/formatters/base_formatter.rb:31: warning: constant ::Fixnum is deprecated
{
         "birth" => "1989-07-25",
          "host" => "94f3e669e831",
          "path" => "/etc/logstash/data/users.csv",
    "@timestamp" => 2020-02-11T08:11:03.138Z,
         "phone" => "19977771111",
      "@version" => "1",
          "addr" => "南京江宁区岔路口宏运大道2199号(秦淮河畔)",
       "message" => "刘洋,1989-07-25,3000,19977771111,南京江宁区岔路口宏运大道2199号(秦淮河畔)",
         "wages" => 3000,
          "name" => "刘洋"
}
{
         "birth" => "birth",
          "host" => "94f3e669e831",
          "path" => "/etc/logstash/data/users.csv",
    "@timestamp" => 2020-02-11T08:11:03.105Z,
         "phone" => "phone",
      "@version" => "1",
          "addr" => "addr",
       "message" => "name,birth,wages,phone,addr",
         "wages" => "wages",
          "name" => "name"
}
{
         "birth" => "1991-03-11",
          "host" => "94f3e669e831",
          "path" => "/etc/logstash/data/users.csv",
    "@timestamp" => 2020-02-11T08:11:03.136Z,
         "phone" => "19911223344",
      "@version" => "1",
          "addr" => "石家庄长征街园明路28号银宏花苑",
       "message" => "王伟,1991-03-11,8000,19911223344,石家庄长征街园明路28号银宏花苑",
         "wages" => 8000,
          "name" => "王伟"
}
{
         "birth" => "1993-12-05",
          "host" => "94f3e669e831",
          "path" => "/etc/logstash/data/users.csv",
    "@timestamp" => 2020-02-11T08:11:03.138Z,
         "phone" => "19901020304",
      "@version" => "1",
          "addr" => "济南花园路90-7号东环花园",
       "message" => "李娜,1993-12-05,5000,19901020304,济南花园路90-7号东环花园",
         "wages" => 5000,
          "name" => "李娜"
}
{
         "birth" => "1988-04-22",
          "host" => "94f3e669e831",
          "path" => "/etc/logstash/data/users.csv",
    "@timestamp" => 2020-02-11T08:11:03.139Z,
         "phone" => "1991234000",
      "@version" => "1",
          "addr" => "湖南省长沙市芙蓉区德政街298号",
       "message" => "张杰,1988-04-22,12000,1991234000,湖南省长沙市芙蓉区德政街298号",
         "wages" => 12000,
          "name" => "张杰"
}

访问 http://localhost:9200/test_users/_search 接口查看数据

可以看到 Logstash 把我们定义的数据都存入了 Elasticsearch

{
  "took": 0,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 4,
      "relation": "eq"
    },
    "max_score": 1,
    "hits": [
      {
        "_index": "test_users",
        "_type": "_doc",
        "_id": "8yVNM3ABYH6e1sCUOHP1",
        "_score": 1,
        "_source": {
          "birth": "1989-07-25",
          "host": "94f3e669e831",
          "path": "/etc/logstash/data/users.csv",
          "@timestamp": "2020-02-11T08:11:03.138Z",
          "phone": "19977771111",
          "@version": "1",
          "addr": "南京江宁区岔路口宏运大道2199号(秦淮河畔)",
          "message": "刘洋,1989-07-25,3000,19977771111,南京江宁区岔路口宏运大道2199号(秦淮河畔)",
          "wages": 3000,
          "name": "刘洋"
        }
      },
      {
        "_index": "test_users",
        "_type": "_doc",
        "_id": "8iVNM3ABYH6e1sCUOHOz",
        "_score": 1,
        "_source": {
          "birth": "1993-12-05",
          "host": "94f3e669e831",
          "path": "/etc/logstash/data/users.csv",
          "@timestamp": "2020-02-11T08:11:03.138Z",
          "phone": "19901020304",
          "@version": "1",
          "addr": "济南花园路90-7号东环花园",
          "message": "李娜,1993-12-05,5000,19901020304,济南花园路90-7号东环花园",
          "wages": 5000,
          "name": "李娜"
        }
      },
      {
        "_index": "test_users",
        "_type": "_doc",
        "_id": "8SVNM3ABYH6e1sCUOHOx",
        "_score": 1,
        "_source": {
          "birth": "1988-04-22",
          "host": "94f3e669e831",
          "path": "/etc/logstash/data/users.csv",
          "@timestamp": "2020-02-11T08:11:03.139Z",
          "phone": "1991234000",
          "@version": "1",
          "addr": "湖南省长沙市芙蓉区德政街298号",
          "message": "张杰,1988-04-22,12000,1991234000,湖南省长沙市芙蓉区德政街298号",
          "wages": 12000,
          "name": "张杰"
        }
      },
      {
        "_index": "test_users",
        "_type": "_doc",
        "_id": "8CVNM3ABYH6e1sCUOHOt",
        "_score": 1,
        "_source": {
          "birth": "1991-03-11",
          "host": "94f3e669e831",
          "path": "/etc/logstash/data/users.csv",
          "@timestamp": "2020-02-11T08:11:03.136Z",
          "phone": "19911223344",
          "@version": "1",
          "addr": "石家庄长征街园明路28号银宏花苑",
          "message": "王伟,1991-03-11,8000,19911223344,石家庄长征街园明路28号银宏花苑",
          "wages": 8000,
          "name": "王伟"
        }
      }
    ]
  }
}

插件 Plugin

Logstash 架构设计优良,输入输出,过滤组件都是以插件的形式来接入使用的,官方文档上有支持的列表

插件安装参考:http://docs.flycloud.me/docs/ELKStack/logstash/get-start/install-plugins.html

另外结合如下两个教程,用到 Logstash 的时候着重学习会很方便。

结语

经过导入 csv,对 Logstash 有了大致的了解,后续再了解一下 Kibana 的使用,看在页面上有哪些便捷的功能能够帮助我们管理 Elasticsearch,分析展示数据,粗略的了解下 Kibana 感觉功能挺多的,需要一点儿时间应用起来。

参考

  1. http://docs.flycloud.me/docs/ELKStack/index.html
  2. https://qbox.io/blog/import-csv-elasticsearch-logstash-sincedb