漲姿勢了解一下Kafka消費位移可好?

摘要:Kafka中的位移是個極其重要的概念,因為數據一致性、準確性是一個很重要的語義,我們都不希望消息重複消費或者丟失。而位移就是控制消費進度的大佬。本文就詳細聊聊kafka消費位移的那些事,包括:

概念剖析

kafka的兩種位移

關於位移(Offset),其實在kafka的世界里有兩種位移:

  • 分區位移:生產者向分區寫入消息,每條消息在分區中的位置信息由一個叫offset的數據來表徵。假設一個生產者向一個空分區寫入了 10 條消息,那麼這 10 條消息的位移依次是 0、1、…、9;

  • 消費位移:消費者需要記錄消費進度,即消費到了哪個分區的哪個位置上,這是消費者位移(Consumer Offset)。

注意,這和上面所說的消息在分區上的位移完全不是一個概念。上面的“位移”表徵的是分區內的消息位置,它是不變的,即一旦消息被成功寫入到一個分區上,它的位移值就是固定的了。而消費者位移則不同,它可能是隨時變化的,畢竟它是消費者消費進度的指示器。

消費位移

消費位移,記錄的是 Consumer 要消費的下一條消息的位移,切記,是下一條消息的位移! 而不是目前最新消費消息的位移

假設一個分區中有 10 條消息,位移分別是 0 到 9。某個 Consumer 應用已消費了 5 條消息,這就說明該 Consumer 消費了位移為 0 到 4 的 5 條消息,此時 Consumer 的位移是 5,指向了下一條消息的位移。

至於為什麼要有消費位移,很好理解,當 Consumer 發生故障重啟之後,就能夠從 Kafka 中讀取之前提交的位移值,然後從相應的位移處繼續消費,從而避免整個消費過程重來一遍。就好像書籤一樣,需要書籤你才可以快速找到你上次讀書的位置。

那麼了解了位移是什麼以及它的重要性,我們自然而然會有一個疑問,kafka是怎麼記錄、怎麼保存、怎麼管理位移的呢?

位移的提交

Consumer 需要上報自己的位移數據,這個彙報過程被稱為位移提交。因為 Consumer 能夠同時消費多個分區的數據,所以位移的提交實際上是在分區粒度上進行的,即Consumer 需要為分配給它的每個分區提交各自的位移數據。

鑒於位移提交甚至是位移管理對 Consumer 端的巨大影響,KafkaConsumer API提供了多種提交位移的方法,每一種都有各自的用途,這些都是本文將要談到的方案。

void commitSync(Duration timeout);
void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);
void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout);
void commitAsync();
void commitAsync(OffsetCommitCallback callback);
void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback);

先粗略的總結一下。位移提交分為自動提交和手動提交;而手動提交又分為同步提交和異步提交。

自動提交

當消費配置enable.auto.commit=true的時候代表自動提交位移。

自動提交位移是發生在什麼時候呢?auto.commit.interval.ms默認值是50000ms。即kafka每隔5s會幫你自動提交一次位移。自動位移提交的動作是在 poll()方法的邏輯里完成的,在每次真正向服務端發起拉取請求之前會檢查是否可以進行位移提交,如果可以,那麼就會提交上一次輪詢的位移。假如消費數據量特別大,可以設置的短一點。

越簡單的東西功能越不足,自動提交位移省事的同時肯定會帶來一些問題。自動提交帶來重複消費和消息丟失的問題:

  • 重複消費: 在默認情況下,Consumer 每 5 秒自動提交一次位移。現在,我們假設提交位移之後的 3 秒發生了 Rebalance 操作。在 Rebalance 之後,所有 Consumer 從上一次提交的位移處繼續消費,但該位移已經是 3 秒前的位移數據了,故在 Rebalance 發生前 3 秒消費的所有數據都要重新再消費一次。雖然你能夠通過減少 auto.commit.interval.ms 的值來提高提交頻率,但這麼做只能縮小重複消費的時間窗口,不可能完全消除它。這是自動提交機制的一個缺陷。

  • 消息丟失: 假設拉取了100條消息,正在處理第50條消息的時候,到達了自動提交窗口期,自動提交線程將拉取到的每個分區的最大消息位移進行提交,如果此時消費服務掛掉,消息並未處理結束,但卻提交了最大位移,下次重啟就從100條那消費,即發生了50-100條的消息丟失。

