前言

microsoft.aspnetcore.concurrencylimiter aspnetcore3.0后增加的,用于传入的请求进行排队处理,避免线程池的不足.
我们日常开发中可能常做的给某web服务器配置连接数以及,请求队列大小,那么今天我们看看如何在通过中间件形式实现一个并发量以及队列长度限制.

queue策略

添加nuget

install-package microsoft.aspnetcore.concurrencylimiter

    public void configureservices(iservicecollection services)
    {
      services.addqueuepolicy(options =>
      {
        //最大并发请求数
        options.maxconcurrentrequests = 2;
        //请求队列长度限制
        options.requestqueuelimit = 1;
      });
      services.addcontrollers();
    }
    public void configure(iapplicationbuilder app, iwebhostenvironment env)
    {
      //添加并发限制中间件
      app.useconcurrencylimiter();
      app.run(async context =>
      {
        task.delay(100).wait(); // 100ms sync-over-async

        await context.response.writeasync("hello world!");
      });
      if (env.isdevelopment())
      {
        app.usedeveloperexceptionpage();
      }

      app.usehttpsredirection();

      app.userouting();

      app.useauthorization();

      app.useendpoints(endpoints =>
      {
        endpoints.mapcontrollers();
      });
    }   

通过上面简单的配置,我们就可以将他引入到我们的代码中,从而做并发量限制,以及队列的长度;那么问题来了,他是怎么实现的呢?

 public static iservicecollection addqueuepolicy(this iservicecollection services, action<queuepolicyoptions> configure)
{
    services.configure(configure);
    services.addsingleton<iqueuepolicy, queuepolicy>();
    return services;
}

queuepolicy采用的是semaphoreslim信号量设计,semaphoreslim、semaphore(信号量)支持并发多线程进入被保护代码,对象在初始化时会指定 最大任务数量,当线程请求访问资源,信号量递减,而当他们释放时,信号量计数又递增。

   /// <summary>
    ///   构造方法(初始化queue策略)
    /// </summary>
    /// <param name="options"></param>
    public queuepolicy(ioptions<queuepolicyoptions> options)
    {
      _maxconcurrentrequests = options.value.maxconcurrentrequests;
      if (_maxconcurrentrequests <= 0)
      {
        throw new argumentexception(nameof(_maxconcurrentrequests), "maxconcurrentrequests must be a positive integer.");
      }

      _requestqueuelimit = options.value.requestqueuelimit;
      if (_requestqueuelimit < 0)
      {
        throw new argumentexception(nameof(_requestqueuelimit), "the requestqueuelimit cannot be a negative number.");
      }
      //使用semaphoreslim来限制任务最大个数
      _serversemaphore = new semaphoreslim(_maxconcurrentrequests);
    }

concurrencylimitermiddleware中间件

    /// <summary>
    /// invokes the logic of the middleware.
    /// </summary>
    /// <param name="context">the <see cref="httpcontext"/>.</param>
    /// <returns>a <see cref="task"/> that completes when the request leaves.</returns>
    public async task invoke(httpcontext context)
    {
      var waitinqueuetask = _queuepolicy.tryenterasync();

      // make sure we only ever call getresult once on the tryenterasync valuetask b/c it resets.
      bool result;

      if (waitinqueuetask.iscompleted)
      {
        concurrencylimitereventsource.log.queueskipped();
        result = waitinqueuetask.result;
      }
      else
      {
        using (concurrencylimitereventsource.log.queuetimer())
        {
          result = await waitinqueuetask;
        }
      }

      if (result)
      {
        try
        {
          await _next(context);
        }
        finally
        {
          _queuepolicy.onexit();
        }
      }
      else
      {
        concurrencylimitereventsource.log.requestrejected();
        concurrencylimiterlog.requestrejectedqueuefull(_logger);
        context.response.statuscode = statuscodes.status503serviceunavailable;
        await _onrejected(context);
      }
    }

每次当我们请求的时候首先会调用_queuepolicy.tryenterasync(),进入该方法后先开启一个私有lock锁,再接着判断总请求量是否≥(请求队列限制的大小+最大并发请求数),如果当前数量超出了,那么我直接抛出,送你个503状态;

 if (result)
 {
     try
     {
       await _next(context);
     }
     finally
    {
      _queuepolicy.onexit();
    }
    }
    else
    {
      concurrencylimitereventsource.log.requestrejected();
      concurrencylimiterlog.requestrejectedqueuefull(_logger);
      context.response.statuscode = statuscodes.status503serviceunavailable;
      await _onrejected(context);
    }

