JUC下的异步编程工具使用详情以及源码分析(FutureTask、CompletableFuture)

2023-09-21 21:32:22

异步编程

一、FutureTask应用&源码分析

1.1 FutureTask介绍

FutureTask是一个可以取消异步任务的类。FutureTaskFuture做的一个基本实现。可以调用方法区开始和取消一个任务

一般是配合Callable去使用

异步任务启动之后,可以获取一个绑定当前异步任务的FutureTask

可以基于FutureTask的方法去取消任务,查看任务是否结果,以及获取任务的返回结果

FutureTask内部的整体结构中,实现了RunnableFuture的接口,这个接口又继承了RunnableFuture这个两个接口。所以FutureTask也可以作为任务直接交给线程池去处理

1.2 FutureTask应用

大方向是FutureTask对任务的控制:

  • 任务执行过程中状态的控制
  • 任务执行完毕后,返回结果的获取

FutureTask的任务在执行run方法后,是无法被再次运行,需要使用runAndReset方法才可以

public static void main(String[] args) throws InterruptedException {
    // 构建FutureTask,基于泛型执行返回结果类型
    // 在有参构造中,声明Callable或者Runnable指定任务
    FutureTask<String> futureTask = new FutureTask<>(() -> {
        System.out.println("任务开始执行……");
        Thread.sleep(2000);
        System.out.println("任务执行完毕……");
        return "OK!";
    });

    // 构建线程池
    ExecutorService service = Executors.newFixedThreadPool(10);

    // 线程池执行任务
    service.execute(futureTask);

    // futureTask提供了run方法,一般不会自己去调用run方法,让线程池去执行任务,由线程池去执行run方法
    // run方法在执行时,是有任务状态的。任务已经执行了,再次调用run方法无效的。
    // 如果希望任务可以反复被执行,需要去调用runAndReset方法
//        futureTask.run();

    // 对返回结果的获取,类似阻塞队列的poll方法
    // 如果在指定时间内,没有拿到方法的返回结果,直接扔TimeoutException
//        try {
//            String s = futureTask.get(3000, TimeUnit.MILLISECONDS);
//            System.out.println("返回结果:" + s);
//        } catch (Exception e) {
//            System.out.println("异常返回:" + e.getMessage());
//            e.printStackTrace();
//        }

    // 对返回结果的获取,类似阻塞队列的take方法,死等结果
//        try {
//            String s = futureTask.get();
//            System.out.println("任务结果:" + s);
//        } catch (ExecutionException e) {
//            e.printStackTrace();
//        }

    // 对任务状态的控制
//        System.out.println("任务结束了么?:" + futureTask.isDone());
//        Thread.sleep(1000);
//        System.out.println("任务结束了么?:" + futureTask.isDone());
//        Thread.sleep(1000);
//        System.out.println("任务结束了么?:" + futureTask.isDone());
}

1.3 FutureTask源码分析

FutureTask的源码,要从几个方向去看:

  • 先查看FutureTask中提供的一些状态
  • 再查看任务的执行过程
1.3.1 FutureTask中的核心属性

清楚任务的流转流转状态是怎样的,其次对于核心属性要追到是干嘛的

/**
 FutureTask的核心属性
 FutureTask任务的状态流转
 * NEW -> COMPLETING -> NORMAL           任务正常执行,并且返回结果也正常返回
 * NEW -> COMPLETING -> EXCEPTIONAL      任务正常执行,但是结果是异常(执行途中出现异常)
 * NEW -> CANCELLED                      任务被取消   
 * NEW -> INTERRUPTING -> INTERRUPTED    任务被中断
 */
// 记录任务的状态
private volatile int state;
// 任务被构建之后的初始状态
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

/** 需要执行任务,会被赋值到这个属性 */
private Callable<V> callable;
/** 任务的任务结果要存储在这几个属性中 */
private Object outcome; // non-volatile, protected by state reads/writes
/** 执行任务的线程 */
private volatile Thread runner;
/** 等待返回结果的线程Node对象, */
private volatile WaitNode waiters;
static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
}
1.3.2 FutureTask的run方法