手動提交

當消費配置enable.auto.commit=false的時候代表手動提交位移。用戶必須在適當的時機(一般是處理完業務邏輯后),手動的調用相關api方法提交位移。比如在下面的案例中,我需要確認我的業務邏輯返回true之後再手動提交位移

 while (true) {
     try {
         ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMinutes(KafkaConfig.pollTimeoutOfMinutes));
         if (!consumerRecords.isEmpty()) {
             for (ConsumerRecord<String, String> record : consumerRecords) {
                 KafkaMessage kafkaMessage = JSON.parseObject(record.value(), KafkaMessage.class);
                 // 處理業務
                 boolean handleResult = handle(kafkaMessage);
                 if (handleResult) {
                     log.info(" handle success, kafkaMessage={}" ,kafkaMessage);
                 } else {
                     log.info(" handle failed, kafkaMessage={}" ,kafkaMessage);
                 }
             }
             // 手動提交offset
             consumer.commitSync(Duration.ofMinutes(KafkaConfig.pollTimeoutOfMinutes));
        
         } 
     } catch (Exception e) {
         log.info("kafka consume error." ,e);
     }
 }

手動提交明顯能解決消息丟失的問題,因為你是處理完業務邏輯后再提交的,假如此時消費服務掛掉,消息並未處理結束,那麼重啟的時候還會重新消費。

但是對於業務層面的失敗導致消息未消費成功,是無法處理的。因為業務層的邏輯千變萬化、比如格式不正確,你叫Kafka消費端程序怎麼去處理?應該要業務層面自己處理,記錄失敗日誌做好監控等。

但是手動提交不能解決消息重複的問題,也很好理解,假如消費0-100條消息,50條時掛了,重啟後由於沒有提交這一批消息的offset,是會從0開始重新消費。至於如何避免重複消費的問題,在這篇文章有說。

手動提交又分為異步提交和同步提交。

同步提交

上面案例代碼使用的是commitSync() ,顧名思義,是同步提交位移的方法。同步提交位移Consumer 程序會處於阻塞狀態,等待 Broker 返回提交結果。同步模式下提交失敗的時候一直嘗試提交,直到遇到無法重試的情況下才會結束。在任何系統中,因為程序而非資源限制而導致的阻塞都可能是系統的瓶頸,會影響整個應用程序的 TPS。當然,你可以選擇拉長提交間隔,但這樣做的後果是 Consumer 的提交頻率下降,在下次 Consumer 重啟回來后,會有更多的消息被重新消費。因此,為了解決這些不足,kafka還提供了異步提交方法。

異步提交

異步提交會立即返回,不會阻塞,因此不會影響 Consumer 應用的 TPS。由於它是異步的,Kafka 提供了回調函數,供你實現提交之後的邏輯,比如記錄日誌或處理異常等。下面這段代碼展示了調用 commitAsync() 的方法

 consumer.commitAsync((offsets, exception) -> {
 if (exception != null)
     handleException(exception);
 });

但是異步提交會有一個問題,那就是它沒有重試機制,不過一般情況下,針對偶爾出現的提交失敗,不進行重試不會有太大問題,因為如果提交失敗是因為臨時問題導致的,那麼後續的提交總會有成功的。所以消息也是不會丟失和重複消費的。
但如果這是發生在關閉消費者或再均衡前的最後一次提交,就要確保能夠提交成功。因此,組合使用commitAsync()commitSync()是最佳的方式。