问题来了,我这边如果说还没到你设置的大小呢,我这个请求没有给你服务器造不成压力,那么你给我处理一下吧.

await _serversemaphore.waitasync();异步等待进入信号量,如果没有线程被授予对信号量的访问权限,则进入执行保护代码;否则此线程将在此处等待,直到信号量被释放为止

 lock (_totalrequestslock)
  {
    if (totalrequests >= _requestqueuelimit + _maxconcurrentrequests)
    {
       return false;
    }
      totalrequests++;
    }
    //异步等待进入信号量,如果没有线程被授予对信号量的访问权限,则进入执行保护代码;否则此线程将在此处等待,直到信号量被释放为止
    await _serversemaphore.waitasync();
    return true;
  }

返回成功后那么中间件这边再进行处理,_queuepolicy.onexit();通过该调用进行调用_serversemaphore.release();释放信号灯,再对总请求数递减

stack策略

再来看看另一种方法,栈策略,他是怎么做的呢?一起来看看.再附加上如何使用的代码.

   public void configureservices(iservicecollection services)
    {
      services.addstackpolicy(options =>
      {
        //最大并发请求数
        options.maxconcurrentrequests = 2;
        //请求队列长度限制
        options.requestqueuelimit = 1;
      });
      services.addcontrollers();
    }

通过上面的配置,我们便可以对我们的应用程序执行出相应的策略.下面再来看看他是怎么实现的呢

 public static iservicecollection addstackpolicy(this iservicecollection services, action<queuepolicyoptions> configure)
    {
      services.configure(configure);
      services.addsingleton<iqueuepolicy, stackpolicy>();
      return services;
    }

可以看到这次是通过stackpolicy类做的策略.来一起来看看主要的方法

    /// <summary>
    ///   构造方法(初始化参数)
    /// </summary>
    /// <param name="options"></param>
    public stackpolicy(ioptions<queuepolicyoptions> options)
    {
      //栈分配
      _buffer = new list<resettablebooleancompletionsource>();
      //队列大小
      _maxqueuecapacity = options.value.requestqueuelimit;
      //最大并发请求数
      _maxconcurrentrequests = options.value.maxconcurrentrequests;
      //剩余可用空间
      _freeserverspots = options.value.maxconcurrentrequests;
    }

当我们通过中间件请求调用,_queuepolicy.tryenterasync()时,首先会判断我们是否还有访问请求次数,如果_freeserverspots>0,那么则直接给我们返回true,让中间件直接去执行下一步,如果当前队列=我们设置的队列大小的话,那我们需要取消先前请求;每次取消都是先取消之前的保留后面的请求;

  public valuetask<bool> tryenterasync()
    {
      lock (_bufferlock)
      {
        if (_freeserverspots > 0)
        {
          _freeserverspots--;
          return _truetask;
        }
        // 如果队列满了,取消先前的请求
        if (_queuelength == _maxqueuecapacity)
        {
          _hasreachedcapacity = true;
          _buffer[_head].complete(false);
          _queuelength--;
        }
        var tcs = _cachedresettabletcs ??= new resettablebooleancompletionsource(this);
        _cachedresettabletcs = null;
        if (_hasreachedcapacity || _queuelength < _buffer.count)
        {
          _buffer[_head] = tcs;
        }
        else
        {
          _buffer.add(tcs);
        }
        _queuelength++;
        // increment _head for next time
        _head++;
        if (_head == _maxqueuecapacity)
        {
          _head = 0;
        }
        return tcs.getvaluetask();
      }
    }

当我们请求后调用_queuepolicy.onexit();出栈,再将请求长度递减

  public void onexit()
    {
      lock (_bufferlock)
      {
        if (_queuelength == 0)
        {
          _freeserverspots++;

          if (_freeserverspots > _maxconcurrentrequests)
          {
            _freeserverspots--;
            throw new invalidoperationexception("onexit must only be called once per successful call to tryenterasync");
          }

          return;
        }

        // step backwards and launch a new task
        if (_head == 0)
        {
          _head = _maxqueuecapacity - 1;
        }
        else
        {
          _head--;
        }
        //退出,出栈
        _buffer[_head].complete(true);
        _queuelength--;
      }
    }

总结

基于栈结构的特点,在实际应用中,通常只会对栈执行以下两种操作:

  • 向栈中添加元素,此过程被称为”进栈”(入栈或压栈);
  • 从栈中提取出指定元素,此过程被称为”出栈”(或弹栈);

队列存储结构的实现有以下两种方式:

  • 顺序队列:在顺序表的基础上实现的队列结构;
  • 链队列:在链表的基础上实现的队列结构;

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持www.887551.com。