前段时间应隔壁部门大佬的邀约,简单地帮他们部门的童靴梳理了下多线程相关的内容,客串了一把讲师【由于部门内有很多是c#转java的童鞋,因此讲的稍微浅显了些】html
ok,按照我的习惯先来大纲java
知识点:mysql
1)进程 多线程的相关概念 涉及到CPU调度 稍微谈下JVM内存模型 程序计数器
2)多线程的三种实现手段及其代码实现 这边能够谈下futurtask的底层源码
3)经常使用锁概念及实现说明 隐式锁 显式锁 乐观锁 悲观锁 CAS 可重入锁 不可重入锁 读写锁 线程安全 引伸的谈下分布式锁 及分布式锁的原理,经常使用的三种实现手段 volatile关键字及其底层源码
4)线程池的概念,线程池的使用 扒一扒线程池的源码 缓存队列 核心线程池 线程池建立任务的过程 线程池的生命周期等redis
多线程: 多线程是什么? 多线程是一个程序(进程)运行时产生了不止一个线程。算法
进程和线程区别 一个正在执行的程序,进程是控制程序的执行顺序。这个顺序又被称为一个控制单元。sql
并行和并发的概念: 并行:多个CPU实例或者多台机器同时执行一段处理逻辑 并发:经过CPU调度算法,让用户看上去是同时执行的,在CPU层面不是同时。数据库
这边衍生的能够谈下JVM中内存模型的程序计数器 就是记录java执行字节码的行号指示器。编程
jvm内存模型: 线程私有:
程序计数器:记录程序执行过程当中的字节码的行号指示器
java虚拟机栈: 主要是是被调用的java方法表明的是一个个栈帧 局部变量表 操做数栈 动态连接 方法出口等等 java.lang.StackOverflowError
本地方法栈: 主要是是被调用的native方法表明的是一个个栈帧c#
线程公有
堆 : 对象实例
方法区:最重要的就是运行时常量池 gc缓存
为何要是用多线程:
1)充分的利用CPU资源,若是只有一个线程的话,第二个任务必须等第一个任务完成以后才能进行。
2)进程之间没法共享数据,可是线程能够
3)建立进程须要为这个进程分配系统资源,建立线程的代价小
多线程的实现手段【3种手段】
1)Thread
package com.Allen.test;
import java.util.concurrent.TimeUnit;
public class testThread extends Thread{
public static void main(String[] args) {
testThread t1=new testThread();
testThread t2=new testThread();
t1.start();
t2.start();
}
public void run(){
System.out.println("start");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end");
}
}
复制代码
2)runnable
package com.Allen.test;
import java.util.concurrent.TimeUnit;
public class testRunnable {
public static void main(String[] args) {
testAllen th1=new testAllen();
for(int i=0;i<5;i++){
Thread t1=new Thread(th1);
t1.start();
}
}
}
class testAllen implements Runnable{
@Override
public void run() {
System.out.println("start");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end");
}
}
复制代码
3)future callable
package com.Allen.test;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
public class testFuture {
public static void main(String[] args) throws InterruptedException, ExecutionException {
futureTask task=new futureTask();
// //线程池 单线程的线程池
// ExecutorService service=Executors.newCachedThreadPool();
// Future<Integer> future=service.submit(task);
// //说到下面这个方法就要提及线程池的状态 四个种状态
// service.shutdown();
FutureTask<Integer>future=new FutureTask<>(task);
Thread t1=new Thread(future);
t1.start();
System.out.println("run task ....");
TimeUnit.SECONDS.sleep(1);
System.out.println("result : "+future.get());
}
}
class futureTask implements Callable<Integer>{
@Override
public Integer call() throws Exception {
TimeUnit.SECONDS.sleep(2);
int result=0;
//模拟一个庞大的计算
for(int i=0;i<100;i++){
for(int j=0;j<i;j++){
result+=j;
}
}
return result;
}
}
复制代码
扒一扒futureTask的源码
首先咱们看run方法
接下来咱们谈下多线程操做中比较核心的东西
多线程操做共享变量的问题
锁机制
最经常使用的锁机制 synchronized及lock
隐式锁 synchronized 谈下底层原理
显示锁 lock
重入锁
我设计了一个实例来直观的观察多线程操做共享变量的线程安全问题
package com.Allen.test;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
public class TestSynchronized {
public static void main(String[] args) throws InterruptedException {
for (int s = 0; s < 10; s++) {
Person person = new Person();
person.setAge(0);
ReentrantLock lock = new ReentrantLock();
for (int i = 0; i < 100; i++) {
Thread t1 = new Thread(new test111(person,lock));
t1.start();
}
TimeUnit.SECONDS.sleep(3);
System.out.println(person.getAge());
}
}
}
class test111 implements Runnable {
Person person;
ReentrantLock lock;
public test111(Person person,ReentrantLock lock) {
this.person = person;
this.lock=lock;
}
public void run() {
// synchronized (person) {
// person.setAge(person.getAge() + 1);
// }
lock.lock();
person.setAge(person.getAge() + 1);
lock.unlock();
//person.setAge(person.getAge()+1);
}
}
复制代码
悲观锁 乐观锁 CAS
一 悲观锁
在关系型数据库管理系统中,悲观并发控制(悲观锁)是一种并发控制的方法。 简单而言,就是它“悲观”地默认每次拿数据都认为别人会修改,因此在每次拿以前去上锁,这样别人想拿这个数据就会block直到它拿到锁。传统悲观锁实现机制大多利用数据库提供的锁机制(也只有数据库层面提供的锁机制才能真正保证了数据访问的排他性)。
悲观锁流程以下:
1 对任意记录进行修改以前,尝试给它加排它锁。
2 如果加锁失败,说明该记录正在修改,那么当前须要等待或者抛出异常。
3 若是成功加锁,那么就能够对记录进行修改,事务完成以后解锁。
4 期间其余人须要对该记录进行修改或者加排查锁操做,就不准等待咱们解锁或者直接抛出异常。
以mysql为例
使用悲观锁,先关闭mysql的自动提交属性。
set autocommit=0
begin;
select status from t_goods where id=1 for update;
insert into t_orders(id,goods_id)values(null,1);
update t_goods set status=2;
commit;
发起事务 操做 提交事务
select for update 开启排它锁方式实现悲观锁,mysql InnoDB默认行级锁【ps:注意一点,行级锁都是基于索引,若是用不到索引会使用表级锁吧整张表锁住】
优势与缺点:
悲观并发控制实现上是“先取锁再访问”的保守策略,为数据安全提供保障,可是牺牲了效率,处理加锁会让数据库产生额外的开销,还增长了死锁的机会,下降了并行性,若是一个实物锁定了某行数据,其余事物必须等待改事务处理完才能处理那一行。适用于状态修改很是高,冲突很是严重的系统。
二 乐观锁
假设了多用户并发事务处理下不会彼此影响,各事务在不产生锁的状况下处理各自的那部分数据,
每次去拿数据都认为别人不会修改,因此不会上锁,只会在更新的时候判断一下此期间别人有没有更新这个数据。若是有其余事务更新的话,正在提交的事务会回滚。
通常来讲乐观锁不会使用数据库提供的机制,咱们一般采用记录数据版原本实现乐观锁 记录的方式有版本号或者时间戳。
在数据初始化的时候指定一个版本号,每次对数据更新在对版本号作+1操做,并判断当前的版本号是否是该数据的最新版本。
优势与缺点;
乐观并发控制事务之间数据竞争几率较小,能够一直作下去知道提交才会锁定,有点相似于svn
三 CAS 无锁化编程
java.util.concurrent包中借助CAS实现了区别于synchronouse同步锁的一种乐观锁。 无锁化编程
CAS有三个操做数,内存值V,旧的预期值A,修改的新值B。当且仅当预期值A与内存值V相同时,将内存值V修改成B,不然什么都不作。
优势与缺点: 能够用CAS在无锁的状况下实现原子操做,但要明确应用场合,很是简单的操做且又不想引入锁能够考虑使用CAS操做,当想要非阻塞地完成某一操做也能够考虑CAS。 CAS虽然很高效的解决原子操做,可是CAS仍然存在三大问题。ABA问题,循环时间长开销大和只能保证一个共享变量的原子操做
ABA问题。由于CAS须要在操做值的时候检查下值有没有发生变化,若是没有发生变化则更新,可是若是一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时会发现它的值没有发生变化,可是实际上却变化了。ABA问题的解决思路就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加一,那么A-B-A 就会变成1A-2B-3A。 从Java1.5开始JDK的atomic包里提供了一个类AtomicStampedReference来解决ABA问题。这个类的compareAndSet方法做用是首先检查当前引用是否等于预期引用,而且当前标志是否等于预期标志,若是所有相等,则以原子方式将该引用和该标志的值设置为给定的更新值。 关于ABA问题参考文档: blog.hesey.net/2011/09/res…
循环时间长开销大。自旋CAS若是长时间不成功,会给CPU带来很是大的执行开销。若是JVM能支持处理器提供的pause指令那么效率会有必定的提高,pause指令有两个做用,第一它能够延迟流水线执行指令(de-pipeline),使CPU不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零。第二它能够避免在退出循环的时候因内存顺序冲突(memory order violation)而引发CPU流水线被清空(CPU pipeline flush),从而提升CPU的执行效率。
只能保证一个共享变量的原子操做。当对一个共享变量执行操做时,咱们可使用循环CAS的方式来保证原子操做,可是对多个共享变量操做时,循环CAS就没法保证操做的原子性,这个时候就能够用锁,或者有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操做。好比有两个共享变量i=2,j=a,合并一下ij=2a,而后用CAS来操做ij。从Java1.5开始JDK提供了AtomicReference类来保证引用对象之间的原子性,你能够把多个变量放在一个对象里来进行CAS操做。
compareAndSet有点相似于以下
if (this == expect) {
this = update
return true;
} else {
return false;
}
以下附上CAS实例
volatile关键字 可是咱们要思考下并发编程下的俩个关键问题?
1 线程之间是如何通讯
1.1 共享内存
隐式通讯
1.2 消息传递
显式通讯
2 线程之间是如何同步
在共享内存的并发模型中,同步是显示作的,synchronized
在消息传递的并发模型,因为消息的发送必需要在消息的接受以前,因此同步是隐式的。
2 定位内存可见性问题:
什么对象是内存共享的,什么不是。
主内存:共享变量
私有本地内存:存储共享变量的副本
synchronized :可重入锁,互斥性,可见性。
volatile:原子性,可见性。不能作到复合操做的原子性。性能开销更小。
synchronized 线程A释放锁以后会把本地内存的变量同步到主内存
线程B获取锁的时候会把主内存的共享变量同步到本地内存中。
调用monitorenter monitorexit
对象同步方法调用的时候必需要获取到它的监视器,用完以后会释放。
多进程下访问共享变量
分布式锁 包括三种经常使用的使用手段
1 ) 经过数据库的方式
create table lock(
ID
Method_Name 惟一约束
)
每次去操做文件的是否,都去插入表,获取锁是否才能去操做文件,
等锁释放【删除这个记录】才能insert
缺点:
删除失败 【等待程序不可用】
重入锁【能够对进程进行编号,来判断重入】
2) 使用zookeeper
临时有序节点
谁先写到节点上,获取锁
有个watch机制,会判断节点是否失效,失效以后会读取下一个节点
3) redis
setnx
谁先set这个值,谁就现获取锁
后续set失败的就是没有获取锁
等待锁释放以后你才能set这个值。
线程池
线程池种类及区别:
运行executors类给咱们提供静态方法调用不一样的线程池
newsingleThreadExecutor:
返回一个单线程的executor,将多个任务交给这个Executor时,这个线程处理完一个任务以后接着处理下一个任务,若该线程出现异常,将会有一个新的线程替代。
newFixedThreadPool
返回一个包含指定数目线程的线程池,任务数量多于线程数目,则没有执行的任务必须等待,直到任务完成。
newCahedThreadPool
根据用户的任务数建立相应的线程数来处理,该线程池不会对线程数加以限制,彻底依赖于JVM能建立的线程数量,可能引发内存不足。
用法以下:
package com.Allen.TestPoolSize;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class Test {
public static void main(String[] args) {
//单线程化的线程池
ExecutorService service1=Executors.newSingleThreadExecutor();
service1.execute(new Runnable() {
@Override
public void run() {
System.out.println("aaa");
}
});
//可缓存线程池
ExecutorService service2=Executors.newCachedThreadPool();
service2.execute(new Runnable() {
public void run() {
System.out.println("bbb");
}
});
//定长线程池
ExecutorService service3=Executors.newFixedThreadPool(3);
service3.execute(new Runnable() {
public void run() {
System.out.println("ccc");
}
});
//定长线程池支持定时和周期任务
ScheduledExecutorService service4=Executors.newScheduledThreadPool(5);
service4.schedule(new Runnable() {
@Override
public void run() {
System.out.println("ddd");
}
}, 3, TimeUnit.SECONDS);
}
}
复制代码
过程 原理
扒一扒源码
首先咱们写一个test类
package com.Allen.studyThread;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class test {
public static void main(String[] args) {
ExecutorService threadpool=Executors.newFixedThreadPool(3);
for(int i=0;i<10;i++){
threadpool.submit(new testrunnable("allen_"+i));
}
threadpool.shutdown();
}
}
class testrunnable implements Runnable{
private String name;
public testrunnable(String name){
this.name=name;
}
public void run() {
System.out.println(name+" start...");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(name+" end...");
}
}
复制代码
看线程池源码,咱们入口从newFixedThreadPool入口入,咱们看看他初始化作了什么 进入到Executors类的newFixedThreadPool方法传入一个参数
而后咱们看下他的submit方法