談反應式編程在服務端中的應用,數據庫操作優化,提速 Upsert

反應式編程在客戶端編程當中的應用相當廣泛,而當前在服務端中的應用相對被提及較少。本篇將介紹如何在服務端編程中應用響應時編程來改進數據庫操作的性能。

開篇就是結論

接續上一篇《談反應式編程在服務端中的應用,數據庫操作優化,從 20 秒到 0.5 秒》之後,這次,我們帶來了關於利用反應式編程進行 upsert 優化的案例說明。建議讀者可以先閱讀一下前一篇,這樣更容易理解本篇介紹的方法。

同樣還是利用批量化的思路,將單個 upsert 操作批量進行合併。已達到減少數據庫鏈接消耗從而大幅提升性能的目的。

業務場景

在最近的一篇文章《十萬同時在線用戶,需要多少內存?——Newbe.Claptrap 框架水平擴展實驗》中。我們通過激活多個常駐於內存當中的 Claptrap 來實現快速驗證 JWT 正確性的目的。

但,當時有一個技術問題沒有得到解決:

Newbe.Claptrap 框架設計了一個特性:當 Claptrap Deactive 時,可以選擇將快照立即保存到數據庫。因此,當嘗試從集群中關閉一個節點時,如果節點上存在大量的 Claptrap ,那麼將產生大量的數據庫 upsert 操作。瞬間推高數據庫消耗,甚至導致部分錯誤而保存失敗。

一點點代碼

有了前篇的 IBatchOperator,那麼留給這篇的代碼內容就非常少了。

首先,按照使用上一篇的 IBatchOperator 編寫一個支持操作的 Repository,形如以下代碼:

public class BatchUpsert : IUpsertRepository
{
private readonly IDatabase _database;
private readonly IBatchOperator<(int, int), int> _batchOperator;

public BatchUpsert(IDatabase database)
{
_database = database;
var options = new BatchOperatorOptions<(int, int), int>
{
BufferCount = 100,
BufferTime = TimeSpan.FromMilliseconds(50),
DoManyFunc = DoManyFunc
};
_batchOperator = new BatchOperator<(int, int), int>(options);
}

private Task<int> DoManyFunc(IEnumerable<(int, int)> arg)
{
return _database.UpsertMany(arg.ToDictionary(x => x.Item1, x => x.Item2));
}

public Task UpsertAsync(int key, int value)
{
return _batchOperator.CreateTask((key, value));
}
}

然後,只要實現對應數據庫的 UpsertMany 方法,便可以很好地完成這項優化。

各種數據庫的操作

結合 Newbe.Claptrap 現在項目的實際。目前,被支持的數據庫分別有 SQLite、PostgreSQL、MySql 和 MongoDB。以下,分別對不同類型的數據庫的批量 Upsert 操作進行說明。

由於在 Newbe.Claptrap 項目中的 Upsert 需求都是以主鍵作為對比鍵,因此以下也只討論這種情況。

SQLite

根據官方文檔,使用 INSERT OR REPLACE INTO 便可以實現主鍵衝突時替換數據的需求。

具體的語句格式形如以下:

INSERT OR REPLACE INTO TestTable (id, value)
VALUES
(@id0,@value0),
...
(@idn,@valuen);

因此只要直接拼接語句和參數調用即可。需要注意的是,SQLite 的可傳入參數默認為 999,因此拼接的變量也不應大於該數量。

官方文檔:INSERT

PostgreSQL

眾所周知,PostgreSQL 在進行批量寫入時,可以使用高效的 COPY 語句來完成數據的高速導入,這遠遠快於 INSERT 語句。但可惜的是 COPY 並不能支持 ON CONFLICT DO UPDATE 子句。因此,無法使用 COPY 來完成 upsert 需求。

因此,我們還是回歸使用 INSERT 配合 ON CONFLICT DO UPDATE 子句,以及 unnest 函數來完成批量 upsert 的需求。

具體的語句格式形如以下:

INSERT INTO TestTable (id, value)
VALUES (unnest(@ids), unnest(@values))
ON CONFLICT ON CONSTRAINT TestTable_pkey
DO UPDATE SET value=excluded.value;

