因为在项目实际过程中所采用的是微服务架构,考虑到承载量基本每个相同业务的服务都是多节点部署,所以针对某些资源的访问就不得不用到用到分布式锁了。

这里列举一个最简单的场景,假如有一个智能售货机,由于机器本身的原因不能同一台机器不能同时出两个商品,这就要求在在出货流程前针对同一台机器在同一时刻出现并发

创建订单时只能有一笔订单创建成功,但是订单服务是多节点部署的,所以就不得不用到分布式锁了。

以上只是一种简单的业务场景,在各种大型互联网实际应用中,需要分布式锁的业务场景会更多,综合比较了业界基于各种中间件来实现的分布式锁方案,然后结合实际业务最终

决定采用consul来实现,因为我们的项目中采用了consul做注册中心,并且consul天生可以保证一致性(这点类似zk),当然zk也能实现分布式锁,但是这里不对这点做过多讨论。

redis虽然也能实现分布式锁,但是可能因为场景比较复杂,如果redis采用cluster部署的话,如果某一主节点出现故障的话,有一定几率会出现脑裂现象,这样就可能会让竞争者在

并发时同时获得到锁,这样可能会破坏掉后面的业务,当然出现这种情况的概率很低,但是也不能完全排除,因为redis的根本不能保证强一致性导致的。

好了,这里说的最简单的分布式锁的意思是,多个竞争者同一时间并发去获得锁时,获取失败的就直接返回了,获取成功的继续后续的流程,然后在合适的时间释放锁,并且为锁

加了超时时间,防止获得到锁的进程或线程在未来得及释放锁时自己挂掉了,导致资源处于一直被锁定的状态无法得到释放。主要的实现逻辑就是这样,如果有人想实现获得锁失

败的竞争者一直继续尝试获得,可以基于该示例进行修改,加上自旋逻辑就ok。

以下是锁实现代码:

package com.lyb.consullock;

import com.ecwid.consul.v1.consulclient;
import com.ecwid.consul.v1.agent.model.newcheck;
import com.ecwid.consul.v1.kv.model.putparams;
import com.ecwid.consul.v1.session.model.newsession;
import com.ecwid.consul.v1.session.model.session;
import lombok.data;


import java.time.localdatetime;
import java.util.arraylist;
import java.util.list;


public class distributedlock{
    private consulclient consulclient;

    /**
     * 构造函数
     * @param consulhost 注册consul的client或服务端的ip或主机名,或域名
     * @param consulport 端口号
     */
    public distributedlock(string consulhost,int consulport){
        consulclient = new consulclient(consulhost,consulport);
    }

    /**
     * 获得锁的方法
     * @param lockname 竞争的资源名
     * @param ttlseconds 锁的超时时间,超过该时间自动释放
     * @return
     */
    public lockcontext getlock(string lockname,int ttlseconds){
        lockcontext lockcontext = new lockcontext();
        if(ttlseconds<10 || ttlseconds > 86400) ttlseconds = 60;
        string sessionid = createsession(lockname,ttlseconds);
        boolean success = lock(lockname,sessionid);
        if(success == false){
            consulclient.sessiondestroy(sessionid,null);
            lockcontext.setgetlock(false);

            return lockcontext;
        }

        lockcontext.setsession(sessionid);
        lockcontext.setgetlock(true);

        return lockcontext;
    }

    /**
     * 释放锁
     * @param sessionid
     */
    public void releaselock(string sessionid){
        consulclient.sessiondestroy(sessionid,null);
    }

    private string createsession(string lockname,int ttlseconds){
        newcheck check = new newcheck();
        check.setid("check "+lockname);
        check.setname(check.getid());
        check.setttl(ttlseconds+"s"); //该值和session ttl共同决定决定锁定时长
        check.settimeout("10s");
        consulclient.agentcheckregister(check);
        consulclient.agentcheckpass(check.getid());

        newsession session = new newsession();
        session.setbehavior(session.behavior.release);
        session.setname("session "+lockname);
        session.setlockdelay(1);
        session.setttl(ttlseconds + "s"); //和check ttl共同决定锁时长
        list<string> checks = new arraylist<>();
        checks.add(check.getid());
        session.setchecks(checks);
        string sessionid = consulclient.sessioncreate(session,null).getvalue();

        return sessionid;
    }

    private boolean lock(string lockname,string sessionid){
        putparams putparams = new putparams();
        putparams.setacquiresession(sessionid);

        boolean issuccess = consulclient.setkvvalue(lockname,"lock:"+ localdatetime.now(),putparams).getvalue();

        return issuccess;
    }

