logstash是一个数据抽取工具,将数据从一个地方转移到另一个地方。如hadoop生态圈的sqoop等。
logstash之所以功能强大和流行,还与其丰富的过滤器插件是分不开的,过滤器提供的并不单单是过滤的功能,还可以对进入过滤器的原始数据进行复杂的逻辑处理,甚至添加独特的事件到后续流程中。
Logstash配置文件有如下三部分组成,其中input、output部分是必须配置,filter部分是可选配置,而filter就是过滤器插件,可以在这部分实现各种日志过滤功能。
配置文件
input { #输入插件 } filter { #过滤匹配插件 } output { #输出插件 }
启动操作:
logstash.bat -e 'input{stdin{}} output{stdout{}}' 1
为了好维护,将配置写入文件,启动
logstash.bat -f ../config/test1.conf
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-file.html
logstash使用一个名为filewatch的ruby gem库来监听文件变化,并通过一个叫.sincedb的数据库文件来记录被监听的日志文件的读取进度(时间戳),这个sincedb数据文件的默认路径在 <path.data>/plugins/inputs/file下面,文件名类似于.sincedb_123456,而<path.data>表示logstash插件存储目录,默认是LOGSTASH_HOME/data。
input { file { path => ["文件路径"] start_position => "beginning" } } output { stdout{ codec=>rubydebug } }
path => ["D:/ProgramData/ELK/logstash-7.8.1/Test*.log"]
*表示匹配文件名前面有Test log文件
Test.log 注意:一行日志结束为回车符 不回车不解析
172.16.213.132 [09/Nov/2019:16:24:19 +0800] "GET / HTTP/1.1" 403 5039 172.16.213.132 [09/Nov/2019:16:24:20 +0800] "GET / HTTP/1.1" 200 5039 172.16.213.132 [09/Nov/2019:13:24:25 +0800] "PUT / HTTP/1.1" 200 5039
input { tcp { port => "1234" } } filter { grok { match => { "message" => "%{SYSLOGLINE}" } } } output { stdout{ codec=>rubydebug } }
https://www.elastic.co/guide/en/logstash/current/filter-plugins.html
grok是一个十分强大的logstash filter插件,他可以通过正则解析任意文本,将非结构化日志数据弄成结构化和方便查询的结构。他是目前logstash 中解析非结构化日志数据最好的方式。
Grok 的语法规则是:
%{语法: 语义}
例如输入的内容为:
172.16.213.132 [07/Feb/2019:16:24:19 +0800] "GET / HTTP/1.1" 403 5039
%{IP:clientip} 匹配模式将获得的结果为:clientip: 172.16.213.132 %{HTTPDATE:timestamp} 匹配模式将获得的结果为:timestamp:07/Feb/2018:16:24:19 +0800 %{QS:referrer} 匹配模式将获得的结果为:referrer: "GET / HTTP/1.1"
下面是一个组合匹配模式,它可以获取上面输入的所有内容:
%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}
通过上面这个组合匹配模式,我们将输入的内容分成了五个部分,即五个字段,将输入内容分割为不同的数据字段,这对于日后解析和查询日志数据非常有用,这正是使用grok的目的。
例子 新建配置文件test3.conf:
input{ stdin{} } filter{ grok{ match => ["message","%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}"] } } output{ stdout{ codec => "rubydebug" } }
输入内容:
172.16.213.132 [07/Feb/2019:16:24:19 +0800] "GET / HTTP/1.1" 403 5039
输出结果:
date插件是对于排序事件和回填旧数据尤其重要,它可以用来转换日志记录中的时间字段,变成LogStash::Timestamp对象,然后转存到@timestamp字段里,这在之前已经做过简单的介绍。 下面是date插件的一个配置示例(这里仅仅列出filter部分):
修改配置文件如下
filter { grok { match => ["message", "%{HTTPDATE:timestamp}"] } date { match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"] } }
结果:
gsub可以通过正则表达式替换字段中匹配到的值,只对字符串字段有效,下面是一个关于mutate插件中gsub的示例(仅列出filter部分):
filter { mutate { gsub => ["filed_name_1", "/" , "_"] } }
这个示例表示将filed_name_1字段中所有"/“字符替换为”_"。
split可以通过指定的分隔符分割字段中的字符串为数组,下面是一个关于mutate插件中split的示例(仅列出filter部分):
filter { mutate { split => ["filed_name_2", "|"] } }
这个示例表示将filed_name_2(属性名)字段以"|"为区间分隔为数组。
rename可以实现重命名某个字段的功能,下面是一个关于mutate插件中rename的示例(仅列出filter部分):
filter { mutate { rename => { "old_field" => "new_field" } } }
remove_field可以实现删除某个字段的功能,下面是一个关于mutate插件中remove_field的示例(仅列出filter部分):
filter { mutate { remove_field => ["timestamp"] } }
这个示例表示将字段timestamp删除。
filter { geoip { source => "ip_field" } }
input { stdin {} } filter { grok { match => { "message" => "%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}" } remove_field => [ "message" ] } date { match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"] } mutate { convert => [ "response","float" ] rename => { "response" => "response_new" } gsub => ["referrer","\"",""] split => ["clientip", "."] } } output{ stdout{ codec => "rubydebug" } }
输出结果:
https://www.elastic.co/guide/en/logstash/current/output-plugins.html
output是Logstash的最后阶段,一个事件可以经过多个输出,而一旦所有输出处理完成,整个事件就执行完成。 一些常用的输出包括:
file: 表示将日志数据写入磁盘上的文件。
Elasticsearch:表示将日志数据发送给Elasticsearch。Elasticsearch可以高效方便和易于查询的保存数据。
1、输出到标准输出(stdout)
output { stdout { codec => rubydebug } }
2、保存为文件(file)
output { file { path => "/data/log/%{+yyyy-MM-dd}/%{host}_%{+HH}.log" } }
3、输出到elasticsearch
output { elasticsearch { hosts => ["192.168.1.1:9200","172.16.213.77:9200"] index => "logstash-%{+YYYY.MM.dd}" } }
hosts:是一个数组类型的值,后面跟的值是elasticsearch节点的地址与端口,默认端口是9200。可添加多个地址。
index:写入elasticsearch的索引的名称,这里可以使用变量。Logstash提供了%{+YYYY.MM.dd}这种写法。在语法解析的时候,看到以+ 号开头的,就会自动认为后面是时间格式,尝试用时间格式来解析后续字符串。这种以天为单位分割的写法,可以很容易的删除老的数据或者搜索指定时间范围内的数据。此外,注意索引名中不能有大写字母。注意:这种格式会使用日志信息的日期进行替换
manage_template:用来设置是否开启logstash自动管理模板功能,如果设置为false将关闭自动管理模板功能。如果我们自定义了模板,那么应该设置为false。
template_name:这个配置项用来设置在Elasticsearch中模板的名称。
input { file { path => ["D:/ES/logstash-7.3.0/nginx.log"] start_position => "beginning" } } filter { grok { match => { "message" => "%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}" } remove_field => [ "message" ] } date { match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"] } mutate { rename => { "response" => "response_new" } convert => [ "response","float" ] gsub => ["referrer","\"",""] remove_field => ["timestamp"] split => ["clientip", "."] } } output { stdout { codec => "rubydebug" } elasticsearch { host => ["localhost:9200"] index => "logstash-%{+YYYY.MM.dd}" } }
需求:集中收集分布式服务的日志
逻辑模块程序随时输出日志
logstash收集日志到es
前置知识 (有选择看 太细了。。)
SpringBoot—Logback日志,输出到文件以及实时输出到web页面
快速使用看这个
SpringBoot系列——Logback日志,输出到文件以及实时输出到web页面 - huanzi-qch - 博客园 (cnblogs.com)
新建logback-spring.xml在resources
<?xml version="1.0" encoding="UTF-8"?> <configuration> <!--定义日志文件的存储地址,使用绝对路径--> <property name="LOG_HOME" value="D:/Documents/WorkDocument/logs"/> <!-- Console 输出设置 --> <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符--> <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern> <charset>utf8</charset> </encoder> </appender> <!-- 按照每天生成日志文件 --> <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender"> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <!--日志文件输出的文件名--> <fileNamePattern>${LOG_HOME}/test-%d{yyyy-MM-dd}.log</fileNamePattern> </rollingPolicy> <encoder> <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern> </encoder> </appender> <!-- 异步输出 --> <appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender"> <!-- 不丢失日志.默认的,如果队列的80%已满,则会丢弃TRACT、DEBUG、INFO级别的日志 --> <discardingThreshold>0</discardingThreshold> <!-- 更改默认的队列的深度,该值会影响性能.默认值为256 --> <queueSize>512</queueSize> <!-- 添加附加的appender,最多只能添加一个 --> <appender-ref ref="FILE"/> </appender> <logger name="org.apache.ibatis.cache.decorators.LoggingCache" level="DEBUG" additivity="false"> <appender-ref ref="CONSOLE"/> </logger> <logger name="org.springframework.boot" level="DEBUG"/> <root level="info"> <!--<appender-ref ref="ASYNC"/>--> <appender-ref ref="FILE"/> <appender-ref ref="CONSOLE"/> </root> </configuration>
在application.yaml 添加
logging: config: classpath:logback-spring.xml
TestLog.java
import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.util.Random; /** * @author gyy * @date 2022/3/14 **/ @RunWith(SpringRunner.class) @SpringBootTest public class TestLog { private static final Logger LOGGER= LoggerFactory.getLogger(TestLog.class); @Test public void testLog(){ Random random =new Random(); while (true){ int userid=random.nextInt(10); LOGGER.info("userId:{},send:{}",userid,"hello world.I am "+userid); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } } }
logstash config
input { file { path => ["D:/Documents/WorkDocument/logs/test-*.log"] start_position => "beginning" } } filter { grok { match => { "message" => "%{DATA:datetime} \[%{DATA:thread}\] %{DATA:level} %{DATA:class} - %{GREEDYDATA:1ogger}" } remove_field => [ "message" ] } date { match => ["datetime", "dd/MMM/yyyy:HH:mm:ss.SSS"] } if "_grokparsefailure" in [tags] { drop { } } } output { elasticsearch { hosts => ["127.0.0.1:9200"] index => "logstash-%{+YYYY.MM.dd}" } }
然后先启动 es 在启动logstash 最后k
在高并发需求中 如淘宝用户搜索商品 使用关系数据库mysql之类肯定不科学 大部分应该是将数据库数据放入es中 使用es集群完成并发需求 所以简单学习一下流程
/* Navicat Premium Data Transfer Source Server : local_mysql Source Server Type : MySQL Source Server Version : 80027 Source Host : localhost:3306 Source Schema : learning_db Target Server Type : MySQL Target Server Version : 80027 File Encoding : 65001 Date: 16/03/2022 13:43:19 */ SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; -- ---------------------------- -- Table structure for user_info -- ---------------------------- DROP TABLE IF EXISTS `user_info`; CREATE TABLE `user_info` ( `user_id` bigint NOT NULL AUTO_INCREMENT, `user_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '用户名', `user_gender` int NOT NULL COMMENT '性别 0 male 1-female', `user_id_number` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '身份证号', `user_nucleic_acid` int NULL DEFAULT NULL COMMENT '0-正常 1-确诊 2-密接 3-次密接 4-疑似', PRIMARY KEY (`user_id`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci COMMENT = '用户信息表' ROW_FORMAT = Dynamic; -- ---------------------------- -- Records of user_info -- ---------------------------- INSERT INTO `user_info` VALUES (1, '小米', 1, '123456789X', 0); INSERT INTO `user_info` VALUES (2, '小明', 0, '123456789X', 1); SET FOREIGN_KEY_CHECKS = 1;
http://localhost:9200/learning_db-user_info
{ "settings": { "number_of_shards": 1, "number_of_replicas": 0 }, "mappings": { "properties": { "user_id": { "type": "keyword" }, "user_name": { "analyzer": "ik_max_word", "search_analyzer": "ik_smart", "type": "text" }, "user_gender": { "type": "keyword" }, "user_id_number": { "type": "keyword" }, "user_create_time": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss || yyyy-MM-dd || yyyy/MM/dd HH:mm:ss|| yyyy/MM/dd|| strict_date_optional_time|| epoch_millis" }, "user_nucleic_acid_last_time": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss || yyyy-MM-dd || yyyy/MM/dd HH:mm:ss|| yyyy/MM/dd|| strict_date_optional_time|| epoch_millis" }, "user_update_time": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss || yyyy-MM-dd || yyyy/MM/dd HH:mm:ss|| yyyy/MM/dd|| strict_date_optional_time|| epoch_millis" } } } }
logstash每个执行完成会在/config/.logstash_jdbc_last_run记录执行时间下次以此时间为基准进行增量同步数据到索引库。
input { jdbc { # 自己本地的驱动位置 也可以用相对 jdbc_driver_library => "D:/Program Files/maven-repository/mysql/mysql-connector-java/8.0.26/mysql-connector-java-8.0.26.jar" jdbc_driver_class => "com.mysql.cj.jdbc.Driver" # 8.0以上版本:一定要把serverTimezone=UTC加上 jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/learning_db?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true&serverTimezone=UTC" jdbc_user => "root" jdbc_password => "123456" # 一分钟一扫 schedule => "* * * * *" #要执行的sql 也可以指定sql文件 statement => "SELECT * FROM user_info WHERE user_update_time >= :sql_last_value" #设置时区 jdbc_default_timezone => "Asia/Shanghai" # sql_last_value储存位置 文件要有 可空 last_run_metadata_path => "D:/ProgramData/ELK/logstash-7.8.1/config/.logstash_jdbc_last_run" } } output { elasticsearch { # ES的IP地址及端口 可写多个 hosts => ["127.0.0.1:9200"] # 索引名称 可自定义 index => "learning_db-user_info" # 需要关联的数据库中有有一个id字段,对应类型中的_id document_id => "%{user_id}" } stdout { # JSON格式输出 codec => json_lines } }
.\logstash.bat -f ..\config\mysql.conf
{ "took": 809, "timed_out": false, "_shards": { "total": 1, "successful": 1, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 2, "relation": "eq" }, "max_score": 1, "hits": [ { "_index": "learning_db-user_info", "_type": "_doc", "_id": "1", "_score": 1, "_source": { "user_id": 1, "user_nucleic_acid": 0, "user_gender": 1, "user_nucleic_acid_last_time": null, "@version": "1", "user_create_time": "2022-03-17T16:41:00.000Z", "@timestamp": "2022-03-17T11:56:00.913Z", "user_name": "小米", "user_update_time": "2022-03-17T16:41:01.000Z", "user_id_number": "123456789X" } }, { "_index": "learning_db-user_info", "_type": "_doc", "_id": "2", "_score": 1, "_source": { "user_id": 2, "user_nucleic_acid": 1, "user_gender": 0, "user_nucleic_acid_last_time": null, "@version": "1", "user_create_time": "2022-03-17T16:41:03.000Z", "@timestamp": "2022-03-17T11:56:00.923Z", "user_name": "小明", "user_update_time": "2022-03-17T16:41:06.000Z", "user_id_number": "123456789X" } } ] } }
ObservabilityLearning: ObservabilityLearning 整合框架demo (gitee.com)
ElastAlert是一个简单的框架,用于从Elasticsearch中的数据中发出异常,尖峰或其他感兴趣的模式的警报。
它通过将Elasticsearch与两种类型的组件(规则类型和警报)结合使用。定期查询Elasticsearch,并将数据传递到规则类型,该规则类型确定找到任何匹配项。发生匹配时,它会发出一个或多个警报,这些警报根据不同的类型采取相应的措施。
ElastAlert由一组规则配置,每个规则定义一个查询,一个规则类型和一组警报。
ElastAlert
是基于python2
开发的一个告警框架,它主要有以下特点:
安装出现问题,而且这个项目3年不维护了,目前放弃,但是是必须找到发邮件之类通知替代品。
{{ cmt.username }}
{{ cmt.content }}
{{ cmt.commentDate | formatDate('YYYY.MM.DD hh:mm') }}