其中的 ids 和 values 分別為兩個等長的數組對象,unnest 函數可以將數組對象轉換為行數據的形式。

注意,可能會出現 ON CONFLICT DO UPDATE command cannot affect row a second time 錯誤。

因此如果嘗試使用上述方案,需要在傳入數據庫之前,先在程序中去重一遍。而且,通常來說,在程序中進行一次去重可以減少向數據庫中傳入的數據,這本身也很有意義。

官方文檔:unnest 函數
官方文檔:Insert 語句

MySql

MySql 與 SQLite 類似,支持 REPLACE 語法。具體語句形式如下:

REPLACE INTO TestTable (id, value)
VALUES
(@id0,@value0),
...
(@idn,@valuen);

官方文檔:REPLACE 語句

MongoDB

MongoDB 原生支持 bulkWrite 的批量傳輸模式,也支持 replace 的 upsert 語法。因此操作非常簡單。

那麼這裏展示一下 C# 操作方法:

private async Task SaveManyCoreMany(
IDbFactory dbFactory,
IEnumerable<StateEntity> entities)
{
var array = entities as StateEntity[] ?? entities.ToArray();
var items = array
.Select(x => new MongoStateEntity
{
claptrap_id = x.ClaptrapId,
claptrap_type_code = x.ClaptrapTypeCode,
version = x.Version,
state_data = x.StateData,
updated_time = x.UpdatedTime,
})
.ToArray();

var client = dbFactory.GetConnection(_connectionName);
var db = client.GetDatabase(_databaseName);
var collection = db.GetCollection<MongoStateEntity>(_stateCollectionName);

var upsertModels = items.Select(x =>
{
var filter = new ExpressionFilterDefinition<MongoStateEntity>(entity =>
entity.claptrap_id == x.claptrap_id && entity.claptrap_type_code == x.claptrap_type_code);
return new ReplaceOneModel<MongoStateEntity>(filter, x)
{
IsUpsert = true
};
});
await collection.BulkWriteAsync(upsertModels);
}

這是從 Newbe.Claptrap 項目業務場景中給出的代碼,讀者可以結合自身需求進行修改。

官方文檔:db.collection.bulkWrite ()

通用型解法

優化的本質是減少數據庫鏈接的使用,盡可能在一個鏈接內完成更多的工作。因此如果特定的數據庫不支持以上數據庫類似的操作。那麼還是存在一種通用型的解法:

  1. 以盡可能快地方式將數據寫入一臨時表
  2. 將臨時表的數據已連表 update 的方式更新的目標表
  3. 刪除臨時表

UPDATE with a join

性能測試

以 SQLite 為例,嘗試對 12345 條數據進行 2 次 upsert 操作。

單條併發:1 分 6 秒

批量處理:2.9 秒

可以在該鏈接找到測試的代碼。

樣例中不包含有 MySql、PostgreSQL 和 MongoDB 的樣例,因為沒有優化之前,在不提高連接池的情況下,一併發基本就爆炸了。所有優化的結果是直接解決了可用性的問題。

所有的示例代碼均可以在代碼庫中找到。如果 Github Clone 存在困難,也可以點擊此處從 Gitee 進行 Clone

常見問題解答

此處對一些常見的問題進行解答。

客戶端是等待批量操作的結果嗎?

這是一個很多網友提出的問題。答案是:是的。

假設我們公開了一個 WebApi 作為接口,由瀏覽器調用。如果同時有 100 個瀏覽器同時發出請求。

那麼這 100 個請求會被合併,然後寫入數據庫。而在寫入數據庫之前,這些客戶端都不會得到服務端的響應,會一直等待。

這也是該合併方案區別於普通的 “寫隊列,后寫庫” 方案的地方。

原理上講,這種和 bulkcopy 有啥不一樣?

兩者是不相關,必須同時才有作用的功能。
首先,代碼中的 database.InsertMany 就是你提到的 bulkcopy。

