ExecutorService shutdown 之后 Future.get() 方法的阻塞问题

今天碰到这个问题记录一下

需求很简单, 差不多是在10万条数据库记录里查找一个特定账户

不过接口是产品包提供的,每次查询都需要进行一次slelect, 不能直接写sql

所以这里我的思路是启动4个线程,然后分别去查找10万/4 条记录,一旦某个线程执行结束, 就关闭全部线程,终止查找

这里只是一个POC,

数据类

package com.lz.bean;

public class Student {
    private String name = "";
    private int age;
    
    public Student(String name, int age){
        this.name = name;
        this.age = age;
    }
    
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public int getAge() {
//        System.out.println("call get age for"+age+" "+Thread.currentThread());
        return age;
    }
    public void setAge(int age) {
        this.age = age;
    }
    
    
}

用来做dummy数据的测试类

package com.lz.tool;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import com.lz.bean.Student;

public class DataFactory {
    public static List<Student> getRandomDummyData(int count){
        List<Integer> list = Stream.generate(Math::random).distinct().filter(i -> i < 1).limit(count).map(i -> i * 100000)
                .map(Double::intValue).collect(Collectors.toList());
        
        List<Student> students = new ArrayList<Student>();
        
        for(Integer i:list){
            Student stu = new Student("name"+i,i);
            students.add(stu);
        }
        
        return students;
    }
    
    public static List<Student> getDummyData(int count){
        
        List<Student> students = new ArrayList<Student>();
        
        for(int i=0;i<count;i++){
            Student stu = new Student("name"+i,i);
            students.add(stu);
        }
        
        return students;
    }
}
 

线程类

package com.lz.tool;

import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;

import com.lz.bean.Student;

public class Worker implements Callable<String>{
    
    private List<Student> students;
    private ExecutorService executor;

    public Worker(List<Student> students, ExecutorService executor) {
        this.students = students;
        this.executor = executor;
    }
    
    
    
    public List<Student> getStudents() {
        return students;
    }

    public void setStudents(List<Student> students) {
        this.students = students;
    }

    public ExecutorService getExecutor() {
        return executor;
    }

    public void setExecutor(ExecutorService executor) {
        this.executor = executor;
    }

    @Override
    public String call() throws Exception {
        
        String result = "";
        
        for(Student stu:students){
            if(stu.getAge()==7300000){
                result = stu.getName();
                break;
            }
        }
        
        if(result!=""){
            System.out.println(Thread.currentThread()+" i shutdown the pool");
            executor.shutdownNow();
        }
        
        System.out.println(Thread.currentThread()+"/"+result);
        
        return result;
    }

}

主启动类

package com.lz.exe;

import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.stream.Collectors;

import com.lz.bean.Student;
import com.lz.tool.DataFactory;
import com.lz.tool.Worker;

public class Executor {
    public static void main(String args[]) {

        // List<Student> data = DataFactory.getRandomDummyData(10000000);
        List<Student> data = DataFactory.getDummyData(10000000);

        ExecutorService executor = Executors.newFixedThreadPool(2);

        String result = "NONE";

        Future<String> f1 = executor.submit(new Worker(data.subList(0, data.size() / 4 * 1), executor));
        Future<String> f2 = executor
                .submit(new Worker(data.subList(data.size() / 4 * 1, data.size() / 4 * 2), executor));
        Future<String> f3 = executor
                .submit(new Worker(data.subList(data.size() / 4 * 2, data.size() / 4 * 3), executor));
        Future<String> f4 = executor
                .submit(new Worker(data.subList(data.size() / 4 * 3, data.size() / 4 * 4), executor));

        Long start = System.currentTimeMillis();
        System.out.println("started");

        try {

            if (f1.get() != "") {
                result = f1.get();
                System.out.println(1);
            }

            if (f2.get() != "") {
                result = f2.get();
                System.out.println(2);
            }

            if (f3.get() != "") {
                result = f3.get();
                System.out.println(3);
            }

            if (f4.get() != "") {
                result = f4.get();
                System.out.println(4);
            }

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            executor.shutdownNow();
        }

        Long end = System.currentTimeMillis();

        executor.shutdown();
        System.out.println(result);
        System.out.println(end - start);
    }
}

这里的问题是

当线程池设置为4或者2时, 程序正常运行并且结束

当线程池设置为1时, 程序无法正常退出

f3执行完成之后, f4.get() 会永久阻塞

started
Thread[pool-1-thread-1,5,main]/
Thread[pool-1-thread-1,5,main]/
Thread[pool-1-thread-1,5,main] i shutdown the pool
Thread[pool-1-thread-1,5,main]/name7300000
3

方法定义:public List<Runnable> shutdownNow()

(1)线程池的状态立刻变成STOP状态,此时不能再往线程池中添加新的任务。

(2)终止等待执行的线程,并返回它们的列表;

(3)试图停止所有正在执行的线程,试图终止的方法是调用Thread.interrupt(),但是,如果线程中没有sleep 、wait、Condition、定时锁等应用, interrupt()方法是无法中断当前的线程的。所以,ShutdownNow()并不代表线程池就一定立即就能退出,它可能必须要等待所有正在执行的任务都执行完成了才能退出

isInterrupted()是判断线程的中断标记是不是为true。当线程处于运行状态,并且我们需要终止它时;可以调用线程的interrupt()方法,使用线程的中断标记为true,即isInterrupted()会返回true。此时,就会退出while循环。
注意:interrupt()并不会终止处于“运行状态”的线程!它会将线程的中断标记设为true。

也就是说

1 当线程处于sleep状态时调用interrupt方法,会得到一个异常

2 当线程在运行状态时, 你需要使用while(isInterrupted())这样的结构