CQRS之旅——旅程6(我們系統的版本管理)

旅程6:我們系統的版本管理

準備下一站:升級和遷移

“變化是生活的調味品。”威廉·考珀

此階段的最高目標是了解如何升級包含實現CQRS模式和事件源的限界上下文的系統。團隊在這一階段實現的用戶場景包括對代碼的更改和對數據的更改:更改了一些現有的數據模式並添加了新的數據模式。除了升級系統和遷移數據外,團隊還計劃在沒有停機時間的情況下進行升級和遷移,以便在Microsoft Azure中運行實時系統。

本章的工作術語定義:

本章使用了一些術語,我們將在下面進行描述。有關更多細節和可能的替代定義,請參閱參考指南中的“深入CQRS和ES”。

  • Command(命令):命令是要求系統執行更改系統狀態的操作。命令是必須服從(執行)的一種指令,例如:MakeSeatReservation。在這個限界上下文中,命令要麼來自用戶發起請求時的UI,要麼來自流程管理器(當流程管理器指示聚合執行某個操作時)。單個接收方處理一個命令。命令總線(command bus)傳輸命令,然後命令處理程序將這些命令發送到聚合。發送命令是一個沒有返回值的異步操作。

  • 事件(Event):一個事件,比如OrderConfirmed,描述了系統中發生的一些事情,通常是一個命令的結果。領域模型中的聚合引發事件。事件也可以來自其他限界上下文。多個訂閱者可以處理特定的事件。聚合將事件發布到事件總線。處理程序在事件總線上註冊特定類型的事件,然後將事件傳遞給訂閱服務器。在訂單和註冊限界上下文中,訂閱者是流程管理器和讀取模型生成器。

  • 冪等性(Idempotency):冪等性是一個操作的特性,這意味着該操作可以多次應用而不改變結果。例如,“將x的值設置為10”的操作是冪等的,而“將x的值加1”的操作不是冪等的。在消息傳遞環境中,如果消息可以多次傳遞而不改變結果,則消息是冪等的:這可能是因為消息本身的性質,也可能是因為系統處理消息的方式。

用戶故事:

在這個過程的這個階段,團隊實現了下面描述的用戶故事。

不停機升級

V2版本的目標是升級系統,包括任何必要的數據遷移,而不需要把系統停機。如果這在當前實現中不可行,那麼停機時間應該最小化,並且應該修改系統,以便在將來支持零停機時間升級(從V3版本開始)。

Beth(業務經理)發言:

確保我們能夠在不停機的情況下進行升級,這對我們在市場中的信譽至關重要。

显示剩餘座位數量

目前,當註冊者創建一個訂單時,沒有显示每種座位類型的剩餘座位數量。當註冊者選擇購買座位時,UI應該显示此信息。

處理不需要付費的座位

目前,當註冊者選擇不需要付費的座位時,UI流仍然會將註冊者帶到支付頁面,即使不需要支付任何費用。系統應該檢測什麼時候沒有支付,並調整流程,讓註冊者直接進入訂單的確認頁面。

架構

該應用程序旨在部署到Microsoft Azure。在旅程的那個階段,應用程序由兩個角色組成,一個包含ASP.Net MVC Web應用程序的web角色和一個包含消息處理程序和領域對象的工作角色。應用程序在寫端和讀端都使用Azure SQL DataBase實例進行數據存儲。應用程序使用Azure服務總線來提供其消息傳遞基礎設施。下圖展示了這個高級體繫結構。

在研究和測試解決方案時,可以在本地運行它,可以使用Azure compute emulator,也可以直接運行MVC web應用程序,並運行承載消息處理程序和領域域對象的控制台應用程序。在本地運行應用程序時,可以使用本地SQL Server Express數據庫,並使用一個在SQL Server Express數據庫實現的簡單的消息傳遞基礎設施。

有關運行應用程序的選項的更多信息,請參見附錄1“發布說明”。

模式和概念

在旅程的這個階段,團隊處理的大多數關鍵挑戰都與如何最好地執行從V1到V2的遷移有關。本節將介紹其中的一些挑戰。

處理“事件定義發生更改”的情況

當團隊檢查V2的發布需求,很明顯,我們需要改變在訂單和註冊限界上下文中使用的一些事件來適應一些新特性:RegistrationProcessManager將會改變,當訂單有一個不需要付費的座位時系統將提供一個更好的用戶體驗。

訂單和註冊限界上下文使用事件源,因此在遷移到V2之後,事件存儲將包含舊事件,但將開始保存新事件。當系統事件被重放時,系統必須能正確處理所有的舊事件和新事件。

團隊考慮了兩種方法來處理系統中的這類更改。

在基礎設施中進行事件映射或過濾

在基礎設施中映射和過濾事件消息是一種選擇。此方法是對舊的事件消息和消息格式進行處理,在它們到達領域之前在基礎設施的某個位置處理它們。您可以過濾掉不再相關的舊消息,並使用映射將舊格式的消息轉換為新格式。這種方法最初比較複雜,因為它需要對基礎設施進行更改,但是它可以保持領域域的純粹,領域只需要理解當前的新事件集合就可以了。

在聚合中處理多個版本的消息

在聚合中處理多個版本的消息是另一種選擇。在這種方法中,所有消息類型(包括舊消息和新消息)都傳遞到領域,每個聚合必須能夠處理舊消息和新消息。從短期來看,這可能是一個合適的策略,但它最終會導致域模型受到遺留事件處理程序的污染。

團隊為V2版本選擇了這個選項,因為它包含了最少數量的代碼更改。

Jana(軟件架構師)發言:

當前在聚合中處理舊事件和新事件並不妨礙您以後使用第一種選擇:在基礎設施中使用映射/過濾機制。

履行消息冪等性

V2版本中要解決的一個關鍵問題是使系統更加健壯。在V1版本中,在某些場景中,可能會多次處理某些消息,導致系統中的數據不正確或不一致。

Jana(軟件架構師)發言:

消息冪等性在任何使用消息傳遞的系統中都很重要,這不僅僅是在實現CQRS模式或使用事件源的系統中。

在某些場景中,設計冪等消息是可能的,例如:使用“將座位配額設置為500”的消息,而不是“在座位配額中增加100”的消息。您可以安全地多次處理第一個消息,但不能處理第二個消息。

然而,並不總是能夠使用冪等消息,因此團隊決定使用Azure服務總線的重複刪除特性,以確保它只傳遞一次消息。團隊對基礎設施進行了一些更改,以確保Azure服務總線能夠檢測重複消息,並配置Azure服務總線來執行重複消息檢測。

要了解Contoso是如何實現這一點的,請參閱下面的“不讓命令消息重複”一節。此外,我們需要考慮系統中的消息處理程序如何從隊列和Topic檢索消息。當前的方法使用Azure服務總線peek/lock機制。這是一個分成三個階段的過程:

  1. 處理程序從隊列或Topic檢索消息,並在其中留下消息的鎖定副本。其他客戶端無法看到或訪問鎖定的消息。
  2. 處理程序處理消息。
  3. 處理程序從隊列中刪除鎖定的消息。如果鎖定的消息在固定時間后沒有解鎖或刪除,則解鎖該消息並使其可用,以便再次檢索。

如果步驟由於某種原因失敗,這意味着系統可以不止一次地處理消息。

Jana(軟件架構師)發言:

該團隊計劃在旅程的下一階段解決這個問題(步驟失敗的問題)。更多信息,請參見第7章“添加彈性和優化性能”。

阻止多次處理事件

在V1中,在某些場景里,如果在處理事件時發生錯誤,系統可能多次處理事件。為了避免這種情況,團隊修改了體繫結構,以便每個事件處理程序都有自己對Azure Topic的訂閱。下圖显示了兩個不同的模型。

在V1中,可能發生以下行為:

  1. EventProcessor實例從服務總線中的所有訂閱者那裡接收到OrderPlaced事件。
  2. EventProcessor實例有兩個已註冊的處理程序,RegistrationProcessManagerRouterOrderViewModelGenerator處理程序類,所以會在兩個裡都觸發調用Handle方法。
  3. OrderViewModelGenerator類中的Handle方法執行成功。
  4. RegistrationProcessManagerRouter類中的Handle方法拋出異常。
  5. EventProcessor實例捕獲到異常然後拋棄掉事件消息。消息將自動放回訂閱中。
  6. EventProcessor實例第二次從所有訂閱者那裡接收到OrderPlaced事件。
  7. 事件又觸發兩個處理方法,導致RegistrationProcessManagerRouter類和OrderViewModelGenerator第二次處理事件消息。
  8. 每當RegistrationProcessManagerRouter類拋出異常時,OrderViewModelGenerator類都會觸發處理該事件。

在V2模型中,如果處理程序類拋出異常,EventProcessor實例將事件消息放回與該處理程序類關聯的訂閱。重試邏輯現在只會導致EventProcessor實例重試引發異常的處理程序,因此沒有其他處理程序會重新處理消息。

集成事件的持久化

在V1版本中提出的一個問題是,系統如何持久化從會議管理限界上下文發送到訂單和註冊限界上下文的集成事件。這些事件包括關於會議創建和發布的信息,以及座位類型和配額更改的詳細信息。

在V1版本中,訂單和註冊上下文中的ConferenceViewModelGenerator類通過更新視圖模型並向SeatsAvailability聚合發送命令來處理這些事件,以告訴它更改座位配額值。

這種方法意味着訂單和註冊限界上下文不存儲任何歷史記錄,這可能會導致問題。例如,其他視圖從這裏中查找座椅類型描述時,這裏只包含座椅類型描述的最新值。因此,在其他地方重播一組事件可能會重新生成另一個包含不正確座椅類型描述的讀取模型投影。

團隊考慮了以下五個方法來糾正這種情況:

  • 將所有事件保存在原始限界上下文中(會議管理限界上下文中),並使用共享的事件存儲,訂單和註冊限界上下文中可以訪問該存儲來重播這些事件。接收限界上下文可以重放事件流,直到它需要查看的之前的座椅類型描述時為止。
  • 當所有事件到達接收限界上下文(訂單和註冊限界上下文)時保存它們。
  • 讓視圖模型生成器中的命令處理程序保存事件,只選擇它需要的那些。
  • 讓視圖模型生成器中的命令處理程序保存不同的事件,實際上就是為此視圖模型使用事件源。
  • 將來自所有限界上下文的所有命令和事件消息存儲在消息日誌中。

第一種選擇並不總是可行的。在這種特殊情況下,它可以工作,因為同一個團隊同時實現了限界上下文和基礎設施,使得使用共享事件存儲變得很容易。

Gary(CQRS專家)發言:

儘管從純粹主義者的角度來看,第一個選項破壞了限界上下文之間的嚴格隔離,但在某些場景中,它可能是一個可接受的實用解決方案。

第三種選擇可能存在的風險是,所需的事件集合可能在未來發生變化。如果我們現在不保存事件,它們將永遠丟失。

儘管第五個選項存儲了所有命令和事件,其中一些可能永遠都不需要再次引用,但它確實提供了一個完整的日誌,記錄了系統中發生的所有事情。這對於故障診斷很有用,還可以幫助您滿足尚未確定的需求。該團隊選擇了這個選項而不是選項二,因為它提供了一個更通用的機制,可能具有未來的好處。

持久化事件的目的是,當訂單和註冊上下文需要有關當前座位配額的信息時,可以回放這些事件,以便計算剩餘座位的數量。要一致地計算這些数字,必須始終以相同的順序回放事件。這種順序有幾種選擇:

  • 會議管理限界上下文發送事件的順序。
  • 訂單和註冊上下文接收事件的順序。
  • 訂單和註冊上下文處理事件的順序。

