RocketMQ系列(六)批量發送與過濾

今天我們再來看看RocketMQ的另外兩個小功能,消息的批量發送和過濾。這兩個小功能提升了我們使用RocketMQ的效率。

批量發送

以前我們發送消息的時候,都是一個一個的發送,這樣效率比較低下。能不能一次發送多個消息呢?當然是可以的,RocketMQ為我們提供了這樣的功能。但是它也有一些使用的條件:

  • 同一批發送的消息的Topic必須相同;
  • 同一批消息的waitStoreMsgOK 必須相同;
  • 批量發送的消息不支持延遲,就是上一節說的延遲消息;
  • 同一批次的消息,大小不能超過1MiB;

好了,只要我們滿足上面的這些限制,就可以使用批量發送了,我們來看看發送端的代碼吧,

@Test
public void producerBatch() throws Exception {

    List<Message> messages = new ArrayList<>();
    for (int i = 0;i<3;i++) {
        MessageExt message = new MessageExt();
        message.setTopic("cluster-topic");
        message.setKeys("key-"+i);
        message.setBody(("this is batchMQ,my NO is "+i+"---"+new Date()).getBytes());
        messages.add(message);
    }
    SendResult sendResult = defaultMQProducer.send(messages);
    System.out.println("sendResult:" + sendResult.getSendStatus().toString());
}
  • 其實批量發送很簡單,我們只是把消息放到一個List當中,然後統一的調用send方法發送就可以了。

再來看看消費端的代碼,