try {
    while (true) {
        ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMinutes(KafkaConfig.pollTimeoutOfMinutes));
        if (!consumerRecords.isEmpty()) {
             for (ConsumerRecord<String, String> record : consumerRecords) {
                KafkaMessage kafkaMessage = JSON.parseObject(record.value(), KafkaMessage.class);
                boolean handleResult = handle(kafkaMessage);             
             }
             //異步提交位移               
             consumer.commitAsync((offsets, exception) -> {
             if (exception != null)
                 handleException(exception);
             });
           
        }
    }
} catch (Exception e) {
    System.out.println("kafka consumer error:" + e.toString());
} finally {
    try {
        //最後同步提交位移
        consumer.commitSync();
    } finally {
        consumer.close();
    }
}

讓位移提交更加靈活和可控

如果細心的閱讀了上面所有demo的代碼,那麼你會發現這樣幾個問題:

1、所有的提交,都是提交 poll 方法返回的所有消息的位移,poll 方法一次返回1000 條消息,則一次性地將這 1000 條消息的位移一併提交。可這樣一旦中間出現問題,位移沒有提交,下次會重新消費已經處理成功的數據。所以我想做到細粒度控制,比如每次提交100條,該怎麼辦?

答:可以通過commitSync(Map<TopicPartition, OffsetAndMetadata>)commitAsync(Map<TopicPartition, OffsetAndMetadata>)對位移進行精確控制。

2、poll和commit方法對於普通的開發人員而言是一個黑盒,無法精確地掌控其消費的具體位置。我都不知道這次的提交,是針對哪個partition,提交上去的offset是多少。

答:可以通過record.topic()獲取topic信息, record.partition()獲取分區信息,record.offset() + 1獲取消費位移,記住消費位移是指示下一條消費的位移,所以要加一。

3、我想自己管理offset怎麼辦?一方面更加保險,一方面下次重啟之後可以精準的從數據庫讀取最後的offset就不存在丟失和重複消費了。
答:可以將消費位移保存在數據庫中。消費端程序使用comsumer.seek方法指定從某個位移開始消費。

綜合以上幾個可優化點,並結合全文,可以給出一個比較完美且完整的demo:聯合異步提交和同步提交,對處理過程中所有的異常都進行了處理。細粒度的控制了消費位移的提交,並且保守的將消費位移記錄到了數據庫中,重新啟動消費端程序的時候會從數據庫讀取位移。這也是我們消費端程序位移提交的最佳實踐方案。你只要繼承這個抽象類,實現你具體的業務邏輯就可以了。

public abstract class PrefectCosumer {
    private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
    int count = 0;
    public final void consume() {
        Properties properties = PropertiesConfig.getConsumerProperties();
        properties.put("group.id", getGroupId());
        Consumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(getTopics());
        consumer.poll(0);
        // 把offset記錄到數據庫中 從指定的offset處消費 
        consumer.partitionsFor(getTopics()).stream().map(info ->
        new TopicPartition(getTopics(), info.partition()))
        .forEach(tp -> {
               consumer.seek(tp, JdbcUtils.queryOffset().get(tp.partition()));   
         });
        try {
            while (true) {
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMinutes(KafkaConfig.pollTimeoutOfMinutes));
                if (!consumerRecords.isEmpty()) {
                    for (ConsumerRecord<String, String> record : consumerRecords) {

                        KafkaMessage kafkaMessage = JSON.parseObject(record.value(), KafkaMessage.class);
                        boolean handleResult = handle(kafkaMessage);
                        if (handleResult) {
                            //注意:提交的是下一條消息的位移。所以OffsetAndMetadata 對象時,必須使用當前消息位移加 1。
                            offsets.put(new TopicPartition(record.topic(), record.partition()),
                                    new OffsetAndMetadata(record.offset() + 1));

                            // 細粒度控制提交 每10條提交一次offset
                            if (count % 10 == 0) {
                                // 異步提交offset
                                consumer.commitAsync(offsets, (offsets, exception) -> {
                                    if (exception != null) {
                                        handleException(exception);
                                    }
                                    // 將消費位移再記錄一份到數據庫中
                                    offsets.forEach((k, v) -> {
                                        String s = "insert into kafka_offset(`topic`,`group_id`,`partition_id`,`offset`) values" +
                                                " ('" + k.topic() + "','" + getGroupId() + "'," + k.partition() + "," + v.offset() + ")" +
                                                " on duplicate key update offset=values(offset);";
                                        JdbcUtils.insertTable(s);
                                    });


                                });
                            }
                            count++;
                        } else {         
                            System.out.println("消費消息失敗 kafkaMessage={}" + getTopics() + getGroupId() + kafkaMessage.toString());                         
                        }
                    }


                }
            }
        } catch (Exception e) {
            System.out.println("kafka consumer error:" + e.toString());
        } finally {
            try {
                // 最後一次提交 使用同步提交offset
                consumer.commitSync();
            } finally {
                consumer.close();
            }


        }
    }


    /**
     * 具體的業務邏輯
     *
     * @param kafkaMessage
     * @return
     */
    public abstract boolean handle(KafkaMessage kafkaMessage);

    public abstract List<String> getTopics();

    public abstract String getGroupId();

    void handleException(Exception e) {
        //異常處理
    }
}

