一、背景

dotnetty 本身是一个优秀的网络通讯框架,不过它是基于异步事件驱动来处理另一端的响应,需要在单独的 handler 去处理相应的返回结果。而在我们的实际使用当中,尤其是 客户端程序 基本都是 请求-响应 模型,在发送了数据时候需要等待服务器的响应才能进行下一步操作,如果服务器返回的是错误信息,则需要进行特殊的处理。

类似于下面这种方式:

public async void button1_click()
{
    var result = await dotnettyclient.senddata("hello");
    
    if(result == "error")
    {
        throw new exception("服务器返回错误!");
    }
    
    console.writeline($"hello {result}");
}

二、解决思路

参阅了大部分资料之后,发现在 java 的 netty 当中可以使用 future / promise 来实现,那么 c# 是否有类似的组件呢?答案是有的,他们对应的就是 tasktaskcompletionsource,前者是给调用者的任务,而后者则是用于设置响应任务的结果。

那么我们就可以这么来处理,当客户端发送请求时,附带唯一的一个请求 id,并将 taskcompletionsource 放在一个请求队列当中,之后返回一个 task。当客户端接收到服务器响应的时候,通过 taskcompletionsource 设置之前那个 task 的结果,这样我们接收到响应之后,就会从之前 await 的地方继续执行。

这里我自己的需求仅仅是类似于 同步阻塞式 的操作,所以我直接使用一个队列来处理,并没有用唯一的请求 id 来表示不同的请求,因为我可以 保证在同一时间内有且仅有一个客户端请求被发起

三、代码实现

实现起来超级简单,只需要在发起请求的时候,创建一个 taskcompletionsource<tresponse> 对象。这个泛型参数指的是你想要的返回值类型,这里我以 tresponse 代替,下面的 demo 我会用 string 类型进行演示。

创建好一个 taskcompletionsource<tresponse> 之后,在发送方法里面,我们可以将其对象放在一个先进先出的队列当中,然后将其 task 属性作为发送方法的返回值。

我们再来到处理服务器响应的 handler 当中,从队列里面拿去之前存放的 taskcompletionsource<tresponse> 对象,调用其 setresult() 方法,将具体响应进行设置。

通过以上的操作,我们在发送数据的时候,就可以使用 await 关键字等待服务端的响应,但不会阻塞线程,当客户端接收到服务端响应时,就会恢复到之前 await 的位置继续执行。

数据发送方法:

public static class dotnettyclient
{
    static dotnettyclient()
    {
        requestqueue = new queue<taskcompletionsource<string>>();
    }
    
    public static queue<taskcompletionsource<string>> requestqueue { get; set; }
    
    public static async task<string> senddata(string data)
    {
        var resulttask = new taskcompletionsource<string>();
        
        var buffer = new unpooled.buffer();
        buffer.writebytes(encoding.utf8.getbytes(data));
        await _clientchannel.writeandflushasync(buffer);
        
        requestqueue.enqueue(resulttask);
        
        return await resulttask.task;
    }
}

服务端响应处理:

public class protocolhandler : channelhandleradapter
{
    public override void channelread(ichannelhandlercontext context, object message)
    {
        if(message is string response)
        {
            if(!dotnettyclient.requestqueue.trydequeue(out taskcompletionsource<string> result)) return;
            result.setresult(response);
        }
    }
}

这里我就不再编写解析器,主要说明一下代码的思路,下面在使用的时候就如同第一节说的一样,直接使用 await 关键字等待响应结果即可。

四、缺陷

在这里我并没有展示多个异步请求的情况,如果是用户同时发起多个请求的时候,你可以通过数据的唯一 id 来标识每一个请求,这样在接收服务端响应的时候就能处理这种情况了。

五、参考资料

  • dotnetty github issues
  • dotblogs – 《[c#] 將事件驅動 (event-driven) 的模式改為可等候的方法 (awaitable method)》
  • dotblogs -《[c#.net][tpl] 利用 taskcompletionsource 將 eap 轉換成 tap》
  • hk-zhang -《taskcompletionsource的使用场景》