大多數情況下,這些順序是相同的。沒有什麼正確的順序。你只需要選擇一個和它保持一致就行了。因此,選擇由簡單性決定。在本例中,最簡單的方法是按照訂單和註冊限界上下文中處理程序接收事件的順序持久化事件(第二個選項)。

Markus(軟件開發人員)發言:

這種選擇通常不會出現在事件源中。每個聚合會都以固定的順序創建事件,這就是系統用於持久存儲事件的順序。在此場景中,集成事件不是由單個聚合創建的。

為這些事件保存時間戳也有類似的問題。如果將來需要查看特定時間剩餘的座位數量,那麼時間戳可能會很有用。這裏的選擇是,當事件在會議管理限界上下文中創建時,還是在訂單和註冊限界上下文中接收時,應該創建時間戳?當會議管理限界上下文創建事件時,訂單和註冊限界上下文可能由於某種原因離線。因此,團隊決定在會議管理有界上下文發布事件時創建時間戳。

消息排序

團隊創建並運行來驗證V1版本的驗收測試,凸顯出了消息排序的一個潛在問題:執行會議管理限界上下文的驗收測試向訂單和註冊限界上下文發送了一系列命令,這些命令有時會出現順序錯誤。

Markus(軟件開發人員)發言:

當人類用戶真實測試系統的這一部分時,不太會注意到這種效果,因為發出命令的時間間隔要長得多,這使得消息不太可能無序地到達。

團隊考慮了兩種方法來確保消息以正確的順序到達。

  • 第一個方法是使用消息會話,這是Azure服務總線的一個特性。如果您使用消息會話,這將確保會話內的消息以與它們發送時相同的順序傳遞。
  • 第二種方法是修改應用程序中的處理程序,通過使用發送消息時添加到消息中的序列號或時間戳來檢測無序消息。如果接收處理程序檢測到一條無序消息,它將拒絕該消息,並在處理了在被拒絕消息之前發送的消息之後,將其放回稍後處理的隊列或Topic。

在這種情況下,首選的解決方案是使用Azure服務總線消息會話,因為這隻需要對現有代碼進行更少的更改。這兩種方法都會給消息傳遞帶來一些額外的延遲,但是團隊並不認為這會對系統的性能產生顯著的影響。

實現細節

本節描述訂單和註冊限界上下文的實現的一些重要功能。您可能會發現擁有一份代碼拷貝很有用,這樣您就可以繼續學習了。您可以從Download center下載一個副本,或者在GitHub上查看存儲庫:https://github.com/mspnp/cqrs-journey-code。您可以從GitHub上的Tags頁面下載V2版本的代碼。

備註:不要期望代碼示例與參考實現中的代碼完全匹配。本章描述了CQRS過程中的一個步驟,隨着我們了解更多並重構代碼,實現可能會發生變化。

**添加對“不需要支付的訂單”的支持

做出這一改變有三個具體的目標,它們都是相關的。我們希望:

  • 修改RegistrationProcessManager類和相關聚合,以處理不需要支付的訂單。
  • 修改UI中的導航,當訂單不需要支付時跳過付款步驟。
  • 確保系統在升級到V2之後能夠正確地工作,包括使用新事件和舊事件。

RegistrationProcessManager類的更改

在此之前,RegistrationProcessManager類在收到來自UI的註冊者已完成支付的通知后發送了一個ConfirmOrderPayment命令。現在,如果有一個不需要支付訂單,UI將直接向訂單聚合發送一個ConfirmOrder命令。如果訂單需要支付,RegistrationProcessManager類在從UI接收到成功支付的通知后,再向訂單聚合發送一個ConfirmOrder命令。

Jana(軟件架構師)發言:

注意,命令的名稱已從ConfirmOrderPayment更改為ConfirmOrder。這反映了訂單不需要知道任何關於付款的信息。它只需要知道訂單已經確認。類似地,現在有一個新的OrderConfirmed事件用於替代舊的OrderPaymentConfirmed事件。

當訂單聚合接收到ConfirmOrder命令時,它將引發一個OrderConfirmed事件。除被持久化外,該事件還由以下對象處理:

  • OrderViewModelGenerator類,它在其中更新讀取模型中的訂單狀態。
  • SeatAssignments聚合,在其中初始化一個新的SeatAssignments實例。
  • RegistrationProcessManager類,它在其中觸發一個提交座位預訂的命令。

UI的更改

UI中的主要更改是在RegistrationController MVC控制器類中的SpecifyRegistrantAndPaymentDetails action里的。之前,此action方法返回InitiateRegistrationWithThirdPartyProcessorPayment(action result)。現在,如果Order對象的新IsFreeOfCharge屬性為true,它將返回一個CompleteRegistrationWithoutPayment(action result)。否則,它返回一個CompleteRegistrationWithThirdPartyProcessorPayment(action result)。

[HttpPost]
public ActionResult SpecifyRegistrantAndPaymentDetails(AssignRegistrantDetails command, string paymentType, int orderVersion)
{
    ...

    var pricedOrder = this.orderDao.FindPricedOrder(orderId);
    if (pricedOrder.IsFreeOfCharge)
    {
        return CompleteRegistrationWithoutPayment(command, orderId);
    }

    switch (paymentType)
    {
        case ThirdPartyProcessorPayment:

            return CompleteRegistrationWithThirdPartyProcessorPayment(command, pricedOrder, orderVersion);

        case InvoicePayment:
            break;

        default:
            break;
    }

    ...
}

CompleteRegistrationWithThirdPartyProcessorPayment將用戶重定向到ThirdPartyProcessorPayment action,CompleteRegistrationWithoutPayment方法將用戶直接重定向到ThankYou action。

數據遷移

會議管理限界上下文在其Azure SQL數據庫實例中的PricedOrders表中存儲來自訂單和註冊限界上下文的訂單信息。以前,會議管理限界上下文接收OrderPaymentConfirmed事件,現在它接收OrderConfirmed事件,該事件包含一個附加的IsFreeOfCharge屬性。這將成為數據庫中的一個新列。

