Android ThreadLocal+PriorityQueue构建多线程队列

1、消息队列

Android中使用了不少消息队列,如Intent,Handler,BroadcastReceiver等。使用消息队列的目的是防止线程阻塞而且将不能及时执行的任务缓存起来,从而提升系统的运行效率。java

为了使得消息队列具备全局性,而且重用性,建议定义在Application或者单例对象的类中缓存

public class QApplication extends Application{

   public static final ThreadLocal<PriorityQueue<TaskMessage>> massegQueue = new ThreadLocal<PriorityQueue<TaskMessage>>();
    @Override
    public void onCreate() {
	super.onCreate();
    }
}

TaskMessage是一个任务消息,例如以下,注意,必须实现Comparable接口并发

package com.tianwt.app;

import java.io.Serializable;


public class TaskMessage implements Serializable,Comparable
{
	private long id;
	private String name;
	private String type;
	
	
	
	public TaskMessage(long id, String name, String type) {
		super();
		this.id = id;
		this.name = name;
		this.type = type;
	}

	public void show()
	{
	     System.out.println("id="+id+" , name="+name+" , type="+type);
	}

	public long getId() {
		return id;
	}

	public void setId(long id) {
		this.id = id;
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public String getType() {
		return type;
	}

	public void setType(String type) {
		this.type = type;
	}

	@Override
	public int compareTo(Object arg0) {
		if(TaskMessage.class.isInstance(arg0))
		{
			TaskMessage tm = (TaskMessage) arg0;
			return tm.type.compareTo(type);
		}
		return 0;
	}
	
	
}

ThreadLocal是一个处理高并发问题的以线程为做用域的类。app

在Android中,消息的队列使用了ThreadLocal,读者可自行查Handler类相关的源码。dom

PriorityQueue是一个能够调整优先级的消息队列ide

使用这种队列的好处是,n个线程中能够出现n个消息队列,这个Handler相似,另外ThreadLocal是静态的,但这并不会影响同步问题,由于它是以线程为做用域。高并发

对于ThreadLocal,可参见下面的文章。this

Java中高并发任务中的ThreadLocal的用法解析spa

对于队列的用法,请参考:Java 队列协做同步.net

2、代码实战

下面给一个简单的例子

package com.tianwt.app;

import java.util.PriorityQueue;

public class ThreadLocalQueue {
	// 指定初始值
	public static final ThreadLocal<PriorityQueue<TaskMessage>> massegQueue = new ThreadLocal<PriorityQueue<TaskMessage>>();

	public static void main(String[] args) {

		TestThreadQueue t1 = new TestThreadQueue();
		TestThreadQueue t2 = new TestThreadQueue();
		TestThreadQueue t3 = new TestThreadQueue();
		t1.start();
		t2.start();
		t3.start();
		
		
		TestThreadQueue[] tset = {t1,t2,t3};
		for (int i = 0; i < 100; i++) {
			
			tset[i%3].addTask(new TaskMessage(i,(i%3+1)+ "号线程", Math.random()*10000+""));
			
		}
	}

	private static class TestThreadQueue extends Thread {

		boolean isStop;
		private PriorityQueue<TaskMessage> taskQueue = null;

		public TestThreadQueue() {

		}

		public void shutDown() {
			if (taskQueue != null) {
				isStop = true;
				synchronized (taskQueue) {
					taskQueue.notify();
				}
			}
		}

		public void addTask(TaskMessage msg) {
			if (taskQueue != null && !isStop) {
				synchronized (taskQueue) {
					taskQueue.add(msg);
					taskQueue.notify();
				}
			}
		}

		public void run() {
			PriorityQueue<TaskMessage> qm = ThreadLocalQueue.massegQueue.get();
			if (qm == null) {
				taskQueue = new PriorityQueue<TaskMessage>(5);
				ThreadLocalQueue.massegQueue.set(taskQueue);
				qm = taskQueue;
			}

			while (!isStop) {
				synchronized (qm) {
					try {
						if (qm.size() != 0) {
							TaskMessage msgTask = qm.poll();
							msgTask.show();

						} else {
							qm.wait();
						}
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}
		}
	}

}
相关文章
相关标签/搜索