【spring boot】在spring boot下使用多线程

 

使用场景:java

方法处理到某一步,须要将信息交给另外一个线程去处理!!spring

===================================================================================缓存

第一种:最简单的Runnable多线程

public void test(String msg){
        System.out.println(Thread.currentThread().getName()+":"+msg);
        Runnable runnable = dealMsg(msg);
    //将返回的runnable对象传入,并start()启动线程
     new Thread(runnable).start(); }
复制代码
//建立一个Runnable,重写run方法
public Runnable dealMsg(String msg){ Runnable runnable = new Runnable() { @Override public void run() { System.out.println("新开线程中处理:"+msg); } }; return runnable; }
复制代码

 

====================================================================================================dom

第二种:本身建立JDK线程池,交给spring管理,而后将任务交给线程池便可异步

1.建立线程池,交给spring管理async

package com.sxd.util;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Configuration
public class ThreadConfig {

    /**
     *newFixedThreadPool
     建立一个指定工做线程数量的线程池。每当提交一个任务就建立一个工做线程,若是工做线程数量达到线程池初始的最大数,则将提交的任务存入到池队列中。

     newCachedThreadPool
     建立一个可缓存的线程池。这种类型的线程池特色是: 
     1).工做线程的建立数量几乎没有限制(其实也有限制的,数目为Interger. MAX_VALUE), 这样可灵活的往线程池中添加线程。 
     2).若是长时间没有往线程池中提交任务,即若是工做线程空闲了指定的时间(默认为1分钟),则该工做线程将自动终止。终止后,若是你又提交了新的任务,则线程池从新建立一个工做线程。

     newSingleThreadExecutor
     建立一个单线程化的Executor,即只建立惟一的工做者线程来执行任务,若是这个线程异常结束,会有另外一个取代它,保证顺序执行(我以为这点是它的特点)。单工做线程最大的特色是可保证顺序地执行各个任务,而且在任意给定的时间不会有多个线程是活动的 。

     newScheduleThreadPool
     建立一个定长的线程池,并且支持定时的以及周期性的任务执行,相似于Timer。
     * @return
     */
    @Bean
    public ExecutorService getExecutorTools(){
        ExecutorService executorService = Executors.newFixedThreadPool(8);
        return  executorService;
    }

}
复制代码
package com.sxd.util;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Configuration
public class ThreadConfig {

    /**
     *newFixedThreadPool
     建立一个指定工做线程数量的线程池。每当提交一个任务就建立一个工做线程,若是工做线程数量达到线程池初始的最大数,则将提交的任务存入到池队列中。

     newCachedThreadPool
     建立一个可缓存的线程池。这种类型的线程池特色是: 
     1).工做线程的建立数量几乎没有限制(其实也有限制的,数目为Interger. MAX_VALUE), 这样可灵活的往线程池中添加线程。 
     2).若是长时间没有往线程池中提交任务,即若是工做线程空闲了指定的时间(默认为1分钟),则该工做线程将自动终止。终止后,若是你又提交了新的任务,则线程池从新建立一个工做线程。

     newSingleThreadExecutor
     建立一个单线程化的Executor,即只建立惟一的工做者线程来执行任务,若是这个线程异常结束,会有另外一个取代它,保证顺序执行(我以为这点是它的特点)。单工做线程最大的特色是可保证顺序地执行各个任务,而且在任意给定的时间不会有多个线程是活动的 。

     newScheduleThreadPool
     建立一个定长的线程池,并且支持定时的以及周期性的任务执行,相似于Timer。
     * @return
     */
    @Bean
    public ExecutorService getExecutorTools(){
        ExecutorService executorService = Executors.newFixedThreadPool(8);
        return  executorService;
    }

}
复制代码

2.使用它ide

import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;


@Component
public class Consumer1 {


    @Resource
    private ExecutorService executorService;

    
    public void test(String msg){
        System.out.println(Thread.currentThread().getName()+":"+msg);


        /**
         * 分类1:能够返回值的 Callable
         */
        Future fal  = executorService.submit(new Callable<String>() {
            @Override
            public String call() {
                System.out.println(Thread.currentThread().getName()+":"+msg);
                return "处理成功!";
            }
        });

        try {
            System.out.println(fal.get());
        }catch (Exception e){
            System.out.println(e);
        }

        /**
         * 分类2:不会返回值的 Runnable
         */
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+":"+msg);
            }
        });

        /**
         * 分类3:也能够这样
         */
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+":"+msg);
            }
        });

    }




}

 

复制代码
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;


@Component
public class Consumer1 {


    @Resource
    private ExecutorService executorService;

    
    public void test(String msg){
        System.out.println(Thread.currentThread().getName()+":"+msg);


        /**
         * 分类1:能够返回值的 Callable
         */
        Future fal  = executorService.submit(new Callable<String>() {
            @Override
            public String call() {
                System.out.println(Thread.currentThread().getName()+":"+msg);
                return "处理成功!";
            }
        });

        try {
            System.out.println(fal.get());
        }catch (Exception e){
            System.out.println(e);
        }

        /**
         * 分类2:不会返回值的 Runnable
         */
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+":"+msg);
            }
        });

        /**
         * 分类3:也能够这样
         */
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+":"+msg);
            }
        });

    }




}
复制代码

 