Markus(軟件開發人員)發言:

在遷移過程中,我們不需要修改該表中的現有數據,因為布爾值的默認值為false。所有現有條目都是在系統支持不需要付費的訂單之前創建的。

在遷移過程中,任何正在運行的ConfirmOrderPayment命令都可能丟失,因為它們不再由訂單聚合處理。您應該驗證當前的命令總線沒有這些命令。

Poe(IT運維人員)發言:

我們需要仔細計劃如何部署V2版本,以便確保所有現有的、正在運行的ConfirmOrderPayment命令都由運行V1版本的工作角色實例處理。

系統將RegistrationProcessManager類實例的狀態保存到SQL數據庫表中。這個表的架構沒有變化。遷移后您將看到的惟一更改是StateValue列中的一個新添加值。這反映了RegistrationProcessManager類中的ProcessState枚舉中額外的PaymentConfirmationReceived值,如下面的代碼示例所示:

public enum ProcessState
{
    NotStarted = 0,
    AwaitingReservationConfirmation = 1,
    ReservationConfirmationReceived = 2,
    PaymentConfirmationReceived = 3,
}

在V1版本中,事件源系統為訂單聚合保存的事件包括OrderPaymentConfirmed事件。因此,事件存儲區包含此事件類型的實例。在V2版本中,OrderPaymentConfirmed事件被替換為OrderConfirmed事件。

團隊決定在V2版本中,當反序列化事件時,不在基礎設施級別映射和過濾事件。這意味着,當系統從事件存儲中重播這些事件時,處理程序必須同時理解舊事件和新事件。下面的代碼示例在SeatAssignmentsHandler類中显示了這一點:

static SeatAssignmentsHandler()
{
    Mapper.CreateMap<OrderPaymentConfirmed, OrderConfirmed>();
}

public SeatAssignmentsHandler(IEventSourcedRepository<Order> ordersRepo, IEventSourcedRepository<SeatAssignments> assignmentsRepo)
{
    this.ordersRepo = ordersRepo;
    this.assignmentsRepo = assignmentsRepo;
}

public void Handle(OrderPaymentConfirmed @event)
{
    this.Handle(Mapper.Map<OrderConfirmed>(@event));
}

public void Handle(OrderConfirmed @event)
{
    var order = this.ordersRepo.Get(@event.SourceId);
    var assignments = order.CreateSeatAssignments();
    assignmentsRepo.Save(assignments);
}

您還可以在OrderViewModelGenerator類中看到同樣的技術。

Order類中的方法略有不同,因為這是持久化到事件存儲中的事件之一。下面的代碼示例显示了Order類中受保護構造函數的一部分:

protected Order(Guid id)
    : base(id)
{
    ...
    base.Handles<OrderPaymentConfirmed>(e => this.OnOrderConfirmed(Mapper.Map<OrderConfirmed>(e)));
    base.Handles<OrderConfirmed>(this.OnOrderConfirmed);
    ...
}

Jana(軟件架構師)發言:

以這種方式處理舊事件對於這個場景非常簡單,因為惟一需要更改的是事件的名稱。如果事件的屬性也發生了變化,情況會更加複雜。將來,Contoso將考慮在基礎設施中進行映射,以避免遺留事件污染領域模型。

在UI中显示剩餘座位

做出這一改變有三個具體的目標,它們都是相關的。我們想要:

  • 修改系統,在會議系統的讀模型中包含每個座位類型的剩餘座位數量信息。
  • 修改UI以显示每種座位類型的剩餘座位數量。
  • 確保升級到V2后系統功能正常。

向讀模型添加關於剩餘座位數量的信息

系統要能显示剩餘座位數量的信息來自兩個地方:

  • 當業務客戶創建新的座位類型或修改座位配額時,會議管理限界上下文將引發SeatCreatedSeatUpdated事件。
  • 在訂單和註冊限界上下文中,當註冊者創建一個訂單的時候,可用座位(SeatsAvailability)聚合將引發SeatsReserved、SeatsReservationCancelled和AvailableSeatsChanged事件。

備註:ConferenceViewModelGenerator類不使用SeatCreatedSeatUpdated事件。

訂單和註冊限界上下文中的ConferenceViewModelGenerator類現在處理這些事件,並使用它們來計算和存儲讀模型中的座位類型數量。下面的代碼示例显示了ConferenceViewModelGenerator類中的相關處理程序:

public void Handle(AvailableSeatsChanged @event)
{
    this.UpdateAvailableQuantity(@event, @event.Seats);
}

public void Handle(SeatsReserved @event)
{
    this.UpdateAvailableQuantity(@event, @event.AvailableSeatsChanged);
}

public void Handle(SeatsReservationCancelled @event)
{
    this.UpdateAvailableQuantity(@event, @event.AvailableSeatsChanged);
}

private void UpdateAvailableQuantity(IVersionedEvent @event, IEnumerable<SeatQuantity> seats)
{
    using (var repository = this.contextFactory.Invoke())
    {
        var dto = repository.Set<Conference>().Include(x => x.Seats).FirstOrDefault(x => x.Id == @event.SourceId);
        if (dto != null)
        {
            if (@event.Version > dto.SeatsAvailabilityVersion)
            {
                foreach (var seat in seats)
                {
                    var seatDto = dto.Seats.FirstOrDefault(x => x.Id == seat.SeatType);
                    if (seatDto != null)
                    {
                        seatDto.AvailableQuantity += seat.Quantity;
                    }
                    else
                    {
                        Trace.TraceError("Failed to locate Seat Type read model being updated with id {0}.", seat.SeatType);
                    }
                }

                dto.SeatsAvailabilityVersion = @event.Version;

                repository.Save(dto);
            }
            else
            {
                Trace.TraceWarning ...
            }
        }
        else
        {
            Trace.TraceError ...
        }
    }
}

UpdateAvailableQuantity方法將事件上的版本與讀模型的當前版本進行比較,以檢測可能的重複消息。

Markus(軟件開發人員)發言:

此檢查僅檢測重複的消息,而不是超出序列的消息。

修改UI以显示剩餘的座位數量

現在,當UI向會議的讀模型查詢座位類型列表時,列表包括當前可用的座位數量。下面的代碼示例显示了RegistrationController MVC控制器如何使用SeatType類的AvailableQuantity

private OrderViewModel CreateViewModel()
{
    var seatTypes = this.ConferenceDao.GetPublishedSeatTypes(this.ConferenceAlias.Id);
    var viewModel =
        new OrderViewModel
        {
            ConferenceId = this.ConferenceAlias.Id,
            ConferenceCode = this.ConferenceAlias.Code,
            ConferenceName = this.ConferenceAlias.Name,
            Items =
                seatTypes.Select(
                    s =>
                        new OrderItemViewModel
                        {
                            SeatType = s,
                            OrderItem = new DraftOrderItem(s.Id, 0),
                            AvailableQuantityForOrder = s.AvailableQuantity,
                            MaxSelectionQuantity = Math.Min(s.AvailableQuantity, 20)
                        }).ToList(),
        };

    return viewModel;
}

數據遷移

保存會議讀模型數據的數據庫有一個新列來保存用於檢查重複事件的版本號,而保存座位類型讀模型數據有一個新列來保存可用的座椅數量。

作為數據遷移的一部分,有必要為每個可用座位(SeatsAvailability)聚合重放事件存儲中的所有事件,以便正確計算可用數量。

不讓命令消息重複

系統目前使用Azure服務總線傳輸消息。當系統從ConferenceProcessor類的啟動代碼初始化Azure服務總線時,它配置Topic來檢測重複的消息,如下面的ServiceBusConfig類的代碼示例所示:

private void CreateTopicIfNotExists() 
{     
    var topicDescription =         
        new TopicDescription(this.topic)         
        {             
            RequiresDuplicateDetection = true,
            DuplicateDetectionHistoryTimeWindow = topic.DuplicateDetectionHistoryTimeWindow,         
        };     
    try     
    {         
        this.namespaceManager.CreateTopic(topicDescription);     
    }     
    catch (MessagingEntityAlreadyExistsException) { } 
} 
備註:您可以在Settings.xml文件中配置DuplicateDetectionHistoryTimeWindow
可以向Topic元素添加這個屬性。默認值是1小時。

但是,為了使重複檢測工作正常,您必須確保每個消息都有一個惟一的ID。下面的代碼示例显示了MarkSeatsAsReserved命令:

public class MarkSeatsAsReserved : ICommand
{
    public MarkSeatsAsReserved()
    {
        this.Id = Guid.NewGuid();
        this.Seats = new List<SeatQuantity>();
    }

    public Guid Id { get; set; }

    public Guid OrderId { get; set; }

    public List<SeatQuantity> Seats { get; set; }

    public DateTime Expiration { get; set; }
}

CommandBus類中的BuildMessage方法使用命令Id創建一個惟一的消息Id, Azure服務總線可以使用這個消息Id來檢測重複:

private BrokeredMessage BuildMessage(Envelope command) 
{ 
    var stream = new MemoryStream(); 
    ...

    var message = new BrokeredMessage(stream, true);
    if (!default(Guid).Equals(command.Body.Id))
    {
        message.MessageId = command.Body.Id.ToString();
    }

...

    return message;
} 

保證消息順序

團隊決定使用Azure服務總線消息會話來保證系統中的消息順序。

系統從ConferenceProcessor類中的OnStart方法配置Azure服務總線Topic和訂閱。Settings.xml配置文件中的配置指定了具體的訂閱使用會話。ServiceBusConfig類中的以下代碼示例显示了系統如何創建和配置訂閱。

private void CreateSubscriptionIfNotExists(NamespaceManager namespaceManager, TopicSettings topic, SubscriptionSettings subscription)
{
    var subscriptionDescription =
        new SubscriptionDescription(topic.Path, subscription.Name)
        {
            RequiresSession = subscription.RequiresSession
        };

    try
    {
        namespaceManager.CreateSubscription(subscriptionDescription);
    }
    catch (MessagingEntityAlreadyExistsException) { }
}

以下來自SessionSubscriptionReceiver類的代碼示例演示了如何使用會話接收消息:

private void ReceiveMessages(CancellationToken cancellationToken)
{
    while (!cancellationToken.IsCancellationRequested)
    {
        MessageSession session;
        try
        {
            session = this.receiveRetryPolicy.ExecuteAction<MessageSession>(this.DoAcceptMessageSession);
        }
        catch (Exception e)
        {
            ...
        }

        if (session == null)
        {
            Thread.Sleep(100);
            continue;
        }


        while (!cancellationToken.IsCancellationRequested)
        {
            BrokeredMessage message = null;
            try
            {
                try
                {
                    message = this.receiveRetryPolicy.ExecuteAction(() => session.Receive(TimeSpan.Zero));
                }
                catch (Exception e)
                {
                    ...
                }

                if (message == null)
                {
                    // If we have no more messages for this session, exit and try another.
                    break;
                }

                this.MessageReceived(this, new BrokeredMessageEventArgs(message));
            }
            finally
            {
                if (message != null)
                {
                    message.Dispose();
                }
            }
        }

        this.receiveRetryPolicy.ExecuteAction(() => session.Close());
    }
}

private MessageSession DoAcceptMessageSession()
{
    try
    {
        return this.client.AcceptMessageSession(TimeSpan.FromSeconds(45));
    }
    catch (TimeoutException)
    {
        return null;
    }
}

Markus(軟件開發人員)發言:

您可能會發現,將使用消息會話的ReceiveMessages方法的這個版本與SubscriptionReceiver類中的原始版本進行比較是很有用的。

您必須確保當你發送消息包含一個會話ID,這樣才能使用消息會話接收一條消息。系統使用事件的SourceID作為會話ID,如下面的代碼示例所示的EventBus類中的BuildMessage方法:

var message = new BrokeredMessage(stream, true);
message.SessionId = @event.SourceId.ToString();

通過這種方式,您可以確保以正確的順序接收來自單個源的所有消息。

Poe(IT運維人員)發言:

在V2版本中,團隊更改了系統創建Azure服務總線Topic和訂閱的方式。之前,SubscriptionReceiver類創建了它們(如果它們還不存在)。現在,系統在應用程序啟動時使用配置數據創建它們。這發生在啟動過程的早期,以避免在系統初始化訂閱之前將消息發送到Topic時丟失消息的風險。

然而,只有當消息按正確的順序傳遞到總線上時,會話才能保證按順序傳遞消息。如果系統異步發送消息,則必須特別注意確保消息以正確的順序放在總線上。在我們的系統中,來自每個單獨聚合實例的事件按順序到達是很重要的,但是我們不關心來自不同聚合實例的事件的順序。因此,儘管系統異步發送事件,EventStoreBusPublisher實例仍然會在發送下一個事件之前等待前一個事件已發送的確認。以下來自TopicSender類的示例說明了這一點:

public void Send(Func<BrokeredMessage> messageFactory)
{
    var resetEvent = new ManualResetEvent(false);
    Exception exception = null;
    this.retryPolicy.ExecuteAction(
        ac =>
        {
            this.DoBeginSendMessage(messageFactory(), ac);
        },
        ar =>
        {
            this.DoEndSendMessage(ar);
        },
        () => resetEvent.Set(),
        ex =>
        {
            Trace.TraceError("An unrecoverable error occurred while trying to send a message:\r\n{0}", ex);
            exception = ex;
            resetEvent.Set();
        });

    resetEvent.WaitOne();
    if (exception != null)
    {
        throw exception;
    }
}

Jana(軟件架構師)發言:

此代碼示例展示了系統如何使用Transient Fault Handling Application Block來讓異步調用可靠。

有關消息排序和Azure服務總線的更多信息,請參見Microsoft Azure Queues and Microsoft Azure Service Bus Queues – Compared and Contrasted

有關異步發送消息和排序的信息,請參閱博客文章Microsoft Azure Service Bus Splitter and Aggregator

從會議管理限界上下文中持久化事件

團隊決定創建一個包含所有發送的命令和事件的消息日誌。這將使訂單和註冊限界上下文能夠從會議管理限界上下文查詢此日誌,以獲取其構建讀模型所需的事件。這不是事件源,因為我們沒有使用這些事件來重建聚合的狀態,儘管我們使用類似的技術來捕獲和持久化這些集成事件。

Gary(CQRS專家)發言:

此消息日誌確保不會丟失任何消息,以便將來能夠滿足其他需求。

向消息添加額外元數據

系統現在將所有消息保存到消息日誌中。為了方便查詢特定命令或事件,系統現在向每個消息添加了更多的元數據。以前,惟一的元數據是事件類型,現在,事件元數據包括事件類型、命名空間、程序集和路徑。系統將元數據添加到EventBus類中的事件和CommandBus類中的命令中。

捕獲消息並將消息持久化到消息日誌中

系統使用Azure服務總線中對會議/命令和會議/事件topic的額外訂閱來接收系統中每條消息的副本。然後,它將消息附加到Azure表存儲中。下面的代碼示例显示了AzureMessageLogWriter類的實例,它用於將消息保存到表中:

public class MessageLogEntity : TableServiceEntity 
{ 
    public string Kind { get; set; }     
    public string CorrelationId { get; set; }     
    public string MessageId { get; set; }     
    public string SourceId { get; set; }     
    public string AssemblyName { get; set; }     
    public string Namespace { get; set; }     
    public string FullName { get; set; }     
    public string TypeName { get; set; }     
    public string SourceType { get; set; }     
    public string CreationDate { get; set; }     
    public string Payload { get; set; } 
} 

Kind屬性指定消息是命令還是事件。MessageId和CorrelationId屬性由消息傳遞基礎設施設置的,其餘屬性是從消息元數據中設置的。

下面的代碼示例显示了這些消息的分區和RowKey的定義:

PartitionKey = message.EnqueuedTimeUtc.ToString("yyyMM"),
RowKey = message.EnqueuedTimeUtc.Ticks.ToString("D20") + "_" + message.MessageId

注意,RowKey保存了消息最初發送的順序,並添加到消息ID上,以確保惟一性,以防兩條消息同時入隊。

Jana(軟件架構師)發言:

這與事件存儲不同,在事件存儲區中,分區鍵標識聚合實例,而RowKey標識聚合的版本號。

數據遷移

當Contoso將系統從V1遷移到V2時,它將使用消息日誌在訂單和註冊限界上下文中重建會議和價格訂單的讀模型。

Gary(CQRS專家)發言:

Contoso可以在需要重建與聚合無關的事件構建的讀模型時來使用消息日誌,例如來自會議管理限界上下文的集成事件。

會議讀模型包含會議的信息,並包含來自會議管理限界上下文的ConferenceCreated、ConferenceUpdated、ConferencePublished、ConferenceUnpublished、SeatCreated和SeatUpdated事件的信息。

價格訂單讀模型持有來自於SeatCreated和SeatUpdated事件的信息,這些事件來自於會議管理限界上下文。

然而,在V1中,這些事件消息沒有被持久化,因此讀模型不能在V2中重新填充。為了解決這個問題,團隊實現了一個數據遷移實用程序,它使用一種最佳方法來生成包含要存儲在消息日誌中的丟失數據的事件。例如,在遷移到V2之後,消息日誌不包含ConferenceCreated事件,因此遷移實用程序在會議管理限界上下文使用的數據庫中找到這些信息,並創建丟失的事件。您可以在MigrationToV2項目的Migrator類中的GeneratePastEventLogMessagesForConferenceManagement方法中看到這是如何完成的。

Markus(軟件開發人員)發言:

您可以在這個類中看到,Contoso還將所有現有的事件源事件複製到消息日誌中。

