CompleteFuture
CompletableFuture,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。默认使用CommonPool支持自定义线程池。
使用场景
适用在耗时计算的场景中,实现阻塞转移。适用于master接受请求,使用CompletableFuture启动工作线程去工作。最后在master线程就获取该future的执行结果即可。
- 合并多个耗时计算任务
- 获取任意一个任务完成就返回,适用于最少可用场景。
调用
CompleteFuture
的get方法要当心,如果说运行的Job异常处理不好使用了e.printStackTrace();
,那么调用get方法会导致该future一直阻塞。如果Job中的异常没有做处理,可以在get使用超时设置,用来结束阻塞。或者不要使用get方法。使用join方法。来获取。
源码分析
- init初始化
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long RESULT;
private static final long STACK;
private static final long NEXT;
static {
try {
final sun.misc.Unsafe u;
UNSAFE = u = sun.misc.Unsafe.getUnsafe();
Class<?> k = CompletableFuture.class;
RESULT = u.objectFieldOffset(k.getDeclaredField("result"));
STACK = u.objectFieldOffset(k.getDeclaredField("stack"));
NEXT = u.objectFieldOffset
(Completion.class.getDeclaredField("next"));
} catch (Exception x) {
throw new Error(x);
}
}
```
静态代码块初始化Unsafe后门,获取内存中的result,next,stack的偏移量。为之后的cas操作做准备。
-
构造方法
public CompletableFuture() {} private CompletableFuture(Object r) { this.result = r; } ```
构造方法只有2个,带参数的还是私有构造函数。CompletableFuture
的主要使用攻略一般是调用内部提供的一系列工厂方法来创建CompletableFuture
。
- 工厂方法
使用ForkJoinPool的commonPool执行任务调用工厂方法的get方法返回
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
使用自定义线程池执行任务调用工厂方法的get方法返回
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
return asyncSupplyStage(screenExecutor(executor), supplier);
}
使用ForkJoinPool的commonPool执行任务回CompletableFuture<Void>
public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(asyncPool, runnable);
}
使用自定义线程池执行任务返回CompletableFuture<Void>
public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor) {
return asyncRunStage(screenExecutor(executor), runnable);
}
当任务执行已经完成之后创建一个新的`CompletableFuture`包装result
public static <U> CompletableFuture<U> completedFuture(U value) {
return new CompletableFuture<U>((value == null) ? NIL : value);
}
构造方法只有2个,带参数的还是私有构造函数。CompletableFuture
的主要使用攻略一般是调用内部提供的一系列工厂方法来创建CompletableFuture
。
- 方法调用
有返回值的
使用上一个任务的线程执行该任务。且上一个的任务的返回值会作为当作任务的入参
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
使用commonpoll中的新线程执行该任务。并且上一个的任务的返回值会作为当作任务的入参,返回任务的执行结果
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn) {
return uniApplyStage(asyncPool, fn);
}
使用自定义线程执行该任务。并且上一个的任务的返回值会作为当作任务的入参
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn, Executor executor) {
return uniApplyStage(screenExecutor(executor), fn);
}
使用上一个任务的线程执行该任务。且上一个的任务的返回值会作为当作任务的入参,无返回值
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
return uniAcceptStage(null, action);
}
使用commonpoll的线程执行该任务。且上一个的任务的返回值会作为当作任务的入参,无返回值
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
return uniAcceptStage(asyncPool, action);
}
使用自定义线程执行该任务。且上一个的任务的返回值会作为当作任务的入参,无返回值
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
Executor executor) {
return uniAcceptStage(screenExecutor(executor), action);
}
接收Runnable接口,上一个的任务的返回值会作为当作任务的入参,无返回值
public CompletableFuture<Void> thenRun(Runnable action) {
return uniRunStage(null, action);
}
接收Runnable接口,上一个的任务的返回值会作为当作任务的入参,无返回值,使用commonpoll的线程执行该任务。
public CompletableFuture<Void> thenRunAsync(Runnable action) {
return uniRunStage(asyncPool, action);
}
接收Runnable接口,上一个的任务的返回值会作为当作任务的入参,无返回值,使用自定义的线程池执行该任务。
public CompletableFuture<Void> thenRunAsync(Runnable action,
Executor executor) {
return uniRunStage(screenExecutor(executor), action);
}
- Field
Either the result or boxed AltResult
这个结果要么是正常返回的结果,要么就是返回包装结果AltResult(包装异常,空指针等)。
volatile Object result;
// Top of Treiber stack of dependent actions
存放每个任务,通过postComplete压入栈中。
volatile Completion stack;
//用于标识异步任务,可以更具这个空接口来追踪当前活跃的异步任务。
public static interface AsynchronousCompletionTask {
}
根据当前CPU可用数目,来判断是否使用CommonPool。
private static final boolean useCommonPool =(ForkJoinPool.getCommonPoolParallelism() > 1);
根据当前CPU可用数目,来判断是否使用CommonPool。
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
当前CPU无可用数目,则新创建一个线程。
static final class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) { new Thread(r).start(); }
}
Completion.tryFire的三种模式
static final int SYNC = 0同步;
static final int ASYNC = 1异步;
static final int NESTED = -1嵌套;
- Completion Class
abstract static class Completion extends ForkJoinTask<Void>
implements Runnable, AsynchronousCompletionTask {
关联的Job
volatile Completion next;
该方法用于完成的时候去调用,根据不同的模式处理返回的。
abstract CompletableFuture<?> tryFire(int mode);
当清除栈中已经完成的任务的时候会去调用该方法
abstract boolean isLive();
模版方法
public final void run() { tryFire(ASYNC); }
模版方法
public final boolean exec() { tryFire(ASYNC); return true; }
模版方法 返回值
public final Void getRawResult() { return null; }
模版方法 设置值
public final void setRawResult(Void v) {}
}
- postComplete Method
该方法负责把每一个CompletableFuture中的stack都取出来,放到当前的CompletableFuture的stack中。
final void postComplete() {
CompletableFuture<?> f = this;
多线程可见的无锁栈。
Completion h;
当前stack不为null或者CompletableFuture是dep,而不是当前实例。
while ((h = f.stack) != null ||
(f != this && (h = (f = this).stack) != null)) {
CompletableFuture<?> d; Completion t;
执行遍历。从头开始遍历。
if (f.casStack(h, t = h.next)) {
if (t != null) {
如果当前Completion的next不为null。
if (f != this) {
把dep的Completion挂在当前stack的next上。也就是入栈。
pushStack(h);
continue;
}
h.next = null; // detach
}
//给节点挂上tryFire方法。(嵌套模式)用于第二次执行uniApply方法时候执行function接口把值设置在result
f = (d = h.tryFire(NESTED)) == null ? this : d;
}
}
}
- cleanStack Method
该方法负责把当前CompletableFuture中的stack中的无效节点断开连接。
/** Traverses stack and unlinks dead Completions. */
final void cleanStack() {
for (Completion p = null, q = stack; q != null;) {
Completion s = q.next;
if (q.isLive()) {
p = q;
q = s;
}
else if (p == null) {
casStack(q, s);
q = stack;
}
else {
p.next = s;
if (p.isLive())
q = s;
else {
p = null; // restart
q = stack;
}
}
}
}
- push Method
该方法负责把UniCompletion中的next中挂到当前CompletableFuture的Stack中,如果压栈失败。则把内存中的UniCompletion(c)引用设置为null。
final void push(UniCompletion<?,?> c) {
if (c != null) {
while (result == null && !tryPushStack(c))
lazySetNext(c, null); // clear on failure
}
}
- postFire Method
该方法负责把UniCompletion中的next中挂到当前CompletableFuture的Stack中,如果压栈失败。则把内存中的UniCompletion引用设置为null。
final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {
a为被当前CompletableFuture需要依赖的CompletableFuture。如果a存在,并且a的stack不为null,如果是嵌套模式或者是该future的结果是空则清理a的stack。否则执行postComplete方法把a的任务压倒栈中。变成一个链表。如果说当前Future的结果不等null并且当前Future的栈不为null,如果是嵌套模式就返回本身this,反之则调用当前Future的postComplete。最后调用当前Future的treFire方法。treFire方法调用之后还回再回到该方法中执行,最后是为了返回this;
if (a != null && a.stack != null) {
if (mode < 0 || a.result == null)
a.cleanStack();
else
a.postComplete();
}
if (result != null && stack != null) {
if (mode < 0)
return this;
else
postComplete();
}
return null;
}
- UniCompletion Method
abstract static class UniCompletion<T,V> extends Completion {
执行的线程数目
Executor executor;
当前CompletableFuture需要依赖的CompletableFuture
CompletableFuture<V> dep;
被当前CompletableFuture需要依赖的CompletableFuture
CompletableFuture<T> src;
UniCompletion(Executor executor, CompletableFuture<V> dep,
CompletableFuture<T> src) {
this.executor = executor; this.dep = dep; this.src = src;
}
通过cas设置tag设置task只被执行一次。实际上就是设置ForkJoinTask中的STATUS的值,给STATUS增加标识位SMASK=0x0000ffff;
final boolean claim() {
Executor e = executor;
if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {
if (e == null)
return true;
executor = null; // disable
e.execute(this);
}
return false;
}
final boolean isLive() { return dep != null; }
}
- UniApply Class
static final class UniApply<T,V> extends UniCompletion<T,V> {
Function<? super T,? extends V> fn;
UniApply(Executor executor, CompletableFuture<V> dep,
CompletableFuture<T> src,
Function<? super T,? extends V> fn) {
super(executor, dep, src); this.fn = fn;
}
final CompletableFuture<V> tryFire(int mode) {
CompletableFuture<V> d; CompletableFuture<T> a;
if ((d = dep) == null ||
!d.uniApply(a = src, fn, mode > 0 ? null : this))
return null;
dep = null; src = null; fn = null;
return d.postFire(a, mode);
}
}
静态类UniApply用于实现tryFire方法,tryFire方法内部去调用uniApply去操作被依赖的CompletableFuture去执行函数接口,执行完毕之后要把dep,src,fn转化为null,然后去调用依赖的CompletableFuture去执行postFire。
- uniApply Method
final <S> boolean uniApply(CompletableFuture<S> a,
Function<? super S,? extends T> f,
UniApply<S,T> c) {
Object r; Throwable x;
if (a == null || (r = a.result) == null || f == null)
return false;
tryComplete: if (result == null) {
if (r instanceof AltResult) {
if ((x = ((AltResult)r).ex) != null) {
completeThrowable(x, r);
break tryComplete;
}
r = null;
}
try {
if (c != null && !c.claim())
return false;
@SuppressWarnings("unchecked") S s = (S) r;
completeValue(f.apply(s));
} catch (Throwable ex) {
completeThrowable(ex);
}
}
return true;
}
第一次调用tryFire方法时候,该uniApply方法会被被调用时候执行的是claim方法,负责把当前this提交给线程去调度。调用Executor#execute方法,然后由线程池去调用Completion的run方法或者exec方法,然后完成第二次调用tryFire方法。
1、调针对自定义线程池回去执行execute方法,调用addWorker,最后线程池中的工作线程启动调用Completion的run方法,去执行调用function对象的apply方法或者是accept方法。
2、ForkJoinPool同样也是调用execute方法,会把整个task添加到该线程池中的WorkQueue中,然后该内部队列中的工作线程ForkJoinWorkerThread启动从该队列中自旋读取task,然后调用WorkQueue的runTask,实际上去待哦用的是execLocalTasks,转而去调用doExec方法,然后去执行task的exec方法。
- uniApplyStage Method
包装function实现调用第一次调用tryFire方法。
private <V> CompletableFuture<V> uniApplyStage(
Executor e, Function<? super T,? extends V> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<V> d = new CompletableFuture<V>();
if (e != null || !d.uniApply(this, f, null)) {
UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
push(c);
c.tryFire(SYNC);
}
return d;
}
看了上述的代码分析相信你已经基本明白了CompletableFuture的提交任务执行的流程了
都是supplyAsync=>uniApplyStage=>UniApply=>uniApply=>claim=>postFire=>postComplete[or]cleanStack=>然后由工作线程执行run[or]exec=>tryFire=>completeValue=>最后设置在result中。
测试代码
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class CompletableFutureTest {
public static void main(String args[]) throws ExecutionException, InterruptedException {
ThreadPoolExecutor executorService=new ThreadPoolExecutor(10,100,60L,TimeUnit.SECONDS,new SynchronousQueue<>());
System.out.println("CPU 核心数: " + Runtime.getRuntime().availableProcessors());
System.out.println("CommonPool 当前并行线程数目: " + ForkJoinPool.commonPool().getParallelism());
System.out.println("CommonPool 总的线程数目: " + ForkJoinPool.getCommonPoolParallelism());
long start = System.currentTimeMillis();
CompletableFuture<Void>[] futuresCommonPool = IntStream.range(0, 100)
.mapToObj(i -> CompletableFuture.supplyAsync(CompletableFutureTest::blockingOperation).thenApplyAsync(x->{
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return x*2;
}).thenApplyAsync(x->{
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return x*2;
}).thenApplyAsync(x->{
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return x*2;
}).thenApplyAsync(x->{
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return x*2;
}).thenApplyAsync(x->{
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return x*2;
}).thenApplyAsync(x->{
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return x*2;
})).toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futuresCommonPool).join();
System.out.println("CommonPool Processed in " + (System.currentTimeMillis() - start)/1000 + " s");
long start1 = System.currentTimeMillis();
CompletableFuture<Void>[] futuresThreadPool = IntStream.range(0, 100)
.mapToObj(i -> CompletableFuture.supplyAsync(CompletableFutureTest::blockingOperation,executorService)
.thenApplyAsync(x->{
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return x*2;
}).thenApplyAsync(x->{
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return x*2;
}).thenApplyAsync(x->{
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return x*2;
}).thenApplyAsync(x->{
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return x*2;
}).thenApplyAsync(x->{
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return x*2;
}).thenApplyAsync(x->{
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return x*2;
})
).toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futuresThreadPool).join();
System.out.println("ThreadPool 当前并行线程数目: " + executorService.getActiveCount());
System.out.println("ThreadPool 总的线程数目: " + executorService.getPoolSize());
System.out.println("ThreadPoolProcessed in " + (System.currentTimeMillis() - start1)/1000 + " s");
executorService.shutdownNow();
}
private static Integer blockingOperation() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10;
}
}
结论
在我本机 通过测试代码得出记过,当任务瞬间达到100的时候并且每个都是耗时任务的前提下,自定义线程池要比CommonPool的执行时间要短,总耗时少,但是常规业务场景下。CommonPool还是完全可以满足的。之前分析了Stream,简单说下Stream和CompleteFuture的优劣吧。Stream也是支持并行处理,并行场景下默认实现就是CommonPool,使用在并行流处理集合数据是一大利器。但是需要考虑非常耗时的情况下,优雅处理选择二者。还有就是二者都支持函数编程,可谓FP爱好者的福音。CompleteFuture是增强版的Future,而且API强大。上手吧,骚年在你的代码中骚起来。