• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

日志=》kafka》ELK

武飞扬头像
antyyy123
帮助2

kELK是三个开源软件的缩写,分别表示:Elasticsearch , Logstash, Kibana;
Elasticsearch是个开源分布式搜索引擎,提供搜集、分析、存储数据三大功能;它的特点有:分布式,零配置,自动发现,索引自动分片,索引副本机制,restful风格接口,多数据源,自动搜索负载等。
Logstash 主要是用来日志的搜集、分析、过滤日志的工具,支持大量的数据获取方式。一般工作方式为c/s架构,client端安装在需要收集日志的主机上,server端负责将收到的各节点日志进行过滤、修改等操作在一并发往elasticsearch上去。
Kibana 也是一个开源和免费的工具,Kibana可以为 Logstash 和 ElasticSearch 提供的日志分析友好的 Web 界面,可以帮助汇总、分析和搜索重要数据日志

logstach(日志收集)->Elasticsearch(日志存储和搜索)->Kibana(查看日志,可视化)

为什么要使用elk?
        ELK 组件在海量日志系统的运维中,可用于解决以下主要问题:- 分布式日志数据统一收集,实现集中式查询和管理
故障排查
安全信息和事件管理
报表功能

我们为什么用kafka,一定要通过kafka吗 
    不是,可以直接logback到ELK的,但是为什么使用kafka接收日志呢,是为了减少logstash对于日志进入时的压力。kafka的特性使用过的人应该都清楚,拥有这10W级别每秒的单机吞吐量,所以很适合作为数据来源缓冲区。

logback.xml

  1.  
    <!-- kafkaAppender 输出日志到kafka -->
  2.  
    <appender name="kafkaAppender"
  3.  
    class="com.td.ai.frame.uni.platform.oaudit.unify.config.KafkaAppender">
  4.  
    <bootstrapServers>kafka-servers</bootstrapServers>
  5.  
    <topic>kafka-topic</topic>
  6.  
    </appender>
  7.  
     
  8.  
    <!-- 要输出日志的类 -->
  9.  
    <logger name="logKafka" level="info">
  10.  
    <appender-ref ref="kafkaAppender"/>
  11.  
    </logger>
  12.  
    <!-- 异步传递策略,建议选择异步,不然连接kafka失败,会阻挡服务启动 -->
  13.  
    <appender name="Async" class="ch.qos.logback.classic.AsyncAppender">
  14.  
    <appender-ref ref="kafkaAppender"/>
  15.  
    </appender>
学新通
  1.  
    public class KafkaAppender extends AppenderBase<ILoggingEvent> {
  2.  
     
  3.  
    private static Logger logger = LoggerFactory.getLogger(KafkaAppender.class);
  4.  
     
  5.  
    private String topic = "***";
  6.  
     
  7.  
     
  8.  
     
  9.  
     
  10.  
    private Producer<String, String> producer;
  11.  
     
  12.  
     
  13.  
     
  14.  
    @Override
  15.  
    public void start() {
  16.  
    super.start();
  17.  
    if (producer == null) {
  18.  
    Properties props = new Properties();
  19.  
    props.put("security.protocol", "SASL_PLAINTEXT");
  20.  
    props.put("sasl.mechanism", "SCRAM-SHA-512");
  21.  
    props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule\n"
  22.  
    "required username=\"***\""
  23.  
    "password=\"****\";");
  24.  
    props.put("bootstrap.servers", topic);
  25.  
    //判断是否成功,我们指定了“1”将会阻塞消息
  26.  
    props.put("acks", "1");
  27.  
    props.put("retries", 3);
  28.  
    props.put("batch.size", 262144);
  29.  
    //延迟10s,10s内数据会缓存进行发送\
  30.  
    props.put("linger.ms", 10);
  31.  
    props.put("buffer.memory", 67108864);
  32.  
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  33.  
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  34.  
    props.put("metric.reporters", "com.ctg.kafka.clients.reporter.KafkaClientMetricsReporter");
  35.  
    props.put("client.id", ""***);
  36.  
    producer = new KafkaProducer<String, String>(props);
  37.  
     
  38.  
     
  39.  
    }
  40.  
     
  41.  
    }
  42.  
     
  43.  
     
  44.  
    @Override
  45.  
    protected void append(ILoggingEvent iLoggingEvent) {
  46.  
    String msg = iLoggingEvent.getFormattedMessage();
  47.  
    String message = "";
  48.  
    InetAddress localHost = null;
  49.  
    try {
  50.  
    localHost = Inet4Address.getLocalHost();
  51.  
    } catch (UnknownHostException e) {
  52.  
    e.printStackTrace();
  53.  
    }
  54.  
    String hostIp = localHost.getHostAddress();
  55.  
    String hostName = localHost.getHostName();
  56.  
    Date date = new Date();
  57.  
    SimpleDateFormat sdformat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");//24小时制
  58.  
    String datetime = sdformat.format(date);
  59.  
    JSONObject json = new JSONObject();
  60.  
     
  61.  
    json.put("podIP", hostIp);
  62.  
    json.put("podName", hostName);
  63.  
    message = json.toString();
  64.  
    // System.out.println("向kafka推送日志开始:" message);
  65.  
    //key为null 2.4之前为轮询策略
  66.  
    // 如果key值为null,并且使用了默认的分区器,Kafka会根据轮询(Random Robin)策略将消息均匀地分布到各个分区上。
  67.  
    // 之后为粘性策略
  68.  
    ProducerRecord<String, String> record = new ProducerRecord<String, String>(
  69.  
    topic, null, message);
  70.  
    //同步发动消息-改-异步发送消息
  71.  
    try {
  72.  
    Future<RecordMetadata> result = producer.send(record, new Callback() {
  73.  
    @Override
  74.  
    public void onCompletion(RecordMetadata metadata, Exception exception) {
  75.  
    if (exception != null) {
  76.  
    // 执行错误逻辑处理//否则就是成功喽
  77.  
    exception.printStackTrace();
  78.  
    }
  79.  
    }
  80.  
    });
  81.  
    // System.out.println("分区:" result.get().partition() ",offset: " result.get().offset());
  82.  
    } catch (Exception e) {
  83.  
    e.printStackTrace();
  84.  
    }
  85.  
     
  86.  
     
  87.  
    producer.flush();
  88.  
     
  89.  
     
  90.  
     
  91.  
    }
  92.  
     
  93.  
     
  94.  
     
  95.  
    }
  96.  
     
