為了賬號安全,請及時綁定郵箱和手機立即綁定

Java線程池相關知識點總結

2017.07.16 08:38 11472瀏覽

Android中常見到的很多通用組件一般都離不開”池”的概念,如各種圖片加載庫,網絡請求庫,即使Android的消息傳遞機制中的Meaasge當使用Meaasge.obtain()就是使用的Meaasge池中的對象,因此這個概念很重要。本文將介紹的線程池技術同樣符合這一思想。

  1. 線程池的優點:
    重用線程池中的線程,減少因對象創建,銷毀所帶來的性能開銷;
    能有效的控制線程的最大并發數,提高系統資源利用率,同時避免過多的資源競爭,避免堵塞;
    能夠多線程進行簡單的管理,使線程的使用簡單、高效。

  2. 線程池框架Executor
    java中的線程池是通過Executor框架實現的,Executor 框架包括類:Executor,Executors,ExecutorService,ThreadPoolExecutor ,Callable和Future、FutureTask的使用等。

Executor: 所有線程池的接口,只有一個方法。

public interface Executor {        
  void execute(Runnable command);      
}

ExecutorService: 增加Executor的行為,是Executor實現類的最直接接口。
Executors: 提供了一系列工廠方法用于創先線程池,返回的線程池都實現了ExecutorService 接口。
ThreadPoolExecutor:線程池的具體實現類,一般用的各種線程池都是基于這個類實現的。
構造方法如下:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
                              
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
}

參數介紹:
corePoolSize:線程池的核心線程數,線程池中運行的線程數也永遠不會超過 corePoolSize 個,默認情況下可以一直存活。可以通過設置allowCoreThreadTimeOut為True,此時 核心線程數就是0,此時keepAliveTime控制所有線程的超時時間。
maximumPoolSize:線程池允許的最大線程數;
keepAliveTime: 指的是空閑線程結束的超時時間;
unit :是一個枚舉,表示 keepAliveTime 的單位;
workQueue:表示存放任務的BlockingQueue<Runnable隊列。
BlockingQueue:阻塞隊列(BlockingQueue)是java.util.concurrent下的主要用來控制線程同步的工具。如果BlockQueue是空的,從BlockingQueue取東西的操作將會被阻斷進入等待狀態,直到BlockingQueue進了東西才會被喚醒。同樣,如果BlockingQueue是滿的,任何試圖往里存東西的操作也會被阻斷進入等待狀態,直到BlockingQueue里有空間才會被喚醒繼續操作。
阻塞隊列常用于生產者和消費者的場景,生產者是往隊列里添加元素的線程,消費者是從隊列里拿元素的線程。阻塞隊列就是生產者存放元素的容器,而消費者也只從容器里拿元素。具體的實現類有LinkedBlockingQueue,ArrayBlockingQueued等。一般其內部的都是通過Lock和Condition(顯示鎖(Lock)及Condition的學習與使用)來實現阻塞和喚醒。

3.線程池的工作過程
線程池剛創建時,里面沒有一個線程。任務隊列是作為參數傳進來的。不過,就算隊列里面有任務,線程池也不會馬上執行它們。
當調用 execute() 方法添加一個任務時,線程池會做如下判斷:
如果正在運行的線程數量小于 corePoolSize,那么馬上創建線程運行這個任務;
如果正在運行的線程數量大于或等于 corePoolSize,那么將這個任務放入隊列;
如果這時候隊列滿了,而且正在運行的線程數量小于 maximumPoolSize,那么還是要創建非核心線程立刻運行這個任務;
如果隊列滿了,而且正在運行的線程數量大于或等于 maximumPoolSize,那么線程池會拋出異常RejectExecutionException。
當一個線程完成任務時,它會從隊列中取下一個任務來執行。
當一個線程無事可做,超過一定的時間(keepAliveTime)時,線程池會判斷,如果當前運行的線程數大于 corePoolSize,那么這個線程就被停掉。所以線程池的所有任務完成后,它最終會收縮到 corePoolSize 的大小。

4.線程池的創建和使用
生成線程池采用了工具類Executors的靜態方法,以下是幾種常見的線程池。
1)SingleThreadExecutor:單個后臺線程 (其緩沖隊列是無界的)

public static ExecutorService newSingleThreadExecutor() {        
    return new FinalizableDelegatedExecutorService (
        new ThreadPoolExecutor(1, 1,                                    
        0L, TimeUnit.MILLISECONDS,                                    
        new LinkedBlockingQueue<Runnable>()));   
}

創建一個單線程的線程池。這個線程池只有一個核心線程在工作,也就是相當于單線程串行執行所有任務。如果這個唯一的線程因為異常結束,那么會有一個新的線程來替代它。此線程池保證所有任務的執行順序按照任務的提交順序執行。

2)FixedThreadPool:只有核心線程的線程池,大小固定 (其緩沖隊列是無界的) 。

public static ExecutorService newFixedThreadPool(int nThreads) {         
        return new ThreadPoolExecutor(nThreads, nThreads,                                       
            0L, TimeUnit.MILLISECONDS,                                         
            new LinkedBlockingQueue<Runnable>());     
}

創建固定大小的線程池。每次提交一個任務就創建一個線程,直到線程達到線程池的最大大小。線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執行異常而結束,那么線程池會補充一個新線程。
3)CachedThreadPool:無界線程池,可以進行自動線程回收。

