VB.net 2010 视频教程 VB.net 2010 视频教程 python基础视频教程
SQL Server 2008 视频教程 c#入门经典教程 Visual Basic从门到精通视频教程
当前位置:
首页 > 编程开发 > Java教程 >
  • 线程池使用的N种姿势

线程池在开发中一定会用到,如果能像golang一样,java语言也有协程,也许java程序员就少了一种包袱。

回归正题,我们聊下到底有哪些线程池的使用方式,总结有以下几种。

  1. JDK 内置线程池
  2. Spring线程池
  3. 自己魔改封装

1、JDK 内置线程池

常用的有:

我们看下最全的线程池参数,探究为什么阿里规约不建议使用Executors创建默认个数的线程池。

/**
参数【7个】

corePoolSize - 即使空闲时仍保留在池中的线程数,除非设置 allowCoreThreadTimeOut
maximumPoolSize - 池中允许的最大线程数
keepAliveTime - 当线程数大于内核时,这是多余的空闲线程在终止前等待新任务的最大时间。

unit - keepAliveTime参数的时间单位

workQueue - 用于在执行任务之前使用的队列。 这个队列将仅保存execute方法提交的Runnable任务。

threadFactory - 执行程序创建新线程时使用的工厂

handler - 执行被阻止时使用的处理程序,因为达到线程限制和队列容量


四个预定义的处理程序策略
在默认ThreadPoolExecutor.AbortPolicy ,处理程序会引发运行RejectedExecutionException后排斥反应。

在ThreadPoolExecutor.CallerRunsPolicy中,调用execute本身的线程运行任务。 这提供了一个简单的反馈控制机制,将降低新任务提交的速度。

在ThreadPoolExecutor.DiscardPolicy中 ,简单地删除无法执行的任务。

在ThreadPoolExecutor.DiscardOldestPolicy中 ,如果执行程序没有关闭,则工作队列头部的任务被删除,然后重试执行(可能会再次失败,导致重复)。


**/

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

1.1 阿里规约原文

线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

说明:Executors返回的线程池对象的弊端如下:

  • 1)FixedThreadPool和SingleThreadPool:

    允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。

  • 2)CachedThreadPool:

    允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。

1.2 阿里规约是否是多余的?不然jdk为啥提供这个api?

答案可以看下ThreadPoolExecutor类的上的注释,已翻译成中文:

为了在广泛的上下文中有用,此类提供了许多可调参数和可扩展性钩子。 然而,程序员被敦促使用更方便的Executors工厂方法Executors.newCachedThreadPool() (无限线程池,具有自动线程回收), Executors.newFixedThreadPool(int) (固定大小的线程池)和Executors.newSingleThreadExecutor() (单个后台线程),可以预先配置最常用的使用场景设置。 否则,手动配置和调优此类时,请使用以下指南...

其实这个类的作者"Doug Lea",已经说明Executors.newCachedThreadPool(),Executors.newFixedThreadPool(int),应该是在什么情况下使用?
只是一般程序员是不会去仔细看类说明文档,都是跟着百度复制过来的代码就干起活来了,
说起来也惭愧之前在2010年培训的时候,其实是会看这个jdk api文档的,但那个时候似乎很懵,也不会去详尽的看文档背后的一些知识点。
所以说看第一手材料多么重要,作者已经把该注意的点都写到类说明文档注释里了,吓得我赶紧用到一个类都去细看下文档注释,解决不了再找度娘。

1.3 线程池不为人知的几个冷门知识点

从几个面试题来一一道来。

1.核心和最大线程池是否都可以动态修改?

可以使用setCorePoolSize(int)和setMaximumPoolSize(int)进行动态 更改

2.核心线程最初创建并且只有在新任务到达时才启动?

可以使用方法prestartCoreThread()或prestartAllCoreThreads()动态地覆盖。如果您使用非空队列构建池,则可能需要预先提供线程。

3.核心线程是否一定不终止?

不一定,报错或者allowCoreThreadTimeOut(boolean)也可以用于将这个超时策略应用于核心线程,只要keepAliveTime值不为零

4.如何给线程池添加一个启动/暂停的功能?

class PausableThreadPoolExecutor extends ThreadPoolExecutor {              
  private boolean isPaused;                                                
  private ReentrantLock pauseLock = new ReentrantLock();                   
  private Condition unpaused = pauseLock.newCondition();                   
                                                                           
  public PausableThreadPoolExecutor(...) { super(...); }                   
                                                                           
