Skip to content

一、概述

服务端数据处理管道,能从多个来源采集数据,转换数据,然后将数据发送大存储库中(elasticsearch)

官网

1.1 配置详解

格式

yml
input { #输入 
	stdin { ... } #标准输入 
}

filter { #过滤,对数据进行分割、截取等处理 
	... 
}
output { #输出 
	stdout { ... } #标准输出 
}

输入

采集各种样式、大小和来源的数据,数据往往以各种各样的形式,或分散或集中地存在于很多系统中。

Logstash 支持各种输入选择 ,可以在同一时间从众多常用来源捕捉事件。能够以连续的流式传输方式,轻松地从您的日志、指标、Web 应用、数据存储以及各种 AWS 服务采集数据。

过滤

实时解析和转换数据

数据从源传输到存储库的过程中,Logstash 过滤器能够解析各个事件,识别已命名的字段以构建结构,并将它们转换成通用格式,以便更轻松、更快速地分析和实现商业价值。

输出

Logstash 提供众多输出选择,您可以将数据发送到您要指定的地方,并且能够灵活地解锁众多下游用例。

二、使用示例

2.1 读取自定义日志

通过Filebeat读取了nginx的日志,如果是自定义结构的日志,就需要读取处理后才能使用,所以,这个时候就需要使用Logstash了,因为Logstash有着强大的处理能力,可以应对各种各样的场景。

如日志结构:

2019-03-15 21:21:21|ERROR|读取数据出错|参数:id=1002

配置读取

sh
#vim itcast-pipeline.conf 
input { 
    file { 
        path => "/itcast/logstash/logs/app.log" 
        start_position => "beginning" 
    } 
}
filter {
    mutate {
    	split => {"message"=>"|"} 
    } 
}
output {
    stdout { 
    	codec => rubydebug 
    } 
}

启动测试

sh
#启动 
./bin/logstash -f ./itcast-pipeline.conf 
#写日志到文件 
echo "2019-03-15 21:21:21|ERROR|读取数据出错|参数:id=1002" >> app.log 
#输出的结果 
{ 
    "@timestamp" => 2019-03-15T08:44:04.749Z, 
    "path" => "/itcast/logstash/logs/app.log", 
    "@version" => "1", 
    "host" => "node01", 
    "message" => [ 
        [0] "2019-03-15 21:21:21", 
        [1] "ERROR", 
        [2] "读取数据出错", 
        [3] "参数:id=1002" 
    ] 
}

输出到Elasticsearch

sh
input { 
    file { 
        path => "/itcast/logstash/logs/app.log" 
        #type => "system" 
        start_position => "beginning"
    } 
}
filter { 
    mutate { 
    	split => {"message"=>"|"} 
    } 
}
output { 
    elasticsearch { 
    	hosts => [ "192.168.40.133:9200","192.168.40.134:9200","192.168.40.135:9200"] 
    } 
}
#启动 
./bin/logstash -f ./itcast-pipeline.conf 
#写入数据 
echo "2019-03-15 21:21:21|ERROR|读取数据出错|参数:id=1003" >> app.log

2.2 接收Filebeat输入的日志

将Filebeat和Logstash整合起来,读取nginx的日志。

安装nginx

sh
apt install nginx -y
#/usr/sbin/nginx:主程序
#/etc/nginx:存放配置文件
#/usr/share/nginx:存放静态文件
#/var/log/nginx:存放日志
#nginx服务命令
service nginx {start|stop|restart|reload|forcereload|
status|configtest|rotate|upgrade}
#通过浏览器访问页面并且查看日志
#访问地址:http://192.168.1.7/
tail -f /var/log/nginx/access.log

配置Filebeat

yml
#vim haoke-nginx.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/nginx/access.log
tags: ["log"]
fields:
from: nginx
fields_under_root: false
output.logstash:
hosts: ["192.168.1.7:5044"]
#启动
./filebeat -e -c haoke-nginx.yml
#说明:现在启动会报错,因为Logstash还没有启动

配置Logstash

sh
#vim haoke-pipeline.conf
#输入如下内容:
input {
beats {
port => "5044"
}
}
# The filter part of this file is commented out to indicate that it is


<NolebasePageProperties />




# optional.
# filter {
#
# }
output {
stdout { codec => rubydebug }
}

#启动 --config.test_and_exit 用于测试配置文件是否正确
bin/logstash -f haoke-pipeline.conf --config.test_and_exit
#[INFO ][logstash.runner ] Using config.test_and_exit mode. Config
Validation Result: OK. Exiting Logstash

