經(jīng)驗分享:深入理解Java 并行編程
中國IT實驗室 發(fā)表于:12年10月23日 14:42 [轉(zhuǎn)載] 中國IT實驗室
概述
基于Thread類和Runnable接口編程很容易實現(xiàn)基本的并行編程任務,但實現(xiàn)復雜的并行程序就會比較困難,因為要詳細考慮資源的同步訪問以及設計必要數(shù)據(jù)結構支持同步訪問。從Java5后,Java平臺提供了java.util.concurrent包以及HighLevelAPI簡化并行編程模型,并提供了很多支持同步訪問數(shù)據(jù)結構滿足編程需要。具體來講,該HighLevelAPI提供了以下內(nèi)容,在后續(xù)段落我將分別介紹。
Lock(鎖對象):相對與Thread模型的隱式的鎖對象,Lock提供了顯式的鎖操作從而簡化應用程序。
Executors:提供了一組HighLevelAPI用來執(zhí)行和管理并行任務。
ConcurrentCollections(并行集合):包含了一組支持并行處理的數(shù)據(jù)結構,大大簡化了并行編程難度。
AtomicVariables(原子變量):減少了同步操作并且避免數(shù)據(jù)不一致問題。
Fork/Join框架:提供了進程操作的支持。
Lock(鎖對象)
Lock在java.util.concurrent.locks包里面,是一個用來同步對共享資源的訪問的工具,其實際效果和Synchronize 比較像。Synchronize用起來很容易,但是Synchroize也有一些限制:當Synchronize以一定順序獲得多個鎖的時候,必須以相反的順序釋放鎖,并且必須在與獲得鎖相同的代碼scope中釋放鎖。Lock則完全克服了上述限制:能夠以任意的順序獲得和釋放所,而且不需要收到代碼 scope的限制。使用Lock也很簡單,通常會使用以下代碼使用Lock:
方式一:
Lockl=…;
l.lock();
try{
//訪問共享資源
}finally{
l.unlock()
}
方式二:
Lockl=…;
if(l.tryLock()){
try{
//訪問共享資源
finally{
l.unlock();
}
else{
//沒能夠獲得鎖,做其它事情
}
ConcurrentCollections(并行集合):
java.util.concurrent包還提供了一系列的數(shù)據(jù)結構,使得并行程序的開發(fā)人員可以更少地關注同步操作。
BlockingQueue是一個支持先入先出(First-in-first-out,FIFO)的數(shù)據(jù)結構,對于隊列的push和poll等操作都采取阻塞的方式保證只有在獲得資源鎖的時候才可以執(zhí)行。
ConcurrentMap是一個跟Map類似的存儲key/value映射的數(shù)據(jù)結構,該數(shù)據(jù)結構保證了Map操作的原子性。
ConcurrentNavigableMap是一個跟TreeMap類似的數(shù)據(jù)結構,允許基于key進行樹狀遍歷。
下面給出BlockingQueue的基本用法:
BlockingQueuequeue=newSynchronousQueue();
queue.put(“helloworld”);
queue.poll();//返回helloworld
AtomicVariables(原子變量):
java.util.concurrent.atomic包提供了一些數(shù)據(jù)結構用來支持對變量的原子操作,這些數(shù)據(jù)結構都必須支持get和set方法從而支持對變量的讀寫。下面是一個例子來證明如何使用AtomicInteger的。
importjava.util.concurrent.atomic.AtomicInteger;
classAtomicCounter{
privateAtomicIntegerc=newAtomicInteger(0);
publicvoidincrement(){
c.incrementAndGet();
}
publicvoiddecrement(){
c.decrementAndGet();
}
publicintvalue(){
returnc.get();
}
}
Fork/Join框架
Fork/Join是自JDK7以后才有的對ExecutorService接口的一個實現(xiàn),用來支持在多核處理器上進行多進程操作。該框架適合于那些能夠被遞歸地分解成更小任務的任務。該框架最牛的地方在于采用了work-stealing算法,也就是空閑的worker可以從繁忙的worker那里偷(steal)任務過來執(zhí)行,從而充分利用計算資源使得任務執(zhí)行更快。
Fork/Join的核心是ForkJoinPool這個類,它是AbstractExecutorService的子類并實現(xiàn)了work-stealing算法,其被用來執(zhí)行ForkJoinTask.
ForkJoinTask實現(xiàn)類有RecursiveTask核ResursiveAction,通常實現(xiàn)的邏輯為:
if(當前任務足夠小)
直接執(zhí)行任務
else
將當前任務分為更小的兩個任務
執(zhí)行這兩個更小的任務并等待結果
下面的代碼示意對Fork/Join框架的使用:
public class MyListPrinter extends RecursiveAction {
private List task;
public MyListPrinter(List list)
task = list;
}
protected void printList() {
// 遍歷并打印task中的元素
}
protected void compute() {
if (task.length() <= 1000) {
printList();
} else{
List first_half_list = ……
List second_half_list = ……
MyListPrinter printer1 = new MyListPrinter(first_half_list);
MyListPrinter printer2 = new MyListPrinter(second_half_list);
invokeAll(printer1, printer2);
}
}
}
MyListPrinterprinter=newMyListPrinter(aHugeList);
ForkJoinPoolpool=newForkJoinPool();
pool.invoke(printer);