任务执行前的一些判断,以及调用任务封装结果的方式,还有最后的一些后续处理

// 当线程池执行FutureTask任务时,会调用的方法
public void run() {
    // 如果当前任务状态不是NEW,直接return告辞
    if (state != NEW ||  
        // 如果状态正确是NEW,这边需要基于CAS将runner属性设置为当前线程
        // 如果CAS失败,直接return告辞
        !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
        return;

    try {
        // 将要执行的任务拿到
        Callable<V> c = callable;
        // 健壮性判断,保证任务不是null
        // 再次判断任务的状态是NEW(DCL)
        if (c != null && state == NEW) {
            // 执行任务
            // result:任务的返回结果
            // ran:如果为true,任务正常结束。 如果为false,任务异常结束。
            V result;
            boolean ran;
            try {
                // 执行任务
                result = c.call();
                // 正常结果,ran设置为true
                ran = true;
            } catch (Throwable ex) {
                // 如果任务执行期间出了异常
                // 返回结果置位null
                result = null;
                // ran设置为false
                ran = false;
                // 封装异常结果
                setException(ex);
            }
            if (ran)
                // 封装正常结果
                set(result);
        }
    } finally {
        // 将执行任务的线程置位null
        runner = null;
        // 拿到任务的状态
        int s = state;
        // 如果状态大于等于INTERRUPTING
        if (s >= INTERRUPTING)
            // 进来代表任务中断,做一些后续处理
            handlePossibleCancellationInterrupt(s);
    }
}
1.3.3 FutureTask的set&setException方法

任务执行完毕后,修改任务的状态以及封装任务的结果

// 没有异常的时候,正常返回结果
protected void set(V v) {
    // 因为任务执行完毕,需要将任务的状态从NEW,修改为COMPLETING
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        // 将返回结果赋值给 outcome 属性
        outcome = v;
        // 将任务状态变为NORMAL,正常结束
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
        // 一会再说……
        finishCompletion();
    }
}

// 任务执行期间出现了异常,这边要封装结果
protected void setException(Throwable t) {
    // 因为任务执行完毕,需要将任务的状态从NEW,修改为COMPLETING
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        // 将异常信息封装到 outcome 属性
        outcome = t;
        // 将任务状态变为EXCEPTIONAL,异常结束
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); 
        // 一会再说……
        finishCompletion();
    }
}
1.3.4 FutureTask的cancel方法

任务取消的一个方式

  • 任务直接从NEW状态转换为CANCEL
  • 任务从NEW状态变成INTERRUPTING,然后再转换为INTERRUPTED
