`
dorishy
  • 浏览: 11015 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

java线程池原理

阅读更多

        平时使用java写多线程的时候经常使用到jdk的提供的线程池,线程池的概念人人都知道,非常好理解。但是jdk底层到底如何实现线程池的呢?或者说是通过什么方式达到线程池中线程复用的效果的呢?我也是带着这个疑问去翻阅了一下jdk6的源码。

 

一、java线程池的小例子

    

public class MyThread implements Runnable {
    private int i = 0;
    MyThread(int i){
        this.i = i;
    }
    @Override
    public void run() {
        try {
            //线程休眠3秒
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //打印当前线程执行完毕时候的时间,线程id,线程编号
        System.out.println("time:" + new Date() + ", threadId:" + Thread.currentThread().getId() + " ,thread " + i);
    }
}

public class ExecutorTestMain {

    public static void main(String[] args) {

        System.out.println("Main start");
        //创建线程池(线程池固定大小3)
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        //生成10个线程丢进线程池中
        for (int i = 0; i < 10; i++) {
            executorService.execute(new MyThread(i));
        }
        System.out.println("Main wait for MyThread");
    }
}

 输出:

Main start
Main wait for MyThread
time:Mon Sep 01 14:52:44 CST 2014, threadId:10 ,thread 2
time:Mon Sep 01 14:52:44 CST 2014, threadId:9 ,thread 1
time:Mon Sep 01 14:52:44 CST 2014, threadId:8 ,thread 0
time:Mon Sep 01 14:52:47 CST 2014, threadId:8 ,thread 5
time:Mon Sep 01 14:52:47 CST 2014, threadId:9 ,thread 4
time:Mon Sep 01 14:52:47 CST 2014, threadId:10 ,thread 3
time:Mon Sep 01 14:52:50 CST 2014, threadId:8 ,thread 6
time:Mon Sep 01 14:52:50 CST 2014, threadId:9 ,thread 7
time:Mon Sep 01 14:52:50 CST 2014, threadId:10 ,thread 8
time:Mon Sep 01 14:52:53 CST 2014, threadId:8 ,thread 9

    从上面一个小例子中,可以看出10个线程一下子全丢进线程中了,但是每次执行只有3个(从执行完成时间和执行的线程id可以看出),3个执行完毕之后,才重新释放出来继续执行后面的。那么jdk是怎么做到这种线程的复用的呢?

 

二、线程池原理

         

//Executors 使用不同工厂方法构造不同需求的线程池
ExecutorService executorService = Executors.newFixedThreadPool(3);
     ExecutorService 继承 Executor,线程池接口,定义了线程池执行的一系列接口。

 

   Executors 线程池的工厂类和工具类,提供了构造线程池的工厂方法以及一些线程池操作的工具方法。

  

   对于不同线程池底层复用线程的原理都是一样的,所以我就以小例子中提到的线程池来看看线程池中复用的原理。

 

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    Executors 工厂类实际上就是返回一个ThreadPoolExecutor类,ThreadPoolExecutor实际上就可以理解为我们概念中的池子的概念了。

 

 

1. ThreadPoolExecutor中包含的属性(主要的属性,其他的参考源码)

 

    //线程池的状态,定义了一些状态常量都是int型,主要是方便比较,后面代码中会看到
    //当前状态
    volatile int runState;
    //接收新的任务,并且处理队列中已有的任务(任务的缓存队列)
    static final int RUNNING    = 0;
    //不接受新任务了,但是会处理队列中已有的任务
    static final int SHUTDOWN   = 1;
    //不接受新任务,不处理队列中已有任务,并且终止正在处理的任务
    static final intb STOP       = 2;
    //同STOP,但是所以线程已经终止
    static final int TERMINATED = 3;

    //线程池的缓存队列,用于缓存等待线程的任务
    private final BlockingQueue<Runnable> workQueue;

    //线程池中的所以工作线程(Worker是ThreadPoolExecutor内部类,实现了Runnable,复用的东东,后面详细介绍)
    private final HashSet<Worker> workers = new HashSet<Worker>();

    //空闲线程等待工作的超时设置
    private volatile long  keepAliveTime;
    private volatile boolean allowCoreThreadTimeOut;


    //一般情况下线程池大小,对比于maximumPoolSize来看
    private volatile int   corePoolSize;
    //线程池最大线程,当线程池处于RUNNING,poolSize大于等于corePoolSize以及小于maximumPoolSize,并且向缓存队列添加失败的情况下,会启用新的Worker来处理任务,如果poolSize超过了maximumPoolSize,那么任务将被拒绝处理
    private volatile int   maximumPoolSize;
    //当前线程数量
    private volatile int   poolSize;
    //任务被拒绝处理的处理类
    private volatile RejectedExecutionHandler handler;
    //线程的工厂类,默认实现是DefaultThreadFactory
    private volatile ThreadFactory threadFactory;

 

2. executorService.execute(new MyThread(i))

 

//executorService.execute(new MyThread(i));
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    //1.若果poolSize<corePoolSize,那么启用新的Worker处理任务(addIfUnderCorePoolSize)
    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
        //2.poolSize >= corePoolSize ,那么放入任务缓存队列(workQueue.offer)
        if (runState == RUNNING && workQueue.offer(command)) {
            //3.runState != RUNNING 则处理后续情况
            if (runState != RUNNING || poolSize == 0)
                ensureQueuedTaskHandled(command);
        }
        //4.poolSize >= corePoolSize,放入缓冲失败的情况(workQueue.offer失败)
        else if (!addIfUnderMaximumPoolSize(command))
            reject(command); // is shutdown or saturated
    }
}

 

3 . 创建新的Worker处理任务

        addIfUnderCorePoolSize 和 addIfUnderMaximumPoolSize 处理逻辑一致,下面以addIfUnderCorePoolSize为例说明

private boolean addIfUnderCorePoolSize(Runnable firstTask) {
    Thread t = null;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //addIfUnderMaximumPoolSize这里的if判断条件是poolSize < maximumPoolSize && runState == RUNNING
        if (poolSize < corePoolSize && runState == RUNNING)
            //addTread返回的线程是Worker
            t = addThread(firstTask);
    } finally {
        mainLock.unlock();
    }
    if (t == null)
        return false;
    //执行线程,会执行到Worker的run方法
    t.start();
    return true;
}

 

private Thread addThread(Runnable firstTask) {
    //将任务封装成Worker
    Worker w = new Worker(firstTask);
    //创建Worker线程
    Thread t = threadFactory.newThread(w);
    if (t != null) {
        w.thread = t;
        workers.add(w);
        int nt = ++poolSize;
        if (nt > largestPoolSize)
            largestPoolSize = nt;
    }
    return t;
}

    

4. Worker

        Worker就可以理解为线程池中的工作线程了,所有提交上来的线程都是在在这些工作线上上面执行的,那么具体是如何做到执行和复用的呢?下面我们就一探究竟。

 

        Worker实际上是ThreadPoolExecutor的一个内部类,实现了Runnable接口。主要属性:

//初始化Worker的时候第一个任务
private Runnable firstTask;

//worker执行的任务数
volatile long completedTasks;

//worker所运行的thread对象
Thread thread;

    线程执行的时候会执行到Worker中的run方法

public void run() {
    try {
        Runnable task = firstTask;
        firstTask = null;
        //getTask 就是从缓冲中获取任务,我们可以看到,这个Worker线程会循环执行,不停的从缓冲队列中获取任务执行
        while (task != null || (task = getTask()) != null) {
            //获取任务执行
            runTask(task);
            task = null;
        }
    } finally {
        workerDone(this);
    }
}
Runnable getTask() {
    //从缓存队列中获取任务执行(当然获取的逻辑中涉及到很多当前线程池不同状态的情况)
    for (;;) {
        try {
            int state = runState;
            if (state > SHUTDOWN)
                return null;
            Runnable r;
            if (state == SHUTDOWN)  // Help drain queue
                r = workQueue.poll();
            else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
            else
                r = workQueue.take();
            if (r != null)
                return r;
            if (workerCanExit()) {
                if (runState >= SHUTDOWN) // Wake up others
                    interruptIdleWorkers();
                return null;
            }
            // Else retry
        } catch (InterruptedException ie) {
            // On interruption, re-check runState
        }
    }
}

         我们都知道,线程执行的过程中都一些自己执行相关的信息,这些信息显然是不可能给其他任务去复用的,所以在执行之前需要去做一些清理工作。

 

private void runTask(Runnable task) {
    final ReentrantLock runLock = this.runLock;
    runLock.lock();
    try {
        /*
         * Ensure that unless pool is stopping, this thread
         * does not have its interrupt set. This requires a
         * double-check of state in case the interrupt was
         * cleared concurrently with a shutdownNow -- if so,
         * the interrupt is re-enabled.
         */
        if (runState < STOP &&
            Thread.interrupted() &&
            runState >= STOP)
            thread.interrupt();
        /*
         * Track execution state to ensure that afterExecute
         * is called only if task completed or threw
         * exception. Otherwise, the caught runtime exception
         * will have been thrown by afterExecute itself, in
         * which case we don't want to call it again.
         */
        boolean ran = false;
        //重新初始化thread中的ThreadLocals等信息
        beforeExecute(thread, task);
        try {
            //任务的执行
            task.run();
            ran = true;
            //后置处理
            afterExecute(task, null);
            ++completedTasks;
        } catch (RuntimeException ex) {
            if (!ran)
                afterExecute(task, ex);
            throw ex;
        }
    } finally {
        runLock.unlock();
    }
}

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics