grpc由于需要用工具生成代码实现,可开发性不是很高,在扩展这方面不是很友好

最近研究了下,进行了扩展,不需要额外的工具生成,直接使用默认grpc.tools生成的代理类即可

相关源码在文章底部

客户端目标:

  • 能配置consul地址和服务名称,在调用client时能正确请求到真实的服务地址
  • 在调用方法时,能使用polly策略重试,超时,和熔断

查看grpc生成的代码,可以看到client实例化有有两个构造方法,以测试为例

      /// <summary>creates a new client for greeter</summary>
      /// <param name="channel">the channel to use to make remote calls.</param>
      public greeterclient(grpc::channelbase channel) : base(channel)
      {
      }
      /// <summary>creates a new client for greeter that uses a custom <c>callinvoker</c>.</summary>
      /// <param name="callinvoker">the callinvoker to use to make remote calls.</param>
      public greeterclient(grpc::callinvoker callinvoker) : base(callinvoker)
      {
      }

1.可传入一个channelbase实例化

2.可传入一个callinvoker实例化

channel可实现为

channel createchannel(string address)
        {
            var channeloptions = new list<channeloption>()
                    {
                        new channeloption(channeloptions.maxreceivemessagelength, int.maxvalue),
                        new channeloption(channeloptions.maxsendmessagelength, int.maxvalue),
                    };
            var channel = new channel(address, channelcredentials.insecure, channeloptions);
            return channel;
        }

在这里,可以从consul地址按服务名获取真实的服务地址,生成channel

 

callinvoker为一个抽象类,若要对方法执行过程干预,则需要重写这个方法,大致实现为

public class grpccallinvoker : callinvoker
    {
        public readonly channel channel;
        public grpccallinvoker(channel channel)
        {
            channel = grpcpreconditions.checknotnull(channel); 
        }

        public override tresponse blockingunarycall<trequest, tresponse>(method<trequest, tresponse> method, string host, calloptions options, trequest request)
        {
            return calls.blockingunarycall(createcall(method, host, options), request);
        }

        public override asyncunarycall<tresponse> asyncunarycall<trequest, tresponse>(method<trequest, tresponse> method, string host, calloptions options, trequest request)
        {
            return calls.asyncunarycall(createcall(method, host, options), request);
        }

        public override asyncserverstreamingcall<tresponse> asyncserverstreamingcall<trequest, tresponse>(method<trequest, tresponse> method, string host, calloptions options, trequest request)
        {
            return calls.asyncserverstreamingcall(createcall(method, host, options), request);
        }

        public override asyncclientstreamingcall<trequest, tresponse> asyncclientstreamingcall<trequest, tresponse>(method<trequest, tresponse> method, string host, calloptions options)
        {
            return calls.asyncclientstreamingcall(createcall(method, host, options));
        }

        public override asyncduplexstreamingcall<trequest, tresponse> asyncduplexstreamingcall<trequest, tresponse>(method<trequest, tresponse> method, string host, calloptions options)
        {
            return calls.asyncduplexstreamingcall(createcall(method, host, options));
        }

        protected virtual callinvocationdetails<trequest, tresponse> createcall<trequest, tresponse>(method<trequest, tresponse> method, string host, calloptions options)
            where trequest : class 
            where tresponse : class
        {
            return new callinvocationdetails<trequest, tresponse>(channel, method, host, options);
        }
    }

这里可以传入上面创建的channel,在createcall方法里,则可以对调用方法进行控制

完整实现为

public class grpccallinvoker : callinvoker
    {
        grpcclientoptions _options;
        igrpcconnect _grpcconnect;
        public grpccallinvoker(igrpcconnect grpcconnect)
        {
            _options = grpcconnect.getoptions();
            _grpcconnect = grpcconnect;
        }

        public override tresponse blockingunarycall<trequest, tresponse>(method<trequest, tresponse> method, string host, calloptions options, trequest request)
        {
            return calls.blockingunarycall(createcall(method, host, options), request);
        }

        public override asyncunarycall<tresponse> asyncunarycall<trequest, tresponse>(method<trequest, tresponse> method, string host, calloptions options, trequest request)
        {
            return calls.asyncunarycall(createcall(method, host, options), request);
        }

        public override asyncserverstreamingcall<tresponse> asyncserverstreamingcall<trequest, tresponse>(method<trequest, tresponse> method, string host, calloptions options, trequest request)
        {
            return calls.asyncserverstreamingcall(createcall(method, host, options), request);
        }

        public override asyncclientstreamingcall<trequest, tresponse> asyncclientstreamingcall<trequest, tresponse>(method<trequest, tresponse> method, string host, calloptions options)
        {
            return calls.asyncclientstreamingcall(createcall(method, host, options));
        }

        public override asyncduplexstreamingcall<trequest, tresponse> asyncduplexstreamingcall<trequest, tresponse>(method<trequest, tresponse> method, string host, calloptions options)
        {
            return calls.asyncduplexstreamingcall(createcall(method, host, options));
        }

        protected virtual callinvocationdetails<trequest, tresponse> createcall<trequest, tresponse>(method<trequest, tresponse> method, string host, calloptions options)
            where trequest : class
            where tresponse : class
        {
            var methodname = $"{method.servicename}.{method.name}";
            var key = methodname.substring(methodname.indexof(".") + 1).tolower();
            var a = _options.methodpolicies.trygetvalue(key, out pollyattribute methodpollyattr);
            if (!a)
            {
                _options.methodpolicies.trygetvalue("", out methodpollyattr);
            }
            calloptions options2;
            //重写header
            if (options.headers != null)
            {
                options2 = options;
            }
            else
            {
                options2 = new calloptions(_grpcconnect.getmetadata(), options.deadline, options.cancellationtoken);
            }

            var pollydata = pollyextension.invoke(methodpollyattr, () =>
            {
                var callres = new callinvocationdetails<trequest, tresponse>(_grpcconnect.getchannel(), method, host, options2);
                return new pollyextension.pollydata<callinvocationdetails<trequest, tresponse>>() { data = callres };
            }, $"{methodname}");
            var response = pollydata.data;
            if (!string.isnullorempty(pollydata.error))
            {
                throw new exception(pollydata.error);
            }
            return response;
            //return new callinvocationdetails<trequest, tresponse>(channel.invoke(), method, host, options2);
        }
    }

 

其中传入了pollyattribute,由pollyextension.invoke来完成polly策略的实现,具体代码可在源码里找到

从上面代码可以看到,callinvoker里可以传入了igrpcconnect,由方法igrpcconnect.getchannel()获取channel

client实例化

.net framework实现为

     public t getclient<t>()
        {
            var a = instancecache.trygetvalue(typeof(t), out object instance);
            if (!a)
            {
                var grpccallinvoker = new grpccallinvoker(this);
                instance = system.activator.createinstance(typeof(t), grpccallinvoker);
                instancecache.tryadd(typeof(t), instance);
            }
            return (t)instance;
        }

core则简单点,直接注入实现

var client = provider.getservice<greeter.greeterclient>();

服务端注册

和其它服务注册一样,填入正确的服务地址和名称就行了,但是在check里得改改,grpc的健康检查参数是不同的,并且在consul客户端里没有这个参数,得自已写

以下代码是我封装过的,可查看源码

public void configure(iapplicationbuilder app, iwebhostenvironment env)
        {
            if (env.isdevelopment())
            {
                app.usedeveloperexceptionpage();
            }

            app.userouting();

            app.useendpoints(endpoints =>
            {
                endpoints.mapgrpcservice<greeterservice>();
                endpoints.mapgrpcservice<healthcheckservice>();
                endpoints.mapget("/", async context =>
                {
                    await context.response.writeasync("communication with grpc endpoints must be made through a grpc client. to learn how to create a client, visit: https://go.microsoft.com/fwlink/?linkid=2086909");
                });
            });

            //注册服务
            var consulclient = new crl.core.consulclient.consul("http://localhost:8500");
            var info = new crl.core.consulclient.serviceregistrationinfo
            {
                address = "127.0.0.1",
                name = "grpcserver",
                id = "grpcserver1",
                port = 50001,
                tags = new[] { "v1" },
                check = new crl.core.consulclient.checkregistrationinfo()
                {
                    grpc = "127.0.0.1:50001",
                    interval = "10s",
                    grpcusetls = false,
                    deregistercriticalserviceafter = "90m"
                }
            };
            consulclient.deregisterservice(info.id);
            var a = consulclient.registerservice(info);

        }

 客户端完整封装代码为

core扩展方法,设置grpcclientoptions来配置consul地址和polly策略,直接注入了client类型

同时添加了统一header传递,使整个服务都能用一个头发送请求,不用再在方法后面跟参数

public static class grpcextensions
    {
        public static void addgrpcextend(this iservicecollection services, action<grpcclientoptions> setupaction, params assembly[] assemblies)
        {
            services.configure(setupaction);
            services.addsingleton<igrpcconnect, grpcconnect>();
            services.addscoped<callinvoker, grpccallinvoker>();
            foreach (var assembyle in assemblies)
            {
                var types = assembyle.gettypes();
                foreach (var type in types)
                {
                    if(typeof(clientbase).isassignablefrom(type))
                    {
                        services.addsingleton(type);
                    }
                }
            }
        }
    }
 class program
    {
        static iserviceprovider provider;
        static program()
        {
            var builder = new configurationbuilder();

            var configuration = builder.build();

            var services = new servicecollection();
            services.addsingleton<iconfiguration>(configuration);
            services.addoptions();

            services.addgrpcextend(op =>
            {
                op.host = "127.0.0.1";
                op.port = 50001;
                op.useconsuldiscover("http://localhost:8500", "grpcserver");//使用consul服务发现
                op.addpolicy("greeter.sayhello", new crl.core.remoting.pollyattribute() { retrycount = 3 });//定义方法polly策略
            }, system.reflection.assembly.getexecutingassembly());

            provider = services.buildserviceprovider();
        }


        static void main(string[] args)
        {            
            //设置允许不安全的http2支持
            appcontext.setswitch("system.net.http.socketshttphandler.http2unencryptedsupport", true);

            var grpcconnect = provider.getservice<igrpcconnect>();
            //认证
            //https://www.cnblogs.com/stulzq/p/11897628.html
            var token = "";
            var headers = new metadata { { "authorization", $"bearer {token}" } };
            grpcconnect.setmetadata(headers);


        label1:
            var client = provider.getservice<greeter.greeterclient>();
            var reply = client.sayhello(
                new hellorequest { name = "test" });
            console.writeline("greeter 服务返回数据: " + reply.message);

            console.readline();
            goto label1;
        }
    }

 

运行服务端,结果为

可以看到服务注册成功,状态检查也成功

运行客户端

客户端正确调用并返回了结果

项目源码:

https://github.com/crl2020/crl.netstandard/tree/master/grpc

除了grpc实现了服务发现和polly策略,本框架对api代理,动态api,rpc也一起实现了

api代理测试
https://github.com/crl2020/crl.netstandard/tree/master/dynamicwebapi/apiproxytest

动态api测试

https://github.com/crl2020/crl.netstandard/tree/master/dynamicwebapi/dynamicwebapiclient

rcp测试

https://github.com/crl2020/crl.netstandard/tree/master/rpc/rpcclient

ps:被上家公司坑了一把,又碰上疫情,最近一年简值了!-_ 求推荐郑州地区工作,加微信hubroxx