一、概述
服务端数据处理管道,能从多个来源采集数据,转换数据,然后将数据发送大存储库中(elasticsearch)
1.1 配置详解
格式
input { #输入
stdin { ... } #标准输入
}
filter { #过滤,对数据进行分割、截取等处理
...
}
output { #输出
stdout { ... } #标准输出
}输入
采集各种样式、大小和来源的数据,数据往往以各种各样的形式,或分散或集中地存在于很多系统中。
Logstash 支持各种输入选择 ,可以在同一时间从众多常用来源捕捉事件。能够以连续的流式传输方式,轻松地从您的日志、指标、Web 应用、数据存储以及各种 AWS 服务采集数据。
过滤
实时解析和转换数据
数据从源传输到存储库的过程中,Logstash 过滤器能够解析各个事件,识别已命名的字段以构建结构,并将它们转换成通用格式,以便更轻松、更快速地分析和实现商业价值。
输出
Logstash 提供众多输出选择,您可以将数据发送到您要指定的地方,并且能够灵活地解锁众多下游用例。
二、使用示例
2.1 读取自定义日志
通过Filebeat读取了nginx的日志,如果是自定义结构的日志,就需要读取处理后才能使用,所以,这个时候就需要使用Logstash了,因为Logstash有着强大的处理能力,可以应对各种各样的场景。
如日志结构:
2019-03-15 21:21:21|ERROR|读取数据出错|参数:id=1002配置读取
#vim itcast-pipeline.conf
input {
file {
path => "/itcast/logstash/logs/app.log"
start_position => "beginning"
}
}
filter {
mutate {
split => {"message"=>"|"}
}
}
output {
stdout {
codec => rubydebug
}
}启动测试
#启动
./bin/logstash -f ./itcast-pipeline.conf
#写日志到文件
echo "2019-03-15 21:21:21|ERROR|读取数据出错|参数:id=1002" >> app.log
#输出的结果
{
"@timestamp" => 2019-03-15T08:44:04.749Z,
"path" => "/itcast/logstash/logs/app.log",
"@version" => "1",
"host" => "node01",
"message" => [
[0] "2019-03-15 21:21:21",
[1] "ERROR",
[2] "读取数据出错",
[3] "参数:id=1002"
]
}输出到Elasticsearch
input {
file {
path => "/itcast/logstash/logs/app.log"
#type => "system"
start_position => "beginning"
}
}
filter {
mutate {
split => {"message"=>"|"}
}
}
output {
elasticsearch {
hosts => [ "192.168.40.133:9200","192.168.40.134:9200","192.168.40.135:9200"]
}
}
#启动
./bin/logstash -f ./itcast-pipeline.conf
#写入数据
echo "2019-03-15 21:21:21|ERROR|读取数据出错|参数:id=1003" >> app.log2.2 接收Filebeat输入的日志
将Filebeat和Logstash整合起来,读取nginx的日志。
安装nginx
apt install nginx -y
#/usr/sbin/nginx:主程序
#/etc/nginx:存放配置文件
#/usr/share/nginx:存放静态文件
#/var/log/nginx:存放日志
#nginx服务命令
service nginx {start|stop|restart|reload|forcereload|
status|configtest|rotate|upgrade}
#通过浏览器访问页面并且查看日志
#访问地址:http://192.168.1.7/
tail -f /var/log/nginx/access.log配置Filebeat
#vim haoke-nginx.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/nginx/access.log
tags: ["log"]
fields:
from: nginx
fields_under_root: false
output.logstash:
hosts: ["192.168.1.7:5044"]
#启动
./filebeat -e -c haoke-nginx.yml
#说明:现在启动会报错,因为Logstash还没有启动配置Logstash
#vim haoke-pipeline.conf
#输入如下内容:
input {
beats {
port => "5044"
}
}
# The filter part of this file is commented out to indicate that it is
<NolebasePageProperties />
# optional.
# filter {
#
# }
output {
stdout { codec => rubydebug }
}
#启动 --config.test_and_exit 用于测试配置文件是否正确
bin/logstash -f haoke-pipeline.conf --config.test_and_exit
#[INFO ][logstash.runner ] Using config.test_and_exit mode. Config
Validation Result: OK. Exiting Logstash
#正式启动 --config.reload.automatic 热加载配置文件,修改配置文件后无需重新启动
bin/logstash -f haoke-pipeline.conf --config.reload.automatic测试
分别启动Filebeat和Logstash,刷新页面查看输出
配置filter
在前面的输出中,可以看出,虽然可以拿到日志信息,但是信息格式并不友好,比如说,不能直接拿到日志中的ip地址。
第一步,自定义nginx的日志格式
vim /etc/nginx/nginx.conf
log_format main '$remote_addr - $remote_user [$time_local] '
'"$request" $status $body_bytes_sent '
'"$http_referer" "$http_user_agent"';
access_log /var/log/nginx/access.log main;
nginx -s reload第二步,编写nginx-patterns文件
NGINX_ACCESS %{IPORHOST:remote_addr} - %{USERNAME:remote_user} \[%
{HTTPDATE:time_local}\] \"%{DATA:request}\" %{INT:status} %{NUMBER:bytes_sent} \"%
{DATA:http_referer}\" \"%{DATA:http_user_agent}\"第三步,修改haoke-pipeline.conf文件
input {
beats {
port => "5044"
}
}
filter {
grok {
patterns_dir => "/haoke/logstash-6.5.4/nginx-patterns"
match => { "message" => "%{NGINX_ACCESS}"}
remove_tag => [ "_grokparsefailure" ]
add_tag => [ "nginx_access" ]
}
}
output {
stdout { codec => rubydebug }
}第四步,测试
发送到Elasticsearch
修改配置:
#vim haoke-pipeline.conf
input {
beats {
port => "5044"
}
}
filter {
grok {
patterns_dir => "/haoke/logstash-6.5.4/nginx-patterns"
match => { "message" => "%{NGINX_ACCESS}"}
remove_tag => [ "_grokparsefailure" ]
add_tag => [ "nginx_access" ]
}
}
#output {
# stdout { codec => rubydebug }
#}
output {
elasticsearch {
hosts => [ "192.168.1.7:9200","192.168.1.7:9201","192.168.1.7:9202" ]
}
}2.3 读取mysql的数据
从MySQL中读取数据,向ES中创建索引
安装logstash-input-jdbc
需先下载ruby并安装
创建模板文件
需要提前创建mapping的模板文件(json)以便logstash使用
配置mysql.conf
input {
stdin {
}
jdbc {
jdbc_connection_string => "jdbc:mysql://localhost:3306/xc_course?
useUnicode=true&characterEncoding=utf‐8&useSSL=true&serverTimezone=UTC"
# the user we wish to excute our statement as
jdbc_user => "root"
jdbc_password => mysql
# the path to our downloaded jdbc driver
jdbc_driver_library => "F:/develop/maven/repository3/mysql/mysql‐connector‐java/5.1.41/mysqlconnector‐java‐5.1.41.jar"
# the name of the driver class for mysql
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
#要执行的sql文件
#statement_filepath => "/conf/course.sql"
statement => "select * from course_pub where timestamp > date_add(:sql_last_value,INTERVAL 8HOUR)"
#定时配置
schedule => "* * * * *"
record_last_run => true
last_run_metadata_path => "D:/ElasticSearch/logstash‐6.2.1/config/logstash_metadata"
}
}
output {
elasticsearch {
#ES的ip地址和端口
hosts => "localhost:9200"
#hosts => ["localhost:9200","localhost:9202","localhost:9203"]
#ES索引库名称
index => "xc_course"
document_id => "%{id}"
document_type => "doc"
template =>"D:/ElasticSearch/logstash‐6.2.1/config/xc_course_template.json"
template_name =>"xc_course"
template_overwrite =>"true"
}
stdout {
#日志输出
codec => json_lines
}
}说明
1、ES采用UTC时区问题ES采用UTC 时区,比北京时间早8小时,所以ES读取数据时让最后更新时间加8小时where timestamp > date_add(:sql_last_value,INTERVAL 8 HOUR)
2、logstash每个执行完成会在D:/ElasticSearch/logstash-6.2.1/config/logstash_metadata记录执行时间下次以此时间为基准进行增量同步数据到索引库。
四、安装
4.1 普通安装
安装jdk环境
下载安装包
解压安装包
shtar -xvf logstash-6.5.4.tar.gz启动
shbin/logstash -e 'input { stdin { } } output { stdout {} }'