# Executors

其中还是创建ThreadPoolExecutor,只是封装了一些参数

# newFixedThreadPool 创建固定大小线程

  • 核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间
  • 阻塞队列是无界的,可以放任意数量的任务

@Slf4j
public class Demo {

    public static void main(String[] args) {
        LongAdder longAdder = new LongAdder();
        ExecutorService executorService = Executors.newFixedThreadPool(2, r -> {
            //自定义名称
            Thread thread = new Thread(r, "my-pool-" + longAdder.longValue());
            longAdder.increment();
            return thread;
        });

        executorService.execute(() -> log.info("1"));

        executorService.execute(() -> log.info("2"));
        executorService.execute(() -> log.info("3"));
        executorService.shutdown();

    }

}

# newCachedThreadPool 带缓存的线程

  • 核心线程数是 0, 最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是 60s,意味着全部都是救急线程(60s 后可以回收)救急线程可以无限创建
  • 队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取是放不进去的(一手交钱、一手交货)
public class Demo {

    public static void main(String[] args) throws InterruptedException {
        SynchronousQueue<Integer> integers = new SynchronousQueue<>();
        new Thread(() -> {
            try {
                log.debug("putting {} ", 1);
                integers.put(1);
                log.debug("{} putted...", 1);
                log.debug("putting...{} ", 2);
                integers.put(2);
                log.debug("{} putted...", 2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "t1").start();

        TimeUnit.SECONDS.sleep(1);
        new Thread(() -> {
            try {
                log.debug("taking {}", 1);
                integers.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "t2").start();

        TimeUnit.SECONDS.sleep(1);
        new Thread(() -> {
            try {
                log.debug("taking {}", 2);
                integers.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "t3").start();

    }

# newSingleThreadExecutor 单例线程池

  • 希望多个任务排队执行。线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程也不会被释放。
  • 自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作
  • Executors.newSingleThreadExecutor() 线程个数始终为1,不能修改
    • FinalizableDelegatedExecutorService 应用的是装饰器模式,只对外暴露了 ExecutorService 接口,因此不能调用 ThreadPoolExecutor 中特有的方法
  • Executors.newFixedThreadPool(1) 初始时为1,以后还可以修改
    • 对外暴露的是 ThreadPoolExecutor 对象,可以强转后调用 setCorePoolSize 等方法进行修改

# 提交任务

  • execute(Runnable command)
    执行任务

  • <T> Future<T> submit(Callable<T> task)
    提交任务 task,用返回值 Future 获得任务执行结果

@Slf4j
public class Demo {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Future<String> future = executorService.submit(() -> "str");
        System.out.println(future.get());
    }
}
  • <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException
    提交 tasks 中所有任务
@Slf4j
public class Demo {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        LongAdder longAdder = new LongAdder();

        Collection<Callable<Long>> threads = new ArrayList<>();

        for (int i = 0; i < 3; i++) {
            threads.add(() -> {
                log.info("{}", Thread.currentThread().getName());
                longAdder.increment();
                return longAdder.longValue();
            });
        }
        List<Future<Long>> futures = executorService.invokeAll(threads);
        for (Future<Long> future : futures) {
            log.info(String.valueOf(future.get()));
        }
    }
}
  • <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)
    提交 tasks 中所有任务,带超时时间

  • <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException
    提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消

  • <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException
    提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间

# newScheduledThreadPool 任务调用线程池

当某个线程出现异常时,不会影响其他任务,会接着运行

# 延时执行任务

public class ABA {
    public static void main(String[] args) {
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);

      pool.schedule(() -> {
        //即使这里出错了,下面任务也会正常执行
        //TODO 虽然不会影响下面的线程,但会也不会打印错误信息 
        int i = 5 / 0;
      }, 1, TimeUnit.SECONDS);
      pool.schedule(() -> System.out.println("school"), 1, TimeUnit.SECONDS);
    }
}

# 定时执行

# scheduleAtFixedRate

定时执行,以时间间隔

@Slf4j
public class ABA {
    public static void main(String[] args) {
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
        /**
         * @params1: 任务对象
         * @params2: 延时时间
         * @params3: 执行间隔
         * @params4: 时间单位
         */
        pool.scheduleAtFixedRate(() -> {
            log.info("scheduleAtFixedRate");
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }, 1, 1, TimeUnit.SECONDS);
    }
}

# scheduleWithFixedDelay

定时执行,和上面的区别是以任务结束时间为准

@Slf4j
public class ABA {
    public static void main(String[] args) {
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
        /**
         * @params1: 任务对象
         * @params2: 延时时间
         * @params3: 执行间隔
         * @params4: 时间单位
         */
        pool.scheduleWithFixedDelay(() -> {
            log.info("scheduleAtFixedRate");
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }, 1, 1, TimeUnit.SECONDS);

    }
}