如下面所示,Migrator類中的RegenerateViewModels方法重新構建讀取的模型。它通過調用Query方法從消息日誌中檢索所有事件,然後使用ConferenceViewModelGeneratorPricedOrderViewModelUpdater類來處理消息。

internal void RegenerateViewModels(AzureEventLogReader logReader, string dbConnectionString)
{
    var commandBus = new NullCommandBus();

    Database.SetInitializer<ConferenceRegistrationDbContext>(null);

    var handlers = new List<IEventHandler>();
    handlers.Add(new ConferenceViewModelGenerator(() => new ConferenceRegistrationDbContext(dbConnectionString), commandBus));
    handlers.Add(new PricedOrderViewModelUpdater(() => new ConferenceRegistrationDbContext(dbConnectionString)));

    using (var context = new ConferenceRegistrationMigrationDbContext(dbConnectionString))
    {
        context.UpdateTables();
    }

    try
    {
        var dispatcher = new MessageDispatcher(handlers);
        var events = logReader.Query(new QueryCriteria { });

        dispatcher.DispatchMessages(events);
    }
    catch
    {
        using (var context = new ConferenceRegistrationMigrationDbContext(dbConnectionString))
        {
            context.RollbackTablesMigration();
        }

        throw;
    }
}

Jana(軟件架構師)發言:

查詢可能不會很快,因為它將從多個分區檢索實體。

注意這個方法如何使用NullCommandBus實例來接收來自ConferenceViewModelGenerator實例的任何命令,因為我們只是在這裏重新構建讀模型。

以前,PricedOrderViewModelGenerator使用ConferenceDao類來獲取關於座位的信息。現在,它是自治的,並直接處理SeatCreatedSeatUpdated事件來維護這些信息。作為遷移的一部分,必須將此信息添加到讀模型中。在前面的代碼示例中,PricedOrderViewModelUpdater類只處理SeatCreatedSeatUpdated事件,並將缺失的信息添加到價格訂單讀模型中。

從V1遷移到V2

從V1遷移到V2需要更新已部署的應用程序代碼並遷移數據。在生產環境中執行遷移之前,應該始終在測試環境中演練遷移。以下是所需步驟:

  1. 將V2版本部署到Azure的staging環境中。V2版本有一個MaintenanceMode屬性,最初設置為true。在此模式下,應用程序向用戶显示一條消息,說明站點當前正在進行維護,而工作角色將不處理消息。
  2. 準備好之後,將V2版本(仍然處於維護模式,MaintenanceMode為true)切換到Azure生產環境中。
  3. 讓V1版本(現在在staging環境中運行)運行幾分鐘,以確保所有正在運行的消息都完成了它們的處理。
  4. 運行遷移程序來遷移數據(參見下面)。
  5. 成功完成數據遷移后,將每種工作角色的MaintenanceMode屬性更改為false。
  6. V2版本現在運行在Azure中。

Jana(軟件架構師)發言:

團隊考慮使用單獨的應用程序在升級過程中向用戶显示一條消息,告訴他們站點正在進行維護。然而,在V2版本中使用MaintenanceMode屬性提供了一個更簡單的過程,併為應用程序添加了一個潛在有用的新特性。

Poe(IT運維人員)發言:

由於對事件存儲的更改,不可能執行從V1到V2的無停機升級。然而,團隊所做的更改將確保從V2遷移到V3將不需要停機時間。

Markus(軟件開發人員)發言:

團隊對遷移實用程序應用了各種優化,例如批處理操作,以最小化停機時間。

下面幾節總結了從V1到V2的數據遷移。這些步驟中的一些在前面已經討論過,涉及到應用程序的特定更改或增強。

團隊為V2引入的一個更改是,將所有命令和事件消息的副本保存在消息日誌中,以便作為未來的證據,通過捕獲將來可能使用的所有內容來保證應用程序的安全性。遷移過程考慮到了這個新特性。

因為遷移過程複製了大量的數據,所以您應該在Azure工作角色中運行遷移過程,以最小化成本。遷移實用程序是一個控制台應用程序,因此您可以使用Azure和遠程桌面服務。有關如何在Azure角色實例中運行應用程序的信息,請參見Using Remote Desktop with Microsoft Azure Roles。

Poe(IT運維人員)發言:

在一些組織中,安全策略不允許您在Azure生產環境使用遠程桌面服務。但是,您只需要一個在遷移期間承載遠程桌面會話的工作角色,您可以在遷移完成后刪除它。您還可以將遷移代碼作為工作角色而不是控制台應用程序運行,並確保它記錄遷移的狀態,以便您驗證。

為會議管理限界上下文生成過去的日誌消息

遷移過程的一部分是在可能的情況下重新創建V1版本處理后丟棄的消息,然後將它們添加到消息日誌中。在V1版本中,所有從會議管理限界上下文發送到訂單和註冊限界上下文的集成事件都以這種方式丟失了。系統不能重新創建所有丟失的事件,但可以創建表示遷移時系統狀態的事件。

有關更多信息,請參見本章前面的“從會議管理限界上下文中持久化事件”一節。

遷移事件源里的事件

在V2版本中,事件存儲為每個事件存儲額外的元數據,以便於查詢事件。遷移過程將所有事件從現有事件存儲複製到具有新模式的新事件存儲。

Jana(軟件架構師)發言:

原始事件不會以任何方式更新,而是被視為不可變的。

同時,系統將所有這些事件的副本添加到V2版本中引入的消息日誌中。

有關更多信息,請參見MigrationToV2項目中Migrator類中的MigrateEventSourcedAndGeneratePastEventLogs

重建讀模型**

V2版本包括對訂單和註冊限界上下文中讀模型定義的幾個更改。MigrationToV2項目在訂單和註冊限界上下文中重新構建會議的讀模型和價格訂單的讀模型。

有關更多信息,請參見本章前面的“從會議管理限界上下文中持久化事件”一節。

對測試的影響