// 取消任务操作
public boolean cancel(boolean mayInterruptIfRunning) {
    // 查看任务的状态是否是NEW,如果NEW状态,就基于传入的参数mayInterruptIfRunning
    // 决定任务是直接从NEW转换为CANCEL,还是从NEW转换为INTERRUPTING
    if (!(state == NEW && 
        UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {   
        // 如果mayInterruptIfRunning为true
        // 就需要中断线程
        if (mayInterruptIfRunning) {
            try {
                // 拿到任务线程
                Thread t = runner;
                if (t != null)
                    // 如果线程不为null,直接interrupt
                    t.interrupt();
            } finally { 
                // 将任务状态设置为INTERRUPTED
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        // 任务结束后的一些处理~~ 一会看~~
        finishCompletion();
    }
    return true;
}

测试

public static void main(String[] args) throws InterruptedException, ExecutionException {
    // 构建FutureTask,基于泛型执行返回结果类型
    // 在有参构造中,声明Callable或者Runnable指定任务
    FutureTask<String> futureTask = new FutureTask<>(() -> {
        System.out.println("任务开始执行……");
        Thread.sleep(2000);
        System.out.println("任务执行完毕……");
        return "OK!";
    });
    // 构建线程池
    ExecutorService service = Executors.newFixedThreadPool(10);

    // 线程池执行任务
    service.execute(futureTask);
    Thread.sleep(1000);
    System.out.println(futureTask.cancel(true));
    System.out.println(futureTask.get());
}

console

任务开始执行……
true
Exception in thread "main" java.util.concurrent.CancellationException
	at java.util.concurrent.FutureTask.report(FutureTask.java:121)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at com.armin.thread.tool.FutureTaskTest.main(FutureTaskTest.java:26)
  • futureTask.cancel(true):会中断执行的程序
  • futureTask.cancel(false):不会中断执行的程序,任务会执行完,但get()同样会抛出CancellationException异常

如果在等待线程的任务执行完毕后再调用cancel()返回falseget()同样也会拿到返回值与正常情况无异

1.3.5 FutureTask的get方法

这个是线程获取FutureTask任务执行结果的方法

// 拿任务结果
public V get() throws InterruptedException, ExecutionException {
    // 获取任务的状态
    int s = state;
    // 要么是NEW,任务还没执行完
    // 要么COMPLETING,任务执行完了,结果还没封装好。
    if (s <= COMPLETING)
        // 让当前线程阻塞,等待结果
        s = awaitDone(false, 0L);
    // 最终想要获取结果,需要执行report方法
    return report(s);
}

// 线程等待FutureTask结果的过程
private int awaitDone(boolean timed, long nanos) throws InterruptedException {
    // 针对get方法传入了等待时长时,需要计算等到什么时间点
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    // 声明好需要的Node,queued:放到链表中了么?
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        // 查看线程是否中断,如果中断,从等待链表中移除,甩个异常
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }
        // 拿到状态
        int s = state;
        // 到这,说明任务结束了。
        if (s > COMPLETING) {
            if (q != null)
                // 如果之前封装了WaitNode,现在要清空
                q.thread = null;
            return s;
        }
        // 如果任务状态是COMPLETING,这就不需要去阻塞线程,让步一下,等待一小会,结果就有了
        else if (s == COMPLETING) 
            Thread.yield();
        // 如果还没初始化WaitNode,初始化
        else if (q == null)
            q = new WaitNode();
        // 没放队列的话,直接放到waiters的前面
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        // 准备挂起线程,如果timed为true,挂起一段时间
        else if (timed) {
            // 计算出最多可以等待多久
            nanos = deadline - System.nanoTime();
            // 如果等待的时间没了
            if (nanos <= 0L) {
                // 移除当前的Node,返回任务状态
                removeWaiter(q);
                return state;
            }
            // 等一会
            LockSupport.parkNanos(this, nanos);
        }
        else
            // 死等
            LockSupport.park(this);
    }
}

// get的线程已经可以阻塞结束了,基于状态查看能否拿到返回结果
private V report(int s) throws ExecutionException {
    // 拿到outcome 返回结果
    Object x = outcome;
    // 如果任务状态是NORMAL,任务正常结束,返回结果
    if (s == NORMAL)
        return (V)x;
    // 如果任务状态大于等于取消
    if (s >= CANCELLED)
        // 直接抛出异常
        throw new CancellationException();
    // 到这就是异常结束
    throw new ExecutionException((Throwable)x);
}
1.3.6 FutureTask的finishCompletion方法

只要任务结束了,无论是正常返回,异常返回,还是任务被取消都会执行这个方法

而这个方法其实就是唤醒那些执行get方法等待任务结果的线程

// 任务结束后触发
private void finishCompletion() {
    // 在任务结束后,需要唤醒
    for (WaitNode q; (q = waiters) != null;) {
        // 第一步直接以CAS的方式将WaitNode置为null
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                // 拿到了Node中的线程
                Thread t = q.thread;
                // 如果线程不为null
                if (t != null) {
                    // 第一步先置位null
                    q.thread = null;
                    // 直接唤醒这个线程
                    LockSupport.unpark(t);
                }
                // 拿到当前Node的next
                WaitNode next = q.next;
                // next为null,代表已经将全部节点唤醒了吗,跳出循环
                if (next == null)
                    break;
                // 将next置位null
                q.next = null; 
                // q的引用指向next
                q = next;
            }
            break;
        }
    }

    // 任务结束后,可以基于这个扩展方法,记录一些信息
    done();

    // 任务执行完,把callable具体任务置位null
    callable = null;  
}