    /**
     * 竞争锁时返回的对象
     */
    @data
    public class lockcontext{
        /**
         * 获得锁成功返回该值,比便后面用该值来释放锁
         */
        private string session;
        /**
         * 是否获得到锁
         */
        private boolean isgetlock;
    }
}

pom文件

<?xml version="1.0" encoding="utf-8"?>
<project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
         xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelversion>4.0.0</modelversion>
    <parent>
        <groupid>org.springframework.boot</groupid>
        <artifactid>spring-boot-starter-parent</artifactid>
        <version>2.1.6.release</version>
        <relativepath/> <!-- lookup parent from repository -->
    </parent>
    <groupid>com.lyb</groupid>
    <artifactid>consul-lock</artifactid>
    <version>0.0.1-snapshot</version>
    <name>consul-lock</name>
    <description>demo project for spring boot</description>

    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>greenwich.sr2</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-web</artifactid>
        </dependency>
        <dependency>
            <groupid>org.springframework.cloud</groupid>
            <artifactid>spring-cloud-starter-consul-discovery</artifactid>
        </dependency>
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-actuator</artifactid>
        </dependency>
        <dependency>
            <groupid>org.projectlombok</groupid>
            <artifactid>lombok</artifactid>
            <version>1.18.8</version>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-test</artifactid>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencymanagement>
        <dependencies>
            <dependency>
                <groupid>org.springframework.cloud</groupid>
                <artifactid>spring-cloud-dependencies</artifactid>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencymanagement>

    <build>
        <plugins>
            <plugin>
                <groupid>org.springframework.boot</groupid>
                <artifactid>spring-boot-maven-plugin</artifactid>
            </plugin>
        </plugins>
    </build>

</project>

测试代码:

package com.lyb.consullock;

import org.junit.assert;
import org.junit.test;
import org.junit.runner.runwith;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.boot.test.context.springboottest;
import org.springframework.test.context.junit4.springrunner;

import java.util.concurrent.executorservice;
import java.util.concurrent.executors;
import java.util.concurrent.timeunit;

@runwith(springrunner.class)
@springboottest
public class consullockapplicationtests {
    @autowired
    private serviceconfig serviceconfig;
    @test
    public void locksameresourer() {
        //针对相同资源在同一时刻只有一个线程会获得锁
        executorservice threadpool = executors.newfixedthreadpool(10);
        for (int a=0;a<20;a++){
            threadpool.submit(
                    () -> {
                        for (int i = 0;i < 100; i++) {
                            distributedlock lock = new distributedlock(
                                    serviceconfig.getconsulregisterhost(),
                                    serviceconfig.getconsulregisterport());

                            distributedlock.lockcontext lockcontext = lock.getlock("test lock", 10);
                            if (lockcontext.isgetlock()) {
                                system.out.println(thread.currentthread().getname() + "获得了锁");
                                try {
                                    timeunit.seconds.sleep(1);
                                    lock.releaselock(lockcontext.getsession());
                                } catch (interruptedexception e) {
                                    e.printstacktrace();
                                }
                            }else {
                                //system.out.println(thread.currentthread().getname() + "没有获得锁");
                            }
                        }
                    });
        }

        try {
            timeunit.minutes.sleep(2);
        } catch (interruptedexception e) {
            e.printstacktrace();
        }
    }

    @test
    public void lockdiffresource(){
        //针对不通的资源所有线程都应该能获得锁
        executorservice threadpool = executors.newfixedthreadpool(10);
        for (int a=0;a<20;a++){
            threadpool.submit(
                    () -> {
                        for (int i = 0;i < 100; i++) {
                            distributedlock lock = new distributedlock(
                                    serviceconfig.getconsulregisterhost(),
                                    serviceconfig.getconsulregisterport());

                            distributedlock.lockcontext lockcontext = lock.getlock("test lock"+thread.currentthread().getname(), 10);
                            if (lockcontext.isgetlock()) {
                                system.out.println(thread.currentthread().getname() + "获得了锁");
                                try {
                                    timeunit.seconds.sleep(1);
                                    lock.releaselock(lockcontext.getsession());
                                } catch (interruptedexception e) {
                                    e.printstacktrace();
                                }
                            }else {
                                //system.out.println(thread.currentthread().getname() + "没有获得锁");
                                assert.asserttrue(lockcontext.isgetlock());
                            }
                        }
                    });
        }

        try {
            timeunit.minutes.sleep(2);
        } catch (interruptedexception e) {
            e.printstacktrace();
        }
    }
}

希望对大家有所帮助

项目路径:

到此这篇关于利用consul在spring boot中实现最简单的分布式锁的文章就介绍到这了,更多相关spring boot分布式锁内容请搜索www.887551.com以前的文章或继续浏览下面的相关文章希望大家以后多多支持www.887551.com!