這個代碼的關鍵不是 InsertMany ,而是如何將單次的插入請求合併。
試想一下,你可以在 webapi 上公開一個 bulkcopy 的 API。
但是,你無法將來自不同客戶端的請求合併在同一個 API 裏面來調用 bulkcopy。
例如,有一萬個客戶端都在調用你的 API,那怎麼合併這些 API 請求呢?

如果如果通過上面這種方式,雖然你只是對外公開了一個單次插入的 API。你卻實現了來自不同客戶端請求的合併,變得可以使用 bulkcopy 了。這在高併發下很有意義。

另外,這符合開閉的原理,因為你沒有修改 Repository 的 InsertOne 接口,卻實現了 bulkcopy
的效果。

如果批量操作中一個操作異常失敗是否會導致被合併的其他操作全部失敗?

如果業務場景是合併會有影響,那當然不應該合併。

批量操作一個失敗,當然是一起失敗,因為底層的數據庫事務肯定也是一起失敗。

除非批量接口也支持對每個傳入的 ID 做區別對待。典型的,比如 mongodb 的 bulkcopy 可以返回哪些成功哪些失敗,那麼我們就有能力設置不同的 Tcs 狀態。

哪些該合併,哪些不該合併,完全取決於業務。樣例給出的是如果要合併,應該怎麼合併。不會要求所有都要合併。

Insert 和 Upsert 都說了,那 Delete 和 Select 呢?

筆者籠統地將該模式稱為 “反應式批量處理”。要確認業務場景是否應用該模式,需要具備以下這兩個基本的要求:

  • 業務下游的批量處理是否會比累積的單條處理要快,如果會,那可以用
  • 業務上游是否會出現短時間的突增頻率的請求,如果會,那可以用

當然,還需要考量,比如:下游的批量操作能否卻分每個請求的結果等等問題。但以上兩點是一定需要考量的。

那麼以 Delete 為例:

  • Delete Where In 的速度會比 Delete = 的速度快嗎?試一下
  • 會有突增的 Delete 需求嗎?想一下

小小工具 Zeal

筆者是一個完整存儲過程都寫不出來的人。能夠查閱到這些數據庫的文檔,全靠一款名為 Zeal 的離線文檔查看免費軟件。推薦給您,您也值得擁有。

Zeal 官網地址:https://zealdocs.org/

最後但是最重要!

最近作者正在構建以反應式Actor模式事件溯源為理論基礎的一套服務端開發框架。希望為開發者提供能夠便於開發出 “分佈式”、“可水平擴展”、“可測試性高” 的應用系統 ——Newbe.Claptrap

本篇文章是該框架的一篇技術選文,屬於技術構成的一部分。如果讀者對該內容感興趣,歡迎轉發、評論、收藏文章以及項目。您的支持是促進項目成功的關鍵。

如果你對該項目感興趣,你可以通過 github issues 提交您的看法。

如果您無法正常訪問 github issue,您也可以發送郵件到 newbe-claptrap@googlegroups.com 來參与我們的討論。

點擊鏈接 QQ 交流【Newbe.Claptrap】:https://jq.qq.com/?_wv=1027&k=5uJGXf5。

您還可以查閱本系列的其他選文:

  • Newbe.Claptrap – 一套以 “事件溯源” 和 “Actor 模式” 作為基本理論的服務端開發框架
  • 十萬同時在線用戶,需要多少內存?——Newbe.Claptrap 框架水平擴展實驗
  • 談反應式編程在服務端中的應用,數據庫操作優化,從 20 秒到 0.5 秒
  • 談反應式編程在服務端中的應用,數據庫操作優化,提速 Upsert
  • Newbe.Claptrap 項目周報 1 – 還沒輪影,先用輪跑

GitHub 項目地址:https://github.com/newbe36524/Newbe.Claptrap

Gitee 項目地址:https://gitee.com/yks/Newbe.Claptrap

 

  • 本文作者: newbe36524
  • 本文鏈接: https://www.newbe.pro/Newbe.Claptrap/Reactive-In-Server-2/
  • 版權聲明: 本博客所有文章除特別聲明外,均採用 BY-NC-SA 許可協議。轉載請註明出處!

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

※教你寫出一流的銷售文案?