控制位移提交的N種方式

剛剛我們說自己控制位移,使用seek方法可以指定offset消費。那到底怎麼控制位移?怎麼重設消費組位移?seek是什麼?現在就來仔細說說。

並不是所有的消息隊列都可以重設消費者組位移達到重新消費的目的。比如傳統的RabbitMq,它們處理消息是一次性的,即一旦消息被成功消費,就會被刪除。而Kafka消費消息是可以重演的,因為它是基於日誌結構(log-based)的消息引擎,消費者在消費消息時,僅僅是從磁盤文件上讀取數據而已,所以消費者不會刪除消息數據。同時,由於位移數據是由消費者控制的,因此它能夠很容易地修改位移的值,實現重複消費歷史數據的功能。

了解如何重設位移是很重要的。假設這麼一個場景,我已經消費了1000條消息后,我發現處理邏輯錯了,所以我需要重新消費一下,可是位移已經提交了,我到底該怎麼重新消費這1000條呢??假設我想從某個時間點開始消費,我又該如何處理呢?

首先說個誤區:auto.offset.reset=earliest/latest這個參數大家都很熟悉,但是初學者很容易誤會它。大部分朋友都覺得在任何情況下把這兩個值設置為earliest或者latest ,消費者就可以從最早或者最新的offset開始消費,但實際上並不是那麼回事,他們生效都有一個前提條件,那就是對於同一個groupid的消費者,如果這個topic某個分區有已經提交的offset,那麼無論是把auto.offset.reset=earliest還是latest,都將失效,消費者會從已經提交的offset開始消費。因此這個參數並不能解決用戶想重設消費位移的需求。

kafka有七種控制消費組消費offset的策略,主要分為位移維度和時間維度,包括:

  • 位移維度。這是指根據位移值來重設。也就是說,直接把消費者的位移值重設成我們給定的位移值。包括Earliest/Latest/Current/Specified-Offset/Shift-By-N策略

  • 時間維度。我們可以給定一個時間,讓消費者把位移調整成大於該時間的最小位移;也可以給出一段時間間隔,比如 30 分鐘前,然後讓消費者直接將位移調回 30 分鐘之前的位移值。包括DateTime和Duration策略

說完了重設策略,我們就來看一下具體應該如何實現,可以從兩個角度,API方式和命令行方式。

重設位移的方法之API方式

API方式只要記住用seek方法就可以了,包括seek,seekToBeginning 和 seekToEnd。

void seek(TopicPartition partition, long offset);    
void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata);    
void seekToBeginning(Collection<TopicPartition> partitions);    
void seekToEnd(Collection<TopicPartition> partitions);    