在這個過程的這個階段,測試團隊繼續擴展驗收測試集合。他們還創建了一組測試來驗證數據遷移過程。

再說SpecFlow

之前,這組SpecFlow測試以兩種方式實現:通過自動化web瀏覽器模擬用戶交互,或者直接在MVC控制器上操作。這兩種方法都有各自的優缺點,我們在第4章“擴展和增強訂單和註冊限界上下文”中討論過。

在與另一位專家討論了這些測試之後,團隊還實現了第三種方法。從領域驅動設計(DDD)方法的角度來看,UI不是領域模型的一部分,核心團隊的重點應該是在領域專家的幫助下理解領域,並在領域中實現業務邏輯。UI只是机械部分,用於使用戶能夠與領域進行交互。因此,驗收測試應該包括驗證領域模型是否以領域專家期望的方式工作。因此,團隊使用SpecFlow創建了一組驗收測試,這些測試旨在在不影響系統UI部分的情況下測試領域。

下面的代碼示例显示了SelfRegistrationEndToEndWithDomain.feature文件,該文件在Conference.AcceptanceTests項目中的Features\Domain\Registration文件夾里,注意When和Then子句怎麼使用命令和事件的。

Gary(CQRS專家)發言:

通常,如果您的領域模型只使用聚合,您會期望When子句發送命令,Then子句查看事件或異常。然而,在本例中,領域模型包含一個通過發送命令來響應事件的流程管理器。測試將檢查是否發送了所有預期的命令,並引發了所有預期的事件。

Feature: Self Registrant end to end scenario for making a Registration for a Conference site with Domain Commands and Events
    In order to register for a conference
    As an Attendee
    I want to be able to register for the conference, pay for the Registration Order and associate myself with the paid Order automatically


Scenario: Make a reservation with the selected Order Items
Given the list of the available Order Items for the CQRS summit 2012 conference
    | seat type                 | rate | quota |
    | General admission         | $199 | 100   |
    | CQRS Workshop             | $500 | 100   |
    | Additional cocktail party | $50  | 100   |
And the selected Order Items
    | seat type                 | quantity |
    | General admission         | 1        |
    | Additional cocktail party | 1        |
When the Registrant proceeds to make the Reservation
    # command:RegisterToConference
Then the command to register the selected Order Items is received 
    # event: OrderPlaced
And the event for Order placed is emitted
    # command: MakeSeatReservation
And the command for reserving the selected Seats is received
    # event: SeatsReserved
And the event for reserving the selected Seats is emitted
    # command: MarkSeatsAsReserved
And the command for marking the selected Seats as reserved is received
    # event: OrderReservationCompleted 
And the event for completing the Order reservation is emitted
    # event: OrderTotalsCalculated
And the event for calculating the total of $249 is emitted

下面的代碼示例显示了feature文件的一些步驟實現。這些步驟使用命令總線發送命令。

[When(@"the Registrant proceed to make the Reservation")]
public void WhenTheRegistrantProceedToMakeTheReservation()
{
    registerToConference = ScenarioContext.Current.Get<RegisterToConference>();
    var conferenceAlias = ScenarioContext.Current.Get<ConferenceAlias>();

    registerToConference.ConferenceId = conferenceAlias.Id;
    orderId = registerToConference.OrderId;
    this.commandBus.Send(registerToConference);

    // Wait for event processing
    Thread.Sleep(Constants.WaitTimeout);
}

[Then(@"the command to register the selected Order Items is received")]
public void ThenTheCommandToRegisterTheSelectedOrderItemsIsReceived()
{
    var orderRepo = EventSourceHelper.GetRepository<Registration.Order>();
    Registration.Order order = orderRepo.Find(orderId);

    Assert.NotNull(order);
    Assert.Equal(orderId, order.Id);
}

[Then(@"the event for Order placed is emitted")]
public void ThenTheEventForOrderPlacedIsEmitted()
{
    var orderPlaced = MessageLogHelper.GetEvents<OrderPlaced>(orderId).SingleOrDefault();

    Assert.NotNull(orderPlaced);
    Assert.True(orderPlaced.Seats.All(
        os => registerToConference.Seats.Count(cs => cs.SeatType == os.SeatType && cs.Quantity == os.Quantity) == 1));
}

在遷移過程中發現的bug

當測試團隊在遷移之後在系統上運行測試時,我們發現訂單和註冊限界上下文中座位類型的數量與遷移之前的數量不同。調查揭示了以下原因。

如果會議從未發布過,則會議管理限界上下文允許業務客戶刪除座位類型,但不會引發集成事件向訂單和註冊限界上下文報告這一情況。所以,當業務客戶創建新的座位類型時,訂單和註冊限界上下文從會議管理限界上下文接收事件,而不是當業務客戶刪除座位類型時。

遷移過程的一部分創建一組集成事件,以替換V1版本處理后丟棄的事件。它通過讀取會議管理限界上下文使用的數據庫來創建這些事件。此過程沒有為已刪除的座位類型創建集成事件。

總之,在V1版本中,已刪除的座位類型錯誤地出現在訂單和註冊限界上下文的讀模型中。在遷移到V2版本之後,這些已刪除的座位類型沒有出現在訂單和註冊限界上下文的讀模型中。

Poe(IT運維人員)發言:

測試遷移過程不僅驗證遷移是否按預期運行,而且可能揭示應用程序本身的bug。

總結

在我們旅程的這個階段,我們對系統進行了版本控制,並完成了V2偽生產版本。這個新版本包含了一些額外的功能和特性,比如支持不需要付費的訂單和在UI中显示更多信息。

我們還對基礎設施做了一些改變。例如,我們使更多的消息具有冪等性,現在持久化集成事件。下一章將描述我們旅程的最後階段,我們將繼續增強基礎設施,並在準備發布V3版本時加強系統。

【精選推薦文章】

如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

想要讓你的商品在網路上成為最夯、最多人討論的話題?

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

不管是台北網頁設計公司台中網頁設計公司,全省皆有專員為您服務

想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師"嚨底家"!!