目录
  • 简单使用spring cloud stream 构建基于rocketmq的生产者和消费者
  • stream其他特性

spring cloud stream对spring cloud体系中的mq进⾏了很好的上层抽象,可以让我们与具体消息中间件解耦合,屏蔽掉了底层具体mq消息中间件的细节差异,就像hibernate屏蔽掉了具体数据库(mysql/oracle⼀样)。如此⼀来,我们学习、开发、维护mq都会变得轻松。⽬前spring cloud stream原生⽀持rabbitmq和kafka,阿里在这个基础上提供了rocketmq的支持

简单使用spring cloud stream 构建基于rocketmq的生产者和消费者

生产者

pom文件中加入依赖

<dependencies>
        <dependency>
            <groupid>org.springframework.cloud</groupid>
            <artifactid>spring-cloud-starter-netflix-eureka-client</artifactid>
        </dependency>
        <dependency>
            <groupid>com.alibaba.cloud</groupid>
            <artifactid>spring-cloud-starter-stream-rocketmq</artifactid>
            <version>2.1.0.release</version>
        </dependency>

    </dependencies>

配置文件中增加关于spring cloud stream binder和bindings的配置

spring:
  application:
    name: zhao-cloud-stream-producer
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
        bindings:
          output:
            producer:
              group: test
              sync: true
      bindings:
        output:
          destination: stream-test-topic
          content-type: text/plain # 内容格式。这里使用 json

其中destination代表生产的数据发送到的topic 然后定义一个channel用于数据发送

import org.springframework.cloud.stream.annotation.output;
import org.springframework.messaging.messagechannel;

public interface testchannel {
    @output("output")
    messagechannel output();
}

最后构造数据发送的接口

@controller
public class sendmessagecontroller {
    @resource
    private testchannel testchannel;

    @responsebody
    @requestmapping(value = "send", method = requestmethod.get)
    public string sendmessage() {
        string messageid = uuid.randomuuid().tostring();
        message<string> message = messagebuilder
                .withpayload("this is a test:" + messageid)
                .setheader(messageconst.property_tags, "test")
                .build();
        try {
            testchannel.output().send(message);
            return messageid + "发送成功";
        } catch (exception e) {
            return messageid + "发送失败,原因:" + e.getmessage();
        }
    }
}

消费者

消费者的pom引入与生产者相同,在此不再赘述,配置时需要将stream的output修改为input并修改对应属性

spring:
  application:
    name: zhao-cloud-stream-consumer
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
        bindings:
          input:
            consumer:
              tags: test
      bindings:
        input:
          destination: stream-test-topic
          content-type: text/plain # 内容格式。这里使用 json
          group: test

另外关于channel的构造也要做同样的修改

import org.springframework.cloud.stream.annotation.input;
import org.springframework.messaging.subscribablechannel;

public interface testchannel {
    @input("input")
    subscribablechannel input();
}

最后我在启动类中对收到的消息进行了监听

   @streamlistener("input")
    public void receiveinput(@payload message message) throws validationexception {
        system.out.println("input1 receive: " + message.getpayload() + ", foo header: " + message.getheaders().get("foo"));
    }

测试结果

stream其他特性

消息发送失败的处理

消息发送失败后悔发送到默认的一个“topic.errors”的channel中(topic是配置的destination)。要配置消息发送失败的处理,需要将错误消息的channel打开 消费者配置如下

spring:
  application:
    name: zhao-cloud-stream-producer
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
        bindings:
          output:
            producer:
              group: test
              sync: true
      bindings:
        output:
          destination: stream-test-topic
          content-type: text/plain # 内容格式。这里使用 json
          producer:
            errorchannelenabled: true

在启动类中配置错误消息的channel信息

 @bean("stream-test-topic.errors")
    messagechannel testoutputerrorchannel(){
        return new publishsubscribechannel();
    }

新建异常处理service

import org.springframework.integration.annotation.serviceactivator;
import org.springframework.messaging.message;
import org.springframework.stereotype.service;

@service
public class errorproducerservice {

    @serviceactivator(inputchannel = "stream-test-topic.errors")
    public void receiveproducererror(message message){
        system.out.println("receive error msg :"+message);
    }
}

当发生异常时,由于测试类中已经将异常捕获,处理发送异常主要是在这里进行。模拟,应用与rocketmq断开的场景。可见

消费者错误处理

首先增加配置为

spring:
  application:
    name: zhao-cloud-stream-producer
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
        bindings:
          output:
            producer:
              group: test
              sync: true
      bindings:
        output:
          destination: stream-test-topic
          content-type: text/plain # 内容格式。这里使用 json
          producer:
            errorchannelenabled: true

增加相应的模拟异常的操作

 @streamlistener("input")
    public void receiveinput(@payload message message) throws validationexception {
        //system.out.println("input1 receive: " + message.getpayload() + ", foo header: " + message.getheaders().get("foo"));
        throw new runtimeexception("oops");
    }
    @serviceactivator(inputchannel = "stream-test-topic.test.errors")
    public void receiveconsumeerror(message message){
        system.out.println("receive error msg"+message.getpayload());
    }

代码地址https://github.com/zhendiao/deme-code/tree/main/zp

到此这篇关于spring cloud stream简单用法的文章就介绍到这了,更多相关spring cloud stream使用内容请搜索www.887551.com以前的文章或继续浏览下面的相关文章希望大家以后多多支持www.887551.com!