@Bean(initMethod = "start",destroyMethod = "shutdown")
public DefaultMQPushConsumer pushConsumer()  {
    try {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DefaultMQPushConsumer");
        consumer.setNamesrvAddr("192.168.73.130:9876;192.168.73.131:9876;192.168.73.132:9876;");
        consumer.subscribe("cluster-topic", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println("msgs.size():"+msgs.size());
                if (msgs != null && msgs.size() > 0) {
                    for (MessageExt msg : msgs) {
                        System.out.println(new String(msg.getBody()));
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        return consumer;
    }catch (Exception e) {
        e.printStackTrace();
    }
    return null;
}
  • 消費端的代碼沒有任何的變化,正常的接收消息就可以了,我們只是打印出了msgs.size(),看看一次接收一個消息,還是一次可以批量的接收多個消息。

我們啟動項目,批量發送一下,看看效果吧,

發送端的日誌如下:

sendResult:SEND_OK

發送成功,看來我們批量發送的3個消息都進入到了隊列中,再看看消費端,是一次消費一個,還是一次消費3個,如下:

msgs.size():1
this is batchMQ,my NO is 0---Mon Jun 15 09:31:04 CST 2020
msgs.size():1
this is batchMQ,my NO is 1---Mon Jun 15 09:31:04 CST 2020
msgs.size():1
this is batchMQ,my NO is 2---Mon Jun 15 09:31:04 CST 2020

看樣子是一次只消費了一個消息,那麼能不能一次消費3個消息呢?當然是可以的,不過要進行特殊的設置,

consumer.setConsumeMessageBatchMaxSize(5);

在消費端,我們設置批量消費消息的數量是5,這個值默認是1。我們再看看消費端的日誌,

msgs.size():3
this is batchMQ,my NO is 0---Mon Jun 15 09:35:47 CST 2020
this is batchMQ,my NO is 1---Mon Jun 15 09:35:47 CST 2020
this is batchMQ,my NO is 2---Mon Jun 15 09:35:47 CST 2020

這次一次消費了3個消息,如果消息比較多的話,最大一次能消費5個。這就是RocketMQ的批量發送和批量消費。

消息過濾

其實我們在大多數情況下,使用tag標籤就能夠很好的實現消息過濾。雖然tag標籤咱們並沒有過多的介紹,其實也很好理解,就是一個子Topic的概念,咱們在構建消息message的時候,message.setTags("xxx")。然後在消費的時候,訂閱Topic的時候,也可以指定訂閱的tag,

consumer.subscribe("cluster-topic", "*");

看到那個”*”了嗎?它就是訂閱的tag,”*”代表全部的tag,如果您想訂閱其中的一個或幾個,可以使用這種方式”tagA || tagB || tagC”,這是訂閱了cluster-topic下的3個tag,其他的tag是不會被消費的。

這裏我們所說的消息過濾比tag要高級很多,是可以支持sql的,怎麼樣?高級吧。比如:我們訂閱”a > 5 and b = ‘abc'”的消息,如下圖:

但是,RocketMQ畢竟不是數據庫,它只能支持一些基礎的SQL語句,並不是所有的SQL都支持,

  • 数字型的支持,>, >=, <, <=, BETWEEN, =

  • 字符串支持,=, <>, IN

  • IS NULL或者IS NOT NULL

  • 邏輯判斷,ANDORNOT

字段的類型也只是簡單的幾種,

  • 数字型,支持123,543.123,整型、浮點都可以;
  • 字符串,必須使用單引號”括起來;
  • 空值,NULL;
  • 布爾型,TRUE或者FALSE;

並且對消費者的類型也有一定的限制,只能使用push consumer才可以進行消息過濾。好了,說了這麼多了,我們看看怎麼使用吧,消費端和生產端都要進行相應的改造,先看看生產端吧,

@Test
public void producerBatch() throws Exception {

    List<Message> messages = new ArrayList<>();
    for (int i = 0;i<3;i++) {
        MessageExt message = new MessageExt();
        message.setTopic("cluster-topic");
        message.setKeys("key-"+i);
        message.setBody(("this is batchMQ,my NO is "+i+"---"+new Date()).getBytes());

        int a = i+4;
        message.putUserProperty("a",String.valueOf(a));

        messages.add(message);
    }
    SendResult sendResult = defaultMQProducer.send(messages);
    System.out.println("sendResult:" + sendResult.getSendStatus().toString());
}

我們在之前批量發送的基礎上進行了修改,定義了a的值,等於i+4,這樣循環3次,a的值就是4,5,6。然後調用message.putUserProperty("a",String.valueOf(a))注意,在使用消息過濾的時候,這些附加的條件屬性都是通過putUserProperty方法進行設置。這裏,我們設置了a的值。再看看消費端,

consumer.subscribe("cluster-topic", MessageSelector.bySql("a > 5"));

消費端,整體上沒有變化,只是在訂閱的方法中,使用MessageSelector.bySql("a > 5"),進行了條件的過濾。有的小夥伴可能會有疑問,我既想用sql過濾又想用tag過濾怎麼辦?當然也是可以,我們可以使用MessageSelector.bySql("a > 5").byTag("xx),byTag和bySql不分前後,怎麼樣,很強大吧。我們運行一下程序,看看效果吧。

我們啟動一下服務,報錯了,怎麼回事?錯誤信息如下:

The broker does not support consumer to filter message by SQL92

隊列不支持過濾消息,我們查詢了RocketMQ源碼中的BrokerConfig類,這個類就是對broker的一些設置,其中發現了這兩個屬性,

// whether do filter when retry.
private boolean filterSupportRetry = false;
private boolean enablePropertyFilter = false;
  • filterSupportRetry是在重試的時候,是否支持filter;
  • enablePropertyFilter,這個就是是否支持過濾消息的屬性;

我們把這兩個屬性在broker的配置文件改為true吧,如下:

filterSupportRetry=true
enablePropertyFilter=true

然後,再重新部署一下我們兩主兩從的集群環境。環境部署完以後,我們再重啟應用,沒有報錯。在生產端發送一下消息看看吧,

sendResult:SEND_OK

生產端發送消息沒有問題,說明3個消息都發送成功了。再看看消費端的日誌,

msgs.size():1
this is batchMQ,my NO is 2---Mon Jun 15 10:59:37 CST 2020

只消費了一個消息,並且這個消息中i的值是2,那麼a的值就是2+4=6,它是>5的,滿足SQL的條件,所以被消費掉了。這完全符合我們的預期。

總結

今天的兩個小功能還是比較有意思的,但裡邊也有需要注意的地方,

  • 消息的批量發送,只要我們滿足它的條件,然後使用List發送就可以了;批量消費,默認的消費個數是1,我們可以調整它的值,這樣就可以一次消費多個消息了;
  • 過濾消息中,最大的坑就是隊列的配置里,需要設置enablePropertyFilter=true,否則消費端在啟動的時候報不支持SQL的錯誤;

我們在使用的時候,多加留意就可以了,有問題,評論區留言吧~

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※為什麼 USB CONNECTOR 是電子產業重要的元件?

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※台北網頁設計公司全省服務真心推薦

※想知道最厲害的網頁設計公司"嚨底家"!

※推薦評價好的iphone維修中心