前言

趁着有空余时间,就想着撸一个mini的生产-消费消息队列,说干就干了。自己是个javer,这次实现,特意换用了go。没错,是零基础上手go,顺便可以学学go。

前置知识:

  1. go基本语法
  2. 消息队列概念,也就三个:生产者、消费者、队列

目的

  • 没想着实现多复杂,因为时间有限,就mini就好,mini到什么程度呢
  • 使用双向链表数据结构作为队列
  • 有多个topic可供生产者生成消息和消费者消费消息
  • 支持生产者并发写
  • 支持消费者读,且ok后,从队列删除
  • 消息不丢失(持久化)
  • 高性能(先这样想)

设计

整体架构

协议

通讯协议底层使用tcp,mq是基于tcp自定义了一个协议,协议如下

type msg struct {
   id int64
   topiclen int64
   topic string
   // 1-consumer 2-producer 3-comsumer-ack 4-error
   msgtype int64 // 消息类型
   len int64 // 消息长度
   payload []byte // 消息
}

payload使用字节数组,是因为不管数据是什么,只当做字节数组来处理即可。msg承载着生产者生产的消息,消费者消费的消息,ack、和错误消息,前两者会有负载,而后两者负载和长度都为空

协议的编解码处理,就是对字节的处理,接下来有从字节转为msg,和从msg转为字节两个函数

func bytestomsg(reader io.reader) msg {

   m := msg{}
   var buf [128]byte
   n, err := reader.read(buf[:])
   if err != nil {
      fmt.println("read failed, err:", err)
   }
   fmt.println("read bytes:", n)
   // id
   buff := bytes.newbuffer(buf[0:8])
   binary.read(buff, binary.littleendian, &m.id)
   // topiclen
   buff = bytes.newbuffer(buf[8:16])
   binary.read(buff, binary.littleendian, &m.topiclen)
   // topic
   msglastindex := 16 + m.topiclen
   m.topic = string(buf[16: msglastindex])
   // msgtype
   buff = bytes.newbuffer(buf[msglastindex : msglastindex + 8])
   binary.read(buff, binary.littleendian, &m.msgtype)

   buff = bytes.newbuffer(buf[msglastindex : msglastindex + 16])
   binary.read(buff, binary.littleendian, &m.len)

   if m.len <= 0 {
      return m
   }

   m.payload = buf[msglastindex + 16:]
   return m
}

func msgtobytes(msg msg) []byte {
   msg.topiclen = int64(len([]byte(msg.topic)))
   msg.len = int64(len([]byte(msg.payload)))

   var data []byte
   buf := bytes.newbuffer([]byte{})
   binary.write(buf, binary.littleendian, msg.id)
   data = append(data, buf.bytes()...)

   buf = bytes.newbuffer([]byte{})
   binary.write(buf, binary.littleendian, msg.topiclen)
   data = append(data, buf.bytes()...)
   
   data = append(data, []byte(msg.topic)...)

   buf = bytes.newbuffer([]byte{})
   binary.write(buf, binary.littleendian, msg.msgtype)
   data = append(data, buf.bytes()...)
   
   buf = bytes.newbuffer([]byte{})
   binary.write(buf, binary.littleendian, msg.len)
   data = append(data, buf.bytes()...)
   data = append(data, []byte(msg.payload)...)

   return data
}

队列

使用container/list,实现先入先出,生产者在队尾写,消费者在队头读取

package broker

import (
   "container/list"
   "sync"
)

type queue struct {
   len int
   data list.list
}

var lock sync.mutex

func (queue *queue) offer(msg msg) {
   queue.data.pushback(msg)
   queue.len = queue.data.len()
}

func (queue *queue) poll() msg{
   if queue.len == 0 {
      return msg{}
   }
   msg := queue.data.front()
   return msg.value.(msg)
}

func (queue *queue) delete(id int64) {
   lock.lock()
   for msg := queue.data.front(); msg != nil; msg = msg.next() {
      if msg.value.(msg).id == id {
         queue.data.remove(msg)
         queue.len = queue.data.len()
         break
      }
   }
   lock.unlock()
}

方法offer往队列里插入数据,poll从队列头读取数据素,delete根据消息id从队列删除数据。这里使用queue结构体对list进行封装,其实是有必要的,list作为底层的数据结构,我们希望隐藏更多的底层操作,只给客户提供基本的操作
delete操作是在消费者消费成功且发送ack后,对消息从队列里移除的,因为消费者可以多个同时消费,所以这里进入临界区时加锁(em,加锁是否就一定会影响对性能有较大的影响呢)

broker

broker作为服务器角色,负责接收连接,接收和响应请求

package broker

import (
   "bufio"
   "net"
   "os"
   "sync"
   "time"
)

var topics = sync.map{}

func handleerr(conn net.conn)  {
   defer func() {
      if err := recover(); err != nil {
         println(err.(string))
         conn.write(msgtobytes(msg{msgtype: 4}))
      }
   }()
}

