目录
  • 对比jdk的线程池

    拆分实现流程

    请看下面这张图

    首先我们得对线程池进行一个功能拆分

    • thread pool 就是我们的线程池,t1,t2,t3代表三个线程
    • blocking queue代表阻塞队列
    • main代表main方法的线程
    • task1,task2,task3代表要执行的每个任务

    现在我们梳理一下执行的流程,注意这里是简略版的,文章后面我会给出详细版的

    所以此时,我们发现了需要创建几个类,或者说几个角色,分别是

    • 线程池
    • 工作线程
    • 阻塞队列
    • 拒绝策略(干嘛的?就是当线程数已经满了,并且阻塞队列也满了,还有任务想进入阻塞队列的时候,就可以拒绝这个任务)

    实现方式

    1.拒绝策略

    /**
     * 拒绝策略
     */
    @functionalinterface
    interface rejectpolicy<t>{
    	//queue就是我们自己实现的阻塞队列,task是任务
        void reject(blockingqueue<t> queue,t task);
    }
    

    2.阻塞队列

    我们需要实现四个方法,获取和添加,超时获取和超时添加,至于方法实现的细节,我都备注了大量的注释进行解释。

    /**
     * 阻塞队列
     */
    class blockingqueue<t>{
        //阻塞队列
        private deque<t> queue = new arraydeque<>();
    
        //锁
        private reentrantlock lock = new reentrantlock();
    
        //生产者条件变量
        private condition fullwaitset = lock.newcondition();
    
        //消费者条件变量
        private condition emptywaitset = lock.newcondition();
    
        //容量
        private int capacity;
    
        public blockingqueue(int capacity){
            this.capacity = capacity;
        }
    
        //带有超时阻塞获取
        public t poll(long timeout, timeunit timeunit){
            lock.lock();
            try {
                //将timeout统一转换为纳秒
                long nanos = timeunit.tonanos(timeout);
                while(queue.isempty()){
                    try {
                        if(nanos <= 0){
                            //小于0,说明上次没有获取到,代表已经超时了
                            return null;
                        }
                        //返回值是剩余的时间
                        nanos = emptywaitset.awaitnanos(nanos);
                    } catch (interruptedexception e) {
                        e.printstacktrace();
                    }
                }
                t t = queue.removefirst();
                //通知生产者
                fullwaitset.signal();
                return t;
            }finally {
                lock.unlock();
            }
        }
    
        //阻塞获取
        public t take(){
            lock.lock();
            try{
                while(queue.isempty()){ //如果任务队列为空,代表线程池没有可以执行的内容
                    try {
                         /*
                        也就说此时进来的线程是执行不了任务的,所以此时emptywaitset消费者要进行阻塞状态
                        等待下一次唤醒,然后继续判断队列是否为空
                         */
                        emptywaitset.await();
                    } catch (interruptedexception e) {
                        e.printstacktrace();
                    }
                }
                /*
                代码执行到这里。说明任务队列不为空,线程池就从任务队列拿出一个任务出来执行
                也就是说把阻塞队列的一个任务出队
                 */
                t t = queue.removefirst();
                /*
                然后唤醒之前存放在生成者condition休息室,因为由于之前阻塞队列已满,fullwaitset才会进入阻塞状态
                所以当阻塞队列删除了任务,就要唤醒之前进入阻塞状态的fullwaitset
                 */
                fullwaitset.signal();
                //返回任务
                return t;
            }finally {
                lock.unlock();
            }
        }
    
        //阻塞添加
        public void put(t task){
            lock.lock();
            try {
                while(queue.size() == capacity){    //任务队列满了
                    try {
                        system.out.println("等待加入任务队列"+task);
                        /*
                        也就说此时进来的任务是进不了阻塞队列的,已经满了,所以此时生产者condition要进入阻塞状态
                        等待下一次唤醒,然后继续判断队列是否为空
                         */
                        fullwaitset.await();
                    } catch (interruptedexception e) {
                        e.printstacktrace();
                    }
                }
                //任务队列还未满
                system.out.println("加入任务队列"+task);
                //把任务加入阻塞队列
                queue.addlast(task);
                /*
                然后唤醒之前存放在消费者condition休息室,因为由于之前阻塞队列为空,emptywaitset才会进入阻塞状态
                所以当阻塞队列加入了任务,就要唤醒之前进入阻塞状态的emptywaitset
                 */
                emptywaitset.signal();
            }finally {
                lock.unlock();
            }
        }
    
        //带超时阻塞时间添加
        public boolean offer(t task,long timeout,timeunit timeunit){
            lock.lock();
            try {
                long nanos = timeunit.tonanos(timeout);
                while(queue.size() == capacity){
                    try {
                        if(nanos < 0){
                            return false;
                        }
                        system.out.println("等待加入任务队列"+task);
                        //不会一直阻塞,超时就会继续向下执行
                        nanos = fullwaitset.awaitnanos(nanos);
                    } catch (interruptedexception e) {
                        e.printstacktrace();
                    }
                }
                system.out.println("加入任务队列"+task);
                queue.addlast(task);
                emptywaitset.signal();
                return true;
            }finally {
                lock.unlock();
            }
        }
    
        //获取任务数量
        public int size(){
            lock.lock();
            try{
                return queue.size();
            }finally {
                lock.unlock();
            }
        }
    
        //尝试添加任务,如果阻塞队列已经满了,就使用拒绝策略
        public void tryput(rejectpolicy<t> rejectpolicy, t task){
            lock.lock();
            try {
                //判断队列是否已满
                if(queue.size() == capacity){
                    rejectpolicy.reject(this,task);
                }else{  //有空闲
                    system.out.println("加入任务队列"+task);
                    queue.addlast(task);
                    emptywaitset.signal();
                }
            }finally {
                lock.unlock();
            }
        }
    }
    

    3.线程池和工作线程

    我把工作线程当成线程池的内部类去实现。方便调用变量。

    /**
     * 线程池
     */
    class threadpool{
        //阻塞队列
        private blockingqueue<runnable> taskqueue;
    
        //线程集合
        private hashset<worker> workers = new hashset<>();
    
        //核心线程数
        private int coresize;
    
        //获取任务的超时时间
        private long timeout;
    
        private timeunit timeunit;
    
        private rejectpolicy<runnable> rejectpolicy;
    
        public threadpool(int coresize, long timeout, timeunit timeunit, int queuecapacity,rejectpolicy<runnable> rejectpolicy) {
            this.coresize = coresize;
            this.timeout = timeout;
            this.timeunit = timeunit;
            this.taskqueue = new blockingqueue<>(queuecapacity);
            this.rejectpolicy = rejectpolicy;
        }
    
        //执行任务
        public void execute(runnable task){
            synchronized (workers){
                if(workers.size() <= coresize){  //当前的线程数小于核心线程数
                    worker worker = new worker(task);
                    workers.add(worker);
                    //让线程开始工作,执行它的run方法
                    worker.start();
                }else{
                    // 1) 死等
                    // 2) 带超时等待
                    // 3) 让调用者放弃任务执行
                    // 4) 让调用者抛出异常
                    // 5) 让调用者自己执行任务
                    taskqueue.tryput(rejectpolicy,task);
                }
            }
        }
    
        /**
         * 工作线程,也就是线程池里面的线程
         */
        class worker extends thread{
            private runnable task;
            public worker(runnable task){
                this.task = task;
            }
    
            @override
            public void run() {
                //执行任务
                // 1) 当 task 不为空,执行任务
                // 2) 当 task 执行完毕,再接着从任务队列获取任务并执行
                while (task != null || (task = taskqueue.poll(timeout, timeunit)) != null) {
                    try {
                        system.out.println("正在执行的任务" + task);
                        task.run();
                    } catch (exception e) {
                        e.printstacktrace();
                    } finally {
                        //代表这个任务已经执行完了
                        task = null;
                    }
                }
                synchronized (workers) {
                    system.out.println("worker 被移除" + this);
                    workers.remove(this);
                }
            }
        }
    }
    

    策略模式

    细心的小伙伴已经发现,我在拒绝策略这里使用了23种设计模式的策略模式,因为我没有将拒绝的方式写死,而是交给了调用者去实现。

    对比jdk的线程池

    下面是jdk自带的线程池

    经典的七大核心参数

    • corepoolsize:核心线程数
    • queuecapacity:任务队列容量(阻塞队列)
    • maxpoolsize:最大线程数
    • keepalivetime:线程空闲时间
    • timeunit unit:超时时间单位
    • threadfactory threadfactory:线程工程
    • rejectedexecutionhandler:任务拒绝处理器

    实际上我们自己实现的也大同小异,只不过jdk官方的更为复杂。

    jdk线程执行的流程图

    线程池的状态转化

    线程我们知道在操作系统层面有5种状态

    • 初始状态:仅是在语言层面创建了线程对象,还未与操作系统线程关联
    • 可运行状态(就绪状态):指该线程已经被创建(与操作系统线程关联),可以由 cpu 调度执行
    • 运行状态:指获取了 cpu 时间片运行中的状态,当 cpu 时间片用完,会从【运行状态】转换至【可运行状态】,会导致线程的上下文切换
    • 阻塞状态
    • 如果调用了阻塞 api,如 bio 读写文件,这时该线程实际不会用到 cpu,会导致线程上下文切换,进入【阻塞状态】
    • 等 bio 操作完毕,会由操作系统唤醒阻塞的线程,转换至【可运行状态】
    • 与【可运行状态】的区别是,对【阻塞状态】的线程来说只要它们一直不唤醒,调度器就一直不会考虑调度它们
    • 终止状态:表示线程已经执行完毕,生命周期已经结束,不会再转换为其它状态

    线程在java api层面有6种状态

    • new 线程刚被创建,但是还没有调用 start() 方法
    • runnable 当调用了 start() 方法之后,注意,java api 层面的
    • runnable 状态涵盖了 操作系统 层面的【可运行状态】、【运行状态】
    • blocked , waiting , timed_waiting 都是 java api 层面对【阻塞状态】的细分
    • terminated 当线程代码运行结束

    线程池有5种状态

    • running:能接受新任务,并处理阻塞队列中的任务
    • shutdown:不接受新任务,但是可以处理阻塞队列中的任务
    • stop:不接受新任务,并且不处理阻塞队列中的任务,并且还打断正在运行任务的线程,就是直接不干了!
    • tidying:所有任务都终止,并且工作线程也为0,处于关闭之前的状态
    • terminated:已关闭。

    总结

    本篇文章就到这里了,希望能给你带来帮助,也希望您能够多多关注www.887551.com的更多内容!