Skip to content

Instantly share code, notes, and snippets.

@wutingjia
Last active July 28, 2021 01:38
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wutingjia/36fbcd70b4908ee402a0f9cf0f6b4ffd to your computer and use it in GitHub Desktop.
Save wutingjia/36fbcd70b4908ee402a0f9cf0f6b4ffd to your computer and use it in GitHub Desktop.
集中式日志分析方案

使用技术:Filebeat + kafka + logstash + elasticsearch + kibana
(日志采集) (传输) (数据收集整理) (数据存储搜索分析) (数据分析可视化)

Logger

这里使用slf4j+logback的组合,首先需要注意是否与某些依赖继承的log4j冲突,需要把它exculusion,如果存在,slf4j会报找到多个binding的实现,然后会自己选取一种。
slf4J 是 简单日志门面(simple logger facade),并没有具体的日志实现,只提供一组接口,这样用户就无需,因为不同的底层实现而编写不同的代码。
首先需要依赖logback-classic和slf4j-api。如果使用的是log4j还需要slf4j-log4j12 作为连接。
对于logback其配置文件为resource下的logback.xml。
配置例如:

<logger name="ELKLogger" level="DEBUG" additivity="true">
        <appender-ref ref="ELK" />
    </logger>

 <appender name="ELK" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <encoder charset="utf-8">
            <pattern>[%d{yyyy-MM-dd HH:mm:ss,SSS}] [%-5level] [%logger{36}] [%thread] %m%n
            </pattern>
        </encoder>
        <!--<filter class="ch.qos.logback.classic.filter.LevelFilter">-->
        <!--<level>DEBUG</level>-->
        <!--<onMatch>ACCEPT</onMatch>-->
        <!--<onMismatch>DENY</onMismatch>-->
        <!--</filter>-->
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>${ROOT}/AAA_QQQ_%d{}_%i.log</fileNamePattern>
            <maxHistory>${MAXHISTORY}</maxHistory>
            <timeBasedFileNamingAndTriggeringPolicy
                    class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
                <maxFileSize>${FILESIZE}</maxFileSize>
            </timeBasedFileNamingAndTriggeringPolicy>
        </rollingPolicy>
    </appender>

private static Logger logger=LoggerFactory.getLogger("ELKLogger");//这里的name和<logger name="">相同说明使用这个logger的配置。这个名字可以是包名,这样包下类的所有logger就都用这个,代码里getLogger("classA.class")就行。<appender>代表一种输出配置,通过名字在配置文件中进行映射,在其中可以对输出目录,输出格式等进行设置

Filebeat

安装
linux:

curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.6.0-linux-x86_64.tar.gz  
tar xzvf filebeat-6.6.0-linux-x86_64.tar.gz  

win:下载解压后,重命名文件夹名字为Filebeat,使用管理员权限打开PowerShell,输入:

PS > cd 'C:\Program Files\Filebeat'  
PS C:\Program Files\Filebeat> .\install-service-filebeat.ps1  

配置
https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-configuration.html
默认配置文件为filebeat.yml还有一个全选项配置的参考文件filebeat.reference.yml

filebeat.inputs:  
- type: log  
  enabled: true  #默认为false 记得改为true
  paths:  
    - /var/log/*.log  #支持所有Glob的匹配形式,从服务器根目录开始的绝对路径,注意-后面有空格。如果没有-空格则为filebeat文件夹下的相对路径。
    #- c:\programdata\elasticsearch\logs\*  
    #/var/log/*/*.log. 不匹配/var/log/本身下的.log文件  
output.kafka:
  # initial brokers for reading cluster metadata  
  hosts: ["kafka1:9092", "kafka2:9092", "kafka3:9092"]  
  
  # message topic selection + partitioning  
  topic: '%{[fields.log_topic]}'  
  partition.round_robin:  
    reachable_only: false  
  
  required_acks: 1  
  compression: gzip  
  max_message_bytes: 1000000  

https://www.elastic.co/guide/en/beats/filebeat/current/kafka-output.html

output.elasticsearch:  
  hosts: ["myEShost:9200"]  
  username: "filebeat_internal"  
  password: "YOUR_PASSWORD"  
