目录

其实synchronousqueue 是一个特别有意思的阻塞队列,就我个人理解来说,它很重要的特点就是没有容量。

直接看一个例子:

package dongguabai.test.juc.test;

import java.util.concurrent.synchronousqueue;

/**
 * @author dongguabai
 * @description
 * @date 2021-09-01 21:52
 */
public class testsynchronousqueue {

    public static void main(string[] args) {
        synchronousqueue synchronousqueue = new synchronousqueue();
        boolean add = synchronousqueue.add("1");
        system.out.println(add);
    }
}

代码很简单,就是往 synchronousqueue 里放了一个元素,程序却抛异常了:

exception in thread "main" java.lang.illegalstateexception: queue full
	at java.util.abstractqueue.add(abstractqueue.java:98)
	at dongguabai.test.juc.test.testsynchronousqueue.main(testsynchronousqueue.java:14)

而异常原因是队列满了。刚刚使用的是 synchronousqueue#add 方法,现在来看看 synchronousqueue#put 方法:

    public static void main(string[] args) throws interruptedexception {
        synchronousqueue synchronousqueue = new synchronousqueue();
        synchronousqueue.put("1");
        system.out.println("----");
    }

看到 interruptedexception 其实就能猜出这个方法肯定会阻塞当前线程。

通过这两个例子,也就解释了 synchronousqueue 队列是没有容量的,也就是说在往 synchronousqueue 中添加元素之前,得先向 synchronousqueue 中取出元素,这句话听着很别扭,那可以换个角度猜想其实现原理,调用取出方法的时候设置了一个“已经有线程在等待取出”的标识,线程等待,然后添加元素的时候,先看这个标识,如果有线程在等待取出,则添加成功,反之则抛出异常或者阻塞。

分析

接下来从 synchronousqueue#put 方法开始进行分析:

    public void put(e e) throws interruptedexception {
        if (e == null) throw new nullpointerexception();
        if (transferer.transfer(e, false, 0) == null) {
            thread.interrupted();
            throw new interruptedexception();
        }
    }

可以发现是调用的 transferer#transfer 方法,这个 transferer 是在构造 synchronousqueue 的时候初始化的:

    public synchronousqueue(boolean fair) {
        transferer = fair ? new transferqueue<e>() : new transferstack<e>();
    }

synchronousqueue 有两种模式,公平与非公平,默认是非公平,非公平使用的就是 transferstack,是基于单向链表做的:

 static final class snode {
            volatile snode next;        // next node in stack
            volatile snode match;       // the node matched to this
            volatile thread waiter;     // to control park/unpark
            object item;                // data; or null for requests
            int mode;
   ...
 }

那么重点就是 synchronousqueue.transferstack#transfer 方法了,从方法名都可以看出这是用来做数据交换的,但是这个方法有好几十行,里面各种 node 指针搞来搞去,这个地方我觉得没必要过于纠结细节,老规矩,抓大放小,而且队列这种,很方便进行 debug 调试。

再理一下思路:

  • 今天研究的是阻塞队列,关注阻塞的话,更应该关系的是 takeput 方法;
  • transferer 是一个抽象类,只有一个 transfer 方法,即 takeput 共用,那就肯定是基于入参进行功能的区分;
  • takeput 方法底层都调用的 synchronousqueue.transferstack#transfer 方法;

将上面 synchronousqueue#put 使用的例子修改一下,再加一个线程take

package dongguabai.test.juc.test;

import java.util.date;
import java.util.concurrent.synchronousqueue;
import java.util.concurrent.timeunit;

/**
 * @author dongguabai
 * @description
 * @date 2021-09-01 21:52
 */
public class testsynchronousqueue {