func process(conn net.conn) {
   handleerr(conn)
   reader := bufio.newreader(conn)
   msg := bytestomsg(reader)
   queue, ok := topics.load(msg.topic)
   var res msg
   if msg.msgtype == 1 {
      // comsumer
      if queue == nil || queue.(*queue).len == 0{
         return
      }
      msg = queue.(*queue).poll()
      msg.msgtype = 1
      res = msg
   } else if msg.msgtype == 2 {
      // producer
      if ! ok {
         queue = &queue{}
         queue.(*queue).data.init()
         topics.store(msg.topic, queue)
      }
      queue.(*queue).offer(msg)
      res = msg{id: msg.id, msgtype: 2}
   } else if msg.msgtype == 3 {
      // consumer ack
      if queue == nil {
         return
      }
      queue.(*queue).delete(msg.id)

   }
   conn.write(msgtobytes(res))

}

msgtype等于1时,直接消费消息;msgtype等于2时是生产者生产消息,如果队列为空,那么还需创建一个新的队列,放在对应的topic下;msgtype等于3时,代表消费者成功消费,可以

删除消息

我们说消息不丢失,这里实现不完全,我就实现了持久化(持久化也没全部实现)。思路就是该topic对应的队列里的消息,按协议格式进行序列化,当broker启动时,从文件恢复
持久化需要考虑的是增量还是全量,需要保存多久,这些都会影响实现的难度和性能(想想kafka和redis的持久化),这里表示简单实现就好:定时器定时保存

func save()  {
   ticker := time.newticker(60)
   for {
      select {
      case <-ticker.c:
         topics.range(func(key, value interface{}) bool {
            if value == nil {
               return false
            }
            file, _ := os.open(key.(string))
            if file == nil {
               file, _ = os.create(key.(string))
            }
            for msg := value.(*queue).data.front(); msg != nil; msg = msg.next() {
               file.write(msgtobytes(msg.value.(msg)))
            }
            _ := file.close()
            return false
         })
      default:
         time.sleep(1)
      }
   }
}

有一个问题是,当上面的delete操作时,这里的file文件需不需要跟着delete掉对应的消息?答案是需要删除的,如果不删除,只能等下一次的全量持久化来覆盖了,中间就有脏数据问题
下面是启动逻辑

package main

import (
   "awesomeproject/broker"
   "fmt"
   "net"
)

func main()  {
   listen, err := net.listen("tcp", "127.0.0.1:12345")
   if err != nil {
      fmt.print("listen failed, err:", err)
      return
   }
   go broker.save()
   for {
      conn, err := listen.accept()
      if err != nil {
         fmt.print("accept failed, err:", err)
         continue
      }
      go broker.process(conn)

   }
}

生产者

package main

import (
   "awesomeproject/broker"
   "fmt"
   "net"
)

func produce() {
   conn, err := net.dial("tcp", "127.0.0.1:12345")
   if err != nil {
      fmt.print("connect failed, err:", err)
   }
   defer conn.close()

   msg := broker.msg{id: 1102, topic: "topic-test",  msgtype: 2,  payload: []byte("我")}
   n, err := conn.write(broker.msgtobytes(msg))
   if err != nil {
      fmt.print("write failed, err:", err)
   }

   fmt.print(n)
}

消费者

package main

import (
   "awesomeproject/broker"
   "bytes"
   "fmt"
   "net"
)

func comsume() {
   conn, err := net.dial("tcp", "127.0.0.1:12345")
   if err != nil {
      fmt.print("connect failed, err:", err)
   }
   defer conn.close()

   msg := broker.msg{topic: "topic-test",  msgtype: 1}

   n, err := conn.write(broker.msgtobytes(msg))
   if err != nil {
      fmt.println("write failed, err:", err)
   }
   fmt.println("n", n)

   var res [128]byte
   conn.read(res[:])
   buf := bytes.newbuffer(res[:])
   recemsg := broker.bytestomsg(buf)
   fmt.print(recemsg)

   // ack
   conn, _ = net.dial("tcp", "127.0.0.1:12345")
   l, e := conn.write(broker.msgtobytes(broker.msg{id: recemsg.id, topic: recemsg.topic, msgtype: 3}))
   if e != nil {
      fmt.println("write failed, err:", err)
   }
   fmt.println("l:", l)
}

消费者这里ack时重新创建了连接,如果不创建连接的话,那服务端那里就需要一直从conn读取数据,直到结束。思考一下,像rabbitmq的ack就有自动和手工的ack,如果是手工的ack,必然需要一个新的连接,因为不知道客户端什么时候发送ack,自动的话,当然可以使用同一个连接,but这里就简单创建一条新连接吧

启动

先启动broker,再启动producer,然后启动comsumer,ok,能跑,能实现发送消息到队列,从队列消费消息

总结

整体虽然简单,但毕竟是使用go实现的,就是看似一顿操作猛如虎,实质慌如狗。第一时间就被go的gopath和go mod困扰住,后面语法的使用,比如指针,传值传引用等,最头疼的就是类型转换,作为一个javer,使用go进行类型转换,着实被狠狠得虐了一番。

到此这篇关于使用go实现一个超级mini的消息队列的示例代码的文章就介绍到这了,更多相关go mini消息队列内容请搜索www.887551.com以前的文章或继续浏览下面的相关文章希望大家以后多多支持www.887551.com!