扩展口

可以继承FutureTask重写done()方法实现线程在执行完毕后(不论正常还是异常执行结束)的一系列操作

二、CompletableFuture应用&源码分析

2.1 CompletableFuture介绍

平时多线程开发一般就是使用RunnableCallableThreadFutureTaskThreadPoolExecutor这些内容和并发编程息息相关。相对来对来说成本都不高,多多使用是可以熟悉这些内容。这些内容组合在一起去解决一些并发编程的问题时,很多时候没有办法很方便的去完成异步编程的操作

Thread + Runnable:执行异步任务,但是没有返回结果

Thread + Callable + FutureTask:完整一个可以有返回结果的异步任务

  • 获取返回结果,如果基于get方法获取,线程需要挂起在WaitNode
  • 获取返回结果,也可以基于isDone判断任务的状态,但是这里需要不断轮询

上述的方式都是有一定的局限性的。

比如说任务A,任务B,还有任务C。其中任务B还有任务C执行的前提是任务A先完成,再执行任务B和任务C

如果任务的执行方式逻辑比较复杂,可能需要业务线程导出阻塞等待,或者是大量的任务线程去编一些任务执行的业务逻辑。对开发成本来说比较高

CompletableFuture就是帮你处理这些任务之间的逻辑关系,编排好任务的执行方式后,任务会按照规划好的方式一步一步执行,不需要让业务线程去频繁的等待

2.2 CompletableFuture应用

CompletableFuture应用还是需要一内内的成本的

首先对CompletableFuture提供的函数式编程中三个函数有一个掌握

  • Supplier<U>:生产者,没有入参,有返回结果
  • Consumer<T>:消费者,有入参,但是没有返回结果
  • Function<T,U>:函数,有入参,又有返回结果
2.2.1 supplyAsync

CompletableFuture如果不提供线程池的话,默认使用的ForkJoinPool,而ForkJoinPool内部是守护线程,如果main线程结束了,守护线程会跟着一起结束

public static void main(String[] args)  {
    // 生产者,可以指定返回结果
    CompletableFuture<String> firstTask = CompletableFuture.supplyAsync(() -> {
        System.out.println("异步任务开始执行");
        System.out.println("异步任务执行结束");
        return "返回结果";
    });

    String result1 = firstTask.join();
    String result2 = null;
    try {
        result2 = firstTask.get();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }

    System.out.println(result1 + "," + result2);
}

console

异步任务开始执行
异步任务执行结束
返回结果,返回结果

Process finished with exit code 0
2.2.2 runAsync

当前方式既不会接收参数,也不会返回任何结果,非常基础的任务编排方式

public static void main(String[] args) throws IOException {
    CompletableFuture.runAsync(() -> {
        System.out.println("任务go");
        System.out.println("任务done");
    });

    System.in.read();
}

console

任务go
任务done
更多推荐

Flutter插件开发流程

本文主要给大家介绍如何开发FlutterPlugin中Android的部分。有关Flutter以及FlutterPlugin的概念,感兴趣的可以从官网查看相关资料。一、简介笔者的环境是Mac下AndroidStudio进行的开发,AS也是谷歌官推的,安装flutter插件后,开发起来相对于其他IDE来说,方便很多,自带

使用Feign实现远程调用

目录概述1、引入依赖2、启用Feign3、创建Feign接口4、使用Feign客户端5、配置Feign客户端5.1、全局配置文件5.2、Feign客户端接口5.3、自定义配置类5.4、自定义属性文件总结概述Feign是一个基于注解的HTTP客户端库,它允许您将HTTP请求转换为声明式的Java接口。您可以使用类似于Sp

qt day2

