為了賬號安全,請及時綁定郵箱和手機立即綁定

JavaWeb項目架構之Kafka分布式日志隊列

2018.02.06 15:10 3326瀏覽

架構、分布式、日志隊列,標題自己都看著唬人,其實就是一個日志收集的功能,只不過中間加了一個Kafka做消息隊列罷了。

圖片描述

kafka介紹

Kafka是由Apache軟件基金會開發的一個開源流處理平臺,由Scala和Java編寫。Kafka是一種高吞吐量的分布式發布訂閱消息系統,它可以處理消費者規模的網站中的所有動作流數據。 這種動作(網頁瀏覽,搜索和其他用戶的行動)是在現代網絡上的許多社會功能的一個關鍵因素。 這些數據通常是由于吞吐量的要求而通過處理日志和日志聚合來解決。

特性

Kafka是一種高吞吐量的分布式發布訂閱消息系統,有如下特性:

  • 通過O(1)的磁盤數據結構提供消息的持久化,這種結構對于即使數以TB的消息存儲也能夠保持長時間的穩定性能。
  • 高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒數百萬的消息。
  • 支持通過Kafka服務器和消費機集群來分區消息。
  • 支持Hadoop并行數據加載。

主要功能

  • 發布和訂閱消息流,這個功能類似于消息隊列,這也是kafka歸類為消息隊列框架的原因

  • 以容錯的方式記錄消息流,kafka以文件的方式來存儲消息流

  • 可以再消息發布的時候進行處理

使用場景

  • 在系統或應用程序之間構建可靠的用于傳輸實時數據的管道,消息隊列功能

  • 構建實時的流數據處理程序來變換或處理數據流,數據處理功能

消息傳輸流程

圖片描述

相關術語介紹

  • Broker
    Kafka集群包含一個或多個服務器,這種服務器被稱為broker
  • Topic
    每條發布到Kafka集群的消息都有一個類別,這個類別被稱為Topic。(物理上不同Topic的消息分開存儲,邏輯上一個Topic的消息雖然保存于一個或多個broker上但用戶只需指定消息的Topic即可生產或消費數據而不必關心數據存于何處)
  • Partition
    Partition是物理上的概念,每個Topic包含一個或多個Partition.
  • Producer
    負責發布消息到Kafka broker
  • Consumer
    消息消費者,向Kafka broker讀取消息的客戶端。
  • Consumer Group
    每個Consumer屬于一個特定的Consumer Group(可為每個Consumer指定group name,若不指定group name則屬于默認的group)
Kafka安裝

環境

Linux、JDK、Zookeeper

下載二進制程序

wget https://archive.apache.org/dist/kafka/0.10.0.1/kafka_2.11-0.10.0.1.tgz

安裝

tar -zxvf kafka_2.11-0.10.0.1.tgz
cd kafka_2.11-0.10.0.1

目錄說明

bin 啟動,停止等命令
config 配置文件
libs 類庫

參數說明

#########################參數解釋##############################

broker.id=0  #當前機器在集群中的唯一標識,和zookeeper的myid性質一樣

port=9092 #當前kafka對外提供服務的端口默認是9092

host.name=192.168.1.170 #這個參數默認是關閉的

num.network.threads=3 #這個是borker進行網絡處理的線程數

num.io.threads=8 #這個是borker進行I/O處理的線程數

log.dirs=/opt/kafka/kafkalogs/ #消息存放的目錄,這個目錄可以配置為“,”逗號分割的表達式,上面的num.io.threads要大于這個目錄的個數這個目錄,如果配置多個目錄,新創建的topic他把消息持久化的地方是,當前以逗號分割的目錄中,那個分區數最少就放那一個

socket.send.buffer.bytes=102400 #發送緩沖區buffer大小,數據不是一下子就發送的,先回存儲到緩沖區了到達一定的大小后在發送,能提高性能

socket.receive.buffer.bytes=102400 #kafka接收緩沖區大小,當數據到達一定大小后在序列化到磁盤

socket.request.max.bytes=104857600 #這個參數是向kafka請求消息或者向kafka發送消息的請請求的最大數,這個值不能超過java的堆棧大小

num.partitions=1 #默認的分區數,一個topic默認1個分區數

log.retention.hours=168 #默認消息的最大持久化時間,168小時,7天

message.max.byte=5242880  #消息保存的最大值5M

default.replication.factor=2  #kafka保存消息的副本數,如果一個副本失效了,另一個還可以繼續提供服務

replica.fetch.max.bytes=5242880  #取消息的最大直接數

log.segment.bytes=1073741824 #這個參數是:因為kafka的消息是以追加的形式落地到文件,當超過這個值的時候,kafka會新起一個文件

log.retention.check.interval.ms=300000 #每隔300000毫秒去檢查上面配置的log失效時間(log.retention.hours=168 ),到目錄查看是否有過期的消息如果有,刪除

log.cleaner.enable=false #是否啟用log壓縮,一般不用啟用,啟用的話可以提高性能

zookeeper.connect=192.168.1.180:12181,192.168.1.181:12181,192.168.1.182:1218 #設置zookeeper的連接端口、如果非集群配置一個地址即可

