多线程(一)

概念

运行程序会建立一个进程。但OS调度的最小单元是线程(轻量级进程)。java

普通的java程序包含的线程:node

/**
 * 一个java程序包含的线程
 */
public class ShowMainThread {

	public static void main(String[] args) {
		// java虚拟机的线程管理接口
		ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
		// 获取线程信息的方法
		ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(false, false);
		for (ThreadInfo threadInfo : threadInfos) {
			System.out.println(threadInfo.getThreadId() + ":"
					+ threadInfo.getThreadName());
		}
	}
}
11:Monitor Ctrl-Break  //监听中断信号

5:Attach Listener  //获取内存dump,线程dump

4:Signal Dispatcher  //将信号分给jvm的线程

3:Finalizer  //调用对象的finalizer 方法

2:Reference Handler  //清除Reference

1:main //程序的主入口

 

为何要用线程?

一、 充分利用多处理核心;sql

二、 更快的响应时间(用户订单的场景,发送邮件等部分可由其余线程执行)数据库

 

学习线程的难点

一、知识点多,相关的类和接口比较多,编程

二、学习原理,看源码牵涉的知识点多,包括有设计模式,数据结构,操做系统,cpu相关的概念和定义;3,线程知识点自己的难度也高。设计模式

    学习路线紧紧记住相关的概念和定义-à多写代码,多用à了解原理->看看源码api

 

启动线程和退出线程

建立线程的方法安全

/**
 * 如何建立一个线程
 */
public class HowStartThread {

	// 继承Thread
	private static class TestThread extends Thread {
		@Override
		public void run() {
			System.out.println("TestThread is runing");

		}
	}

	// 实现Runnable 或者Callable 另外还有内部类线程
	private static class TestRunable implements Runnable {

		@Override
		public void run() {
			System.out.println("TestRunable is runing");
		}
	}

	public static void main(String[] args) {
		Thread t1 = new TestThread();
		Thread t2 = new Thread(new TestRunable());
		t1.start();
		t2.start();

	}

}

启动线程:threadl类的start()性能优化

线程完成:一、run()方法执行完成;二、抛出一个未处理的异常致使线程的提早结束网络

 

取消和中断

不安全的取消:

 单独使用一个取消标志位(boolean值判断).

Stop(),suspend(),resume()是过时的api,很大的反作用,容易致使死锁或者数据不一致

/**
 * 使用自定义的取消标志位中断线程(不安全)
 */
public class FlagCancel {

	private static class TestRunable implements Runnable {

		// boolean的标志位 volatile轻量的线程同步
		private volatile boolean on = true;
		private long i = 0;

		@Override
		public void run() {
			while (on) {
				i++;
				// 阻塞方法,on不起做用
				// wait,sleep,blockingqueue(put,take)
				try {
					Thread.sleep(20000);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			System.out.println("TestRunable is runing :" + i);
		}

		public void cancel() {
			on = false;
		}
	}

}

如何安全的终止线程

使用线程的中断 : 

interrupt() 中断线程,本质是将线程的中断标志位设为true,其余线程向须要中断的线程打个招呼。是否真正进行中断由线程本身决定。

isInterrupted() 线程检查本身的中断标志位

静态方法Thread.interrupted() 将中断标志位复位为false

由上面的中断机制可知Java里是没有抢占式任务,只有协做式任务。

为什么要用中断,线程处于阻塞(如调用了java的sleep,wait等等方法时)的时候,是不会理会咱们本身设置的取消标志位的,可是这些阻塞方法都会检查线程的中断标志位。

/**
 * 安全的中断线程
 */
public class SafeInterrupt implements Runnable {

	private volatile boolean on = true;
	private long i = 0;

	@Override
	public void run() {
		while (on && Thread.currentThread().isInterrupted()) {
			i++;
		}
		System.out.println("TestRunable is runing :" + i);
	}

	public void cancel() {
		on = false;
		Thread.currentThread().interrupt();
	}
}

处理不可中断的阻塞

IO通讯 inputstream read/write等阻塞方法,不会理会中断,而关闭底层的套接字socket.close()会抛出socketException

NIO: selector.select()会阻塞,调用selector的wakeup和close方法会抛出ClosedSelectorException

死锁状态不响应中断的请求,这个必须重启程序,修改错误。

/**
 * 调用阻塞方法时,如何中断线程
 */
public class BlockInterrupt {

	private static volatile boolean on = true;

	private static class WhenBlock implements Runnable {

		@Override
		public void run() {
			while (on && !Thread.currentThread().isInterrupted()) {
				try {
					// 抛出中断异常的阻塞方法,抛出异常后,中断标志位改为false 须要从新设置标志位
					Thread.sleep(100);
				} catch (InterruptedException e) {
					// 从新设置标志位
					Thread.currentThread().interrupt();
					// do my work
				}
				// 清理工做结束线程
			}
		}

		public void cancel() {
			on = false;
			Thread.currentThread().interrupt();
		}

	}
}

如何让咱们的代码既能够响应普通的中断,又能够关闭底层的套接字呢?

覆盖线程的interrupt方法,在处理套接字异常时,再用super.interrupt()自行中断线程

/**
 * 如何覆盖线程的interrupt() 方法
 */
public class OverrideInterrupt extends Thread {

	private final Socket socket;
	private final InputStream in;

	public OverrideInterrupt(Socket socket, InputStream in) {
		this.socket = socket;
		this.in = in;
	}

	private void t() {
	}

	@Override
	public void interrupt() {
		try {
			// 关闭底层的套接字
			socket.close();
		} catch (IOException e) {
			e.printStackTrace();
			// .....
		} finally {
			// 同时中断线程
			super.interrupt();
		}

	}
}

线程的状态

新建立   线程被建立,可是没有调用start方法

可运行(RUNNABLE)  运行状态,由cpu决定是否是正在运行

被阻塞(BLOCKING)  阻塞,线程被阻塞于锁

等待/计时等待(WAITING) 等待某些条件成熟

被终止  线程执行完毕

public class SleepUtils {
	public static final void second(long seconds) {
		try {
			TimeUnit.SECONDS.sleep(seconds);
		} catch (InterruptedException e) {
		}
	}
}
/**
 * 查看线程的状态
 */
public class ThreadState {
	private static Lock lock = new ReentrantLock();

	public static void main(String[] args) {
		new Thread(new SleepAlways(), "SleepAlwaysThread").start();
		new Thread(new Waiting(), "WaitingThread").start();
		// 使用两个Blocked线程,一个获取锁成功,另外一个被阻塞
		new Thread(new Blocked(), "BlockedThread-1").start();
		new Thread(new Blocked(), "BlockedThread-2").start();
		new Thread(new Sync(), "SyncThread-1").start();
		new Thread(new Sync(), "SyncThread-2").start();
	}

	/**
	 * 该线程不断的进行睡眠
	 */
	static class SleepAlways implements Runnable {
		@Override
		public void run() {
			while (true) {
				SleepUtils.second(100);
			}
		}
	}

	/**
	 * 该线程在Waiting.class实例上等待
	 */
	static class Waiting implements Runnable {
		@Override
		public void run() {
			while (true) {
				synchronized (Waiting.class) {
					try {
						Waiting.class.wait();
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}
		}
	}

	/**
	 * 该线程在Blocked.class实例上加锁后,不会释放该锁
	 */
	static class Blocked implements Runnable {
		public void run() {
			synchronized (Blocked.class) {
				while (true) {
					SleepUtils.second(100);
				}
			}
		}
	}

	/**
	 * 该线程得到锁休眠后,又释放锁
	 */
	static class Sync implements Runnable {

		@Override
		public void run() {
			lock.lock();
			try {
				SleepUtils.second(3000);
			} finally {
				lock.unlock();
			}

		}

	}
}

须要用ThreadState工具才能查看运行的进程状态

线程的优先级:

成员变量priority控制优先级,范围1-10之间,数字越高优先级越高,缺省为5,建立线程时setPriotity()能够设置优先级,不要期望他发挥做用。

Daemon线程

守护型线程(如GC线程),程序里没有非Daemon线程时,java程序就会退出。通常用不上,也不建议咱们平时开发时使用,由于Try/Finally里的代码不必定执行的。

/**
 * 守护线程
 */
public class Daemon {
	public static void main(String[] args) {
		Thread thread = new Thread(new DaemonRunner());
		thread.setDaemon(true);// 设置为守护线程
		thread.start();
	}

	static class DaemonRunner implements Runnable {

		@Override
		public void run() {
			System.out.println("2");
			try {
				System.out.println("23");
				SleepUtils.second(100);
			} finally {
				System.out.println("DaemonThread finally run.");
			}
		}
	}
}

main运行结果:没有打印任何东西

经常使用方法深刻理解

run()和start()

run就是一个普通的方法,跟其余类的实例方法没有任何区别。

/**
 * Run和start方法辨析
 */
public class RunAndStart {

	private static class TestThread extends Thread {

		private String name;

		public TestThread(String name) {
			this.name = name;
		}

		@Override
		public void run() {
			int i = 90;
			while (i > 0) {
				try {
					Thread.sleep(1000);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				System.out.println("I am " + name + " i= " + i);
			}

		}
	}

	public static void main(String[] args) {
		TestThread parent = new TestThread("beInvoked");
		parent.start();// 是TestThread线程去执行

		TestThread beInvoked = new TestThread("beInvoked_thread");
		beInvoked.run();// 是main线程去执行

	}

}

main运行结果:
I am beInvoked i= 90
I am beInvoked_thread i= 90
I am beInvoked i= 90
I am beInvoked_thread i= 90
I am beInvoked i= 90
I am beInvoked_thread i= 90

Sleep

不会释放锁,因此咱们在用sleep时,要把sleep放在同步代码块的外面。

/**
 * sleep方法是否会释放锁
 */
public class SleepTest {
	// 锁
	private Object lock = new Object();

	public static void main(String[] args) {
		SleepTest sleepTest = new SleepTest();
		Thread threadA = sleepTest.new ThreadSleep();
		threadA.setName("ThreadSleep");
		Thread threadB = sleepTest.new ThreadNotSleep();
		threadB.setName("ThreadNotSleep");
		threadA.start();
		try {
			Thread.sleep(1000);
			System.out.println(" RunTest slept!");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		threadB.start();
	}

	// 休眠的线程
	private class ThreadSleep extends Thread {

		@Override
		public void run() {
			String threadName = Thread.currentThread().getName();
			System.out.println(threadName + " will take the lock");
			try {

				// 拿到锁之后,休眠
				synchronized (lock) {
					System.out.println(threadName + " taking the lock");
					System.out.println("Finish the work: " + threadName);
					Thread.sleep(5000);
				}

			} catch (InterruptedException e) {
				// e.printStackTrace();
			}
		}
	}

	// 不休眠的线程
	private class ThreadNotSleep extends Thread {

		@Override
		public void run() {
			String threadName = Thread.currentThread().getName();
			System.out.println(threadName + " will take the lock time="
					+ System.currentTimeMillis());
			// 拿到锁之后不休眠
			synchronized (lock) {
				System.out.println(threadName + " taking the lock time="
						+ System.currentTimeMillis());
				System.out.println("Finish the work: " + threadName);
			}
		}
	}
}

main运行结果:
ThreadSleep will take the lock
ThreadSleep taking the lock
Finish the work: ThreadSleep
 RunTest slept!
ThreadNotSleep will take the lock time=1526644256785
ThreadNotSleep taking the lock time=1526644260785
Finish the work: ThreadNotSleep

yield()

当前线程出让cpu占有权,当前线程变成了可运行状态,下一时刻仍然可能被cpu选中,不会释放锁。

wait()和 notify()/notiyfAll()

调用之前,当前线程必需要持有锁,调用了wait() notify()/notiyfAll()会释放锁。

等待通知机制:

线程 A调用了对象O的wait方法进入等待状态,线程 B调用了对象O的notify方法进行唤醒,唤醒的是在对象O上wait的线程(好比线程A)

notify() 唤醒一个线程,唤醒哪个彻底看cpu的心情(谨慎使用)

notiyfAll() 全部在对象O上wait的线程所有唤醒(应该用notiyfAll())

 

/**
 * wait/notify/notifyAll的演示
 */
public class User {

	private int age;
	private String city;

	public static final String CITY = "NewYork";

	public User(int age, String city) {
		this.age = age;
		this.city = city;
	}

	public User() {
	}

	// 修改用户的城市后,发出通知
	public synchronized void changeCity() {
		this.city = "London";
		notifyAll();
	}

	// 修改用户的年龄后,发出通知
	public synchronized void changeAge() {
		this.age = 31;
		notifyAll();
	}

	// 等待用户的年龄变化的方法,接收到通知,检查发现用户年龄大于30时,进行业务工做,不然一直等待
	// 阻塞方法
	public synchronized void waitAge() {
		while (this.age <= 30) {
			try {
				wait();
				System.out.println("wait age ["
						+ Thread.currentThread().getId() + "] is notified!");
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		System.out.println("the age is " + this.age);// 业务工做
	}

	// 等待用户的城市变化的方法,接收到通知,检查发现用户城市不是NewYork时,进行业务工做,不然一直等待
	// 阻塞方法
	public synchronized void waitCity() {
		while (this.city.equals(CITY)) {
			try {
				wait();
				System.out.println("wait city ["
						+ Thread.currentThread().getId() + "] is notified!");
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		System.out.println("the city is " + this.city);// 业务工做
	}
}
/**
 * TestUser测试类
 */
public class TestUser {

	private static User user = new User(30, User.CITY);

	private static class CheckAge extends Thread {
		@Override
		public void run() {
			user.waitAge();
		}
	}

	private static class CheckCity extends Thread {
		@Override
		public void run() {
			user.waitCity();
		}
	}

	public static void main(String[] args) throws InterruptedException {
		for (int i = 0; i < 3; i++) {
			// 启动三个等待用户年龄变化的线程
			new CheckAge().start();
		}
		for (int i = 0; i < 3; i++) {
			// 启动三个等待用户城市变化的线程
			new CheckCity().start();
		}
		Thread.sleep(1000);
		user.changeCity();// 变更用户的城市
	}

}
main运行结果:
wait city [16] is notified!
the city is London
wait city [15] is notified!
the city is London
wait city [14] is notified!
the city is London
wait age [13] is notified!
wait age [12] is notified!
wait age [11] is notified!
/**
 *调用阻塞方法时,如何中断线程
 */
public class BlockInterrupt {

	private static Object o = new Object();

	/* while循环中包含try/catch块 */
	private static class WhileTryWhenBlock extends Thread {
		private volatile boolean on = true;
		private long i = 0;

		@Override
		public void run() {
			System.out.println("当前执行线程id:" + Thread.currentThread().getId());
			while (on && !Thread.currentThread().isInterrupted()) {
				System.out.println("i=" + i++);
				try {

					// 抛出中断异常的阻塞方法,抛出异常后,中断标志位会改为false
					// 能够理解为这些方法会隐含调用Thread.interrputed()方法
					synchronized (o) {
						o.wait();
					}

				} catch (InterruptedException e) {
					System.out.println("当前执行线程的中断标志位:"
							+ Thread.currentThread().getId() + ":"
							+ Thread.currentThread().isInterrupted());
					Thread.currentThread().interrupt();// 从新设置一下
					System.out.println("被中断的线程_" + getId() + ":"
							+ isInterrupted());
					// do my work
				}
				// 清理工做,准备结束线程
			}
		}

		public void cancel() {
			// on = false;
			interrupt();
			System.out.println("本方法所在线程实例:" + getId());
			System.out.println("执行本方法的线程:" + Thread.currentThread().getId());
			// Thread.currentThread().interrupt();
		}
	}

	/* try/catch块中包含while循环 */
	private static class TryWhileWhenBlock extends Thread {
		private volatile boolean on = true;
		private long i = 0;

		@Override
		public void run() {
			try {
				while (on) {
					System.out.println(i++);
					// 抛出中断异常的阻塞方法,抛出异常后,中断标志位改为false
					synchronized (o) {
						o.wait();
					}
				}
			} catch (InterruptedException e) {
				System.out.println("当前执行线程的中断标志位:"
						+ Thread.currentThread().getId() + ":"
						+ Thread.currentThread().isInterrupted());
			} finally {
				// 清理工做结束线程
			}
		}

		public void cancel() {
			on = false;
			interrupt();
		}
	}

	public static void main(String[] args) throws InterruptedException {
		WhileTryWhenBlock whileTryWhenBlock = new WhileTryWhenBlock();
		whileTryWhenBlock.start();
		Thread.sleep(100);
		whileTryWhenBlock.cancel();

		System.out.println("====================");
		TryWhileWhenBlock tryWhileWhenBlock = new TryWhileWhenBlock();
		tryWhileWhenBlock.start();
		Thread.sleep(100);
		tryWhileWhenBlock.cancel();
	}
}


输出结果
当前执行线程id:11
i=0
本方法所在线程实例:11
当前执行线程的中断标志位:11:false
执行本方法的线程:1
被中断的线程_11:true
====================
0
当前执行线程的中断标志位:11:false

线程间协做和通讯

每一个线程有本身栈空间,孤立运行,对咱们没有价值。若是多个线程可以相互配合完成工做,这将会带来巨大的价值。

volatile和synchronized

多个线程同时访问一个共享的变量的时候,每一个线程的工做内存有这个变量的一个拷贝,变量自己仍是保存在共享内存中。

Violate修饰字段,对这个变量的访问必需要从共享内存刷新一次。最新的修改写回共享内存。能够保证字段的可见性。绝对不是线程安全的,没有操做的原子性。

适用场景:一、一个线程写,多个线程读;二、volatile变量的变化很固定

关键字synchronized能够修饰方法或者以同步块的形式来进行使用,它主要确保多个线程在同一个时刻,只能有一个线程处于方法或者同步块中,它保证了线程对变量访问的可见性和排他性,又称为内置锁机制。

Synchronized的类锁和对象锁,本质上是两把锁,类锁实际锁的是每个类的class对象。对象锁锁的是当前对象实例。

/**
 * 测试Volatile型变量的操做原子性
 */
public class VolatileThread implements Runnable {

    private volatile  int a= 0;

    @Override
    public void run() {
        //synchronized (this){
            a=a+1;
            System.out.println(Thread.currentThread().getName()+"----"+a);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            a=a+1;
            System.out.println(Thread.currentThread().getName()+"----"+a);

        //}
    }
}
public class VolatileTest {
    public static void main(String[] args) {
        VolatileThread volatileThread = new VolatileThread();

        Thread t1 = new Thread(volatileThread);
        Thread t2 = new Thread(volatileThread);
        Thread t3 = new Thread(volatileThread);
        Thread t4 = new Thread(volatileThread);
        t1.start();
        t2.start();
        t3.start();
        t4.start();
    }
}

等待和通知机制

等待方原则:

一、获取对象锁

二、若是条件不知足,调用对象的wait方法,被通知后依然要检查条件是否知足

三、条件知足之后,才能执行相关的业务逻辑

Synchronized(对象){
	While(条件不知足){
	  对象.wait()
    }
    业务逻辑处理
}

通知方原则:

一、 得到对象的锁;

二、 改变条件;

三、 通知全部等待在对象的线程

Synchronized(对象){
	业务逻辑处理,改变条件
	对象.notify/notifyAll
}

 

/**
 * 有界阻塞队列
 */
public class BlockingQueueWN<T> {

	private List queue = new LinkedList<>();
	private final int limit; // 大小限制

	public BlockingQueueWN(int limit) {
		this.limit = limit;
	}

	// 入队
	public synchronized void enqueue(T item) throws InterruptedException {
		// 若是队列满了 等待
		while (this.queue.size() == this.limit) {
			wait();
		}
		// 若是队列为空 唤醒
		if (this.queue.size() == 0) {
			System.out.println("enqueue notifyAll");
			notifyAll();
		}
		// 入列
		this.queue.add(item);
	}

	// 出队
	public synchronized T dequeue() throws InterruptedException {
		// 若是队列为空 等待
		while (this.queue.size() == 0) {
			System.out.println("dequeue wait");
			wait();
		}
		// 若是队列满了 唤醒
		if (this.queue.size() == this.limit) {
			notifyAll();
		}
		// 出列
		return (T) this.queue.remove(0);
	}
}
public class BqTest {
	public static void main(String[] args) {
		BlockingQueueWN bq = new BlockingQueueWN(10);
		Thread threadA = new ThreadPush(bq);
		threadA.setName("Push");
		Thread threadB = new ThreadPop(bq);
		threadB.setName("Pop");
		threadB.start();
		threadA.start();
	}

	// 数据入队列线程
	private static class ThreadPush extends Thread {
		BlockingQueueWN<Integer> bq;

		public ThreadPush(BlockingQueueWN<Integer> bq) {
			this.bq = bq;
		}

		@Override
		public void run() {
			String threadName = Thread.currentThread().getName();
			int i = 20;
			// 循环20次入列
			while (i > 0) {
				try {
					Thread.sleep(1000);
					System.out.println(" i=" + i + " will push");
					bq.enqueue(i--);
				} catch (InterruptedException e) {
					// e.printStackTrace();
				}

			}
		}
	}

	// 数据出队列线程
	private static class ThreadPop extends Thread {
		BlockingQueueWN<Integer> bq;

		public ThreadPop(BlockingQueueWN<Integer> bq) {
			this.bq = bq;
		}

		@Override
		public void run() {
			// 无限循环 有就取
			while (true) {
				try {
					System.out.println(Thread.currentThread().getName()
							+ " will pop.....");
					Integer i = bq.dequeue();
					System.out.println(" i=" + i.intValue() + " alread pop");
				} catch (InterruptedException e) {
					// e.printStackTrace();
				}
			}

		}
	}
}
运行结果
Pop will pop.....
dequeue wait
 i=20 will push
enqueue notifyAll
 i=20 alread pop
Pop will pop.....
dequeue wait
 i=19 will push
enqueue notifyAll
 i=19 alread pop
Pop will pop.....
dequeue wait
 i=18 will push
enqueue notifyAll
 i=18 alread pop
Pop will pop.....
dequeue wait
 i=17 will push
enqueue notifyAll
 i=17 alread pop
Pop will pop.....
dequeue wait
 i=16 will push
enqueue notifyAll
 i=16 alread pop
Pop will pop.....
dequeue wait
 i=15 will push
enqueue notifyAll
 i=15 alread pop
Pop will pop.....
dequeue wait
 i=14 will push
enqueue notifyAll
 i=14 alread pop
Pop will pop.....
dequeue wait
 i=13 will push
enqueue notifyAll
 i=13 alread pop
Pop will pop.....
dequeue wait
 i=12 will push
enqueue notifyAll
 i=12 alread pop
Pop will pop.....
dequeue wait
 i=11 will push
enqueue notifyAll
 i=11 alread pop
Pop will pop.....
dequeue wait
 i=10 will push
enqueue notifyAll
 i=10 alread pop
Pop will pop.....
dequeue wait
 i=9 will push
enqueue notifyAll
 i=9 alread pop
Pop will pop.....
dequeue wait
 i=8 will push
enqueue notifyAll
 i=8 alread pop
Pop will pop.....
dequeue wait
 i=7 will push
enqueue notifyAll
 i=7 alread pop
Pop will pop.....
dequeue wait
 i=6 will push
enqueue notifyAll
 i=6 alread pop
Pop will pop.....
dequeue wait
 i=5 will push
enqueue notifyAll
 i=5 alread pop
Pop will pop.....
dequeue wait
 i=4 will push
enqueue notifyAll
 i=4 alread pop
Pop will pop.....
dequeue wait
 i=3 will push
enqueue notifyAll
 i=3 alread pop
Pop will pop.....
dequeue wait
 i=2 will push
enqueue notifyAll
 i=2 alread pop
Pop will pop.....
dequeue wait
 i=1 will push
enqueue notifyAll
 i=1 alread pop
Pop will pop.....
dequeue wait

管道输入输出流

文件输入输出,网络输入输出,管道输入输出流用于线程中间的数据传递,传输媒介的内存

管道是在线程间进行传送

四种实现

pipedOutputStream/input 面向的字节

pipedReader/Writer 面向的是字符

只适合线程间一对一的通讯,适用范围较狭窄。

public class PipeTransfer {

    private static class Print implements Runnable{
        private PipedReader in;

        public Print(PipedReader in) {
            this.in = in;
        }

        @Override
        public void run() {
           int receive =0;
            try {
                while((receive=in.read())!=-1){
                    System.out.println((char) receive);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws Exception {
        PipedWriter out = new PipedWriter();
        PipedReader in = new PipedReader();
        //必须进行链接
        out.connect(in);

        Thread t1 = new Thread(new Print(in),"PrintThread");
        t1.start();
        int receive =0;
        try {
            while((receive=System.in.read())!=-1){
                out.write(receive);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            out.close();
        }
    }

}

运行输入 good  
输出
g
o
o
d

join方法

线程A,执行了thread.join(),线程A等待thread线程终止了之后,A在join后面的语句才会继续执行

public class JoinTes {

	public static void main(String[] args) throws InterruptedException {
		ThreadJoinTest t1 = new ThreadJoinTest("小明");
		ThreadJoinTest t2 = new ThreadJoinTest("小东");
		t1.start();
		/**
		 * Thread类中的join方法的主要做用就是同步,它可使得线程之间的并行执行变为串行执行。
		 * 
		 * 1 join的意思是使得放弃当前线程的执行,并返回对应的线程,例以下面代码的意思就是:
		 * 程序在main线程中调用t1线程的join方法,则main线程放弃cpu控制权,并返回t1线程继续执行直到线程t1执行完毕
		 * 因此结果是t1线程执行完后,才到主线程执行,至关于在main线程中同步t1线程,t1执行完了,main线程才有执行的机会
		 * 
		 * 2 join方法能够传递参数,join(10)表示main线程会等待t1线程10毫秒,10毫秒过去后,
		 * main线程和t1线程之间执行顺序由串行执行变为普通的并行执行
		 * 若是A线程中掉用B线程的join(10),则表示A线程会等待B线程执行10毫秒,10毫秒事后,
		 * A、B线程并行执行。须要注意的是,jdk规定,join(0)的意思不是A线程等待B线程0秒,
		 * 而是A线程等待B线程无限时间,直到B线程执行完毕,即join(0)等价于join()
		 * 
		 * 3 join方法必须在线程start方法调用以后调用才有意义。这个也很容易理解:若是一个线程都没有start,那它也就没法同步了。
		 * 
		 * 4 join方法的原理就是调用相应线程的wait方法进行等待操做的,例如A线程中调用了B线程的join方法,
		 * 则至关于在A线程中调用了B线程的wait方法,当B线程执行完(或者到达等待时间),
		 * B线程会自动调用自身的notifyAll方法唤醒A线程,从而达到同步的目的。
		 */
		t1.join();
		t2.start();
	}

}

class ThreadJoinTest extends Thread {
	public ThreadJoinTest(String name) {
		super(name);
	}

	@Override
	public void run() {
		for (int i = 0; i < 10; i++) {
			System.out.println(this.getName() + ":" + i);
			try {
				sleep(1000);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}
}

运行结果
小明:0
小明:1
小明:2
小明:3
小明:4
小明:5
小明:6
小明:7
小明:8
小明:9
小东:0
小东:1
小东:2
小东:3
小东:4
小东:5
小东:6
小东:7
小东:8
小东:9
public class JoinTest {

	public static class CutInLine implements Runnable {

		private Thread thread;

		public CutInLine(Thread thread) {
			this.thread = thread;
		}

		@Override
		public void run() {

			try {
				// 在被插队的线程里,调用一下插队线程的join方法
				System.out.println(thread.getName() + " join "
						+ Thread.currentThread().getName());
				thread.join();

			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			System.out.println(Thread.currentThread().getName() + " will work");

		}
	}

	public static void main(String[] args) throws InterruptedException {
		Thread previous = Thread.currentThread();
		for (int i = 0; i < 10; i++) {
			Thread thread = new Thread(new CutInLine(previous),
					String.valueOf(i));
			thread.start();
			System.out.print(thread.getName() + " start ");
			previous = thread;
		}

	}

}

打印结果
0 start main join 0
1 start 0 join 1
2 start 1 join 2
3 start 2 join 3
4 start 3 join 4
5 start 4 join 5
6 start 5 join 6
7 start 6 join 7
8 start 7 join 8
9 start 8 join 9
0 will work
1 will work
2 will work
3 will work
4 will work
5 will work
6 will work
7 will work
8 will work
9 will work

ThreadLocal

本质是个map,map的键就是每一个线程对象,值就是每一个线程所拥有的值

经常使用方法:

initialValue()

get()

set()

remove():将当前线程局部变量的值删除,这个方法是JDK 5.0新增的方法。当线程结束后,对应该线程的局部变量将自动被垃圾回收,因此显式调用该方法清除线程的局部变量并非必须的操做,但它能够加快内存回收的速度。

ThreadLocal拥有的这个变量,在线程之间很独立的,相互之间没有联系。内存占用相对来讲比较大。

public class ThreadLocalTest {

    static ThreadLocal<String> threadLocal = new ThreadLocal<String>(){
        @Override
        protected String initialValue() {
            return "init";
        }
    };

    public void test(){
        Thread[] runs = new Thread[3];
        for(int i =0;i<runs.length;i++){
            runs[i]=new Thread(new T1(i));
        }
        for(int i =0;i<runs.length;i++){
            runs[i].start();
        }
    }

    private static class T1 implements Runnable{

        private int id;

        public T1(int id) {
            this.id = id;
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getId()+" start");
            String s = threadLocal.get();
            s = s+"_"+id;
            threadLocal.set(s);
            System.out.println(Thread.currentThread().getId()+s);
        }
    }

    public static void main(String[] args) {
        ThreadLocalTest test = new ThreadLocalTest();
        test.test();
    }
}

输出结果
11 start
13 start
11init_0
12 start
13init_2
12init_1

性能问题

串行化、无锁化、异步化编程是趋势之一,好比node.js,Vert.x。

黄金原则:编码时候不要考虑性能优化的事情,先正确实现业务,发现性能不行,这个时候再来考虑性能优化。

public class PerfermenTest {

    /** 执行次数 */
    private static final long count = 100000000;

    public static void main(String[] args) throws InterruptedException {
        //并发计算
        concurrency();
        //单线程计算
        serial();
    }

    private static void concurrency() throws InterruptedException {
        long start = System.currentTimeMillis();
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                int a = 0;
                for (long i = 0; i < count; i++) {
                    a += 5;
                }
                System.out.println("a="+a);
            }
        });
        thread.start();
        int b = 0;
        for (long i = 0; i < count; i++) {
            b--;
        }
        thread.join();
        long time = System.currentTimeMillis() - start;
        System.out.println("concurrency :" + time + "ms,b=" + b);
    }

    private static void serial() {
        long start = System.currentTimeMillis();
        int a = 0;
        for (long i = 0; i < count; i++) {
            a += 5;
        }
        int b = 0;
        for (long i = 0; i < count; i++) {
            b--;
        }
        long time = System.currentTimeMillis() - start;
        System.out.println("serial:" + time + "ms,b=" + b + ",a=" + a);
    }

}

输出结果
a=500000000
concurrency :41ms,b=-100000000
serial:70ms,b=-100000000,a=500000000

等待超时模式

调用场景:调用一个方法时等待一段时间(通常来讲是给定一个时间段),若是该方法可以在给定的时间段以内获得结果,那么将结果马上返回,反之,超时返回默认结果。

假设等待时间段是T,那么能够推断出在当前时间now+T以后就会超时
等待持续时间:REMAINING=T。
·超时时间:FUTURE=now+T。
// 对当前对象加锁
public synchronized Object get(long mills) throws InterruptedException {
   long future = System.currentTimeMillis() + mills;
   long remaining = mills;
   // 当超时大于0而且result返回值不知足要求
   while ((result == null) && remaining > 0) {
      wait(remaining);
      remaining = future - System.currentTimeMillis();
   }
   return result;
}
public class ConnectionDriver {

	public static final Connection getConnectiong() {
		return new ConnectionImpl();
	}

	private static class ConnectionImpl implements Connection {

		@Override
		public Statement createStatement() throws SQLException {
			System.out.println("建立SQL " + Thread.currentThread().getId());
			return null;
		}

		@Override
		public void commit() throws SQLException {
			try {
				System.err.println(Thread.currentThread().getId() + "准备提交数据");
				TimeUnit.MILLISECONDS.sleep(100);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}

		@Override
		public PreparedStatement prepareStatement(String sql)
				throws SQLException {
			return null;
		}

		@Override
		public CallableStatement prepareCall(String sql) throws SQLException {
			return null;
		}

		@Override
		public String nativeSQL(String sql) throws SQLException {
			return null;
		}

		@Override
		public void setAutoCommit(boolean autoCommit) throws SQLException {

		}

		@Override
		public boolean getAutoCommit() throws SQLException {
			return false;
		}

		@Override
		public void rollback() throws SQLException {

		}

		@Override
		public void close() throws SQLException {

		}

		@Override
		public boolean isClosed() throws SQLException {
			return false;
		}

		@Override
		public DatabaseMetaData getMetaData() throws SQLException {
			return null;
		}

		@Override
		public void setReadOnly(boolean readOnly) throws SQLException {

		}

		@Override
		public boolean isReadOnly() throws SQLException {
			return false;
		}

		@Override
		public void setCatalog(String catalog) throws SQLException {

		}

		@Override
		public String getCatalog() throws SQLException {
			return null;
		}

		@Override
		public void setTransactionIsolation(int level) throws SQLException {

		}

		@Override
		public int getTransactionIsolation() throws SQLException {
			return 0;
		}

		@Override
		public SQLWarning getWarnings() throws SQLException {
			return null;
		}

		@Override
		public void clearWarnings() throws SQLException {

		}

		@Override
		public Statement createStatement(int resultSetType,
				int resultSetConcurrency) throws SQLException {
			return null;
		}

		@Override
		public PreparedStatement prepareStatement(String sql,
				int resultSetType, int resultSetConcurrency)
				throws SQLException {
			return null;
		}

		@Override
		public CallableStatement prepareCall(String sql, int resultSetType,
				int resultSetConcurrency) throws SQLException {
			return null;
		}

		@Override
		public Map<String, Class<?>> getTypeMap() throws SQLException {
			return null;
		}

		@Override
		public void setTypeMap(Map<String, Class<?>> map) throws SQLException {

		}

		@Override
		public void setHoldability(int holdability) throws SQLException {

		}

		@Override
		public int getHoldability() throws SQLException {
			return 0;
		}

		@Override
		public Savepoint setSavepoint() throws SQLException {
			return null;
		}

		@Override
		public Savepoint setSavepoint(String name) throws SQLException {
			return null;
		}

		@Override
		public void rollback(Savepoint savepoint) throws SQLException {

		}

		@Override
		public void releaseSavepoint(Savepoint savepoint) throws SQLException {

		}

		@Override
		public Statement createStatement(int resultSetType,
				int resultSetConcurrency, int resultSetHoldability)
				throws SQLException {
			return null;
		}

		@Override
		public PreparedStatement prepareStatement(String sql,
				int resultSetType, int resultSetConcurrency,
				int resultSetHoldability) throws SQLException {
			return null;
		}

		@Override
		public CallableStatement prepareCall(String sql, int resultSetType,
				int resultSetConcurrency, int resultSetHoldability)
				throws SQLException {
			return null;
		}

		@Override
		public PreparedStatement prepareStatement(String sql,
				int autoGeneratedKeys) throws SQLException {
			return null;
		}

		@Override
		public PreparedStatement prepareStatement(String sql,
				int[] columnIndexes) throws SQLException {
			return null;
		}

		@Override
		public PreparedStatement prepareStatement(String sql,
				String[] columnNames) throws SQLException {
			return null;
		}

		@Override
		public Clob createClob() throws SQLException {
			return null;
		}

		@Override
		public Blob createBlob() throws SQLException {
			return null;
		}

		@Override
		public NClob createNClob() throws SQLException {
			return null;
		}

		@Override
		public SQLXML createSQLXML() throws SQLException {
			return null;
		}

		@Override
		public boolean isValid(int timeout) throws SQLException {
			return false;
		}

		@Override
		public void setClientInfo(String name, String value)
				throws SQLClientInfoException {

		}

		@Override
		public void setClientInfo(Properties properties)
				throws SQLClientInfoException {

		}

		@Override
		public String getClientInfo(String name) throws SQLException {
			return null;
		}

		@Override
		public Properties getClientInfo() throws SQLException {
			return null;
		}

		@Override
		public Array createArrayOf(String typeName, Object[] elements)
				throws SQLException {
			return null;
		}

		@Override
		public Struct createStruct(String typeName, Object[] attributes)
				throws SQLException {
			return null;
		}

		@Override
		public void setSchema(String schema) throws SQLException {

		}

		@Override
		public String getSchema() throws SQLException {
			return null;
		}

		@Override
		public void abort(Executor executor) throws SQLException {

		}

		@Override
		public void setNetworkTimeout(Executor executor, int milliseconds)
				throws SQLException {

		}

		@Override
		public int getNetworkTimeout() throws SQLException {
			return 0;
		}

		@Override
		public <T> T unwrap(Class<T> iface) throws SQLException {
			return null;
		}

		@Override
		public boolean isWrapperFor(Class<?> iface) throws SQLException {
			return false;
		}
	}

}
/**
 * 数据库链接池
 * 从链接池中获取、使用和释放链接的过程,而客户端获取链接的过程被设定为等待超时的模式,
 * 也就是在1000毫秒内若是没法获取到可用链接,将会返回给客户端一个null。
 * 设定链接池的大小为10个,而后经过调节客户端的线程数来模拟没法获取链接的场景。
 * 链接池的定义。它经过构造函数初始化链接的最大上限,经过一个双向队列来维护链接,
 * 调用方须要先调用fetchConnection(long)方法来指定在多少毫秒内超时获取链接,当链接使用完成后,
 * 须要调用releaseConnection(Connection)方法将链接放回线程池
 */
public class ConnectionPool {

	// 存放链接的容器
	private LinkedList<Connection> pool = new LinkedList<Connection>();

	public ConnectionPool(int initialSize) {
		if (initialSize > 0) {
			for (int i = 0; i < initialSize; i++) {
				pool.addLast(ConnectionDriver.getConnectiong());
			}
		}
	}

	/* 将链接放回线程池 */
	public void releaseConnection(Connection connection) {
		if (connection != null) {
			synchronized (pool) {
				// 添加后须要进行通知,这样其余消费者可以感知到连接池中已经归还了一个连接
				pool.addLast(connection);
				pool.notifyAll();
			}
		}
	}

	/*
	 * 指定在多少毫秒内超时获取链接,在指定时间内没法获取到链接,将会返回null
	 */
	public Connection fetchConnection(long mills) throws InterruptedException {
		synchronized (pool) {
			// 彻底超时
			if (mills <= 0) {
				while (pool.isEmpty()) {
					pool.wait();
				}
				return pool.removeFirst();
			} else {
				long future = System.currentTimeMillis() + mills;// 何时超时
				long remaining = mills;// 超时时长
				while (pool.isEmpty() && remaining > 0) {
					pool.wait(remaining);
					remaining = future - System.currentTimeMillis();// 当前还须等待的时长
				}
				Connection result = null;
				if (!pool.isEmpty()) {
					result = pool.removeFirst();
				}
				return result;
			}
		}
	}
}
public class ConnectionPoolTest {
	static ConnectionPool pool = new ConnectionPool(10);
	// 保证全部ConnectionRunner可以同时开始
	static CountDownLatch start = new CountDownLatch(1);
	// main线程将会等待全部ConnectionRunner结束后才能继续执行
	static CountDownLatch end;

	public static void main(String[] args) throws Exception {
		// 线程数量,能够线程数量进行观察
		int threadCount = 50;
		end = new CountDownLatch(threadCount);
		int count = 10;// 每一个线程循环取20次
		AtomicInteger got = new AtomicInteger();// 获取到数据库链接的次数
		AtomicInteger notGot = new AtomicInteger();// 没有获取到数据库链接的次数
		for (int i = 0; i < threadCount; i++) {
			Thread thread = new Thread(new ConnetionRunner(count, got, notGot),
					"ConnectionRunnerThread");
			thread.start();
		}
		start.countDown();
		end.await();
		System.out.println("total invoke: " + (threadCount * count));
		System.out.println("got connection:  " + got);
		System.out.println("not got connection " + notGot);
	}

	static class ConnetionRunner implements Runnable {
		int count;
		AtomicInteger got;
		AtomicInteger notGot;

		public ConnetionRunner(int count, AtomicInteger got,
				AtomicInteger notGot) {
			this.count = count;
			this.got = got;
			this.notGot = notGot;
		}

		public void run() {
			try {
				start.await();
			} catch (Exception ex) {

			}
			while (count > 0) {
				try {
					// 从线程池中获取链接,若是1000ms内没法获取到,将会返回null
					// 分别统计链接获取的数量got和未获取到的数量notGot
					Connection connection = pool.fetchConnection(1000);
					if (connection != null) {
						try {
							connection.createStatement();
							connection.commit();
						} finally {
							pool.releaseConnection(connection);
							got.incrementAndGet();
						}
					} else {
						notGot.incrementAndGet();
					}
				} catch (Exception ex) {
				} finally {
					count--;
				}
			}
			end.countDown();
		}
	}
}

运行结果:
建立SQL 41
41准备提交数据
41准备提交数据
建立SQL 41
total invoke: 500
got connection:  432
not got connection 68
相关文章
相关标签/搜索