Dapr 运用之集成 Asp.Net Core Grpc 调用篇

站长

发表文章数:4335

如何运用领域驱动设计 - 聚合

前置条件: 《Dapr 运用》

改造 ProductService 以提供 gRPC 服务

  1. 从 NuGet 或程序包管理控制台安装 gRPC 服务必须的包

    • Grpc.AspNetCore
  2. 配置 Http/2
    • gRPC 服务需要 Http/2 协议

      public static IHostBuilder CreateHostBuilder(string[] args)
      {
          return Host.CreateDefaultBuilder(args)
              .ConfigureWebHostDefaults(webBuilder =>
              {
                  webBuilder.ConfigureKestrel(options =>
                  {
                      options.Listen(IPAddress.Loopback, 5001, listenOptions =>
                      {
                          listenOptions.Protocols = HttpProtocols.Http2;
                      });
                  });
                  webBuilder.UseStartup<Startup>();
              });
      }
  3. 新建了 product.proto 以定义 GRPC 服务,它需要完成的内容是返回所有产品集合,当然目前产品内容只有一个 ID

    • 定义产品 proto

      syntax = "proto3";
      
      package productlist.v1;
      
      option csharp_namespace = "ProductList.V1";
      
      service ProductRPCService{
          rpc GetAllProducts(ProductListRequest) returns(ProductList);
      }
      
      message ProductListRequest{
      
      }
      
      message ProductList {
          repeated Product results = 1;
      }
      
      message Product {
          string ID=1;
      }

      说明

      • 定义产品列表 gRPC 服务,得益于宇宙第一 IDE Visual Studio ,只要添加 Grpc.Tools 包就可以自动生成 gRPC 所需的代码,这里不再需要手动去添加 Grpc.Tools ,官方提供的 Grpc.AspNetCore 中已经集成了
      • 定义了一个服务 ProductRPCService
      • 定义了一个函数 ProductRPCService
      • 定义了一个请求构造 ProductListRequest ,内容为空
      • 定义了一个请求返回构造 ProductList ,使用 repeated 表明返回数据是集合
      • 定义了一个数据集合中的一个对象 Product
    • 添加 ProductListService 文件,内容如下

          public class ProductListService : ProductRPCService.ProductRPCServiceBase
          {
              private readonly ProductContext _productContext;
      
              public ProductListService(ProductContext productContext)
              {
                  _productContext = productContext;
              }
      
              public override async Task<ProductList.V1.ProductList> GetAllProducts(ProductListRequest request, ServerCallContext context)
              {
                  IList<Product> results = await _productContext.Products.ToListAsync();
                  var productList = new ProductList.V1.ProductList();
                  foreach (Product item in results)
                  {
                      productList.Results.Add(new ProductList.V1.Product
                      {
                          ID = item.ProductID.ToString()
                      });
                  }
      
                  return productList;
              }
          }
  4. 在 Startup.cs 修改代码如下

    public void ConfigureServices(IServiceCollection services)
    {
        //启用 gRPC 服务
        services.AddGrpc();
        services.AddTransient<ProductListService>();
        ...
    }

    这里的 services.AddTransient (); 的原因是在 Dapr 中需要使用构造器注入,以完成 GetAllProducts(...) 函数的调用

    public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
    {
        if (env.IsDevelopment())
        {
            app.UseDeveloperExceptionPage();
        }
    
        app.UseRouting();
    
        app.UseEndpoints(endpoints =>
        {
            ...
    
            //添加 gRPC 到路由管道中
            endpoints.MapGrpcService<DaprClientService>();
        });
    }

    这里添加的代码的含义分别是启用 gRPC 服务和添加 gRPC 路由。得益于 ASP.NET Core 中间件的优秀设计,ASP.NET Core 可同时支持 Http 服务。

  5. 添加 daprclient.proto 文件以生成 Dapr Grpc 服务,daprclient.proto 内容如下

    Golang 入门系列(十七)几个常见的并发模型——生产者消费者模型

    syntax = "proto3";
    
    package daprclient;
    
    import "google/protobuf/any.proto";
    import "google/protobuf/empty.proto";
    import "google/protobuf/duration.proto";
    
    option java_outer_classname = "DaprClientProtos";
    option java_package = "io.dapr";
    
    // User Code definitions
    service DaprClient {
    rpc OnInvoke (InvokeEnvelope) returns (google.protobuf.Any) {}
    rpc GetTopicSubscriptions(google.protobuf.Empty) returns (GetTopicSubscriptionsEnvelope) {}
    rpc GetBindingsSubscriptions(google.protobuf.Empty) returns (GetBindingsSubscriptionsEnvelope) {}
    rpc OnBindingEvent(BindingEventEnvelope) returns (BindingResponseEnvelope) {}
    rpc OnTopicEvent(CloudEventEnvelope) returns (google.protobuf.Empty) {}
    }
    
    message CloudEventEnvelope {
    string id = 1;
    string source = 2;
    string type = 3;
    string specVersion = 4;
    string dataContentType = 5;
    string topic = 6;
    google.protobuf.Any data = 7;
    }
    
    message BindingEventEnvelope {
        string name = 1;
        google.protobuf.Any data = 2;
        map<string,string> metadata = 3;
    }
    
    message BindingResponseEnvelope {
    google.protobuf.Any data = 1;
    repeated string to = 2;
    repeated State state = 3;
    string concurrency = 4;
    }
    
    message InvokeEnvelope {
        string method = 1;
        google.protobuf.Any data = 2;
        map<string,string> metadata = 3;
    }
    
    message GetTopicSubscriptionsEnvelope {
    repeated string topics = 1;
    }
    
    message GetBindingsSubscriptionsEnvelope {
    repeated string bindings = 1;
    }
    
    message State {
    string key = 1;
    google.protobuf.Any value = 2;
    string etag = 3;
    map<string,string> metadata = 4;
    StateOptions options = 5;
    }
    
    message StateOptions {
    string concurrency = 1;
    string consistency = 2;
    RetryPolicy retryPolicy = 3;
    }
    
    message RetryPolicy {
    int32 threshold = 1;
    string pattern = 2;
    google.protobuf.Duration interval = 3;
    }

    说明

    • 此文件为官方提供,Dapr 0.3 版本之前提供的已经生成好的代码,现在看源码可以看出已经改为提供 proto 文件了,这里我认为提供 proto 文件比较合理
    • 此文件定义了5个函数,此文主要讲的就是 OnInvoke() 函数
    • OnInvoke() 请求构造为 InvokeEnvelope
      • method 提供调用方法名称
      • data 请求数据
      • metadata 额外数据,此处使用键值对形式体现
  6. 创建 DaprClientService.cs 文件,此文件用于终结点路由,内容为

    public class DaprClientService : DaprClient.DaprClientBase
    {
        private readonly ProductListService _productListService;
    
        /// <summary>
        /// Initializes a new instance of the <see cref="ProductService" /> class.
        /// </summary>
        /// <param name="productListService"></param>
        public DaprClientService(ProductListService productListService)
        {
            _productListService = productListService;
        }
    
        public override async Task<Any> OnInvoke(InvokeEnvelope request, ServerCallContext context)
        {
            switch (request.Method)
            {
                case "GetAllProducts":
                    ProductListRequest productListRequest = ProductListRequest.Parser.ParseFrom(request.Data.Value);
                    ProductList.V1.ProductList productsList = await _productListService.GetAllProducts(productListRequest, context);
                    return Any.Pack(productsList);
            }
    
            return null;
        }
    }

    说明

    • 使用构造器注入已定义好的 ProductListService
    • InvokeEnvelope 中的 Method 用于路由数据
    • 使用 ProductListRequest.Parser.ParseFrom 转换请求构造
    • 使用 Any.Pack() 打包需要返回的数据
  7. 运行 productService

    dapr run --app-id productService --app-port 5001 --protocol grpc dotnet run

