文章目录

      • 阻塞队列
      • coreSize用完了,队列也满了采用了 才用这个接口的实现类的方法
      • 线程池
      • 测试


本文代码可能会因为某些地方没有加锁,会抛出异常!请谨慎食用。写这个只是为了更好地了解线程池的实现。

阻塞队列

public class BlockQueue<T> { 
    private Deque<T> queue=new ArrayDeque<>();
    private int capcity;
    private ReentrantLock lock=new ReentrantLock();
    private Condition emptyWait=lock.newCondition();
    private Condition fullWait=lock.newCondition();

    public BlockQueue(int capcity) { 
        this.capcity = capcity;
    }
    public int getSize(){ 
        return queue.size();
    }
    public T take(){ 
        lock.lock();
        try{ 
            while(queue.isEmpty()){ 
                try { 
                    emptyWait.await();
                } catch (InterruptedException e) { 
                    e.printStackTrace();
                }
            }
            T t=queue.removeFirst();
            fullWait.signal();
            return t;
        }finally{ 
            lock.unlock();
        }
    }
    public void put(T task,RejectPolicy<T> rejectPolicy){ 
        lock.lock();
        try{ 
            if(queue.size()==capcity){ 
                rejectPolicy.reject(this,task);
            }else{ 
                queue.addLast(task);
            }
        }finally { 
            lock.unlock();
        }
    }
    public T timeTake(long timeout, TimeUnit timeUnit) { 
        lock.lock();
        try{ 
            long nanos=timeUnit.toNanos(timeout);
            while(queue.isEmpty()){ 
                try { 
                    if(nanos<=0) return null;
                    nanos=emptyWait.awaitNanos(timeout);
                } catch (InterruptedException e) { 
                    e.printStackTrace();
                }
            }
            T t=queue.removeFirst();
            fullWait.signal();
            return t;
        }finally{ 
            lock.unlock();
        }
    }
    public boolean timePut(T task,long timeout,TimeUnit timeUnit){ 
        lock.lock();
        try{ 
            long nanos=timeUnit.toNanos(timeout);
            while(queue.size()==capcity){ 
                try { 
                    if(nanos<=0) return false;
                    nanos=fullWait.awaitNanos(timeout);
                } catch (InterruptedException e) { 
                    e.printStackTrace();
                }
            }
            queue.addLast(task);
            emptyWait.signal();
            return true;
        }finally{ 
            lock.unlock();
        }
    }
}

coreSize用完了,队列也满了采用了 才用这个接口的实现类的方法

public interface RejectPolicy<T> { 
    void reject(BlockQueue<T> taskQueue,T task);
}

线程池

public class ThreadPool { 
    private BlockQueue<Runnable> taskQueue;
    private int coreSize;
    private HashSet<Worker> workers=new HashSet<>();
    private RejectPolicy<Runnable> rejectPolicy;
    public ThreadPool(int capcity,int coreSize,RejectPolicy<Runnable> rejectPolicy){ 
        this.taskQueue=new BlockQueue<>(capcity);
        this.coreSize=coreSize;
        this.rejectPolicy=rejectPolicy;
    }
    public void excute(Runnable task){ 
        synchronized (workers){ 
            if(workers.size()<coreSize){ 
                Worker worker=new Worker(task);
                worker.start();
                workers.add(worker);
            }else{ 
                taskQueue.put(task,rejectPolicy);
            }
        }
    }
    class Worker extends Thread{ 
        private Runnable task;
        public Worker(Runnable task){ 
            this.task=task;
        }
        @Override
        public void run(){ 
            while(task!=null||(task=taskQueue.timeTake(1000,TimeUnit.MILLISECONDS))!=null){ 
                try{ 
                    task.run();
                }catch(RuntimeException e){ 
                    e.printStackTrace();
                }
                task=null;
            }
            workers.remove(this);
        }
    }
}

测试

public class Test { 
    public static void main(String[] args) { 
        ThreadPool threadPool=new ThreadPool(2,2,(queue,task)->{ 
            //什么也不做
            //task.run(); 让主线程去执行
            //throw new RuntimeException("阻塞队列容量不够 直接不管执行任务失败"+task);//抛出异常
            //boolean flag=queue.timePut(task,1000, TimeUnit.MILLISECONDS);
            //if(flag==false) throw new RuntimeException("阻塞队列容量不够,超时获取也没获取得到"+task);
        });
        threadPool.excute(()->{ 
            System.out.println("fnq是小狗");
        });
        threadPool.excute(()->{ 
            System.out.println("swt是小猪猪");
        });
    }
}

本文地址:https://blog.csdn.net/qq_42576687/article/details/109268564