完善登录框点击登录按钮后,判断账号(admin)和密码(123456)是否一致,如果匹配失败,则弹出错误对话框,文本内容“账号密码不匹配,是否重新登录”,给定两个按钮ok和cancel,点击ok后,会清除密码框中的内容,继续进行登录;如果点击cancel按钮,则关闭界面。如果账号和密码匹配,则弹出信息对话框,给出提示信

Docker容器网络安全性最佳实践:防止容器间攻击

Docker容器在现代应用开发和部署中扮演着重要的角色,但由于容器共享宿主机网络环境,容器网络的安全性变得尤为重要。为了防止容器间的攻击,并保护容器网络的安全,有一些最佳实践可以采用。下面将介绍一些重要的Docker容器网络安全性最佳实践。首先,一个基本的安全原则是使用可靠和受信任的容器镜像。确保从受信任的仓库中拉取和

Unity中Shader的模板测试

文章目录前言什么是模板测试1、模板缓冲区2、模板缓冲区中存储的值3、模板测试是什么(看完以下流程就能知道模板测试是什么)模板测试就是在渲染,后渲染的物体前,与渲染前的模板缓冲区的值进行比较,选出符合条件的部分,对后渲染的物体进行渲染前言Unity中Shader的模板测试什么是模板测试1、模板缓冲区2、模板缓冲区中存储的

C++ 指针

C++指针学习C++的指针既简单又有趣。通过指针,可以简化一些C++编程任务的执行,还有一些任务,如动态内存分配,没有指针是无法执行的。所以,想要成为一名优秀的C++程序员,学习指针是很有必要的。正如您所知道的,每一个变量都有一个内存位置,每一个内存位置都定义了可使用连字号(&)运算符访问的地址,它表示了在内存中的一个

网络安全(黑客)自学

前言我是去年8月22日才正式学习网络安全的,因为在国营单位工作了4年,在广东一个月工资只有5000块,而且看不到任何晋升的希望,如果想要往上走,那背后就一定要有关系才行。而且国营单位的气氛是你干的多了,领导觉得你有野心,你干的不多,领导却觉得你这个人不错。我才24周岁,实在的受不了这种工作氛围,情绪已经压制了很多久,一

竞赛选题 基于深度学习的人脸表情识别

文章目录0前言1技术介绍1.1技术概括1.2目前表情识别实现技术2实现效果3深度学习表情识别实现过程3.1网络架构3.2数据3.3实现流程3.4部分实现代码4最后0前言🔥优质竞赛项目系列,今天要分享的是基于深度学习的人脸表情识别该项目较为新颖,适合作为竞赛课题方向,学长非常推荐!🧿更多资料,项目分享:https:/

java版Spring Cloud+Mybatis+Oauth2+分布式+微服务+实现工程管理系统

鸿鹄工程项目管理系统SpringCloud+SpringBoot+Mybatis+Vue+ElementUI+前后端分离构建工程项目管理系统1.项目背景一、随着公司的快速发展,企业人员和经营规模不断壮大。为了提高工程管理效率、减轻劳动强度、提高信息处理速度和准确性,公司对内部工程管理的提升提出了更高的要求。二、企业通过

基于Matlab实现自动泊车(垂直泊车)

自动泊车是一项非常有趣和实用的技术,它可以让车辆在没有人为干预的情况下自动停放在合适的位置上。在这篇文章中,我们将介绍如何使用Matlab实现自动泊车。首先,我们需要了解自动泊车的基本原理。自动泊车系统通常包括车辆、传感器和控制算法。传感器可以用来检测周围的环境,例如通过摄像头、超声波传感器或激光雷达来检测车辆周围的障

数据治理在数字化转型中的重要性

在当今数字化时代,企业的成功与否往往取决于它们对数据的处理和管理能力。数据治理作为数字化转型的关键组成部分,对于帮助企业有效管理和利用数据,实现业务增长和创新至关重要。本文将探讨为什么数字化转型必须进行数据治理,并介绍数据治理的几个关键优势。随着技术的进步和数字化转型的发展,大量的数据被不断产生和积累。这些数据代表了企

热文推荐