背景

事件发布/订阅是一种非常强大的模式,它可以帮助业务组件间实现完全解耦,不同的业务组件只依赖事件,只关注哪些事件是需要自己处理的,而不用关注谁来处理自己发布事件,事件追溯(event sourcing)也是基于事件发布/订阅的。在微服务架构中,事件发布/订阅有非常多的应用场景。今天我给大家分享一个基于asp.net core的单体程序使用事件发布/订阅的例子,针对分布式项目的事件发布/订阅比较复杂,难点是事务处理,后续我会另写一篇博文来演示。

案例说明

当前我们有一个基于asp.net core的电子商务系统,在项目的初期,业务非常简单,只有一个购物车模块和一个订单模块,所有的代码都放在一个项目中。

整个项目使用了一个简单的三层架构。

这里当用户提交购物车的时候,程序会在shoppingcartmanager类的submitshoppingcart方法中执行3个操作

  • 修改当前购物车的状态为完成
  • 根据购物车中的物品创建一个新订单
  • 给用户发邮件

代码如下:

 public void submitshoppingcart(string shoppingcartid)
 {
 var shoppingcart = _unitofwork.shoppingcartrepository
 .getshoppingcart(shoppingcartid);

 _unitofwork.shoppingcartrepository
 .submitshoppingcart(shoppingcartid);

 _unitofwork.orderrepository
  .creatorder(new createorderdto
  {
  items = shoppingcart.items
   .select(p => new neworderitemdto
    {
    itemid = p.itemid,
    name = p.name,
    price = p.price
   }).tolist()
  });

 //这里为了简化代码,我用命令行表示发送邮件的逻辑
 console.writeline("confirm email sent.");

 _unitofwork.save();
 }

根据solid设计原则中的单一责任原则,如果一个类承担的职责过多,就等于把这些职责耦合在一起了。这里生成订单和发送邮件都不应该是当前submitshoppingcart需要负责的,所以我们需要它们从这个方法中移出去,使用的方法就是事件订阅/发布。

新的架构图

以下是使用事件发布/订阅之后的系统架构图。

  • 这里我们会创建一个购物车提交事件shoppingcartsubmittedevent。
  • 当站点启动的时候,我们会在一个名为eventhandlercontainer的类中注册订阅shoppingcartsubmittedevent事件的2个处理类createorderhandler和confirmemailsenthandler。
  • 在submitshoppingcart方法中,我们会做2件事情:
          更改当前购物车的状态。
          发布shoppingcartsubmittedevent事件。
  • createorderhandler事件处理器会调用ordermanager类中的创建订单方法。
  • confirmemailsenthandler事件处理器会负责发送邮件。

好的,下面让我们来一步一步实现以上描述的代码。

添加事件基类

这里我们首先定义一个事件基类,其中暂时只添加了一个属性occuredon,它表示了当前事件的触发时间。

 public class eventbase
 {
 public eventbase()
 {
  occuredon = datetime.now;
 }

 protected datetime occuredon
 {
  get;
  set;
 }
 }

定义购物车提交事件

接下来我们就需要创建购物车提交事件类shoppingcartsubmittedevent, 它继承自eventbase, 并提供了一个购物项集合

 public class shoppingcartsubmittedevent : eventbase
 {
 public shoppingcartsubmittedevent()
 {
  items = new list<shoppingcartsubmitteditem>();
 }

 public list<shoppingcartsubmitteditem> items { get; set; }
 }

 public class shoppingcartsubmitteditem
 {
 public string itemid { get; set; }

 public string name { get; set; }

 public decimal price { get; set; }

 }

定义事件处理器接口

为了添加事件处理器,我们首先需要定义一个泛型接口类ieventhandler

 public interface ieventhandler<t> where t : eventbase
 {
 void run(t obj);

 task runasync(t obj);
 }

这个泛型接口类的是泛型类型必须继承自eventbase类。接口提供了2个方法run和runasync。 它们定义了该接口的实现类必须实现同一个处理逻辑的同步和异步方法。

为购物车提交事件编写事件处理器

有了事件处理器接口,接下来我们就可以开始为购物车提交事件添加事件处理器了。这里我们为了实现前面定义的逻辑,我们需要创建2个处理器createorderhandler和confirmemailsenthandler

createorderhandler.cs

 public class createorderhandler : ieventhandler<shoppingcartsubmittedevent>
 {
 private iordermanager _ordermanager = null;


 public createorderhandler(iordermanager ordermanager)
 {
  _ordermanager = ordermanager;
 }

 public void run(shoppingcartsubmittedevent obj)
 {
  _ordermanager.createneworder(new models.dtos.createorderdto
  {
  items = obj.items.select(p => new models.dtos.neworderitemdto
  {
   itemid = p.itemid,
   name = p.name,
   price = p.price
  }).tolist()
  });
 }

 public task runasync(shoppingcartsubmittedevent obj)
 {
  return task.run(() =>
  {
  run(obj);
  });
 }
 }

代码解释:

  • 在createorderhandler的构造函数中,我们注入了iordermanager接口对象,createneworder负责最终创建订单的工作
  • 这里为了简化代码,我直接使用了task.run,并在其中调用了同步方法实现

confirmemailsenthandler.cs

 public class confirmemailsenthandler : ieventhandler<shoppingcartsubmittedevent>
 {
 public void run(shoppingcartsubmittedevent obj)
 {
  console.writeline("confirm email sent.");
 }

 public task runasync(shoppingcartsubmittedevent obj)
 {
  return task.run(() =>
  {
  console.writeline("confirm email sent.");
  });
 }
 }

代码解释:

  • 这个处理类非常简单,为了简化代码,我仅输出了一行文本来表示实际需要运行的代码。

为ordermanager类添加创建订单方法