setup.kibana:  
  host: "mykibanahost:5601"  
  username: "my_kibana_user"  
  password: "YOUR_PASSWORD"  
sudo ./filebeat -e -c filebeat.yml -d "publish" #启动filebeat 
sudo rm data/registry   # 重启前删除注册表的缓存信息,让其重新加载完整的信息。

Kafka

用于对大量的消息日志进行缓冲

基本概念

  • broker:一个kafka集群包含一个或多个服务器,每个服务器被称为broker,用于保存producer发送的消息。

  • controller leader:Kafka集群中有多个broker,当每个broker启动的时候,都会创建Kafka Controller对象,然后去zookeeper竞争一个Controller leader节点。leader会向zookeeper上注册Watcher,其他broker几乎不用监听zookeeper的状态变化。负责管理整个集群中分区和副本的状态.

  • producer:消息生产者,就是向kafka broker发消息的客户端。

  • topic :每条被发送到broker的消息都有一个逻辑上的类别,比如这个是服务a的日志,那个是服务b的日志,这个类别被称为topic。

  • partition:一个topic中的消息由多个分区存储的,每个分区被称为一个partition。每个partition中保证消息有序

  • replica:即replication,由replication-factor设定,大于等于1,。这些备份必须在不同的broker上。即replica数要小于等于broker数。(注意,虽然叫做备份,但是并没有本体的概念,类似本体的概念是下面的leader!)

  • leader:一个partition的所有replica中只有一个leader,用于读取和写入消息。

  • follower: 一个partition的所有replica中除了leader被称为follower,只用于备份数据,实现高可用。

  • ISR:即in-sync Replica,partition的同步机制。每个leader维护一份与其基本保持同步的follower列表。如果一个follower中的消息比起leader落后太多,则会被从列表中被删除。当所有列表中的follower向leader发送ACK——表明已经从leader中主动拉取数据并备份完毕之后——leader才进行commit提交这次事务,表示已经从producer接收到数据。

  • ACK:通过request.required.acks参数设置级别。0表示producer发送消息过去就完事,不关心broker是否处理成功;1表示producer发送消息过去,leader成功接受到就完事。-1表示producer发送过去,leader及其ISR中所有follower成功才完事,即上条所述。

  • offset:偏移量。kafka为每条在partition的消息保存一个偏移量offset,表示现在已经消费到的位置。保存在一个名叫__consumeroffsets__ 的topic中。

  • consumer:消息读取客户端,通过订阅topic的消息从broker拉取消息。

  • consumer gruop:实际情况是以一个consumer gruop来订阅topic的,一个消费组里面可以有多个consumer。topic中每个partition只会对应的被consumer gruop中的某一个consumer所消费,用于保持消息的有序性。但是可以被多个consumer gruop同时订阅。

  • 消息投递语义:At most once:最多一次,消息可能会丢失,但不会重复。先获取数据,再commit offset,最后进行业务处理。
    At least once:最少一次,消息不会丢失,可能会重复。先获取数据,再进行业务处理,业务处理成功后commit offset(常用)
    Exactly once:只且一次,消息不丢失不重复,只且消费一次(0.11中实现,仅限于下游也是kafka)。

LogStash

下载安装LogStash(?)
配置
主要有三部分组成: input、filte、output

# The # character at the beginning of a line indicates a comment. Use
# comments to describe your configuration.
input {
}
# The filter part of this file is commented out to indicate that it is
# optional.
# filter {
#
# }
output {
}  

在logstash目录下新建first-pipeline.conf中进行配置。

在input端将使用logstash-input-kafka 以此从kafka topic读入数据。 在input中填入kafka的设置例如:

  kafka{
        bootstrap_servers => ["192.168.110.31:9092,192.168.110.31:9093,192.168.110.31:9094"]
        client_id => "test"
        group_id => "test"
        auto_offset_reset => "latest" //从最新的偏移量开始消费
        consumer_threads => 5
      }  

详细参数https://www.elastic.co/guide/en/logstash/6.6/plugins-inputs-kafka.html 注意版本!老版本使用zookeeper地址,新版本使用kafka实例地址。