從方法簽名我們可以看出seekToBeginningseekToEnd是可以一次性重設n個分區的位移,而seek 只允許重設指定分區的位移,即為每個分區都單獨設置位移,因為不難得出,如果要自定義每個分區的位移值則用seek,如果希望kafka幫你批量重設所有分區位移,比如從最新數據消費或者從最早數據消費,那麼用seekToEnd和seekToBeginning。

Earliest 策略:從最早的數據開始消費

從主題當前最早位移處開始消費,這個最早位移不一定就是 0 ,因為很久遠的消息會被 Kafka 自動刪除,主要取決於你的刪除配置。

代碼如下:

Properties properties = PropertiesConfig.getConsumerProperties();
properties.put("group.id", getGroupId());
Consumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(getTopics());
consumer.poll(0);
consumer.seekToBeginning(
consumer.partitionsFor(getTopics()).stream().map(partitionInfo ->
   new TopicPartition(getTopics(), partitionInfo.partition()))
   .collect(Collectors.toList()));

首先是構造consumer對象,這樣我們可以通過partitionsFor獲取到分區的信息,然後我們就可以構造出TopicPartition集合,傳給seekToBegining方法。需要注意的一個地方是:需要用consumer.poll(0),而不能用consumer.poll(Duration.ofMillis(0))

在poll(0)中consumer會一直阻塞直到它成功獲取了所需的元數據信息,之後它才會發起fetch請求去獲取數據。而poll(Duration)會把元數據獲取也計入整個超時時間。由於本例中使用的是0,即瞬時超時,因此consumer根本無法在這麼短的時間內連接上coordinator,所以只能趕在超時前返回一個空集合。

Latest策略:從最新的數據開始消費

    consumer.seekToEnd(
        consumer.partitionsFor(getTopics().get(0)).stream().map(partitionInfo ->
            new TopicPartition(getTopics().get(0), partitionInfo.partition()))
              .collect(Collectors.toList()));

Current策略:從當前已經提交的offset處消費

consumer.partitionsFor(getTopics().get(0)).stream().map(info ->
        new TopicPartition(getTopics().get(0), info.partition()))
        .forEach(tp -> {
            long committedOffset = consumer.committed(tp).offset();
            consumer.seek(tp, committedOffset);
        });

**Special-offset策略:從指定的offset處消費 **

該策略使用的方法和current策略一樣,區別在於,current策略是直接從kafka元信息中讀取中已經提交的offset值,而special策略需要用戶自己為每一個分區指定offset值,我們一般是把offset記錄到數據庫中然後可以從數據庫去讀取這個值

    consumer.partitionsFor(getTopics().get(0)).stream().map(info ->
                new TopicPartition(getTopics().get(0), info.partition()))
                .forEach(tp -> {
                    try {
                        consumer.seek(tp, JdbcUtils.queryOffset().get(tp.partition()));
                    } catch (SQLException e) {
                        e.printStackTrace();
                    }
                });

以上演示了用API方式重設位移,演示了四種常見策略的代碼,另外三種沒有演示,一方面是大同小異,另一方面在實際生產中,用API的方式不太可能去做時間維度的重設,而基本都是用命令行方式。

重設位移的方法之命令行方式

命令行方式重設位移是通過 kafka-consumer-groups 腳本。比起 API 的方式,用命令行重設位移要簡單得多。

Earliest 策略指定–to-earliest。

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-earliest –execute

Latest 策略指定–to-latest。

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-latest --execute

Current 策略指定–to-current。

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-current --execute

Specified-Offset 策略指定–to-offset。

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-offset <offset> --execute

Shift-By-N 策略指定–shift-by N。

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --shift-by <offset_N> --execute

DateTime 策略指定–to-datetime。

DateTime 允許你指定一個時間,然後將位移重置到該時間之後的最早位移處。常見的使用場景是,你想重新消費昨天的數據,那麼你可以使用該策略重設位移到昨天 0 點。

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --to-datetime 2019-06-20T20:00:00.000 --execute