  protected void beforeExecute(Thread t, Runnable r) {                     
    super.beforeExecute(t, r);                                             
    pauseLock.lock();                                                      
    try {                                                                  
      while (isPaused) unpaused.await();                                   
    } catch (InterruptedException ie) {                                    
      t.interrupt();                                                       
    } finally {                                                            
      pauseLock.unlock();                                                  
    }                                                                      
  }                                                                        
                                                                           
  public void pause() {                                                    
    pauseLock.lock();                                                      
    try {                                                                  
      isPaused = true;                                                     
    } finally {                                                            
      pauseLock.unlock();                                                  
    }                                                                      
  }                                                                        
                                                                           
  public void resume() {                                                   
    pauseLock.lock();                                                      
    try {                                                                  
      isPaused = false;                                                    
      unpaused.signalAll();                                                
    } finally {                                                            
      pauseLock.unlock();                                                  
    }                                                                      
  }                                                                        
}}                                                             

2、Spring线程池

官方已经实现的全部7个TaskExecuter。Spring宣称对于任何场景,这些TaskExecuter完全够用了:

名字 特点
SimpleAsyncTaskExecutor 每次请求新开线程,没有最大线程数设置.不是真的线程池,这个类不重用线程,每次调用都会创建一个新的线程。限流是通过 ConcurrencyThrottleSupport类的monitor的wait(),notify() 实现的
SyncTaskExecutor 不是异步的线程.同步可以用SyncTaskExecutor,但这个可以说不算一个线程池,因为还在原线程执行。这个类没有实现异步调用,只是一个同步操作。
ConcurrentTaskExecutor Executor的适配类,不推荐使用。如果ThreadPoolTaskExecutor不满足要求时,才用考虑使用这个类。
SimpleThreadPoolTaskExecutor 监听Spring’s lifecycle callbacks,并且可以和Quartz的Component兼容.是Quartz的SimpleThreadPool的类。线程池同时被quartz和非quartz使用,才需要使用此类。
ThreadPoolTaskExecutor 最常用。要求jdk版本大于等于5。可以在程序而不是xml里修改线程池的配置.其实质是对java.util.concurrent.ThreadPoolExecutor的包装。
TimerTaskExecutor  
WorkManagerTaskExecutor  
  • 使用ThreadPoolExecutorFactoryBean
package org.springframework.scheduling.concurrent;

public class ThreadPoolExecutorFactoryBean extends ExecutorConfigurationSupport implements FactoryBean<ExecutorService>, InitializingBean, DisposableBean {
    private int corePoolSize = 1;
    private int maxPoolSize = 2147483647;
    private int keepAliveSeconds = 60;
    private boolean allowCoreThreadTimeOut = false;
    private int queueCapacity = 2147483647;
    private boolean exposeUnconfigurableExecutor = false;
    @Nullable
    private ExecutorService exposedExecutor;

    public ThreadPoolExecutorFactoryBean() {
    }
    ...
    protected ThreadPoolExecutor createExecutor(int corePoolSize, int maxPoolSize, int keepAliveSeconds, BlockingQueue<Runnable> queue, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
        return new ThreadPoolExecutor(corePoolSize, maxPoolSize, (long)keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler);
    }
  • 使用ScheduledExecutorFactoryBean
package org.springframework.scheduling.concurrent;

public class ScheduledExecutorFactoryBean extends ExecutorConfigurationSupport implements FactoryBean<ScheduledExecutorService> {
    private int poolSize = 1;
    @Nullable
    private ScheduledExecutorTask[] scheduledExecutorTasks;
    private boolean removeOnCancelPolicy = false;
    private boolean continueScheduledExecutionAfterException = false;
    private boolean exposeUnconfigurableExecutor = false;
    @Nullable
    private ScheduledExecutorService exposedExecutor;

    public ScheduledExecutorFactoryBean() {
    }
  • 使用ThreadPoolTaskExecutor
    这个类可以设置回调方法

  • 使用ThreadPoolTaskScheduler
    这个类可以设置回调方法,包装错误处理类,错误不会影响执行下一个任务 @Scheduled就是使用这个类包装的,注意核心默认一个线程,会阻塞任务

	@Override
	public void run() {
		try {
			this.delegate.run();
		}
		catch (UndeclaredThrowableException ex) {
			this.errorHandler.handleError(ex.getUndeclaredThrowable());
		}
		catch (Throwable ex) {
			this.errorHandler.handleError(ex);
		}
	}

参考

[java8 api] https://www.matools.com/api/java8

[spring线程池(同步、异步)] https://www.cnblogs.com/duanxz/p/9435343.html

 

 

原文:https://www.cnblogs.com/javago/p/14478294.html


相关教程