十六,java多线程详解

1.线程的操做方法

1.1设置和取得名字

设置名字 java

set方法:
android

public final void setName(String name)
构造方法 :
public Thread (Runable target ,String name)

public Thread (String name)
在线程操做中 ,由于其操做的不肯定性 ,因此提供了一个方法 ,取得当前线程名 .

public static Thread currentThread() web

示例:
算法

Thread.currentThread().getName()

注意 :在程序运行时 ,主方法实际上就是一个线程 .能够写以下代码验证 . 数据库

示例:
编程

public class ThreadDemo {
	public static void main(String[] args) {
		MyThread mt = new MyThread(); 
		new Thread(mt,”自定义线程”).start() ;
                          mt.run();  //这个是主线程中执行的.
                }
}
注: 执行一个 java程序至少启动两个线程 ,一个是 main主线程 ,一个是 gc垃圾回收线程 .


1.2线程的休眠

线程休眠的方法是:
设计模式

public static void sleep(long millis ) throws InterruptedException 服务器

因此使用此方法时须要使用try...catch...来处理. 多线程

1.3线程的中断

sleep()方法中,存在InterruptedException这个异常,中断能够引起这个异常. 并发

public void interrupt()

1.4设置线程的优先级

public final void setPriority(int newPriority)

java线程的优先级:

  • public static final int MAX_PRIORITY 10

  • public static final int MIN_PRIORITY 1

  • public static final int NORM_PRIORITY 5

注意: main方法的优先级是5,NORM_PRIORITY普通优先级.


2.线程的同步与死锁

2.1问题引出

示例:

package org.lxh.syndemo;
class MyTicketThread implements Runnable {// 实现Runnable接口
	private int ticket = 5; // 一共才5张票
	public void run() {// 覆写run()方法
		for (int i = 0; i < 50; i++) {// 表示循环10次
			if (this.ticket > 0) {
				try {
					Thread.sleep(300);// 延迟致使数据操做出现问题
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				System.out.println("卖票:ticket = " + this.ticket--);
			}
		}
	}
}
public class SynDemo01 {

	public static void main(String[] args) {
		MyTicketThread mt = new MyTicketThread();
		new Thread(mt, "票贩子a").start();
		new Thread(mt, "票贩子b").start();
		new Thread(mt, "票贩子c").start();
	}
}
问题出现了 :可能有两个线程同时操做一个变量的时候 ,都知足大于 1的条件 ,可是都执行了 - - 操做 .致使数据出现问题 ,这时引入了同步与锁定 .

2.2同步的实现

java中能够经过同步代码的方式进行代码的加锁操做,同步的实现有两种方式:

  • 同步代码块

