受 f# 中的管道运算符和 c# 中的 linq 语法,管道式编程为 c# 提供了更加灵活性的功能性编程。通过使用 扩展函数 可以将多个功能连接起来构建成一个管道。

前言

在 c# 编程中,管道式编程(pipeline style programming)其实存在已久,最明显的就是我们经常使用的 linq。在进入 dotnetcore 世界后, 这种编程方式就更加明显,比如网关配置。通过使用这种编程方式,大大提高了代码的可维护性,优化了的业务的组合方式。

管道式编程具有如下优点:

  • 创建一个流畅的编程范例,将语句转换为表达式并将它们链接在一起
  • 用线性排序替换代码嵌套
  • 消除变量声明 – 甚至不需要 var
  • 提供某种形式的可变不变性和范围隔离
  • 将结构代码编写成具有明确职责的小 lambda 表达式
  • ……

初探管道式编程 – 基础实现

在该示例中,我们通过构建一个 double->int->string 的类型转换的管道来将一个目标数据最终转化为一个字符串。

  • 首先,我们需要定义一个功能接口,用于约束每个功能函数的具体实现,示例代码如下所示:
public interface ipipelinestep<input, output>
{
    output process(input input);
}
  • 然后,我们定义两个类型转换的功能类,继承并实现上述接口,示例代码如下所示:
public class doubletointsetp : ipipelinestep<double, int>
{
    public int process(double input)
    {
        return convert.toint32(input);
    }
}

public class inttostringstep : ipipelinestep<int, string>
{
    public string process(int input)
    {
        return input.tostring();
    }
}
  • 接着,定义一个扩展函数,用于连接上述的各个功能函数,示例代码如下所示:
public static class pipelinestepextensions
{
    public static output step<input, output>(this input input, ipipelinestep<input, output> step)
    {
        return step.process(input);
    }
}
  • 最后,我们就可以构建一个完整的管道,用于我们的数据类型转换,示例代码如下所示:
class program
{
    static void main(string[] args)
    {
        double input = 1024.1024;

        // 构建并使用管道
        string result = input.step(new doubletointsetp())
                             .step(new inttostringstep());
        console.writeline(result);
    }
}

此时,我们成功将一个 double 类型的数据转化为了 string 类型。通过介绍上述示例,我们可以简单将管道式编程概括为:定义功能接口 -> 实现功能函数 -> 组装功能函数

再探管道式编程 – 支持 di

上述代码在一般的情况下是可以正常运行的,但是如果希望以 依赖注入(di) 的方式注入的话,我们就需要将我们的管道组装进行封装,方便作为一个统一的服务注入到系统中。

  • 首先,我们需要定义一个抽线类,用于管道组装的抽象封装,示例代码如下所示:
public abstract class pipeline<input,output>
{
    public func<input, output> pipelinesteps { get; protected set; }

    public output process(input input)
    {
        return pipelinesteps(input);
    }
}
  • 然后,我们就可以创建一个继承上述抽象类的具体管道组装类,示例代码如下所示:
public class trivalpipeline : pipeline<double, string>
{
    public trivalpipeline()
    {
        pipelinesteps = input => input.step(new doubletointsetp())
                                      .step(new inttostringstep());
    }
}
  • 最后,我们可以将 trivalpipeline 这个具体的管道注入到我们的系统中。同样的,我们也可以直接使用,示例代码如下所示:
class program
{
    static void main(string[] args)
    {
        double input = 1024.1024;

        // 需要安装 microsoft.extensions.dependencyinjection
        var services = new servicecollection();
        services.addtransient<trivalpipeline>();
        var  provider = services.buildserviceprovider();

        var trival = provider.getservice<trivalpipeline>();
        string result = trival.process(input);
        console.writeline(result);
    }
}

再探管道式编程 – 条件式组装

上述两个示例代码展示的管道组装式不带任何条件限制的, 无论参数是否合法都是这样组装进管道,但是在实际的开发过程中,我们需要对一定的业务模块进行条件性组装,所以这个时候我们就需要完善一下我们的代码。

  • 首先,我们需要修改上面的 pipeline<input,output> 类,使其继承 ipipelinestep<input, output> 接口,示例代码如下所示:
public abstract class pipeline<input, output> : ipipelinestep<input, output>
{
    public func<input, output> pipelinesteps { get; protected set; }

    public output process(input input)
    {
        return pipelinesteps(input);
    }
}
  • 然后,我们定义一个带条件的管道装饰器类,示例代码如下所示:
public class optionalstep<input, output> : ipipelinestep<input, output> where input : output
{
    private readonly ipipelinestep<input, output> _step;
    private readonly func<input, bool> _choice;

    public optionalstep(func<input,bool> choice,ipipelinestep<input,output> step)
    {
        _choice = choice;
        _step = step;
    }