#########################參數解釋##############################

啟動kafka

啟動kafka之前要啟動相應的zookeeper集群、自行安裝,這里不做說明。

#進入到kafka的bin目錄
./kafka-server-start.sh -daemon ../config/server.properties
Kafka集成

環境

spring-boot、elasticsearch、kafka

pom.xml引入:

<!-- kafka 消息隊列 -->
<dependency>
<groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>1.1.1.RELEASE</version>
</dependency>

生產者

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
/**
 * 生產者
 * 創建者 科幫網
 * 創建時間 2018年2月4日
 */
@Configuration
@EnableKafka
public class KafkaProducerConfig {

    @Value("${kafka.producer.servers}")
    private String servers;
    @Value("${kafka.producer.retries}")
    private int retries;
    @Value("${kafka.producer.batch.size}")
    private int batchSize;
    @Value("${kafka.producer.linger}")
    private int linger;
    @Value("${kafka.producer.buffer.memory}")
    private int bufferMemory;

    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<String, String>(producerFactory());
    }
}

消費者

mport java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
/**
 * 消費者
 * 創建者 科幫網
 * 創建時間 2018年2月4日
 */
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    @Value("${kafka.consumer.servers}")
    private String servers;
    @Value("${kafka.consumer.enable.auto.commit}")
    private boolean enableAutoCommit;
    @Value("${kafka.consumer.session.timeout}")
    private String sessionTimeout;
    @Value("${kafka.consumer.auto.commit.interval}")
    private String autoCommitInterval;
    @Value("${kafka.consumer.group.id}")
    private String groupId;
    @Value("${kafka.consumer.auto.offset.reset}")
    private String autoOffsetReset;
    @Value("${kafka.consumer.concurrency}")
    private int concurrency;
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setPollTimeout(1500);
        return factory;
    }

    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        return propsMap;
    }

    @Bean
    public Listener listener() {
        return new Listener();
    }
}

日志監聽

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import com.itstyle.es.common.utils.JsonMapper;
import com.itstyle.es.log.entity.SysLogs;
import com.itstyle.es.log.repository.ElasticLogRepository;
/**
 * 掃描監聽
 * 創建者 科幫網
 * 創建時間 2018年2月4日
 */
@Component
public class Listener {
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    private  ElasticLogRepository elasticLogRepository;

    @KafkaListener(topics = {"itstyle"})
    public void listen(ConsumerRecord<?, ?> record) {
        logger.info("kafka的key: " + record.key());
        logger.info("kafka的value: " + record.value());
        if(record.key().equals("itstyle_log")){
            try {
                SysLogs log = JsonMapper.fromJsonString(record.value().toString(), SysLogs.class);
                logger.info("kafka保存日志: " + log.getUsername());
                elasticLogRepository.save(log);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

測試日志傳輸

  /**
    * kafka 日志隊列測試接口
    */
   @GetMapping(value="kafkaLog")
   public @ResponseBody String kafkaLog() {
        SysLogs log = new SysLogs();
        log.setUsername("紅薯");
        log.setOperation("開源中國社區");
        log.setMethod("com.itstyle.es.log.controller.kafkaLog()");
        log.setIp("192.168.1.80");
        log.setGmtCreate(new Timestamp(new Date().getTime()));
        log.setExceptionDetail("開源中國社區");
        log.setParams("{'name':'碼云','type':'開源'}");
        log.setDeviceType((short)1);
        log.setPlatFrom((short)1);
        log.setLogType((short)1);
        log.setDeviceType((short)1);
        log.setId((long)200000);
        log.setUserId((long)1);
        log.setTime((long)1);
        //模擬日志隊列實現
        String json = JsonMapper.toJsonString(log);
        kafkaTemplate.send("itstyle", "itstyle_log",json);
        return "success";
   }
Kafka與Redis

之前簡單的介紹過,JavaWeb項目架構之Redis分布式日志隊列,有小伙伴們聊到, Redis PUB/SUB沒有任何可靠性保障,也不會持久化。當然了,原項目中僅僅是記錄日志,并不是十分重要的信息,可以有一定程度上的丟失

Kafka與Redis PUB/SUB之間最大的區別在于Kafka是一個完整的分布式發布訂閱消息系統,而Redis PUB/SUB只是一個組件而已。

使用場景

  • Redis PUB/SUB
    消息持久性需求不高、吞吐量要求不高、可以忍受數據丟失
  • Kafka
    高可用、高吞吐、持久性、多樣化的消費處理模型
開源項目源碼(參考):https://gitee.com/52itstyle/spring-boot-elasticsearch

作者: 小柒
分享是快樂的,也見證了個人成長歷程,文章大多都是工作經驗總結以及平時學習積累,基于自身認知不足之處在所難免,也請大家指正,共同進步。

點擊查看更多內容

本文原創發布于慕課網 ,轉載請注明出處,謝謝合作

9人點贊

若覺得本文不錯,就分享一下吧!

評論

相關文章推薦

正在加載中
意見反饋 幫助中心 APP下載
官方微信

舉報

0/150
提交
取消
lpl竞猜