  • 同步方法

2.2.1同步代码块

格式:

synchronized(对象){ //通常都是将this进行锁定

//须要同步的代码

}
示例 :
class MyTicketThread implements Runnable {// 实现Runnable接口
	private int ticket = 5; // 一共才5张票
	public void run() {// 覆写run()方法
		for (int i = 0; i < 50; i++) {// 表示循环10次
                                  synchronized(this){
			if (this.ticket > 0) {
				try {
					Thread.sleep(300);// 延迟致使数据操做出现问题
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				System.out.println("卖票:ticket = " + this.ticket--);
			}
                                 }
		}
	}
}


2.2.2同步方法

示例:

class MyTicketThread implements Runnable {// 实现Runnable接口
	private int ticket = 5; // 一共才5张票
	public void run() {// 覆写run()方法
		for (int i = 0; i < 50; i++) {// 表示循环10次
			this.sale();   //同步方法
		}
	}
}

public void synchronized sale(){   //同步方法
              if (this.ticket > 0) {
		try {
		     Thread.sleep(300);// 延迟致使数据操做出现问题
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println("卖票:ticket = " + this.ticket--);
	 }
}
:java中方法定义的完整格式

[访问权限: public private protected default ]

[static final abstract synchronized native ]

返回值类型 void

方法名词([参数列表]) [throw 异常1,异常2,异常3...]{}


2.3死锁

程序中,过多的同步会致使死锁的问题.

示例:

package com.android.syndemo;
class Bangfei {
	public synchronized void say(QinYou qy) {
		System.out.println("把钱给我,我放人");
		qy.give();
	}

	public synchronized void give() {
		System.out.println("获得了钱,同时溜之大吉");
	}
}

class QinYou {
	public synchronized void say( Bangfei bf) {
		System.out.println("把人放了,我给你钱.");
		bf.give();
	}

	public synchronized void give() {
		System.out.println("人救回来,同时报案了.");
	}
}

public class DeadLockDemo implements Runnable {
	private BangFei bf = new BangFei();
	private QinYou qy = new QinYou();
	public DeadLockDemo() {
		new Thread(this).start() ;
   //这句致使死锁.
		qy.say(bf);
	}
	public void run() {
		bf.say(qy);
	}
	public static void main(String[] args) {
		new DeadLockDemo();
	}
}
多线程共享一个数据的时候须要进行同步 ,可是过多的同步会致使死锁 ,即各个线程进入相互等待的状态而中止执行.


3.生产者和消费者模式

3.1为什么引入生产者消费者模式

在并发编程中使用生产者和消费者模式可以解决绝大多数并发问题.该模式经过平衡生产线程和消费线程的工做能力来提升程序的总体处理数据的速度.

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程.在多线程开发当中,若是生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据.一样的道理,若是消费者的处理能力大于生产者,那么消费者就必须等待生产者.为了解决这个问题因而引入了生产者和消费者模式.

3.2生产者消费者模式定义

生产者消费者模式是经过一个容器来解决生产者和消费者的强耦合问题.生产者和消费者彼此之间不直接通信,而经过阻塞队列来进行通信,因此生产者生产完数据以后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就至关于一个缓冲区,平衡了生产者和消费者的处理能力.

这个阻塞队列就是用来给生产者和消费者解耦的.纵观大多数设计模式,都会找一个第三者出来进行解耦,如工厂模式的第三者是工厂类,模板模式的第三者是模板类.在学习一些设计模式的过程当中,若是先找到这个模式的第三者,能帮助咱们快速熟悉一个设计模式.

3.3生产者消费者模式实践

我和同事一块儿利用业余时间开发的Yuna工具中使用了生产者和消费者模式.首先我先介绍下Yuna工具,在阿里巴巴不少同事都喜欢经过邮件分享技术文章,由于经过邮件分享很方便,同窗们在网上看到好的技术文章,复制粘贴发送就完成了分享,可是咱们发现技术文章不能沉淀下来,对于新来的同窗看不到之前分享的技术文章,你们也很难找到之前分享过的技术文章.为了解决这问题,咱们开发了Yuna工具.Yuna取名自我喜欢的一款游戏最终幻想里的女主角.

首先咱们申请了一个专门用来收集分享邮件的邮箱,好比share@alibaba.com,同窗将分享的文章发送到这个邮箱,让同窗们每次都抄送到这个邮箱确定很麻烦,因此咱们的作法是将这个邮箱地址放在部门邮件列表里,因此分享的同窗只须要象之前同样向整个部门分享文章就行,Yuna工具经过读取邮件服务器里该邮箱的邮件,把全部分享的邮件下载下来,包括邮件的附件,图片,和邮件回复,咱们可能会从这个邮箱里下载到一些非分享的文章,因此咱们要求分享的邮件标题必须带有一个关键字,好比[内贸技术分享],下载完邮件以后,经过confluenceweb service接口,把文章插入到confluence,这样新同事就能够在confluence里看之前分享过的文章,而且Yuna工具还能够自动把文章进行分类和归档.

为了快速上线该功能,当时咱们花了三天业余时间快速开发了Yuna1.0版本.1.0版本中我并无使用生产者消费模式,而是使用单线程来处理,由于当时只须要处理咱们一个部门的邮件,因此单线程明显够用,整个过程是串行执行的.在一个线程里,程序先抽取所有的邮件,转化为文章对象,而后添加所有的文章,最后删除抽取过的邮件.

示例:

public void extract() {
        logger.debug("开始" + getExtractorName() + "..");
        //抽取邮件
        List<Article> articles = extractEmail();
        //添加文章
        for (Article article : articles) {
            addArticleOrComment(article);
        }
        //清空邮件
        cleanEmail();
        logger.debug("完成" + getExtractorName() + "..");
    }
Yuna工具在推广后 ,愈来愈多的部门使用这个工具 ,处理的时间愈来愈慢 ,Yuna是每隔 5分钟进行一次抽取的 ,而当邮件多的时候一次处理可能就花了几分钟 ,因而我在 Yuna2.0版本里使用了生产者消费者模式来处理邮件 ,首先生产者线程按必定的规则去邮件系统里抽取邮件 ,而后存放在阻塞队列里 ,消费者从阻塞队列里取出文章后插入到 conflunce.

示例:

public class QuickEmailToWikiExtractor extends AbstractExtractor {

private ThreadPoolExecutor      threadsPool;

private ArticleBlockingQueue<ExchangeEmailShallowDTO> emailQueue;

public QuickEmailToWikiExtractor() {
        emailQueue= new ArticleBlockingQueue<ExchangeEmailShallowDTO>();
        int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
        threadsPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, 10l, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(2000));
    
    }

public void extract() {
        logger.debug("开始" + getExtractorName() + "..");
        long start = System.currentTimeMillis();

        //抽取全部邮件放到队列里
        new ExtractEmailTask().start();

        // 把队列里的文章插入到Wiki
        insertToWiki();

        long end = System.currentTimeMillis();
        double cost = (end - start) / 1000;
        logger.debug("完成" + getExtractorName() + ",花费时间:" + cost + "秒");
    }

    

    /**
     * 把队列里的文章插入到Wiki
     */
    private void insertToWiki() {
        //登陆wiki,每间隔一段时间须要登陆一次
        confluenceService.login(RuleFactory.USER_NAME, RuleFactory.PASSWORD);

        while (true) {
            //2秒内取不到就退出
            ExchangeEmailShallowDTO email = emailQueue.poll(2, TimeUnit.SECONDS);
            if (email == null) {
                break;
            }
            threadsPool.submit(new insertToWikiTask(email));
        }
    }


     protected List<Article> extractEmail() {
        List<ExchangeEmailShallowDTO> allEmails = getEmailService().queryAllEmails();
        if (allEmails == null) {
            return null;
        }
        for (ExchangeEmailShallowDTO exchangeEmailShallowDTO : allEmails) {
            emailQueue.offer(exchangeEmailShallowDTO);
        }
        return null;
    }

    /**
     * 抽取邮件任务
     * 
     * @author tengfei.fangtf
     */
    public class ExtractEmailTask extends Thread {
        public void run() {
            extractEmail();
        }
    }
}

3.4多生产者和多消费者场景

在多核时代,多线程并发处理速度比单线程处理速度更快,因此咱们可使用多个线程来生产数据,一样可使用多个消费线程来消费数据.而更复杂的状况是,消费者消费的数据,有可能须要继续处理,因而消费者处理完数据以后,它又要做为生产者把数据放在新的队列里,交给其余消费者继续处理.以下图:

咱们在一个长链接服务器中使用了这种模式,生产者1负责将全部客户端发送的消息存放在阻塞队列1,消费者1从队列里读消息,而后经过消息ID进行hash获得N个队列中的一个,而后根据编号将消息存放在到不一样的队列里,每一个阻塞队列会分配一个线程来消费阻塞队列里的数据.若是消费者2没法消费消息,就将消息再抛回到阻塞队列1,交给其余消费者处理.

如下是消息总队列的代码:

/**
 * 总消息队列管理
 * 
 * @author tengfei.fangtf
 */
public class MsgQueueManager implements IMsgQueue{