#正式启动 --config.reload.automatic 热加载配置文件,修改配置文件后无需重新启动
bin/logstash -f haoke-pipeline.conf --config.reload.automatic

测试

分别启动Filebeat和Logstash,刷新页面查看输出

配置filter

在前面的输出中,可以看出,虽然可以拿到日志信息,但是信息格式并不友好,比如说,不能直接拿到日志中的ip地址。

第一步,自定义nginx的日志格式

sh
vim /etc/nginx/nginx.conf

log_format main '$remote_addr - $remote_user [$time_local] '
'"$request" $status $body_bytes_sent '
'"$http_referer" "$http_user_agent"';
access_log /var/log/nginx/access.log main;

nginx -s reload

第二步,编写nginx-patterns文件

sh
NGINX_ACCESS %{IPORHOST:remote_addr} - %{USERNAME:remote_user} \[%
{HTTPDATE:time_local}\] \"%{DATA:request}\" %{INT:status} %{NUMBER:bytes_sent} \"%
{DATA:http_referer}\" \"%{DATA:http_user_agent}\"

第三步,修改haoke-pipeline.conf文件

sh
input {
beats {
port => "5044"
}
}
filter {
grok {
patterns_dir => "/haoke/logstash-6.5.4/nginx-patterns"
match => { "message" => "%{NGINX_ACCESS}"}
remove_tag => [ "_grokparsefailure" ]
add_tag => [ "nginx_access" ]
}
}
output {
stdout { codec => rubydebug }
}

第四步,测试

发送到Elasticsearch

修改配置:

sh
#vim haoke-pipeline.conf
input {
beats {
port => "5044"
}
}
filter {
grok {
patterns_dir => "/haoke/logstash-6.5.4/nginx-patterns"
match => { "message" => "%{NGINX_ACCESS}"}
remove_tag => [ "_grokparsefailure" ]
add_tag => [ "nginx_access" ]
}
}
#output {
# stdout { codec => rubydebug }
#}
output {
elasticsearch {
hosts => [ "192.168.1.7:9200","192.168.1.7:9201","192.168.1.7:9202" ]
}
}

2.3 读取mysql的数据

从MySQL中读取数据,向ES中创建索引

安装logstash-input-jdbc

需先下载ruby并安装

创建模板文件

需要提前创建mapping的模板文件(json)以便logstash使用

配置mysql.conf

sh
input {
	stdin {
	}
	jdbc {
	jdbc_connection_string => "jdbc:mysql://localhost:3306/xc_course?
	useUnicode=true&characterEncoding=utf‐8&useSSL=true&serverTimezone=UTC"
	# the user we wish to excute our statement as
	jdbc_user => "root"
	jdbc_password => mysql
	# the path to our downloaded jdbc driver
	jdbc_driver_library => "F:/develop/maven/repository3/mysql/mysql‐connector‐java/5.1.41/mysqlconnector‐java‐5.1.41.jar"
	# the name of the driver class for mysql
	jdbc_driver_class => "com.mysql.jdbc.Driver"
	jdbc_paging_enabled => "true"
	jdbc_page_size => "50000"
	#要执行的sql文件
	#statement_filepath => "/conf/course.sql"
	statement => "select * from course_pub where timestamp > date_add(:sql_last_value,INTERVAL 8HOUR)"
	#定时配置
	schedule => "* * * * *"
	record_last_run => true
	last_run_metadata_path => "D:/ElasticSearch/logstash‐6.2.1/config/logstash_metadata"
	}
}
output {
	elasticsearch {
	#ES的ip地址和端口
	hosts => "localhost:9200"
	#hosts => ["localhost:9200","localhost:9202","localhost:9203"]
#ES索引库名称
	index => "xc_course"
	document_id => "%{id}"
	document_type => "doc"
	template =>"D:/ElasticSearch/logstash‐6.2.1/config/xc_course_template.json"
	template_name =>"xc_course"
	template_overwrite =>"true"
	}
	stdout {
	#日志输出
	codec => json_lines
	}
}

说明

1、ES采用UTC时区问题ES采用UTC 时区,比北京时间早8小时,所以ES读取数据时让最后更新时间加8小时where timestamp > date_add(:sql_last_value,INTERVAL 8 HOUR)

2、logstash每个执行完成会在D:/ElasticSearch/logstash-6.2.1/config/logstash_metadata记录执行时间下次以此时间为基准进行增量同步数据到索引库。

四、安装

4.1 普通安装

  1. 安装jdk环境

  2. 下载安装包

  3. 解压安装包

    sh
    tar -xvf logstash-6.5.4.tar.gz
  4. 启动

    sh
    bin/logstash -e 'input { stdin { } } output { stdout {} }'