导入依赖

<properties> 
	<curator.version>4.2.0</curator.version>
 </properties>
  <!-- curator ZK 客户端 -->
 <dependency>
     <groupId>org.apache.curator</groupId>
     <artifactId>curator-framework</artifactId>
     <version>${curator.version}</version>
 </dependency>
 <dependency>
     <groupId>org.apache.curator</groupId>
     <artifactId>curator-recipes</artifactId>
     <version>${curator.version}</version>
 </dependency>

配置文件

根路径下新建zookeeper.properties

zk.host=192.168.81.128:2181
# zk自增存储node
zk.sequence-path=/news/sequence/

枚举封装

创建com.demo.common.zookeeper.sequence.ZkSequenceEnum文件,用于定义通过Zk生成自增ID的枚举,在项目中规范要求与物理表名项目,使用与当前项目阶段的枚举如下:

package com.demo.common.zookeeper.sequence;

public enum ZkSequenceEnum {
    AP_LIKES,
    AP_READ_BEHAVIOR,
    AP_COLLECTION,
    AP_USER_FOLLOW,
    AP_USER_FAN
}

序列封装

创建com.demo.common.zookeeper.sequence.ZkSequence文件,用于封装程序在运行时每个表对应的自增器,这里主要通过分布式原子自增类(DistributedAtomicLong)实现,注意每500毫秒重试3次后仍然生成失败则返回null,由上层处理,相关实现代码如下:

package com.demo.common.zookeeper.sequence;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class ZkSequence {
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(500, 3);
    DistributedAtomicLong distAtomicLong;

    public ZkSequence(String sequenceName, CuratorFramework client) {
        distAtomicLong = new DistributedAtomicLong(client, sequenceName, retryPolicy);
    }

    /*** * 生成序列 * @return * @throws Exception * */
    public Long sequence() throws Exception {
        AtomicValue<Long> sequence = this.distAtomicLong.increment();
        if (sequence.succeeded()) {
            return sequence.postValue();
        } else {
            return null;
        }
    }
}

Client封装

创建com.demo.common.zookeeper.ZookeeperClient类,通过PostConstruct注解在内构器之后调用init方法初始化客户端连接,并调用initZkSequence方法初始项目所定义的ZkSequence,并存储在zkSequence的Map集合中,最终提供sequence方法来查询对应zkSequence获取自增ID,相关实现代码如下:

package com.demo.common.zookeeper;

import com.google.common.collect.Maps;
import com.demo.common.zookeeper.sequence.ZkSequence;
import com.demo.common.zookeeper.sequence.ZkSequenceEnum;
import lombok.Getter;
import lombok.Setter;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.PostConstruct;
import java.util.Map;
@Setter
@Getter
public class ZookeeperClient {
    private static Logger logger = LoggerFactory.getLogger(ZookeeperClient.class);
    private String host;
    private String sequencePath;
    // 重试休眠时间
    private final int SLEEP_TIME_MS = 1000;
    // 最大重试1000次
    private final int MAX_RETRIES = 1000;
    //会话超时时间
    private final int SESSION_TIMEOUT = 30 * 1000;
    //连接超时时间
    private final int CONNECTION_TIMEOUT = 3 * 1000;
    //创建连接实例
    private CuratorFramework client = null;
    // 序列化集合
    private Map<String, ZkSequence> zkSequence = Maps.newConcurrentMap();

    public ZookeeperClient(String host, String sequencePath) {
        this.host = host;
        this.sequencePath = sequencePath;
    }

    @PostConstruct
    public void init() throws Exception {
        this.client = CuratorFrameworkFactory.builder()
                .connectString(this.getHost())
                .connectionTimeoutMs(CONNECTION_TIMEOUT)
                .sessionTimeoutMs(SESSION_TIMEOUT)
                .retryPolicy(new ExponentialBackoffRetry(SLEEP_TIME_MS, MAX_RETRIES))
                .build();
        this.client.start();
        this.initZkSequence();
    }

    public void initZkSequence() {
        ZkSequenceEnum[] list = ZkSequenceEnum.values();
        for (int i = 0; i < list.length; i++) {
            String name = list[i].name();
            String path = this.sequencePath + name;
            ZkSequence seq = new ZkSequence(path, this.client);
            zkSequence.put(name, seq);
        }
    }

    /*** 生成SEQ * @param name * @return * @throws Exception */
    public Long sequence(ZkSequenceEnum name) {
        try {
            ZkSequence seq = zkSequence.get(name.name());
            if (seq != null) {
                return seq.sequence();
            }
        } catch (Exception e) {
            logger.error("获取[{}]Sequence错误:{}", name, e);
        }
        return null;
    }
}


注:在这里ZookeeperClient是一个BeanFactory,ZkSequence是一个FactoryBean

Config封装

创建com.demo.common.zookeeper.ZkConfig类,用于自动化配置环境文件的导入,和zkClient定义Bean定义,其相关的实现代码如下:

package com.demo.common.zookeeper;

import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;

/*** 自动化配置核心数据库的连接配置 */
@Setter
@Getter
@Configuration
@ConfigurationProperties(prefix = "zk")
@PropertySource("classpath:zookeeper.properties")
public class ZkConfig {
    String host;
    String sequencePath;

    /*** 这是最快的数据库连接池 * @return */
    @Bean
    public ZookeeperClient zookeeperClient() {
        return new ZookeeperClient(this.host, this.sequencePath);
    }
}

Sequences封装

为便于程序中调用,以及对自增生成失败的统一处理,项目中规范通过
com.demo.zookeeper.sequence.Sequences类统一暴露生成自增主键的功能,相关代码如下:

package com.demo.common.zookeeper.sequence;

import com.demo.common.zookeeper.ZookeeperClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class Sequences {

    @Autowired
    private ZookeeperClient client;

    public Long sequenceApLikes(){
        return this.client.sequence(ZkSequenceEnum.AP_LIKES);
    }

    public Long sequenceApReadBehavior(){
        return this.client.sequence(ZkSequenceEnum.AP_READ_BEHAVIOR);
    }

    public Long sequenceApCollection(){
        return this.client.sequence(ZkSequenceEnum.AP_COLLECTION);
    }

    public Long sequenceApUserFollow(){return this.client.sequence(ZkSequenceEnum.AP_USER_FOLLOW);}

    public Long sequenceApUserFan(){return this.client.sequence(ZkSequenceEnum.AP_USER_FAN);}
}

使用案例

项目中使用需要开发包扫描,@ComponentScan({“com.demo.zookeeper”})然后代码中参考如下步骤使用:

// 第一步,注入Sequences
 @Autowired private Sequences sequences;
 // 第二步,在方法中调用生成 
 alb.setId(sequences.sequenceApCollection());

本文地址:https://blog.csdn.net/hello_IT_/article/details/107370906