    public static void main(string[] args) throws interruptedexception {
        synchronousqueue synchronousqueue = new synchronousqueue();
        new thread(()->{
            system.out.println(new date().tolocalestring()+"::"+thread.currentthread().getname()+"-put了数据:"+"1");

            try {
                synchronousqueue.put("1");
            } catch (interruptedexception e) {
                e.printstacktrace();
            }
        }).start();
        system.out.println("----");
        new thread(()->{
            object take = null;
            try {
                take = synchronousqueue.take();
            } catch (interruptedexception e) {
                e.printstacktrace();
            }
            system.out.println(new date().tolocalestring()+"::"+thread.currentthread().getname()+"-take到了数据:"+take);
        }).start();
        timeunit.seconds.sleep(1);
        system.out.println("结束...");
    }
}

整个程序结束,并且输出:

—-
2021-9-2 0:58:55::thread-0-put了数据:1
2021-9-2 0:58:55::thread-1-take到了数据:1
结束…

也就是说当一个线程在 put 的时候,如果有线程 take ,那么 put 线程可以正常运行,不会被阻塞。

基于这个例子,再结合上文的猜想,也就是说核心点就是找到 put 的时候现在已经有线程在 take 的标识,或者 take 的时候已经有线程在 put,这个标识不一定是变量,结合 aqs 的原理来看,很可能是根据链表中的 node 进行判断。

接下来看 synchronousqueue.put 方法:

    public void put(e e) throws interruptedexception {
        if (e == null) throw new nullpointerexception();
        if (transferer.transfer(e, false, 0) == null) {
            thread.interrupted();
            throw new interruptedexception();
        }
    }

它底层也是调用的 synchronousqueue.transferstack#transfer 方法,但是传入参数是当前 put 的元素、false 和 0。再回过头看 synchronousqueue.transferstack#transfer 方法:

e transfer(e e, boolean timed, long nanos) {
            snode s = null; // constructed/reused as needed
  					//这里的参数e就是要put的元素,显然不为null,也就是说是data模式,根据注释,data模式就说明当前线程是producer
            int mode = (e == null) ? request : data;  

            for (;;) {
                snode h = head;
                if (h == null || h.mode == mode) {  // empty or same-mode
                    if (timed && nanos <= 0) {      // can't wait
                        if (h != null && h.iscancelled())
                            cashead(h, h.next);     // pop cancelled node
                        else
                            return null;
                    } else if (cashead(h, s = snode(s, e, h, mode))) {
                        //因为第一次put那么h肯定为null,这里入参timed为false,所以会到这里,执行awaitfulfill方法,根据名称可以猜想出是一个阻塞方法
                        snode m = awaitfulfill(s, timed, nanos);
                        if (m == s) {               // wait was cancelled
                            clean(s);
                            return null;
                        }
                   ....
        }

这里首先会构造一个 snode,然后执行 cashead 函数,其实最终栈结构就是:

head->put_e

就是 head 会指向 put 的元素对应的 snode

然后会执行 awaitfulfill 方法:

snode awaitfulfill(snode s, boolean timed, long nanos) {
            final long deadline = timed ? system.nanotime() + nanos : 0l;
            thread w = thread.currentthread();
            int spins = (shouldspin(s) ?
                         (timed ? maxtimedspins : maxuntimedspins) : 0);
            for (;;) {
                if (w.isinterrupted())
                    s.trycancel();
                snode m = s.match;
                if (m != null)
                    return m;
                if (timed) {
                    nanos = deadline - system.nanotime();
                    if (nanos <= 0l) {
                        s.trycancel();
                        continue;
                    }
                }
                if (spins > 0)
                    spins = shouldspin(s) ? (spins-1) : 0;    //自旋机制
                else if (s.waiter == null)
                    s.waiter = w; // establish waiter so can park next iter
                else if (!timed)
                    locksupport.park(this); //阻塞
                else if (nanos > spinfortimeoutthreshold)
                    locksupport.parknanos(this, nanos);
            }
        }

最终还是会使用 locksupport 进行阻塞,等待唤醒。

已经大致过了一遍流程了,细节方面就不再纠结了,那么假如再put 一个元素呢,其实结合源码已经可以分析出此时栈的结果为:

head–>put_e_1–>put_e

避免分析出错,写个 debug 的代码验证一下:

package dongguabai.test.juc.test;

import java.util.concurrent.synchronousqueue;
import java.util.concurrent.timeunit;

/**
 * @author dongguabai
 * @description
 * @date 2021-09-02 02:15
 */
public class debugput2e {

    public static void main(string[] args) throws interruptedexception {
        synchronousqueue synchronousqueue = new synchronousqueue();
        new thread(()-> {
            try {
                synchronousqueue.put("1");
            } catch (interruptedexception e) {
                e.printstacktrace();
            }
        }).start();
        timeunit.seconds.sleep(1);
        new thread(()-> {
            try {
                synchronousqueue.put("2");
            } catch (interruptedexception e) {
                e.printstacktrace();
            }
        }).start();
    }
}

synchronousqueue.transferstack#awaitfulfill 方法的 locksupport.park(this); 处打上断点,运行上面的代码,再看看现在的 head

的确与分析的一致。

也就是先进后出。再看 take 方法:

    public e take() throws interruptedexception {
        e e = transferer.transfer(null, false, 0);
        if (e != null)
            return e;
        thread.interrupted();
        throw new interruptedexception();
    }

调用的 synchronousqueue.transferstack#transfer 方法,但是传入参数是 nullfalse 和 0。

偷个懒就不分析源码了,直接 debug 走一遍,代码如下:

package dongguabai.test.juc.test;

import java.util.concurrent.synchronousqueue;
import java.util.concurrent.timeunit;

/**
 * @author dongguabai
 * @description
 * @date 2021-09-02 02:24
 */
public class debugtake {

    public static void main(string[] args) throws interruptedexception {
        synchronousqueue synchronousqueue = new synchronousqueue();
        new thread(()-> {
            try {
                synchronousqueue.put("1");
            } catch (interruptedexception e) {
                e.printstacktrace();
            }
        },"thread-put-1").start();
        timeunit.seconds.sleep(1);
        new thread(()-> {
            try {
                synchronousqueue.put("2");
            } catch (interruptedexception e) {
                e.printstacktrace();
            }
        },"thread-put-2").start();
        timeunit.seconds.sleep(1);
        new thread(()->{
            try {
                object take = synchronousqueue.take();
                system.out.println("======take:"+take);
            } catch (interruptedexception e) {
                e.printstacktrace();
            }
        },"thread-take").start();
    }
}

synchronousqueue#take 方法中打上断点,运行上面的代码:

这里的 s 就是 headm 就是栈顶的元素,也是最近一次 put 的元素。说白了 take 就是取的栈顶的元素,最后再匹配一下,符合条件就直接取出来。take 之后 head 为:

栈的结构为:

head–>put_e

最后再把整个流程梳理一遍:

执行 put 操作的时候,每次压入栈顶;take 的时候就取栈顶的元素,即先进后出;这也就实现了非公平;

至于公平模式,结合 transferstack 的实现,可以猜测实现就是 put 的时候放入队列,take 的时候从队列头部开始取,先进先出。

那么这个队列设计的优势使用场景在哪里呢?个人感觉它的优势就是完全不会产生对队列中数据的争抢,因为说白了队列是空的,从某种程度上来说消费速率是很快的。

至于使用场景,我这边的确没有想到比较好的使用场景。结合组内同学的使用来看,他选择使用这个队列的原因是因为它不会在内存中生成任务队列,当服务宕机后不用担心内存中任务的丢失(非优雅停机的情况)。经过讨论后发现即使使用了 synchronousqueue 也无法有效的避免任务丢失,但这的确是一个思路,没准以后在其他场景中用得上。

到此这篇关于详解java七大阻塞队列之synchronousqueue的文章就介绍到这了,更多相关java阻塞队列 synchronousqueue内容请搜索www.887551.com以前的文章或继续浏览下面的相关文章希望大家以后多多支持www.887551.com!