iordermanager.cs

 public interface iordermanager
 {
  string createneworder(createorderdto dto);
 }

ordermanager.cs

 public class ordermanager : iordermanager
 {
  private iorderrepository _orderrepository;

  public ordermanager(iorderrepository orderrepository)
  {
   _orderrepository = orderrepository;
  }

  public string createneworder(createorderdto dto)
  {
   var orderid = _orderrepository.creatorder(dto);

   console.writeline($"one order created: {jsonconvert.serializeobject(dto)}");

   return orderid;
  }
 }

创建eventhandlercontainer

下面我们来编写最核心的事件处理器容器。在这里我们的事件处理器容器完成了3个功能

  • 订阅事件(subscribe event)
  • 取消订阅事件(unsubscribe event)
  • 发布事件(publish event)
 public class eventhandlercontainer
 {
  private iserviceprovider _serviceprovider = null;
  private static dictionary<string, list<type>> _mappings = new dictionary<string, list<type>>();

  public eventhandlercontainer(iserviceprovider serviceprovider)
  {
   _serviceprovider = serviceprovider;
  }

  public static void subscribe<t, thandler>()
   where t : eventbase
   where thandler : ieventhandler<t>
  {
   var name = typeof(t).name;

   if (!_mappings.containskey(name))
   {
    _mappings.add(name, new list<type> { });
   }

   _mappings[name].add(typeof(thandler));
  }

  public static void unsubscribe<t, thandler>()
   where t : eventbase
   where thandler : ieventhandler<t>
  {
   var name = typeof(t).name;
   _mappings[name].remove(typeof(thandler));

   if (_mappings[name].count == 0)
   {
    _mappings.remove(name);
   }
  }

  public void publish<t>(t o) where t : eventbase
  {
   var name = typeof(t).name;

   if (_mappings.containskey(name))
   {
    foreach (var handler in _mappings[name])
    {
     var service = (ieventhandler<t>)_serviceprovider.getservice(handler);

     service.run(o);
    }
   }
  }

  public async task publishasync<t>(t o) where t : eventbase
  {
   var name = typeof(t).name;

   if (_mappings.containskey(name))
   {
    foreach (var handler in _mappings[name])
    {
     var service = (ieventhandler<t>)_serviceprovider.getservice(handler);

     await service.runasync(o);
    }
   }
  }
 }

代码解释:

  • 这里我没有直接订阅事件处理器的实例,而且订阅了事件处理器的类型
  • 多个事件处理器可以订阅同一个事件
  • eventhandlercontainer的构造函数中,我们注入了一个iserviceprovider,我们可以使用它来获得对应事件处理器的实例。

在程序启动时,注册事件订阅

现在我们来startup.cs的configureservices方法,这里我们需要进行服务注册,并完成事件订阅。

 public void configureservices(iservicecollection services)
 {
  services.addmvc().setcompatibilityversion(compatibilityversion.version_2_2);

  services.addscoped<iordermanager, ordermanager>();
  services.addscoped<ishoppingcartmanager, shoppingcartmanager>();
  services.addscoped<ishoppingcartrepository, shoppingcartrepository>();
  services.addscoped<iorderrepository, orderrepository>();
  services.addscoped<iunitofwork, unitofwork>();
  services.addscoped<createorderhandler>();
  services.addscoped<confirmemailsenthandler>();
  services.addscoped<eventhandlercontainer>();


  eventhandlercontainer.subscribe<shoppingcartsubmittedevent, createorderhandler>();
  eventhandlercontainer.subscribe<shoppingcartsubmittedevent, confirmemailsenthandler>();
 }

注意:这里保证一个api请求中的所有数据库操作在一个事务里,这里我们使用scoped作用域。这样我们就可以在调用工作单元iunitofwork接口的save代码中启用事务。

修改shoppingcartmanager

最后我们来修改shoppingcartmanager, 改用发布事件的方式来完成后续创建订单和发送邮件的功能。

 public void submitshoppingcart(string shoppingcartid)
 {
  var shoppingcart = _unitofwork.shoppingcartrepository
   .getshoppingcart(shoppingcartid);

  _unitofwork.shoppingcartrepository
   .submitshoppingcart(shoppingcartid);


  _container.publish(new shoppingcartsubmittedevent()
  {
   items = shoppingcart
     .items
     .select(p => new shoppingcartsubmitteditem
     {
      itemid = p.itemid,
      name = p.name,
      price = p.price
     })
     .tolist()
  });
  
  _unitofwork.save();
 }

这样shoppingcartmanager就只需要关注购物车状态的变更,而不需要关注发送确认邮件和创建订单了。

最终效果

现在让我们启动项目,

首先我们使用[post] /api/shoppingcarts来添加一个新的购物车, 这个api会返回当前购物车的id

 

然后我们使用[put] /api/shoppingcarts/shoppingcart_636872897140555966来模拟提交购物车,程序返回操作成功

 

最后我们查看一下控制台的输出日志

 

2个事件处理器都被正确触发了。

总结

至此我们的代码重构完成。 最终的代码中,submitshoppingcart方法,仅负责修改购物车状态并发布一个购物车提交的事件。生成订单和发送邮件的功能代码都被移动到了独立的处理类中。

这样的方式的好处不仅仅是完成了代码的解耦,针对后续的扩展也非常有利,想想一下,如果在未来当前项目需求追加这样一个功能,当提交购物车的时候,除了要发送确认邮件,还要发送手机短信。这时候你根本不需要去修改shoppingcartmanager类,你只需要针对shoppingcartsubmittedevent在再添加一个新的事件处理器即可,这也满足的solid的开闭原则。

项目源代码:https://github.com/lamondlu/eventhandlerinsingleapplication

好了,以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对www.887551.com的支持。