1:关于smack与tigase的用法跟做用请你们本身去网上查看相关资料,这里就不作描述了。java
PS:这篇文章主要是说明在客户端jvm建立的最大线程数的大小。多线程
以前公司要求作一个客户端用于测试刚刚部署的tigase的性能,因此项目经理就安排了一个事情就是本身动手在客户端写一个基于smack长链接的压力测试工具。并发
初期的要求是这样的:jvm
一、并发注册10000个用户ide
二、用户之间相互收发消息工具
三、能够调整并发数量性能
用smack 作客户端链接测试
因为以前对多线程这块懂的不是很深,在这个过程当中碰了走了不少弯路(总觉得是代码的问题,实际上不是),个人机子的配置是:32位,xp,4G内存。本身写了个程序不管怎么跑在线数都不能突破2000,离要求还差一大截呢!本身反反复复检查程序,没啥不对啊!后来使用jconsole工具观察发现线程数6k一直是最高峰,难怪用户量上不去,要知道要保证10000个用户同时在线至少客户端要启动10000个线程吧!问题总算找到了,因而百度了一下,发现了一个jvm启动线程数的公式:this
(MaxProcessMemory - JVMMemory - ReservedOsMemory) / (ThreadStackSize) = Number of threads
MaxProcessMemory 指的是一个进程的最大内存
JVMMemory JVM内存
ReservedOsMemory 保留的操做系统内存
ThreadStackSize 线程栈的大小spa
原来:在java语言里, 当你建立一个线程的时候,虚拟机会在JVM内存建立一个Thread对象同时建立一个操做系统线程,而这个系统线程的内存用的不是JVMMemory,而是系统中剩下的内存(MaxProcessMemory - JVMMemory - ReservedOsMemory)
OK问题找到了有了解决问题的思路:根据本身的测试发现当给jvm设置的内存数为512m时建立的线程数是最多的,以下:
2*1024*1024-1024*64-512*1024)/128=11776
又经过观察发现要保持一个长链接本地至少了启动四个线程因而支持的在线用户数为:11776/4
OK:测试结果想以下:
看图说话,哈哈 问题解决:
代码以下:
public class SmackConf {
public static String server="XXXX";
public static int port=5222;
private SmackConf(){};
//初始化配置
private static class ConnectionConfig{
private static ConnectionConfiguration config;
static{
config = new ConnectionConfiguration(server, 5222, "XXX");
config.setSASLAuthenticationEnabled(false);
config.setSecurityMode(ConnectionConfiguration.SecurityMode.disabled);
//config.setSendPresence(false);
System.out.println("加载配置文件类完毕");
}
}
public static ConnectionConfiguration getConnectionConfiguration(){
return ConnectionConfig.config;
}
}
public class SmackUtils {
private static Logger log = Logger.getLogger(SmackUtils.class);
private static int sleepTime = 1000;
public static void chart(User user) {
Connection connection = MapsUtils.getConnection(user.getUsername());
try {
if(connection==null||!connection.isConnected()){
initConnection(user);
}
if (!connection.isAuthenticated()) {
initLogin(user);
}
synchronized (connection) {
PacketFilter filter = new MessageTypeFilter(Message.Type.chat);
connection.addPacketListener(new PacketListener() {
@Override
public void processPacket(Packet packet) {
Message message = (Message) packet;
if (message.getBody() != null) {
String fromName = (String) message
.getProperty("fromId");
System.out.println("Got text [" + message.getBody()
+ "] from [" + fromName + "]");
}
}
}, filter);
}
} catch (Exception e) {
System.out.println("chart===="+e.getMessage());
log.info("chart===="+e.getMessage());
}
}
/*
* 得到链接
*/
private static void getConnection(User user, int i) throws Exception {
Connection connection = null;
try {
log.info("第" + i + "次开始申请链接,用户:" + user.getUsername());
connection = MapsUtils.getConnection(user.getUsername());
if (connection == null) {
connection = new XMPPConnection(SmackConf.getConnectionConfiguration());
MapsUtils.put(user.getUsername(), connection);
}
if(connection.isConnected()){
connection.disconnect();
}
connection.connect();
user.setConnectionFa(true);
log.info("第" + i + "次申请链接,用户:" + user.getUsername() + "成功");
} catch (Exception e) {
log.info("第" + i + "次申请链接,用户:" + user.getUsername() + "失败");
System.out.println("得到链接的err:"+e.getMessage());
log.info("得到链接的err:"+e.getMessage());
throw e;
}
}
public static void getConnection(User user) {
/** 创建链接 */
try {
getConnection(user, 0);
} catch (Exception e) {
while (user.getiConnent() < MapsUtils.count
&& !user.isConnectionFa()) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e1) {
}
user.setiConnent(user.getiConnent() + 1);
try {
getConnection(user, user.getiConnent());
} catch (Exception e1) {
e1.printStackTrace();
}
}
}
}
private static void initConnection(User user) throws Exception {
try {
log.info("开始初始化建立链接:" + user.getUsername());
user.setConnectionFa(false);
user.setiConnent(0);
getConnection(user);
} catch (Exception e) {
throw e;
}
}
/**
* 注册新用户
*
* @param i
* @throws Exception
*/
private static void createAccount(User user, int i) throws Exception {
Connection connection = MapsUtils.getConnection(user.getUsername());
try {
if (connection == null || !connection.isConnected()) {
initConnection(user);
}
synchronized (connection) {
AccountManager am = null;
log.info("第" + i + "次开始注册用户:" + user.getUsername());
am = connection.getAccountManager();
am.createAccount(user.getUsername(), user.getPassword());
user.setCreateFa(true);
log.info("第" + i + "次开始注册用户:" + user.getUsername() + "成功");
}
} catch (Exception e) {
log.info("第" + i + "次开始注册用户:" + user.getUsername() + "失败");
String message=e.getMessage();
log.info("注册用户的err:"+e.getMessage());
reInit(message, user);
throw e;
}
}
public static void createAccount(User user) {
try {
createAccount(user, 0);
} catch (Exception e) {
while (user.getiCreate() < MapsUtils.count && !user.isCreateFa()) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e2) {
}
user.setiCreate(user.getiCreate() + 1);
try {
createAccount(user, user.getiCreate());
} catch (Exception e1) {
e1.printStackTrace();
}
}
}
}
// 登录
public static void login(User user) {
try {
login(user, 0);
} catch (Exception e) {
while (user.getiLogin() < MapsUtils.count && !user.isLoginFa()) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e1) {
}
user.setiLogin(user.getiLogin() + 1);
try {
login(user, user.getiLogin());
} catch (Exception e2) {
e2.printStackTrace();
}
}
}
}
private static void initSignUser(User user) throws Exception {
try {
log.info("开始初始化注册:" + user.getUsername());
user.setiCreate(0);
user.setCreateFa(false);
createAccount(user);
} catch (Exception e) {
throw e;
}
}
// 登录
private static void login(User user, int i) throws Exception {
Connection connection = MapsUtils.getConnection(user.getUsername());
try {
if(connection==null){
initConnection(user);
}
if(!connection.isConnected()){
initConnection(user);
}
if (!user.isCreateFa()) {
initSignUser(user);
}
if(connection.isAuthenticated()){
return;
}
synchronized (connection) {
log.info("第" + i + "次开始登录,用户:" + user.getUsername());
connection.login(user.getUsername(), user.getPassword());
user.setLoginFa(true);
/** 设置状态 */
Presence presence = new Presence(Presence.Type.available);
presence.setStatus("Q我吧");
connection.sendPacket(presence);
log.info("第" + i + "次开始登录,用户:" + user.getUsername() + "成功");
}
} catch (Exception e) {
log.info("第" + i + "次开始登录,用户:" + user.getUsername() + "失败");
String message = e.getMessage();
log.error("登录异常信息:" + message);
System.out.println("用户登录的err:"+e.getMessage());
log.info("用户登录的err:"+e.getMessage());
reInit(message, user);
throw e;
}
}
private static void initLogin(User user) throws Exception {
try {
log.info("开始初始化登录:" + user.getUsername());
user.setiLogin(0);
user.setLoginFa(false);
login(user);
} catch (Exception e) {
throw e;
}
}
public static void sendMessage(String from, String to, String message) {
/** 获取当前登录用户的聊天管理器 */
int messageIndex = 0;
try {
sendMessage(from, to, message, messageIndex);
} catch (Exception e) {
e.printStackTrace();
while (messageIndex < MapsUtils.count) {
System.err.println("消息发送次数:" + messageIndex + "消息来源:" + from
+ "\t消息目的地:" + to + "\t消息内容:" + message);
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
messageIndex++;
try {
sendMessage(from, to, message, messageIndex);
} catch (Exception e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
}
}
private static void sendMessage(String from, String to, String message,
int count) throws Exception {
/** 获取当前登录用户的聊天管理器 */
Connection connection = MapsUtils.getConnection(from);
try {
if (!connection.isAuthenticated()) {
log.info(from + "==没有登录");
return;
}
synchronized (connection) {
/** 发送消息 */
ChatManager chatManager = connection.getChatManager();
Chat chat = chatManager.createChat(to + "@tt.com", null);
chat.sendMessage("消息来源:" + from + "\t消息目的地:" + to + "\t消息内容:"
+ message);
System.err.println("消息来源:" + from + "\t消息目的地:" + to + "\t消息内容:"
+ message);
}
} catch (Exception e) {
throw e;
}
}
private static void reInit(String message,User user) throws Exception{
if(message==null){
throw new IllegalArgumentException("message参数不能为空");
}
try {
if(message.indexOf("Not connected")>-1){
//没有链接
initConnection(user);
}else if(message.indexOf("not-authorized(401)")>-1){
//没有注册
initSignUser(user);
}else if(message.indexOf("conflict(409)")>-1){
//重复注册
user.setCreateFa(true);
}else if(message.indexOf("No response")>-1){
//没有返回
initConnection(user);
}
} catch (Exception e) {
throw e;
}
}
}
private ExecutorService executorService=null;
public ThreadSysUilt(ExecutorService executorService) {
super();
this.executorService = executorService;
}
/**
* 执行线程任务
* @param rs
* @param fa true(表明所有线程任务须要执行完才能返回),false(与true相反)
* @return
*/
public void execute(List<Runnable> rs){
for(Runnable r:rs){
executorService.execute(r);
}
executorService.shutdown();
}
/**
* 线程执行任务
* @param cs
* @return
*/
public <V> List<Future<V>> submit(List<Callable<V>> cs){
List<Future<V>> fLists=null;
try {
fLists = executorService.invokeAll(cs);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
executorService.shutdown();
return fLists;
}
public class ConnThreadMore implements Callable<MoreUser> {
private Logger log = Logger.getLogger(ConnThreadSpe.class.getName());
private MoreUser moreUser = null;
public ConnThreadMore(MoreUser moreUser) {
super();
this.moreUser = moreUser;
}
@Override
public MoreUser call() {
// TODO Auto-generated method stub
List<User> users=moreUser.getUsers();
for(User user:users){
log.info("开始新建线程:"+Thread.currentThread().getName()+"==执行:"+user.getUsername()+"==建立链接");
try {
SmackUtils.getConnection(user);
Thread.sleep(1000);
} catch (Exception e) {
log.error("申请链接异常", e);
}
try {
SmackUtils.createAccount(user);
Thread.sleep(1000);
} catch (Exception e) {
log.error("申请链接异常", e);
}
}
return moreUser;
}
}
public class LoginThreadMore implements Runnable {
private Logger log = Logger.getLogger(LoginThreadSpe.class.getName());
private MoreUser moreUser = null;
public LoginThreadMore(MoreUser moreUser) {
super();
this.moreUser = moreUser;
}
@Override
public void run() {
// TODO Auto-generated method stub
List<User> users=moreUser.getUsers();
for(User user:users){
log.info("开始新建线程:"+Thread.currentThread().getName()+"==执行:"+user.getUsername()+"==注册并登陆");
try {
SmackUtils.login(user);//登陆
Thread.sleep(100);
} catch (InterruptedException e) {
log.info("登录异常", e);
e.printStackTrace();
}
try {
SmackUtils.chart(user);//启动消息
Thread.sleep(100);
} catch (InterruptedException e) {
log.info("启动消息", e);
e.printStackTrace();
}
}
}
}
总结:这个实现上是比较简单的,关键的问题是培养本身解决问题的能力,有不对之处但愿你们多多指正。