小结
至此,ProductService 服务完成。此时 ProductService.Api.csproj Protobuf 内容为

<ItemGroup>
  <Protobuf Include="Protos\daprclient.proto" GrpcServices="Server" />
 <Protobuf Include="Protos\productList.proto" GrpcServices="Server" />
 </ItemGroup>

改造 StorageService 服务以完成 Dapr GRPC 服务调用

  1. 添加 productList.proto 文件,内容同 ProductService 中的 productList.proto

  2. 添加 dapr.proto 文件,此文件也为官方提供,内容为

    syntax = "proto3";
    
    package dapr;
    
    import "google/protobuf/any.proto";
    import "google/protobuf/empty.proto";
    import "google/protobuf/duration.proto";
    
    option java_outer_classname = "DaprProtos";
    option java_package = "io.dapr";
    
    option csharp_namespace = "Dapr.Client.Grpc";
    
    
    // Dapr definitions
    service Dapr {
    rpc PublishEvent(PublishEventEnvelope) returns (google.protobuf.Empty) {}
    rpc InvokeService(InvokeServiceEnvelope) returns (InvokeServiceResponseEnvelope) {}
    rpc InvokeBinding(InvokeBindingEnvelope) returns (google.protobuf.Empty) {}
    rpc GetState(GetStateEnvelope) returns (GetStateResponseEnvelope) {}
    rpc SaveState(SaveStateEnvelope) returns (google.protobuf.Empty) {}
    rpc DeleteState(DeleteStateEnvelope) returns (google.protobuf.Empty) {}
    }
    
    message InvokeServiceResponseEnvelope {
    google.protobuf.Any data = 1;
    map<string,string> metadata = 2;
    }
    
    message DeleteStateEnvelope {
    string key = 1;
    string etag = 2;
    StateOptions options = 3;
    }
    
    message SaveStateEnvelope {
    repeated StateRequest requests = 1;
    }
    
    message GetStateEnvelope {
        string key = 1;
        string consistency = 2;
    }
    
    message GetStateResponseEnvelope {
    google.protobuf.Any data = 1;
    string etag = 2;
    }
    
    message InvokeBindingEnvelope {
    string name = 1;
    google.protobuf.Any data = 2;
    map<string,string> metadata = 3;
    }
    
    message InvokeServiceEnvelope {
    string id = 1;
    string method = 2;
    google.protobuf.Any data = 3;
    map<string,string> metadata = 4;
    }
    
    message PublishEventEnvelope {
        string topic = 1;
        google.protobuf.Any data = 2;
    }
    
    message State {
    string key = 1;
    google.protobuf.Any value = 2;
    string etag = 3;
    map<string,string> metadata = 4;
    StateOptions options = 5;
    }
    
    message StateOptions {
    string concurrency = 1;
    string consistency = 2;
    RetryPolicy retryPolicy = 3;
    }
    
    message RetryPolicy {
    int32 threshold = 1;
    string pattern = 2;
    google.protobuf.Duration interval = 3;
    }
    
    message StateRequest {
    string key = 1;
    google.protobuf.Any value = 2;
    string etag = 3;
    map<string,string> metadata = 4;
    StateRequestOptions options = 5;
    }
    
    message StateRequestOptions {
    string concurrency = 1;
    string consistency = 2;
    StateRetryPolicy retryPolicy = 3;
    }
    
    message StateRetryPolicy {
    int32 threshold = 1;
    string pattern = 2;
    google.protobuf.Duration interval = 3;
    }

    说明

    • 此文件提供6个 GRPC 服务,此文介绍的函数为 InvokeService()
      • 请求构造为 InvokeServiceEnvelope
        • id 请求的服务的 --app-id ,比如 productService
        • method 请求的方法
        • data 请求函数的签名
        • metadata 元数据键值对
  3. 修改 StorageController 中的 InitialStorage() 函数为

    /// <summary>
    /// 初始化仓库.
    /// </summary>
    /// <returns>是否成功.</returns>
    [HttpGet("InitialStorage")]
    public async Task<bool> InitialStorage()
    {
        string defaultPort = Environment.GetEnvironmentVariable("DAPR_GRPC_PORT") ?? "5001";
    
        // Set correct switch to make insecure gRPC service calls. This switch must be set before creating the GrpcChannel.
        AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
    
        // Create Client
        string daprUri = $"http://127.0.0.1:{defaultPort}";
        GrpcChannel channel = GrpcChannel.ForAddress(daprUri);
        var client = new Dapr.Client.Grpc.Dapr.DaprClient(channel);
    
        InvokeServiceResponseEnvelope result = await client.InvokeServiceAsync(new InvokeServiceEnvelope
        {
            Method = "GetAllProducts",
            Id = "productService",
            Data = Any.Pack(new ProductListRequest())
        });
        ProductList.V1.ProductList productResult = ProductList.V1.ProductList.Parser.ParseFrom(result.Data.Value);
    
        var random = new Random();
    
        foreach (Product item in productResult.Results)
        {
            _storageContext.Storage.Add(new Storage
            {
                ProductID = Guid.Parse(item.ID),
                Amount = random.Next(1, 1000)
            });
        }
    
        await _storageContext.SaveChangesAsync();
        return true;
    }
  4. 启动 StorageService

    dapr run --app-id storageService --app-port 5003 dotnet run
  5. 使用 Postman 请求 StorageService 的 InitialStorage
    Dapr 运用之集成 Asp.Net Core Grpc 调用篇
  6. 使用 MySql Workbench 查看结果
    Dapr 运用之集成 Asp.Net Core Grpc 调用篇

小结
至此,以 Dapr 框架使用 GRPC 客户端在 StorageService 中完成了对 ProductService 服务的调用。

源码地址

flask之分析线程和协程

未经允许不得转载作者:站长, 转载或复制请以 超链接形式 并注明出处 xss云之家,资源网,娱乐网,网络技术资源分享平台
原文地址:《Dapr 运用之集成 Asp.Net Core Grpc 调用篇》 发布于2019-12-18

分享到:
赞(0) 打赏 生成海报

长按图片转发给朋友

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏

投稿赚钱
2020年在家赚取零花钱
切换注册

登录

忘记密码 ?

您也可以使用第三方帐号快捷登录

Q Q 登 录
微 博 登 录
切换登录

注册