public static ExecutorService newCachedThreadPool() {         
    return new ThreadPoolExecutor(0,Integer.MAX_VALUE,                                           
           60L, TimeUnit.SECONDS,                                       
           new SynchronousQueue<Runnable>());     
}

如果線程池的大小超過了處理任務所需要的線程,那么就會回收部分空閑(60秒不執行任務)的線程,當任務數增加時,此線程池又可以智能的添加新線程來處理任務。此線程池不會對線程池大小做限制,線程池大小完全依賴于操作系統(或者說JVM)能夠創建的最大線程大小。SynchronousQueue是一個是緩沖區為1的阻塞隊列。
4)ScheduledThreadPool:核心線程池固定,大小無限的線程池。此線程池支持定時以及周期性執行任務的需求。

public static ExecutorService newScheduledThreadPool(int corePoolSize) {         
    return new ScheduledThreadPool(corePoolSize, 
              Integer.MAX_VALUE,                                                  
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,                                                    
              new DelayedWorkQueue());    
}

5.線程池實現的原理
如果只講線程池的使用,那這篇博客沒有什么大的價值,充其量也就是熟悉Executor相關API的過程。線程池的實現過程沒有用到Synchronized關鍵字,用的都是volatile,Lock和同步(阻塞)隊列,Atomic相關類,FutureTask等等,因為后者的性能更優。理解的過程可以很好的學習源碼中并發控制的思想。

在ThreadPoolExecutor主要Worker類來控制線程的復用。看下Worker類簡化后的代碼,這樣方便理解:

private final class Worker implements Runnable {
 
	final Thread thread;
    
	Runnable firstTask;
    
	Worker(Runnable firstTask) {
		this.firstTask = firstTask;
		this.thread = getThreadFactory().newThread(this);
	}
    
	public void run() {
		runWorker(this);
	}
	
	final void runWorker(Worker w) {
		Runnable task = w.firstTask;
		w.firstTask = null;
		while (task != null || (task = getTask()) != null){
		task.run();
	}
}

Worker是一個Runnable,同時擁有一個thread,這個thread就是要開啟的線程,在新建Worker對象時同時新建一個Thread對象,同時將Worker自己作為參數傳入TThread,這樣當Thread的start()方法調用時,運行的實際上是Worker的run()方法,接著到runWorker()中,有個while循環,一直從getTask()里得到Runnable對象,順序執行。getTask()又是怎么得到Runnable對象的呢?

private Runnable getTask() {
    if(一些特殊情況) {
        return null;
    }

    Runnable r = workQueue.take();

    return r;
}

這個workQueue就是初始化ThreadPoolExecutor時存放任務的BlockingQueue隊列,這個隊列里的存放的都是將要執行的Runnable任務。因為BlockingQueue是個阻塞隊列,BlockingQueue.take()得到如果是空,則進入等待狀態直到BlockingQueue有新的對象被加入時喚醒阻塞的線程。所以一般情況Thread的run()方法就不會結束,而是不斷執行從workQueue里的Runnable任務,這就達到了線程復用的原理了。

我是面試課的DocMike老師,曾在阿里、百度、美團,最近和北大同學搞了一個
職場類公眾號: 健男說說 ,會有熱門互聯網職場咨詢和經驗,可以關注下,
也可以加我私人微信 570089514,注明慕課網就可以
以后有相關面試、內推、簡歷、職場的問題都可以通過微信和公眾號給我反饋
希望自己這么多年走過的彎路 能夠給大家一些幫助

控制最大并發數:
那Runnable是什么時候放入workQueue?Worker又是什么時候創建,Worker里的Thread的又是什么時候調用start()開啟新線程來執行Worker的run()方法的呢?有上面的分析看出Worker里的runWorker()執行任務時是一個接一個,串行進行的,那并發是怎么體現的呢?
很容易想到是在execute(Runnable runnable)時會做上面的一些任務。看下execute里是怎么做的。

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();

     int c = ctl.get();
    // 當前線程數 < corePoolSize
    if (workerCountOf(c) < corePoolSize) {
        // 直接啟動新的線程。
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }

    // 活動線程數 >= corePoolSize
    // runState為RUNNING && 隊列未滿
    // workQueue.offer(command)表示添加到隊列,如果添加成功返回true,否則返回false
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 再次檢驗是否為RUNNING狀態
        // 非RUNNING狀態 則從workQueue中移除任務并拒絕
        if (!isRunning(recheck) && remove(command))
            reject(command);// 采用線程池指定的策略拒絕任務
        // 兩種情況:
        // 1.非RUNNING狀態拒絕新的任務
        // 2.隊列滿了啟動新的線程失敗(workCount > maximumPoolSize)
    } else if (!addWorker(command, false))
        reject(command);
}

根據代碼再來看上面提到的線程池工作過程中的添加任務的情況:
如果正在運行的線程數量小于 corePoolSize,那么馬上創建線程運行這個任務;
如果正在運行的線程數量大于或等于 corePoolSize,那么將這個任務放入隊列;
如果這時候隊列滿了,而且正在運行的線程數量小于 maximumPoolSize,那么還是要創建非核心線程立刻運行這個任務;
如果隊列滿了,而且正在運行的線程數量大于或等于 maximumPoolSize,那么線程池會拋出異常RejectExecutionException。

點擊查看更多內容
15人點贊

若覺得本文不錯,就分享一下吧!

評論

相關文章推薦

正在加載中
意見反饋 幫助中心 APP下載
官方微信

舉報

0/150
提交
取消
lpl竞猜