运行程序会建立一个进程。但OS调度的最小单元是线程(轻量级进程)。java
普通的java程序包含的线程:node
/** * 一个java程序包含的线程 */ public class ShowMainThread { public static void main(String[] args) { // java虚拟机的线程管理接口 ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); // 获取线程信息的方法 ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(false, false); for (ThreadInfo threadInfo : threadInfos) { System.out.println(threadInfo.getThreadId() + ":" + threadInfo.getThreadName()); } } }
11:Monitor Ctrl-Break //监听中断信号 5:Attach Listener //获取内存dump,线程dump 4:Signal Dispatcher //将信号分给jvm的线程 3:Finalizer //调用对象的finalizer 方法 2:Reference Handler //清除Reference 1:main //程序的主入口
一、 充分利用多处理核心;sql
二、 更快的响应时间(用户订单的场景,发送邮件等部分可由其余线程执行)数据库
一、知识点多,相关的类和接口比较多,编程
二、学习原理,看源码牵涉的知识点多,包括有设计模式,数据结构,操做系统,cpu相关的概念和定义;3,线程知识点自己的难度也高。设计模式
学习路线:紧紧记住相关的概念和定义-à多写代码,多用à了解原理->看看源码api
建立线程的方法安全
/** * 如何建立一个线程 */ public class HowStartThread { // 继承Thread private static class TestThread extends Thread { @Override public void run() { System.out.println("TestThread is runing"); } } // 实现Runnable 或者Callable 另外还有内部类线程 private static class TestRunable implements Runnable { @Override public void run() { System.out.println("TestRunable is runing"); } } public static void main(String[] args) { Thread t1 = new TestThread(); Thread t2 = new Thread(new TestRunable()); t1.start(); t2.start(); } }
启动线程:threadl类的start()性能优化
线程完成:一、run()方法执行完成;二、抛出一个未处理的异常致使线程的提早结束网络
单独使用一个取消标志位(boolean值判断).
Stop(),suspend(),resume()是过时的api,很大的反作用,容易致使死锁或者数据不一致
/** * 使用自定义的取消标志位中断线程(不安全) */ public class FlagCancel { private static class TestRunable implements Runnable { // boolean的标志位 volatile轻量的线程同步 private volatile boolean on = true; private long i = 0; @Override public void run() { while (on) { i++; // 阻塞方法,on不起做用 // wait,sleep,blockingqueue(put,take) try { Thread.sleep(20000); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("TestRunable is runing :" + i); } public void cancel() { on = false; } } }
使用线程的中断 :
interrupt() 中断线程,本质是将线程的中断标志位设为true,其余线程向须要中断的线程打个招呼。是否真正进行中断由线程本身决定。
isInterrupted() 线程检查本身的中断标志位
静态方法Thread.interrupted() 将中断标志位复位为false
由上面的中断机制可知Java里是没有抢占式任务,只有协做式任务。
为什么要用中断,线程处于阻塞(如调用了java的sleep,wait等等方法时)的时候,是不会理会咱们本身设置的取消标志位的,可是这些阻塞方法都会检查线程的中断标志位。
/** * 安全的中断线程 */ public class SafeInterrupt implements Runnable { private volatile boolean on = true; private long i = 0; @Override public void run() { while (on && Thread.currentThread().isInterrupted()) { i++; } System.out.println("TestRunable is runing :" + i); } public void cancel() { on = false; Thread.currentThread().interrupt(); } }
IO通讯 inputstream read/write等阻塞方法,不会理会中断,而关闭底层的套接字socket.close()会抛出socketException
NIO: selector.select()会阻塞,调用selector的wakeup和close方法会抛出ClosedSelectorException
死锁状态不响应中断的请求,这个必须重启程序,修改错误。
/** * 调用阻塞方法时,如何中断线程 */ public class BlockInterrupt { private static volatile boolean on = true; private static class WhenBlock implements Runnable { @Override public void run() { while (on && !Thread.currentThread().isInterrupted()) { try { // 抛出中断异常的阻塞方法,抛出异常后,中断标志位改为false 须要从新设置标志位 Thread.sleep(100); } catch (InterruptedException e) { // 从新设置标志位 Thread.currentThread().interrupt(); // do my work } // 清理工做结束线程 } } public void cancel() { on = false; Thread.currentThread().interrupt(); } } }
覆盖线程的interrupt方法,在处理套接字异常时,再用super.interrupt()自行中断线程
/** * 如何覆盖线程的interrupt() 方法 */ public class OverrideInterrupt extends Thread { private final Socket socket; private final InputStream in; public OverrideInterrupt(Socket socket, InputStream in) { this.socket = socket; this.in = in; } private void t() { } @Override public void interrupt() { try { // 关闭底层的套接字 socket.close(); } catch (IOException e) { e.printStackTrace(); // ..... } finally { // 同时中断线程 super.interrupt(); } } }
新建立 线程被建立,可是没有调用start方法
可运行(RUNNABLE) 运行状态,由cpu决定是否是正在运行
被阻塞(BLOCKING) 阻塞,线程被阻塞于锁
等待/计时等待(WAITING) 等待某些条件成熟
被终止 线程执行完毕
public class SleepUtils { public static final void second(long seconds) { try { TimeUnit.SECONDS.sleep(seconds); } catch (InterruptedException e) { } } }
/** * 查看线程的状态 */ public class ThreadState { private static Lock lock = new ReentrantLock(); public static void main(String[] args) { new Thread(new SleepAlways(), "SleepAlwaysThread").start(); new Thread(new Waiting(), "WaitingThread").start(); // 使用两个Blocked线程,一个获取锁成功,另外一个被阻塞 new Thread(new Blocked(), "BlockedThread-1").start(); new Thread(new Blocked(), "BlockedThread-2").start(); new Thread(new Sync(), "SyncThread-1").start(); new Thread(new Sync(), "SyncThread-2").start(); } /** * 该线程不断的进行睡眠 */ static class SleepAlways implements Runnable { @Override public void run() { while (true) { SleepUtils.second(100); } } } /** * 该线程在Waiting.class实例上等待 */ static class Waiting implements Runnable { @Override public void run() { while (true) { synchronized (Waiting.class) { try { Waiting.class.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } /** * 该线程在Blocked.class实例上加锁后,不会释放该锁 */ static class Blocked implements Runnable { public void run() { synchronized (Blocked.class) { while (true) { SleepUtils.second(100); } } } } /** * 该线程得到锁休眠后,又释放锁 */ static class Sync implements Runnable { @Override public void run() { lock.lock(); try { SleepUtils.second(3000); } finally { lock.unlock(); } } } }
须要用ThreadState工具才能查看运行的进程状态
成员变量priority控制优先级,范围1-10之间,数字越高优先级越高,缺省为5,建立线程时setPriotity()能够设置优先级,不要期望他发挥做用。
守护型线程(如GC线程),程序里没有非Daemon线程时,java程序就会退出。通常用不上,也不建议咱们平时开发时使用,由于Try/Finally里的代码不必定执行的。
/** * 守护线程 */ public class Daemon { public static void main(String[] args) { Thread thread = new Thread(new DaemonRunner()); thread.setDaemon(true);// 设置为守护线程 thread.start(); } static class DaemonRunner implements Runnable { @Override public void run() { System.out.println("2"); try { System.out.println("23"); SleepUtils.second(100); } finally { System.out.println("DaemonThread finally run."); } } } } main运行结果:没有打印任何东西
run()和start()
run就是一个普通的方法,跟其余类的实例方法没有任何区别。
/** * Run和start方法辨析 */ public class RunAndStart { private static class TestThread extends Thread { private String name; public TestThread(String name) { this.name = name; } @Override public void run() { int i = 90; while (i > 0) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("I am " + name + " i= " + i); } } } public static void main(String[] args) { TestThread parent = new TestThread("beInvoked"); parent.start();// 是TestThread线程去执行 TestThread beInvoked = new TestThread("beInvoked_thread"); beInvoked.run();// 是main线程去执行 } } main运行结果: I am beInvoked i= 90 I am beInvoked_thread i= 90 I am beInvoked i= 90 I am beInvoked_thread i= 90 I am beInvoked i= 90 I am beInvoked_thread i= 90
Sleep
不会释放锁,因此咱们在用sleep时,要把sleep放在同步代码块的外面。
/** * sleep方法是否会释放锁 */ public class SleepTest { // 锁 private Object lock = new Object(); public static void main(String[] args) { SleepTest sleepTest = new SleepTest(); Thread threadA = sleepTest.new ThreadSleep(); threadA.setName("ThreadSleep"); Thread threadB = sleepTest.new ThreadNotSleep(); threadB.setName("ThreadNotSleep"); threadA.start(); try { Thread.sleep(1000); System.out.println(" RunTest slept!"); } catch (InterruptedException e) { e.printStackTrace(); } threadB.start(); } // 休眠的线程 private class ThreadSleep extends Thread { @Override public void run() { String threadName = Thread.currentThread().getName(); System.out.println(threadName + " will take the lock"); try { // 拿到锁之后,休眠 synchronized (lock) { System.out.println(threadName + " taking the lock"); System.out.println("Finish the work: " + threadName); Thread.sleep(5000); } } catch (InterruptedException e) { // e.printStackTrace(); } } } // 不休眠的线程 private class ThreadNotSleep extends Thread { @Override public void run() { String threadName = Thread.currentThread().getName(); System.out.println(threadName + " will take the lock time=" + System.currentTimeMillis()); // 拿到锁之后不休眠 synchronized (lock) { System.out.println(threadName + " taking the lock time=" + System.currentTimeMillis()); System.out.println("Finish the work: " + threadName); } } } } main运行结果: ThreadSleep will take the lock ThreadSleep taking the lock Finish the work: ThreadSleep RunTest slept! ThreadNotSleep will take the lock time=1526644256785 ThreadNotSleep taking the lock time=1526644260785 Finish the work: ThreadNotSleep
yield()
当前线程出让cpu占有权,当前线程变成了可运行状态,下一时刻仍然可能被cpu选中,不会释放锁。
wait()和 notify()/notiyfAll()
调用之前,当前线程必需要持有锁,调用了wait() notify()/notiyfAll()会释放锁。
等待通知机制:
线程 A调用了对象O的wait方法进入等待状态,线程 B调用了对象O的notify方法进行唤醒,唤醒的是在对象O上wait的线程(好比线程A)
notify() 唤醒一个线程,唤醒哪个彻底看cpu的心情(谨慎使用)
notiyfAll() 全部在对象O上wait的线程所有唤醒(应该用notiyfAll())
/** * wait/notify/notifyAll的演示 */ public class User { private int age; private String city; public static final String CITY = "NewYork"; public User(int age, String city) { this.age = age; this.city = city; } public User() { } // 修改用户的城市后,发出通知 public synchronized void changeCity() { this.city = "London"; notifyAll(); } // 修改用户的年龄后,发出通知 public synchronized void changeAge() { this.age = 31; notifyAll(); } // 等待用户的年龄变化的方法,接收到通知,检查发现用户年龄大于30时,进行业务工做,不然一直等待 // 阻塞方法 public synchronized void waitAge() { while (this.age <= 30) { try { wait(); System.out.println("wait age [" + Thread.currentThread().getId() + "] is notified!"); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("the age is " + this.age);// 业务工做 } // 等待用户的城市变化的方法,接收到通知,检查发现用户城市不是NewYork时,进行业务工做,不然一直等待 // 阻塞方法 public synchronized void waitCity() { while (this.city.equals(CITY)) { try { wait(); System.out.println("wait city [" + Thread.currentThread().getId() + "] is notified!"); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("the city is " + this.city);// 业务工做 } }
/** * TestUser测试类 */ public class TestUser { private static User user = new User(30, User.CITY); private static class CheckAge extends Thread { @Override public void run() { user.waitAge(); } } private static class CheckCity extends Thread { @Override public void run() { user.waitCity(); } } public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 3; i++) { // 启动三个等待用户年龄变化的线程 new CheckAge().start(); } for (int i = 0; i < 3; i++) { // 启动三个等待用户城市变化的线程 new CheckCity().start(); } Thread.sleep(1000); user.changeCity();// 变更用户的城市 } } main运行结果: wait city [16] is notified! the city is London wait city [15] is notified! the city is London wait city [14] is notified! the city is London wait age [13] is notified! wait age [12] is notified! wait age [11] is notified!
/** *调用阻塞方法时,如何中断线程 */ public class BlockInterrupt { private static Object o = new Object(); /* while循环中包含try/catch块 */ private static class WhileTryWhenBlock extends Thread { private volatile boolean on = true; private long i = 0; @Override public void run() { System.out.println("当前执行线程id:" + Thread.currentThread().getId()); while (on && !Thread.currentThread().isInterrupted()) { System.out.println("i=" + i++); try { // 抛出中断异常的阻塞方法,抛出异常后,中断标志位会改为false // 能够理解为这些方法会隐含调用Thread.interrputed()方法 synchronized (o) { o.wait(); } } catch (InterruptedException e) { System.out.println("当前执行线程的中断标志位:" + Thread.currentThread().getId() + ":" + Thread.currentThread().isInterrupted()); Thread.currentThread().interrupt();// 从新设置一下 System.out.println("被中断的线程_" + getId() + ":" + isInterrupted()); // do my work } // 清理工做,准备结束线程 } } public void cancel() { // on = false; interrupt(); System.out.println("本方法所在线程实例:" + getId()); System.out.println("执行本方法的线程:" + Thread.currentThread().getId()); // Thread.currentThread().interrupt(); } } /* try/catch块中包含while循环 */ private static class TryWhileWhenBlock extends Thread { private volatile boolean on = true; private long i = 0; @Override public void run() { try { while (on) { System.out.println(i++); // 抛出中断异常的阻塞方法,抛出异常后,中断标志位改为false synchronized (o) { o.wait(); } } } catch (InterruptedException e) { System.out.println("当前执行线程的中断标志位:" + Thread.currentThread().getId() + ":" + Thread.currentThread().isInterrupted()); } finally { // 清理工做结束线程 } } public void cancel() { on = false; interrupt(); } } public static void main(String[] args) throws InterruptedException { WhileTryWhenBlock whileTryWhenBlock = new WhileTryWhenBlock(); whileTryWhenBlock.start(); Thread.sleep(100); whileTryWhenBlock.cancel(); System.out.println("===================="); TryWhileWhenBlock tryWhileWhenBlock = new TryWhileWhenBlock(); tryWhileWhenBlock.start(); Thread.sleep(100); tryWhileWhenBlock.cancel(); } } 输出结果 当前执行线程id:11 i=0 本方法所在线程实例:11 当前执行线程的中断标志位:11:false 执行本方法的线程:1 被中断的线程_11:true ==================== 0 当前执行线程的中断标志位:11:false
每一个线程有本身栈空间,孤立运行,对咱们没有价值。若是多个线程可以相互配合完成工做,这将会带来巨大的价值。
多个线程同时访问一个共享的变量的时候,每一个线程的工做内存有这个变量的一个拷贝,变量自己仍是保存在共享内存中。
Violate修饰字段,对这个变量的访问必需要从共享内存刷新一次。最新的修改写回共享内存。能够保证字段的可见性。绝对不是线程安全的,没有操做的原子性。
适用场景:一、一个线程写,多个线程读;二、volatile变量的变化很固定
关键字synchronized能够修饰方法或者以同步块的形式来进行使用,它主要确保多个线程在同一个时刻,只能有一个线程处于方法或者同步块中,它保证了线程对变量访问的可见性和排他性,又称为内置锁机制。
Synchronized的类锁和对象锁,本质上是两把锁,类锁实际锁的是每个类的class对象。对象锁锁的是当前对象实例。
/** * 测试Volatile型变量的操做原子性 */ public class VolatileThread implements Runnable { private volatile int a= 0; @Override public void run() { //synchronized (this){ a=a+1; System.out.println(Thread.currentThread().getName()+"----"+a); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } a=a+1; System.out.println(Thread.currentThread().getName()+"----"+a); //} } }
public class VolatileTest { public static void main(String[] args) { VolatileThread volatileThread = new VolatileThread(); Thread t1 = new Thread(volatileThread); Thread t2 = new Thread(volatileThread); Thread t3 = new Thread(volatileThread); Thread t4 = new Thread(volatileThread); t1.start(); t2.start(); t3.start(); t4.start(); } }
等待方原则:
一、获取对象锁
二、若是条件不知足,调用对象的wait方法,被通知后依然要检查条件是否知足
三、条件知足之后,才能执行相关的业务逻辑
Synchronized(对象){ While(条件不知足){ 对象.wait() } 业务逻辑处理 }
通知方原则:
一、 得到对象的锁;
二、 改变条件;
三、 通知全部等待在对象的线程
Synchronized(对象){ 业务逻辑处理,改变条件 对象.notify/notifyAll }
/** * 有界阻塞队列 */ public class BlockingQueueWN<T> { private List queue = new LinkedList<>(); private final int limit; // 大小限制 public BlockingQueueWN(int limit) { this.limit = limit; } // 入队 public synchronized void enqueue(T item) throws InterruptedException { // 若是队列满了 等待 while (this.queue.size() == this.limit) { wait(); } // 若是队列为空 唤醒 if (this.queue.size() == 0) { System.out.println("enqueue notifyAll"); notifyAll(); } // 入列 this.queue.add(item); } // 出队 public synchronized T dequeue() throws InterruptedException { // 若是队列为空 等待 while (this.queue.size() == 0) { System.out.println("dequeue wait"); wait(); } // 若是队列满了 唤醒 if (this.queue.size() == this.limit) { notifyAll(); } // 出列 return (T) this.queue.remove(0); } }
public class BqTest { public static void main(String[] args) { BlockingQueueWN bq = new BlockingQueueWN(10); Thread threadA = new ThreadPush(bq); threadA.setName("Push"); Thread threadB = new ThreadPop(bq); threadB.setName("Pop"); threadB.start(); threadA.start(); } // 数据入队列线程 private static class ThreadPush extends Thread { BlockingQueueWN<Integer> bq; public ThreadPush(BlockingQueueWN<Integer> bq) { this.bq = bq; } @Override public void run() { String threadName = Thread.currentThread().getName(); int i = 20; // 循环20次入列 while (i > 0) { try { Thread.sleep(1000); System.out.println(" i=" + i + " will push"); bq.enqueue(i--); } catch (InterruptedException e) { // e.printStackTrace(); } } } } // 数据出队列线程 private static class ThreadPop extends Thread { BlockingQueueWN<Integer> bq; public ThreadPop(BlockingQueueWN<Integer> bq) { this.bq = bq; } @Override public void run() { // 无限循环 有就取 while (true) { try { System.out.println(Thread.currentThread().getName() + " will pop....."); Integer i = bq.dequeue(); System.out.println(" i=" + i.intValue() + " alread pop"); } catch (InterruptedException e) { // e.printStackTrace(); } } } } }
运行结果 Pop will pop..... dequeue wait i=20 will push enqueue notifyAll i=20 alread pop Pop will pop..... dequeue wait i=19 will push enqueue notifyAll i=19 alread pop Pop will pop..... dequeue wait i=18 will push enqueue notifyAll i=18 alread pop Pop will pop..... dequeue wait i=17 will push enqueue notifyAll i=17 alread pop Pop will pop..... dequeue wait i=16 will push enqueue notifyAll i=16 alread pop Pop will pop..... dequeue wait i=15 will push enqueue notifyAll i=15 alread pop Pop will pop..... dequeue wait i=14 will push enqueue notifyAll i=14 alread pop Pop will pop..... dequeue wait i=13 will push enqueue notifyAll i=13 alread pop Pop will pop..... dequeue wait i=12 will push enqueue notifyAll i=12 alread pop Pop will pop..... dequeue wait i=11 will push enqueue notifyAll i=11 alread pop Pop will pop..... dequeue wait i=10 will push enqueue notifyAll i=10 alread pop Pop will pop..... dequeue wait i=9 will push enqueue notifyAll i=9 alread pop Pop will pop..... dequeue wait i=8 will push enqueue notifyAll i=8 alread pop Pop will pop..... dequeue wait i=7 will push enqueue notifyAll i=7 alread pop Pop will pop..... dequeue wait i=6 will push enqueue notifyAll i=6 alread pop Pop will pop..... dequeue wait i=5 will push enqueue notifyAll i=5 alread pop Pop will pop..... dequeue wait i=4 will push enqueue notifyAll i=4 alread pop Pop will pop..... dequeue wait i=3 will push enqueue notifyAll i=3 alread pop Pop will pop..... dequeue wait i=2 will push enqueue notifyAll i=2 alread pop Pop will pop..... dequeue wait i=1 will push enqueue notifyAll i=1 alread pop Pop will pop..... dequeue wait
文件输入输出,网络输入输出,管道输入输出流用于线程中间的数据传递,传输媒介的内存
管道是在线程间进行传送
四种实现
pipedOutputStream/input 面向的字节
pipedReader/Writer 面向的是字符
只适合线程间一对一的通讯,适用范围较狭窄。
public class PipeTransfer { private static class Print implements Runnable{ private PipedReader in; public Print(PipedReader in) { this.in = in; } @Override public void run() { int receive =0; try { while((receive=in.read())!=-1){ System.out.println((char) receive); } } catch (IOException e) { e.printStackTrace(); } } } public static void main(String[] args) throws Exception { PipedWriter out = new PipedWriter(); PipedReader in = new PipedReader(); //必须进行链接 out.connect(in); Thread t1 = new Thread(new Print(in),"PrintThread"); t1.start(); int receive =0; try { while((receive=System.in.read())!=-1){ out.write(receive); } } catch (IOException e) { e.printStackTrace(); }finally { out.close(); } } } 运行输入 good 输出 g o o d
线程A,执行了thread.join(),线程A等待thread线程终止了之后,A在join后面的语句才会继续执行
public class JoinTes { public static void main(String[] args) throws InterruptedException { ThreadJoinTest t1 = new ThreadJoinTest("小明"); ThreadJoinTest t2 = new ThreadJoinTest("小东"); t1.start(); /** * Thread类中的join方法的主要做用就是同步,它可使得线程之间的并行执行变为串行执行。 * * 1 join的意思是使得放弃当前线程的执行,并返回对应的线程,例以下面代码的意思就是: * 程序在main线程中调用t1线程的join方法,则main线程放弃cpu控制权,并返回t1线程继续执行直到线程t1执行完毕 * 因此结果是t1线程执行完后,才到主线程执行,至关于在main线程中同步t1线程,t1执行完了,main线程才有执行的机会 * * 2 join方法能够传递参数,join(10)表示main线程会等待t1线程10毫秒,10毫秒过去后, * main线程和t1线程之间执行顺序由串行执行变为普通的并行执行 * 若是A线程中掉用B线程的join(10),则表示A线程会等待B线程执行10毫秒,10毫秒事后, * A、B线程并行执行。须要注意的是,jdk规定,join(0)的意思不是A线程等待B线程0秒, * 而是A线程等待B线程无限时间,直到B线程执行完毕,即join(0)等价于join() * * 3 join方法必须在线程start方法调用以后调用才有意义。这个也很容易理解:若是一个线程都没有start,那它也就没法同步了。 * * 4 join方法的原理就是调用相应线程的wait方法进行等待操做的,例如A线程中调用了B线程的join方法, * 则至关于在A线程中调用了B线程的wait方法,当B线程执行完(或者到达等待时间), * B线程会自动调用自身的notifyAll方法唤醒A线程,从而达到同步的目的。 */ t1.join(); t2.start(); } } class ThreadJoinTest extends Thread { public ThreadJoinTest(String name) { super(name); } @Override public void run() { for (int i = 0; i < 10; i++) { System.out.println(this.getName() + ":" + i); try { sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } 运行结果 小明:0 小明:1 小明:2 小明:3 小明:4 小明:5 小明:6 小明:7 小明:8 小明:9 小东:0 小东:1 小东:2 小东:3 小东:4 小东:5 小东:6 小东:7 小东:8 小东:9
public class JoinTest { public static class CutInLine implements Runnable { private Thread thread; public CutInLine(Thread thread) { this.thread = thread; } @Override public void run() { try { // 在被插队的线程里,调用一下插队线程的join方法 System.out.println(thread.getName() + " join " + Thread.currentThread().getName()); thread.join(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " will work"); } } public static void main(String[] args) throws InterruptedException { Thread previous = Thread.currentThread(); for (int i = 0; i < 10; i++) { Thread thread = new Thread(new CutInLine(previous), String.valueOf(i)); thread.start(); System.out.print(thread.getName() + " start "); previous = thread; } } } 打印结果 0 start main join 0 1 start 0 join 1 2 start 1 join 2 3 start 2 join 3 4 start 3 join 4 5 start 4 join 5 6 start 5 join 6 7 start 6 join 7 8 start 7 join 8 9 start 8 join 9 0 will work 1 will work 2 will work 3 will work 4 will work 5 will work 6 will work 7 will work 8 will work 9 will work
本质是个map,map的键就是每一个线程对象,值就是每一个线程所拥有的值
经常使用方法:
initialValue()
get()
set()
remove():将当前线程局部变量的值删除,这个方法是JDK 5.0新增的方法。当线程结束后,对应该线程的局部变量将自动被垃圾回收,因此显式调用该方法清除线程的局部变量并非必须的操做,但它能够加快内存回收的速度。
ThreadLocal拥有的这个变量,在线程之间很独立的,相互之间没有联系。内存占用相对来讲比较大。
public class ThreadLocalTest { static ThreadLocal<String> threadLocal = new ThreadLocal<String>(){ @Override protected String initialValue() { return "init"; } }; public void test(){ Thread[] runs = new Thread[3]; for(int i =0;i<runs.length;i++){ runs[i]=new Thread(new T1(i)); } for(int i =0;i<runs.length;i++){ runs[i].start(); } } private static class T1 implements Runnable{ private int id; public T1(int id) { this.id = id; } @Override public void run() { System.out.println(Thread.currentThread().getId()+" start"); String s = threadLocal.get(); s = s+"_"+id; threadLocal.set(s); System.out.println(Thread.currentThread().getId()+s); } } public static void main(String[] args) { ThreadLocalTest test = new ThreadLocalTest(); test.test(); } } 输出结果 11 start 13 start 11init_0 12 start 13init_2 12init_1
串行化、无锁化、异步化编程是趋势之一,好比node.js,Vert.x。
黄金原则:编码时候不要考虑性能优化的事情,先正确实现业务,发现性能不行,这个时候再来考虑性能优化。
public class PerfermenTest { /** 执行次数 */ private static final long count = 100000000; public static void main(String[] args) throws InterruptedException { //并发计算 concurrency(); //单线程计算 serial(); } private static void concurrency() throws InterruptedException { long start = System.currentTimeMillis(); Thread thread = new Thread(new Runnable() { @Override public void run() { int a = 0; for (long i = 0; i < count; i++) { a += 5; } System.out.println("a="+a); } }); thread.start(); int b = 0; for (long i = 0; i < count; i++) { b--; } thread.join(); long time = System.currentTimeMillis() - start; System.out.println("concurrency :" + time + "ms,b=" + b); } private static void serial() { long start = System.currentTimeMillis(); int a = 0; for (long i = 0; i < count; i++) { a += 5; } int b = 0; for (long i = 0; i < count; i++) { b--; } long time = System.currentTimeMillis() - start; System.out.println("serial:" + time + "ms,b=" + b + ",a=" + a); } } 输出结果 a=500000000 concurrency :41ms,b=-100000000 serial:70ms,b=-100000000,a=500000000
调用场景:调用一个方法时等待一段时间(通常来讲是给定一个时间段),若是该方法可以在给定的时间段以内获得结果,那么将结果马上返回,反之,超时返回默认结果。
假设等待时间段是T,那么能够推断出在当前时间now+T以后就会超时 等待持续时间:REMAINING=T。 ·超时时间:FUTURE=now+T。 // 对当前对象加锁 public synchronized Object get(long mills) throws InterruptedException { long future = System.currentTimeMillis() + mills; long remaining = mills; // 当超时大于0而且result返回值不知足要求 while ((result == null) && remaining > 0) { wait(remaining); remaining = future - System.currentTimeMillis(); } return result; }
public class ConnectionDriver { public static final Connection getConnectiong() { return new ConnectionImpl(); } private static class ConnectionImpl implements Connection { @Override public Statement createStatement() throws SQLException { System.out.println("建立SQL " + Thread.currentThread().getId()); return null; } @Override public void commit() throws SQLException { try { System.err.println(Thread.currentThread().getId() + "准备提交数据"); TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public PreparedStatement prepareStatement(String sql) throws SQLException { return null; } @Override public CallableStatement prepareCall(String sql) throws SQLException { return null; } @Override public String nativeSQL(String sql) throws SQLException { return null; } @Override public void setAutoCommit(boolean autoCommit) throws SQLException { } @Override public boolean getAutoCommit() throws SQLException { return false; } @Override public void rollback() throws SQLException { } @Override public void close() throws SQLException { } @Override public boolean isClosed() throws SQLException { return false; } @Override public DatabaseMetaData getMetaData() throws SQLException { return null; } @Override public void setReadOnly(boolean readOnly) throws SQLException { } @Override public boolean isReadOnly() throws SQLException { return false; } @Override public void setCatalog(String catalog) throws SQLException { } @Override public String getCatalog() throws SQLException { return null; } @Override public void setTransactionIsolation(int level) throws SQLException { } @Override public int getTransactionIsolation() throws SQLException { return 0; } @Override public SQLWarning getWarnings() throws SQLException { return null; } @Override public void clearWarnings() throws SQLException { } @Override public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException { return null; } @Override public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { return null; } @Override public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { return null; } @Override public Map<String, Class<?>> getTypeMap() throws SQLException { return null; } @Override public void setTypeMap(Map<String, Class<?>> map) throws SQLException { } @Override public void setHoldability(int holdability) throws SQLException { } @Override public int getHoldability() throws SQLException { return 0; } @Override public Savepoint setSavepoint() throws SQLException { return null; } @Override public Savepoint setSavepoint(String name) throws SQLException { return null; } @Override public void rollback(Savepoint savepoint) throws SQLException { } @Override public void releaseSavepoint(Savepoint savepoint) throws SQLException { } @Override public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { return null; } @Override public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { return null; } @Override public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { return null; } @Override public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException { return null; } @Override public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException { return null; } @Override public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException { return null; } @Override public Clob createClob() throws SQLException { return null; } @Override public Blob createBlob() throws SQLException { return null; } @Override public NClob createNClob() throws SQLException { return null; } @Override public SQLXML createSQLXML() throws SQLException { return null; } @Override public boolean isValid(int timeout) throws SQLException { return false; } @Override public void setClientInfo(String name, String value) throws SQLClientInfoException { } @Override public void setClientInfo(Properties properties) throws SQLClientInfoException { } @Override public String getClientInfo(String name) throws SQLException { return null; } @Override public Properties getClientInfo() throws SQLException { return null; } @Override public Array createArrayOf(String typeName, Object[] elements) throws SQLException { return null; } @Override public Struct createStruct(String typeName, Object[] attributes) throws SQLException { return null; } @Override public void setSchema(String schema) throws SQLException { } @Override public String getSchema() throws SQLException { return null; } @Override public void abort(Executor executor) throws SQLException { } @Override public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException { } @Override public int getNetworkTimeout() throws SQLException { return 0; } @Override public <T> T unwrap(Class<T> iface) throws SQLException { return null; } @Override public boolean isWrapperFor(Class<?> iface) throws SQLException { return false; } } }
/** * 数据库链接池 * 从链接池中获取、使用和释放链接的过程,而客户端获取链接的过程被设定为等待超时的模式, * 也就是在1000毫秒内若是没法获取到可用链接,将会返回给客户端一个null。 * 设定链接池的大小为10个,而后经过调节客户端的线程数来模拟没法获取链接的场景。 * 链接池的定义。它经过构造函数初始化链接的最大上限,经过一个双向队列来维护链接, * 调用方须要先调用fetchConnection(long)方法来指定在多少毫秒内超时获取链接,当链接使用完成后, * 须要调用releaseConnection(Connection)方法将链接放回线程池 */ public class ConnectionPool { // 存放链接的容器 private LinkedList<Connection> pool = new LinkedList<Connection>(); public ConnectionPool(int initialSize) { if (initialSize > 0) { for (int i = 0; i < initialSize; i++) { pool.addLast(ConnectionDriver.getConnectiong()); } } } /* 将链接放回线程池 */ public void releaseConnection(Connection connection) { if (connection != null) { synchronized (pool) { // 添加后须要进行通知,这样其余消费者可以感知到连接池中已经归还了一个连接 pool.addLast(connection); pool.notifyAll(); } } } /* * 指定在多少毫秒内超时获取链接,在指定时间内没法获取到链接,将会返回null */ public Connection fetchConnection(long mills) throws InterruptedException { synchronized (pool) { // 彻底超时 if (mills <= 0) { while (pool.isEmpty()) { pool.wait(); } return pool.removeFirst(); } else { long future = System.currentTimeMillis() + mills;// 何时超时 long remaining = mills;// 超时时长 while (pool.isEmpty() && remaining > 0) { pool.wait(remaining); remaining = future - System.currentTimeMillis();// 当前还须等待的时长 } Connection result = null; if (!pool.isEmpty()) { result = pool.removeFirst(); } return result; } } } }
public class ConnectionPoolTest { static ConnectionPool pool = new ConnectionPool(10); // 保证全部ConnectionRunner可以同时开始 static CountDownLatch start = new CountDownLatch(1); // main线程将会等待全部ConnectionRunner结束后才能继续执行 static CountDownLatch end; public static void main(String[] args) throws Exception { // 线程数量,能够线程数量进行观察 int threadCount = 50; end = new CountDownLatch(threadCount); int count = 10;// 每一个线程循环取20次 AtomicInteger got = new AtomicInteger();// 获取到数据库链接的次数 AtomicInteger notGot = new AtomicInteger();// 没有获取到数据库链接的次数 for (int i = 0; i < threadCount; i++) { Thread thread = new Thread(new ConnetionRunner(count, got, notGot), "ConnectionRunnerThread"); thread.start(); } start.countDown(); end.await(); System.out.println("total invoke: " + (threadCount * count)); System.out.println("got connection: " + got); System.out.println("not got connection " + notGot); } static class ConnetionRunner implements Runnable { int count; AtomicInteger got; AtomicInteger notGot; public ConnetionRunner(int count, AtomicInteger got, AtomicInteger notGot) { this.count = count; this.got = got; this.notGot = notGot; } public void run() { try { start.await(); } catch (Exception ex) { } while (count > 0) { try { // 从线程池中获取链接,若是1000ms内没法获取到,将会返回null // 分别统计链接获取的数量got和未获取到的数量notGot Connection connection = pool.fetchConnection(1000); if (connection != null) { try { connection.createStatement(); connection.commit(); } finally { pool.releaseConnection(connection); got.incrementAndGet(); } } else { notGot.incrementAndGet(); } } catch (Exception ex) { } finally { count--; } } end.countDown(); } } } 运行结果: 建立SQL 41 41准备提交数据 41准备提交数据 建立SQL 41 total invoke: 500 got connection: 432 not got connection 68