====================================================================================================post

第三种:使用spring封装的线程池spa

1.建立线程配置类【

@ComponentScan("com.sxd") 标明会在哪一个包下使用多线程

package com.sxd.util;

import java.util.concurrent.Executor;

import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration
@ComponentScan("com.sxd")
@EnableAsync
// 线程配置类
public class AsyncTaskConfig implements AsyncConfigurer {

    // ThredPoolTaskExcutor的处理流程
    // 当池子大小小于corePoolSize,就新建线程,并处理请求
    // 当池子大小等于corePoolSize,把请求放入workQueue中,池子里的空闲线程就去workQueue中取任务并处理
    // 当workQueue放不下任务时,就新建线程入池,并处理请求,若是池子大小撑到了maximumPoolSize,就用RejectedExecutionHandler来作拒绝处理
    // 当池子的线程数大于corePoolSize时,多余的线程会等待keepAliveTime长时间,若是无请求可处理就自行销毁

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(5);// 最小线程数
        taskExecutor.setMaxPoolSize(10);// 最大线程数
        taskExecutor.setQueueCapacity(25);// 等待队列

        taskExecutor.initialize();

        return taskExecutor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return null;
    }
}
复制代码
package com.sxd.util;

import java.util.concurrent.Executor;

import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration
@ComponentScan("com.sxd")
@EnableAsync
// 线程配置类
public class AsyncTaskConfig implements AsyncConfigurer {

    // ThredPoolTaskExcutor的处理流程
    // 当池子大小小于corePoolSize,就新建线程,并处理请求
    // 当池子大小等于corePoolSize,把请求放入workQueue中,池子里的空闲线程就去workQueue中取任务并处理
    // 当workQueue放不下任务时,就新建线程入池,并处理请求,若是池子大小撑到了maximumPoolSize,就用RejectedExecutionHandler来作拒绝处理
    // 当池子的线程数大于corePoolSize时,多余的线程会等待keepAliveTime长时间,若是无请求可处理就自行销毁

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(5);// 最小线程数
        taskExecutor.setMaxPoolSize(10);// 最大线程数
        taskExecutor.setQueueCapacity(25);// 等待队列

        taskExecutor.initialize();

        return taskExecutor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return null;
    }
}
复制代码

2.建立线程任务执行类

package com.sxd.util;

import java.util.Random;
import java.util.concurrent.Future;

import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;

@Service
// 线程执行任务类
public class AsyncTaskService {

    Random random = new Random();// 默认构造方法

    @Async
    // 代表是异步方法
    // 无返回值
    public void executeAsyncTask(String msg) {
        System.out.println(Thread.currentThread().getName()+"开启新线程执行" + msg);
    }

    /**
     * 异常调用返回Future
     *
     * @param i
     * @return
     * @throws InterruptedException
     */
    @Async
    public Future<String> asyncInvokeReturnFuture(int i) throws InterruptedException {
        System.out.println("input is " + i);
        Thread.sleep(1000 * random.nextInt(i));

        Future<String> future = new AsyncResult<String>("success:" + i);// Future接收返回值,这里是String类型,能够指明其余类型

        return future;
    }
}
复制代码
package com.sxd.util;

import java.util.Random;
import java.util.concurrent.Future;

import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;

@Service
// 线程执行任务类
public class AsyncTaskService {

    Random random = new Random();// 默认构造方法

    @Async
    // 代表是异步方法
    // 无返回值
    public void executeAsyncTask(String msg) {
        System.out.println(Thread.currentThread().getName()+"开启新线程执行" + msg);
    }

    /**
     * 异常调用返回Future
     *
     * @param i
     * @return
     * @throws InterruptedException
     */
    @Async
    public Future<String> asyncInvokeReturnFuture(int i) throws InterruptedException {
        System.out.println("input is " + i);
        Thread.sleep(1000 * random.nextInt(i));

        Future<String> future = new AsyncResult<String>("success:" + i);// Future接收返回值,这里是String类型,能够指明其余类型

        return future;
    }
}
复制代码

3.使用它

@Component
public class Consumer1 {


    @Resource
    private AsyncTaskService asyncTaskService;


    public void test(String msg){
        System.out.println(Thread.currentThread().getName()+":"+msg);

        asyncTaskService.executeAsyncTask(msg);

    }
    
}
复制代码
@Component
public class Consumer1 {


    @Resource
    private AsyncTaskService asyncTaskService;


    public void test(String msg){
        System.out.println(Thread.currentThread().getName()+":"+msg);

        asyncTaskService.executeAsyncTask(msg);

    }
    
}
复制代码

 

====================================================================================================

第四种:在代码中启动异步处理最简单的代码

复制代码
public void test(){
    new Thread(()->doReplace(replaceLog)).start();         
}

public void doReplace(String replaceLog){
                  
            //异步处理的业务
}
复制代码

 

======================================

就这么多,再补充噻!!

相关文章
相关标签/搜索