    private static final Logger              LOGGER             
 = LoggerFactory.getLogger(MsgQueueManager.class);


    /**
     * 消息总队列
     */
    public final BlockingQueue<Message> messageQueue;

    private MsgQueueManager() {
        messageQueue = new LinkedTransferQueue<Message>();
    }

    public void put(Message msg) {
        try {
            messageQueue.put(msg);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public Message take() {
        try {
            return messageQueue.take();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return null;
    }

}
启动一个消息分发线程 .在这个线程里子队列自动去总队列里获取消息 .

/**
     * 分发消息,负责把消息从大队列塞到小队列里
     * 
     * @author tengfei.fangtf
     */
    static class DispatchMessageTask implements Runnable {
        @Override
        public void run() {
            BlockingQueue<Message> subQueue;
            for (;;) {
                //若是没有数据,则阻塞在这里
                Message msg = MsgQueueFactory.getMessageQueue().take();
                //若是为空,则表示没有Session机器链接上来,
须要等待,直到有Session机器链接上来
                while ((subQueue = getInstance().getSubQueue()) == null) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
                //把消息放到小队列里
                try {
                    subQueue.put(msg);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
使用 Hash算法获取一个子队列 .
/**
     * 均衡获取一个子队列.
     * 
     * @return
     */
    public BlockingQueue<Message> getSubQueue() {
        int errorCount = 0;
        for (;;) {
            if (subMsgQueues.isEmpty()) {
                return null;
            }
            int index = (int) (System.nanoTime() % subMsgQueues.size());
            try {
                return subMsgQueues.get(index);
            } catch (Exception e) {
                //出现错误表示,在获取队列大小以后,队列进行了一次删除操做
                LOGGER.error("获取子队列出现错误", e);
                if ((++errorCount) < 3) {
                    continue;
                }
            }
        }
    }

使用的时候咱们只须要往总队列里发消息.

//往消息队列里添加一条消息
        IMsgQueue messageQueue = MsgQueueFactory.getMessageQueue();
        Packet msg = Packet.createPacket(Packet64FrameType.
TYPE_DATA, "{}".getBytes(), (short) 1);
        messageQueue.put(msg);
小结

本章讲解了生产者消费者模式,并给出了实例.读者能够在平时的工做中思考下哪些场景可使用生产者消费者模式,我相信这种场景应该很是之多,特别是须要处理任务时间比较长的场景,好比上传附件并处理,用户把文件上传到系统后,系统把文件丢到队列里,而后马上返回告诉用户上传成功,最后消费者再去队列里取出文件处理.好比调用一个远程接口查询数据,若是远程服务接口查询时须要几十秒的时间,那么它能够提供一个申请查询的接口,这个接口把要申请查询任务放数据库中,而后该接口马上返回.而后服务器端用线程轮询并获取申请任务进行处理,处理完以后发消息给调用方,让调用方再来调用另一个接口拿数据.

另外Java中的线程池类其实就是一种生产者和消费者模式的实现方式,可是实现方法更高明.生产者把任务丢给线程池,线程池建立线程并处理任务,若是将要运行的任务数大于线程池的基本线程数就把任务扔到阻塞队列里,这种作法比只使用一个阻塞队列来实现生产者和消费者模式显然要高明不少,由于消费者可以处理直接就处理掉了,这样速度更快,而生产者先存,消费者再取这种方式显然慢一些.

咱们的系统也可使用线程池来实现多生产者消费者模式.好比建立N个不一样规模的Java线程池来处理不一样性质的任务,好比线程池1将数据读到内存以后,交给线程池2里的线程继续处理压缩数据.线程池1主要处理IO密集型任务,线程池2主要处理CPU密集型任务.

第3部分参考自:http://www.infoq.com/cn/articles/producers-and-consumers-mode/



20150419


JAVA学习笔记系列

--------------------------------------------

                    联系方式

--------------------------------------------

        Weibo: ARESXIONG

        E-Mail: aresxdy@gmail.com

------------------------------------------------
相关文章
相关标签/搜索