    public output process(input input)
    {
        return _choice(input) ? _step.process(input) : input;
    }
}
  • 接着,我们定义一个新的功能类和支持条件判断的管道包装类,示例代码如下所示:
public class thisstepisoptional : ipipelinestep<double, double>
{
    public double process(double input)
    {
        return input * 10;
    }
}

public class pipelinewithoptionalstep : pipeline<double, double>
{
    public pipelinewithoptionalstep()
    {
        // 当输入参数大于 1024,执行 thisstepisoptional() 功能
        pipelinesteps = input => input.step(new optionalstep<double, double>(i => i > 1024, new thisstepisoptional()));
    }
}
  • 最后,我们可以使用如下方式进行测试:
class program
{
    static void main(string[] args)
    {
        pipelinewithoptionalstep step = new pipelinewithoptionalstep();
        console.writeline(step.process(1024.1024));  // 输出 10241.024
        console.writeline(step.process(520.520));    // 输出 520.520
    }
}

再探管道式编程 – 事件监听

有的时候,我们希望在我们管道中执行的每一步,在开始和结束时,上层模块都能获得相应的事件通知,这个时候,我们就需要需改一下我们的管道包装器,使其支持这个需求。

  • 首先,我们需要实现一个支持事件监听的具体功能类,示例代码代码如下所示:
public class eventstep<input, output> : ipipelinestep<input, output>
{
    public event action<input> oninput;
    public event action<output> onoutput;

    private readonly ipipelinestep<input, output> _innerstep;
    public eventstep(ipipelinestep<input,output> innerstep)
    {
        _innerstep = innerstep;
    }

    public output process(input input)
    {
        oninput?.invoke(input);

        var output = _innerstep.process(input);

        onoutput?.invoke(output);

        return output;
    }
}
  • 然后,我们需要定义一个能够传递事件参数的管道包装器类,示例代码如下所示:
public static class pipelinestepeventextensions
{
    public static output step<input, output>(this input input, ipipelinestep<input, output> step, action<input> inputevent = null, action<output> outputevent = null)
    {
        if (inputevent != null || outputevent != null)
        {
            var eventdecorator = new eventstep<input, output>(step);
            eventdecorator.oninput += inputevent;
            eventdecorator.onoutput += outputevent;

            return eventdecorator.process(input);
        }
        return step.process(input);
    }
}
  • 最后,上层调用就相对简单很多,示例代码如下所示:
public class doublestep : ipipelinestep<int, int>
{
    public int process(int input)
    {
        return input * input;
    }
}

class program
{
    static void main(string[] args)
    {
        var input = 10;
        console.writeline($"input value:{input}[{input.gettype()}]");
        var pipeline = new eventstep<int, int>(new doublestep());
        pipeline.oninput += i => console.writeline($"input value:{i}");
        pipeline.onoutput += o => console.writeline($"output value:{o}");
        var output = pipeline.process(input);
        console.writeline($"output value: {output} [{output.gettype()}]");
    }
}

输出结果如下图所示:

再探管道式编程 – 可迭代执行

可迭代执行是指当我们的管道中注册了多个功能模块时,不是一次性执行完所以的功能模块,而是每次只执行一个功能,后续功能会在下次执行该管道对应的代码块时接着执行,直到该管道中所有的功能模块执行完毕为止。该特性主要是通过 yield return 来实现。

  • 首先,我们需要实现一个该特性的管道包装器类,示例代码如下所示:
public class loopstep<input, output> : ipipelinestep<ienumerable<input>, ienumerable<output>>
{
    private readonly ipipelinestep<input, output> _internalstep;
    public loopstep(ipipelinestep<input,output> internalstep)
    {
        _internalstep = internalstep;
    }

    public ienumerable<output> process(ienumerable<input> input)
    {
        foreach (input item in input)
        {
            yield return _internalstep.process(item);
        }

        //等价于下述代码段
        //return from input item in input
        //       select _internalstep.process(item);
    }
}
  • 然后,定义一个支持上述类型的功能组装的扩展方法,示例代码如下所示:
public static class pipelinesteploopextensions
{
    public static ienumerable<output> step<input, output>(this ienumerable<input> input, ipipelinestep<input, output> step)
    {
        loopstep<input, output> loopdecorator = new loopstep<input, output>(step);
        return loopdecorator.process(input);
    }
}
  • 最后,上层调用如下所示:
class program
{
    static void main(string[] args)
    {
        var list = enumerable.range(0, 10);
        foreach (var item in list.step(new doublestep()))
        {
            console.writeline(item);
        }
    }
}

总结

通过上述 5 部分示例代码的不断改进,最终我们实现了一个支持依赖注入和条件式组装的管道,了解了如何进行管道式编程。掌握管道式编程可以让我们对整个项目的架构和代码质量都有很大帮助,感兴趣的朋友可以自行查阅相关资料进行深入研究。

相关参考

  • walkthrough: creating a dataflow pipeline
  • pipelinestyle
  • an alternative approach to pipelines