Duration 策略指定–by-duration。
Duration 策略則是指給定相對的時間間隔,然後將位移調整到距離當前給定時間間隔的位移處,具體格式是 PnDTnHnMnS。如果你熟悉 Java 8 引入的 Duration 類的話,你應該不會對這個格式感到陌生。它就是一個符合 ISO-8601 規範的 Duration 格式,以字母 P 開頭,後面由 4 部分組成,即 D、H、M 和 S,分別表示天、小時、分鐘和秒。舉個例子,如果你想將位移調回到 15 分鐘前,那麼你就可以指定 PT0H15M0S

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --by-duration PT0H30M0S --execute

提交的位移都去哪了?

通過上面那幾部分的內容,我們已經搞懂了位移提交的方方面面,那麼提交的位移它保存在哪裡呢?這就要去位移主題的的世界里一探究竟了。kafka把位移保存在一個叫做__consumer_offsets的內部主題中,叫做位移主題。

注意:老版本的kafka其實是把位移保存在zookeeper中的,但是zookeeper並不適合這種高頻寫的場景。所以新版本已經是改進了這個方案,直接保存到kafka。畢竟kafka本身就適合高頻寫的場景,並且kafka也可以保證高可用性和高持久性。

既然它也是主題,那麼離不開分區和副本這兩個機制。我們並沒有手動創建這個主題並且指定,所以是kafka自動創建的, 分區的數量取決於Broker 端參數 offsets.topic.num.partitions,默認是50個分區,而副本參數取決於offsets.topic.replication.factor,默認是3。

既然也是主題,肯定會有消息,那麼消息格式是什麼呢?參考前面我們手動設計將位移寫入數據庫的方案,我們保存了topic,group_id,partition,offset四個字段。topic,group_id,partition無疑是數據表中的聯合主鍵,而offset是不斷更新的。無疑kafka的位移主題消息也是類似這種設計。key也是那三個字段,而消息體其實很複雜,你可以先簡單理解為就是offset。

既然也是主題,肯定也會有刪除策略,否則消息會無限膨脹。但是位移主題的刪除策略和其他主題刪除策略又不太一樣。我們知道普通主題的刪除是可以通過配置刪除時間或者大小的。而位移主題的刪除,叫做 Compaction。Kafka 使用Compact 策略來刪除位移主題中的過期消息,對於同一個 Key 的兩條消息 M1 和 M2,如果 M1 的發送時間早於 M2,那麼 M1 就是過期消息。Compact 的過程就是掃描日誌的所有消息,剔除那些過期的消息,然後把剩下的消息整理在一起。

Kafka 提供了專門的後台線程定期地巡檢待 Compact 的主題,看看是否存在滿足條件的可刪除數據。這個後台線程叫 Log Cleaner。很多實際生產環境中都出現過位移主題無限膨脹佔用過多磁盤空間的問題,如果你的環境中也有這個問題,我建議你去檢查一下 Log Cleaner 線程的狀態,通常都是這個線程掛掉了導致的。

總結

kafka的位移是個極其重要的概念,控制着消費進度,也即控制着消費的準確性,完整性,為了保證消息不重複和不丟失。我們最好做到以下幾點:

  • 手動提交位移。

  • 手動提交有異步提交和同步提交兩種方式,既然兩者有利也有弊,那麼我們可以結合起來使用。

  • 細粒度的控制消費位移的提交,這樣可以避免重複消費的問題。

  • 保守的將消費位移再記錄到了數據庫中,重新啟動消費端程序的時候從數據庫讀取位移。

獲取Kafka全套原創學習資料及思維導圖,關注【胖滾豬學編程】公眾號,回復”kafka”。

本文來源於公眾號:【胖滾豬學編程】。一枚集顏值與才華於一身,不算聰明卻足夠努力的女程序媛。用漫畫形式讓編程so easy and interesting!求關注!

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※Google地圖已可更新顯示潭子電動車充電站設置地點!!

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※別再煩惱如何寫文案,掌握八大原則!

網頁設計最專業,超強功能平台可客製化