学新通

服务器安装logstash 


1. 查看一下路径
pwd
应该显示/app/logstash或者/data/logstash

2. 将tar包上传

3. 执行以下命令
tar -zxvf logstash-7.5.2.tar.gz(自己的版本号)

cd logstash-7.5.2

mkdir config/conf
mkdir config/certs
mkdir logs

cd config/conf
上传js-sysname.conf

  1.  
    input {
  2.  
    kafka {
  3.  
    topics_pattern => "kafkatopic"
  4.  
    consumer_threads => 4
  5.  
    group_id => "***-consumer" # kafka 消费组
  6.  
    type => "kafka"
  7.  
    security_protocol => "SASL_PLAINTEXT"
  8.  
    sasl_mechanism => "SCRAM-SHA-512"
  9.  
    jaas_path => "/home/crmapp/logstash-7.5.2/config/certs/kafka_client_jaas.conf"
  10.  
    bootstrap_servers => "*****"
  11.  
    codec => "json"
  12.  
    }
  13.  
    }
  14.  
     
  15.  
    filter {
  16.  
    ruby{
  17.  
    code => "event.set('index_day',event.get('@timestamp').time.localtime(' 08:00').strftime('%Y.%m.%d'))"
  18.  
    }
  19.  
     
  20.  
    }
  21.  
     
  22.  
    output {
  23.  
     
  24.  
    elasticsearch {
  25.  
    hosts => ["*****"]
  26.  
    index => "***a-log-%{index_day}"
  27.  
    user => "**"
  28.  
    password => "这里写es的密码"
  29.  
    }
  30.  
     
  31.  
    }
学新通

cd ../certs
上传kafka_client_jaas.conf

  1.  
    KafkaClient {
  2.  
    org.apache.kafka.common.security.scram.ScramLoginModule required
  3.  
    username="**"
  4.  
    password="****";
  5.  
    };

cd ..
vim pipelines.yml
将下面的加到"# Example of two pipelines:"这一行下面
 - pipeline.id:js-sysname
   pipeline.workers: 2
   path.config: "/app/logstash/logstash-7.5.2/config/conf/-js-sysname.conf"

cd /app/logstash/logstash-7.5.2/
nohup bin/logstash -r true --config.reload.automatic >> logs/logstash.log &

4. 查看日志
tail -100f logs/logstash.log
启动需要时间,如果没有erorr日志,没有提示连不上kafka或者elasricsearch即可

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhifkkhi
系列文章
更多 icon
同类精品
更多 icon
继续加载