120行代碼打造.netcore生產力工具-小而美的後台異步組件

相信絕大部分開發者都接觸過用戶註冊的流程,通常情況下大概的流程如下所示:

  1. 接收用戶提交註冊信息
  2. 持久化註冊信息(數據庫+redis)
  3. 發送註冊成功短信(郵件)
  4. 寫操作日誌(可選)

偽代碼如下:

public async Task<IActionResult> Reg([FromBody] User user)
{
    _logger.LogInformation("持久化數據開始");
    await Task.Delay(50);
    _logger.LogInformation("持久化結束");
    _logger.LogInformation("發送短信開始");
    await Task.Delay(100);
    _logger.LogInformation("發送短信結束");
    _logger.LogInformation("操作日誌開始");
    await _logRepository.Insert(new Log { Txt = "註冊日誌" });
    _logger.LogInformation("操作日誌結束");
    return Ok("註冊成功");
}

在以上的代碼中,我使用Task.Delay方法阻塞主線程,用以模擬實際場景中的執行耗時。以上流程應該是包含了絕大部分註冊流程所需要的操作。對於任何開發者來講,以上業務流程沒任何難度,無非是順序的執行各個流程的代碼即可。

稍微有點開發經驗的應該會將以上的流程進行拆分,但有些人可能就要問了,為什麼要拆分呢?拆分之後的代碼應該怎麼寫呢?下面我們就來簡單聊下如此場景的正確打開方式。

首先,註冊成功的依據應該是是否成功的將用戶信息持久化(至於是先持久化到數據庫,異或是先寫到redis不在本篇文章討論的範疇),至於發送註冊短信(郵件)以及寫日誌的操作應該不能成為影響註冊是否成功的因素,而發送短信/郵件等相關操作通常情況下也是比較耗時的,所以在對此接口做性能優化時,可優先考慮將短信/郵件以及寫日誌等相關操作與主流程(持久化數據)拆分,使其不阻塞主流程的執行,從而達到提高響應速度的目的。

知道了為什麼要拆,但具體如何拆分呢?怎樣才能用最少的改動,達到所需的目的呢?

條條大路通羅馬,所以要達成我們的目的也是有很多方案的,具體選擇哪種方案需要根據具體的業務場景,業務體量等多種因素綜合考慮,下面我將一一介紹分析相關方案。

在正式介紹可用方案前,筆者想先介紹一種很多新手容易錯誤使用的一種方案(因為筆者就曾經天真的使用過這種錯誤的方案)。

提到異步,絕大部分.net開發者應該第一想到的就是Task,async,await等,的確,async,await的語法糖簡化了.net開發者異步編程的門檻,減少了很多代碼量。通常一個返回Task類型的方法,在被調用時,會在方法的前面加上await,表示需要等待此方法的執行結果,再繼續執行後面的代碼。但如果不加await時,則不會等待方法的執行結果,進而也不會阻塞主線程。所以,有些人可能就會將發送短信/郵件以及寫日誌的操作如下方式進行改造。

public async Task<IActionResult> Reg1([FromBody] User user)
{
    _logger.LogInformation("持久化數據開始");
    await Task.Delay(50);
    _logger.LogInformation("持久化結束");
    _ = Task.Run(async () =>
     {
         _logger.LogInformation("發送短信開始");
         await Task.Delay(100);
         _logger.LogInformation("發送短信結束");
         _logger.LogInformation("操作日誌開始");
         await _logRepository.Insert(new Log { Txt = "註冊日誌" });
         _logger.LogInformation("操作日誌結束");
     });
    return Ok("註冊成功");
}

然後使用jmeter分別壓測改造前和改造后的接口,結果如下:

有沒有被驚訝到?就這樣一個簡單的改造,吞吐量就提高了三四倍。既然已經提高了三四倍,那為什麼說這是一種錯誤的改造方法嗎?各位看官且往下看。

熟悉.netcore的大佬,應該都知道.netcore的依賴注入的生命周期吧。通常情況下,注入的生命周期包括:Singleton,Scope,Transient。
在以上的流程中,假如寫操作日誌的實例的生命周期是Scope,當在Task中調用Controller獲取到的實例的方法時,因為Task.Run並沒有阻塞主線程,當調用Action return后,當前請求的scope注入的對象會被回收,如果對象被回收之後,Task.Run還未執行完,則會報System.ObjectDisposedException: Cannot access a disposed object. 異常。意思是,不能訪問一個已disposed的對象。正確的做法是使用IServiceScopeFactory創建一個新的作用域,在新的作用域中獲取獲取日誌倉儲服務的實例。這樣就可以避免System.ObjectDisposedException異常了。
改造后的示例代碼如下:

public async Task<IActionResult> Reg1([FromBody] User user)
{
    _logger.LogInformation("持久化數據開始");
    await Task.Delay(50);
    _logger.LogInformation("持久化結束");
    _ = Task.Run(async () =>
    {
        using (var scope = _scopeFactory.CreateScope())
        {
            var sp = scope.ServiceProvider;
            var logRepository = sp.GetService<ILogRepository>();
            _logger.LogInformation("發送短信開始");
            await Task.Delay(100);
            _logger.LogInformation("發送短信結束");

            _logger.LogInformation("操作日誌開始");
            await logRepository.Insert(new Log { Txt = "註冊日誌" });
            _logger.LogInformation("操作日誌結束");
        }
    });
    return Ok("註冊成功");
}

雖然得到了正解,但上述的代碼着實有點多,如果一個項目有多個相似的業務場景,就要考慮對CreateScope相關的操作進行封裝。

下面就來一一介紹下筆者覺得實現此業務場景的幾種方案。
1.消息隊列
2.Quartz任務調度組件
3.Hangfire任務調度組件
4.Weshare.TransferJob(推薦)
首先說下消息隊列的方式。準確的說,消息隊列應該是這種場景的最優解決方案,消息隊列的其中一個比較重要的特性就是解耦,從而提高吞吐量。但並不是所有的應用程序都需要上消息隊列。有些業務場景使用消息隊列時,往往會給人一種”殺雞用牛刀”的感覺。

其次Quartz和Hangfire都是任務調度框架,都提供了可實現以上業務場景的邏輯,但Quartz和Hangfire都需要持久化作業數據。雖然Hangfire提供了內存版本,但經過我的測試,發現Hangfire的內存版本特別消耗內存,所以不太推薦使用任務調度框架來實現類似於這樣的業務邏輯。

最後,也就是本文的重點,筆者結合了消息隊列和任務調度的思想,實現了一個輕量級的轉移作業到後台執行的組件。此組件完美的解決了Scope生命周期實例獲取的問題,一行代碼將不需要等待的操作轉移到後台線程執行。
接入步驟如下:
1.使用nuget安裝Weshare.TransferJob
2.在Stratup中注入服務。

services.AddTransferJob();

3.通過構造函數或其他方法獲取到IBackgroundRunService的實例。
4.調用實例的Transfer方法將作業轉移到後台線程。

_backgroundRunService.Transfer(log=>log.Insert(new Log(){Txt = "註冊日誌"}));

就是這麼簡單的實現了這樣的業務場景,不僅簡化了代碼,而且大大提高了系統的吞吐量。

下面再來一起分析下Weshare.TransferJob的核心代碼(畢竟文章要點題)。各位器宇不凡的看官請繼續往下看。
下面的代碼是AddTransferJob方法的實現:

public static IServiceCollection AddTransferJob(this IServiceCollection services)
{
    services.AddSingleton<IBackgroundRunService, BackgroundRunService>();
    services.AddHostedService<TransferJobHostedService>();
    return services;
}

聰明”絕頂”的各位看官應該已經發現上述代碼的關鍵所在。是的, 你沒有看錯,此組件的就是利用.net core提供的HostedService在後台執行被轉移的作業的。
我們再來一起看看TransferJobHostedService的代碼:

public class TransferJobHostedService:BackgroundService
{
    private IBackgroundRunService _runService;
    public TransferJobHostedService(IBackgroundRunService runService)
    {
        _runService = runService;
    }
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            await _runService.Execute(stoppingToken);
        }
    }
}

這個類的代碼也很簡單,重寫了BackgroundService類的ExecuteAsync,循環調用IBackgroundRunService實例的Execute方法。所以,最最關鍵的代碼是IBackgroundRunService的實現類中。
詳細代碼如下:

public class BackgroundRunService : IBackgroundRunService
{
    private readonly SemaphoreSlim _slim;
    private readonly ConcurrentQueue<LambdaExpression> queue;
    private ILogger<BackgroundRunService> _logger;
    private readonly IServiceProvider _serviceProvider;
    public BackgroundRunService(ILogger<BackgroundRunService> logger, IServiceProvider serviceProvider)
    {
        _slim = new SemaphoreSlim(1);
        _logger = logger;
        _serviceProvider = serviceProvider;
        queue = new ConcurrentQueue<LambdaExpression>();
    }
    public async Task Execute(CancellationToken cancellationToken)
    {
        try
        {
            await _slim.WaitAsync(cancellationToken);
            if (queue.TryDequeue(out var job))
            {
                using (var scope = _serviceProvider.GetRequiredService<IServiceScopeFactory>().CreateScope())
                {
                    var action = job.Compile();
                    var isTask = action.Method.ReturnType == typeof(Task);
                    var parameters = job.Parameters;
                    var pars = new List<object>();
                    if (parameters.Any())
                    {
                        var type = parameters[0].Type;
                        var param = scope.ServiceProvider.GetRequiredService(type);
                        pars.Add(param);
                    }
                    if (isTask)
                    {
                        await (Task)action.DynamicInvoke(pars.ToArray());
                    }
                    else
                    {
                        action.DynamicInvoke(pars.ToArray());
                    }
                }
            }
        }
        catch (Exception e)
        {
            _logger.LogError(e.ToString());
        }
    }
    public void Transfer<T>(Expression<Func<T, Task>> expression)
    {
        queue.Enqueue(expression);
        _slim.Release();
    }
    public void Transfer(Expression<Action> expression)
    {
        queue.Enqueue(expression);
        _slim.Release();
    }
}

納尼?嫌代碼多看不懂?那咱們一起來剖析下吧。
首先,此類有三個較重要的私有變量,對應的類型分別是SemaphoreSlim, ConcurrentQueue ,IServiceProvider。
其中SemaphoreSlim是為了控制後台作業執行的順序的,在構造函數中初始化了此對象的信號量為1,表示在後台服務的ExecuteAsync方法的循環中每次只能有一個作業執行。
ConcurrentQueue 的對象是用來存儲被轉移到後台服務執行的作業的邏輯,所以使用LambdaExpression作為隊列的類型。
IServiceProvider是為了解決依賴注入的生命周期的。

然後在Execute方法中,第一行代碼如下:

await _slim.WaitAsync(cancellationToken);

作用是等待一個信號量,當沒有可用的信號量時,會阻塞線程的執行,這樣在後台服務的ExecuteAsync方法的死循環就不會一直執行下去,只有獲取到信號量才會繼續執行。
當獲取到信號量后,則說明有新的作業等待執行,所以此時則需要從隊列中讀出要執行的LambdaExpression表達式,創建一個新的Scope后,編譯此表達式樹,判斷返回類型,獲取泛型的具體類型,最後獲取到泛型對應的實例,執行對應的方法。

另外,Transfer方法就是暴露給調用者的方法,用於將表達式樹寫到隊列中,同時釋放信號量。

到此為止,Weshare.TransferJob的實現原理已分析完畢,由於此組件的原理只是將任務轉移到後台進行執行,所以並不是適合對事務有要求的場景。正如本文開頭所假設的場景,TransferJob最適合的場景還是那些和主操作關聯性較低的、失敗或成功並不會影響業務的正常運行。
同時,此組件的定位就是小而美,像延遲執行、定時執行的功能在最初的規劃中其實是有的,後來發現這些功能quartz已經有了,所以沒必要重複造這樣的輪子。
後期會根據使用場景,嘗試加入異常重試機制,以及異常通知回調機制。

最後,不知道有沒有較真的看官想計算下代碼量是否超過120行。
為了證明我不是標題黨,現將此組件進行開源,地址是:
https://github.com/fuluteam/WeShare.TransferJob

橋豆麻袋,筆者辛苦敲的代碼,難道各位看官想白嫖嗎? 點個贊再走唄。點完贊還有力氣的話,如果git上能點個star的話,那也是最好不過的。小生這廂先行謝過。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※教你寫出一流的銷售文案?

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※回頭車貨運收費標準

※別再煩惱如何寫文案,掌握八大原則!

※超省錢租車方案

※產品缺大量曝光嗎?你需要的是一流包裝設計!