验证配置:

bin/logstash -f first-pipeline.conf --config.test_and_exit  

启动logstash:

bin/logstash -f first-pipeline.conf --config.reload.automatic  

警告中的pipelines.yml用于一个实例中的multi-pipeline配置。

在filter中可以使用 grok filter plugin,例如设置:

grok {
        match => { "message" => "%{COMBINEDAPACHELOG}"}
    }

更多关于grok的介绍https://www.elastic.co/guide/en/logstash/6.6/plugins-filters-grok.html

在output中使用elasticsearch例如设置:

elasticsearch {
        hosts => [ "localhost:9200" ]
    }  

在三个部分中都可以配置多个插件 具体插件的选择可以见文档都是你xxx{}嵌在每个部分中。 样例

 
input {
  kafka{
    bootstrap_servers => ""
    topics_pattern => "log-.*"
    type => "log"
    codec => json
  }

}
filter {
  if [fields][log_topic] == "log-trade" { //kafka中的topic名
    grok {
      match => {"message" => "(?<UID>(?<=UID><).*?(?=><IUID)).*(?<ReqTime>(?<=T1><).*?(?=><T4)).*(?<ResTime>(?<=T4><).*?(?=><RAP))    }

    date{
      match=>["ReqTime","yyyyMMddHHmmssSSS"]
      target=>"@timestamp"
      timezone=>"+08:00"
    }
  }


  if [fields][log_topic] == "log-exception" {
    grok {
      match => {"message" => "(?<ExceptionID>(?<=id:).*)"} 
    }
  }
      
}        
        
 
output {     
  elasticsearch {
    hosts => [""]
    action => "index"
    codec => line{format => "%{message}"}
    index=>"%{[fields][log_topic]}-%{+YYYY-MM-dd}"
    template_name => "log*" //随意
    manage_template => true
    template_overwrite => true
    template => "/ifspt/logstash/logstash-6.6.0/template/log.json" //使用index模版文件,启动时会被自动安装到es.该文件中的template名字必须能匹配index名字。最简单的模版就仅仅是指定
    名字。使用模版后,可以在高版本的kibana中 直接管理index!
    
  } 
  stdout { codec => rubydebug } 
}

ElasticSearch

基本概念

cluster: 一个cluster由一个或多个node组成,由不同的名字所区别,默认为"elasticsearch"。一个cluster只有一个node也是有效的。

node: 一个node就是一个服务器,用于存储处理数据。,由不同的名字所区别,默认为随机的UUID。如果当前没有任何node在运行,启动一个node,它会自动称为一个名字为"elasticsearch"的single-node cluster。

index: 是有一些相似特征的ducument的集合。由不同的名字所区别,必须全部小写。

document: 是可以被索引的最小单位,例如单个的产品document,单个的客户document,以JSON格式表示。

shard: 当一个index数据量太大是,可以分为多个shard在不同node上,每个shard都将是独立且功能完整的。为了保证高可用,Elasticsearch可以对shard进行备份,称为replica shards。显然备份不会和原数据在同一个node上,这还增加了横向扩展的能力。这些都在index创建的时候进行定义,当然之后也可以动态修改。默认每个index有5个shard,每个shard有一份replica。

Cluster指令

./elasticsearch -Ecluster.name=my_cluster_name -Enode.name=my_node_name可以启动时设置名字。
curl -X GET "localhost:9200/_cat/health?v" 查看cluster情况,默认为9200端口。
curl -X GET "localhost:9200/_cat/nodes?v" 查看node情况。
curl -X GET "localhost:9200/_cat/indices?v" 查看index情况。
curl -X PUT "localhost:9200/customer?pretty" 创建一个名为customer的index pretty表示漂亮的打印json格式。
curl -X GET "localhost:9200/_cat/indices?v" 再查看一下。

curl -X PUT "localhost:9200/customer/_doc/1?pretty" -H 'Content-Type: application/json' -d'  
{
 "name": "John Doe"  
}  

在ID为customer的index中放入一个ID为1的ducument。如果没有ID为customer的index,则会自动创建。
curl -X GET "localhost:9200/customer/_doc/1?pretty" 查看该document。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment