Netty聊天室(2):从0开始实战100w级流量应用

客户端 Client 登陆和响应处理

疯狂创客圈 Java 分布式聊天室【 亿级流量】实战系列之 17【 博客园 总入口java


源码IDEA工程获取连接Java 聊天室 实战 源码面试

写在前面

​ 你们好,我是做者尼恩。目前和几个小伙伴一块儿,组织了一个高并发的实战社群【疯狂创客圈】。正在开始高并发、亿级流程的 IM 聊天程序 学习和实战bootstrap

​ 前面,已经完成一个高性能的 Java 聊天程序的四件大事:安全

  1. 完成了协议选型,选择了性能更佳的 Protobuf协议。具体的文章为: Netty+Protobuf 整合一:实战案例,带源码服务器

  2. 介绍了 通信消息数据包的几条设计准则。具体的文章为: Netty +Protobuf 整合二:protobuf 消息通信协议设计的几个准则session

  3. 解决了一个很是基础的问题,这就是通信的 粘包和半包问题。具体的文章为:Netty 粘包/半包 全解 | 史上最全解读并发

  4. 前一篇文件,已经完成了 系统三大组成模块的组成介绍。 具体的文章为:Netty聊天程序(实战一):从0开始实战100w级流量应用分布式

    今天介绍很是重要的一个内容:ide

    客户端的通信、登陆请求和登陆响应设计

    下面,开启今天的 惊险和刺激实战之旅

客户端的会话管理

​ 什么是会话?

​ 为了方便客户端的开发,管理与服务器的链接,这里引入一个很是重要的中间角色——Session (会话)。有点儿像Web开发中的Tomcat的服务器 Session,可是又有很大的不一样。

​ 客户端的会话概念图,以下图所示:

在这里插入图片描述

​ 客户端会话有两个很重的成员,一个是user,表明了拥有会话的用户。一个是channel,表明了链接的通道。两个成员的做用是:

  • 经过user,能够得到当前的用户信息

  • 经过channel,能够向服务器发送消息

    因此,会话左拥右抱,左手用户资料,右手服务器的链接。在本例的开发中,会常常用到。

客户端的逻辑构成

从逻辑上来讲,客户端有三个子的功能模块。

在这里插入图片描述

模块一:Handler

入站处理器。

在Netty 中很是重要,负责处理入站消息。比方,服务器发送过来登陆响应,服务器发送过来的聊天消息。

模块二:MsgBuilder

消息组装器。

将 Java 内部的 消息 Bean 对象,转成发送出去的 Protobuf 消息。

模块三:Sender

消息发送器。

Handler 负责收的工做。Sender 则是负责将消息发送出去。

三大子模块的类关系图:
在这里插入图片描述

介绍完成了主要的组成部分后,开始服务器的链接和Session 的建立。

链接服务器与Session 的建立

​ 经过bootstrap 帮助类,设置完成线程组、通道类型,向管道流水线加入处理器Handler后,就能够开始链接服务器的工做。

​ 本小节须要重点介绍的,是链接成功以后,建立 Session,而且将 Session和 channel 相互绑定。

​ 代码以下:

package com.crazymakercircle.chat.client;
//...
@Data
@Service("EchoClient")
public class ChatClient
{
    static final Logger LOGGER =
            LoggerFactory.getLogger(ChatClient.class);
   //..
    private Channel channel;
    private ClientSender sender;

   public void doConnect(Bootstrap bootstrap, EventLoopGroup eventLoopGroup)
    {
        ChannelFuture f = null;
        try
        {
            if (bootstrap != null)
            {
                bootstrap.group(eventLoopGroup);
                bootstrap.channel(NioSocketChannel.class);
                bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
                bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
                bootstrap.remoteAddress(host, port);

                // 设置通道初始化
                bootstrap.handler(
                        new ChannelInitializer<SocketChannel>()
                        {
                            public void initChannel(SocketChannel ch) throws Exception
                            {
                                ch.pipeline().addLast(new ProtobufDecoder());
                                ch.pipeline().addLast(new ProtobufEncoder());
                                ch.pipeline().addLast(chatClientHandler);

                            }
                        }
                );
                LOGGER.info(new Date() + "客户端开始登陆[疯狂创客圈IM]");

                f = bootstrap.connect().addListener((ChannelFuture futureListener) ->
                {
                    final EventLoop eventLoop = futureListener.channel().eventLoop();
                    if (!futureListener.isSuccess())
                    {
                        LOGGER.info("与服务端断开链接!在10s以后准备尝试重连!");
                        eventLoop.schedule(() -> doConnect(new Bootstrap(), eventLoop), 10, TimeUnit.SECONDS);

                        initFalg = false;
                    }
                    else
                    {
                        initFalg = true;
                    }
                    if (initFalg)
                    {
                        LOGGER.info("EchoClient客户端链接成功!");

                        LOGGER.info(new Date() + ": 链接成功,启动控制台线程……");
                        channel = futureListener.channel();

                        // 建立会话
                        ClientSession session = new ClientSession(channel);
                        channel.attr(ClientSession.SESSION).set(session);
                        session.setUser(ChatClient.this.getUser());
                        startConsoleThread();
                    }

                });

                // 阻塞
                f.channel().closeFuture().sync();
            }
        } catch (Exception e)
        {
            LOGGER.info("客户端链接失败!" + e.getMessage());
        }

    }

 //...

}

Session和 channel 相互绑定

Session和 channel 相互绑定,再截取出来,分析一下。

ClientSession session = new ClientSession(channel);
channel.attr(ClientSession.SESSION).set(session);
session.setUser(ChatClient.this.getUser());

​ 为何要Session和 channel 相互绑定呢?

  • 发的时候, 须要从Session 写入 Channel ,这至关于正向的绑定。
  • 收的时候,是从Channel 过来的,须要找到 Session ,这至关于反向的绑定。

​ Netty 中的 channel ,实现了AttributeMap接口 ,至关于一个 Map容器。 反向的绑定,利用了channel 的这个特色。

​ 看一下AttributeMap接口 如何使用的?

AttributeMap接口的使用

​ AttributeMap 是一个接口,而且只有一个attr()方法,接收一个AttributeKey类型的key,返回一个Attribute类型的value。按照Javadoc,AttributeMap实现必须是线程安全的。

​ AttributeMap内部结构看起来像下面这样:

img

不要被吓着了,其实很简单。

AttributeMap 的使用,主要是设置和取值。

  • 设值 Key-> Value

AttributeMap 的设值的方法,举例以下:

channel.attr(ClientSession.SESSION).set(session);

这个是链式调用,attr() 方法中的是 Key, set()方法中的是Value。 这样就完成了 Key-> Value 的设置。

  • 取值

AttributeMap 的取值的方法,举例以下:

ClientSession session =
        ctx.channel().attr(ClientSession.SESSION).get();

这个是链式调用,attr() 方法中的是 Key, get()方法返回 的是Value。 这样就完成了 取值。

关键是,这个key比较特殊

通常的Map,Key 的类型多半为字符串。可是这里的Key不行,有特殊的约定。

Key的类型必须是 AttributeKey 类型,并且这是一个泛型类,它的优点是,不须要对值进行强制的类型转换。

Key的例子以下:

public static final AttributeKey<ClientSession> SESSION = AttributeKey.valueOf("session");

客户端登陆请求

登陆的请求,大体以下:
在这里插入图片描述

ClientSender的 代码以下:

package com.crazymakercircle.chat.client;

@Service("ClientSender")
public class ClientSender
{
    static final Logger LOGGER = LoggerFactory.getLogger(ClientSender.class);


    private User user;
    private ClientSession session;

    public void sendLoginMsg()
    {
        LOGGER.info("开始登录");
        ProtoMsg.Message message = LoginMsgBuilder.buildLoginMsg(user);
        session.writeAndFlush(message);
    }

//...
    public boolean isLogin()
    {
        return  session.isLogin();
    }
}

Sender 首先经过 LoginMsgBuilder,构造一个protobuf 消息。而后调用session发送消息。

session 会经过绑定的channel ,将消息发送出去。

session的代码,以下:

public synchronized void writeAndFlush(Object pkg)
{
    channel.writeAndFlush(pkg);
}

其余的客户端请求流程,大体也是相似的。

一个客户端的请求大体的流程有三步,分别从Sender 到session到channel。

在这里插入图片描述

处理登陆成功的响应

​ 这是从服务器过来的入站消息。 若是登陆成功,服务器会发送一个登陆成功的响应过来。 这个响应,会从channel 传递到Handler。

在这里插入图片描述

处理器 LoginResponceHandler 的代码以下:

package com.crazymakercircle.chat.clientHandler;

//...

public class LoginResponceHandler extends ChannelInboundHandlerAdapter
{
    static final Logger LOGGER = LoggerFactory.getLogger(LoginResponceHandler.class);
    /**
     * 业务逻辑处理
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
    {
        LOGGER.info("msg:{}", msg.toString());
        if (msg != null && msg instanceof ProtoMsg.Message)
        {
            ProtoMsg.Message pkg = (ProtoMsg.Message) msg;
            ProtoMsg.LoginResponse info = pkg.getLoginResponse();
            ProtoInstant.ResultCodeEnum result =
                    ProtoInstant.ResultCodeEnum.values()[info.getCode()];

            if (result.equals(ProtoInstant.ResultCodeEnum.SUCCESS))
            {
                ClientSession session =
                        ctx.channel().attr(ClientSession.SESSION).get();

                session.setLogin(true);
                LOGGER.info("登陆成功");
            }
        }
    }

}

​ LoginResponceHandler 对消息类型进行判断,若是是请求响应消息,而且登陆成功。 则取出绑定的session,经过session,进一步完成登陆成功后的业务处理。

​ 好比设置成功的状态,完成一些成功的善后处理操做等等。

​ 其余的客户端响应处理流程,大体也是相似的。

在这里插入图片描述

写在最后

​ 至此为止,能够看到,客户端登陆的完整流程。

​ 下一篇:服务器的请求处理和通信的全流程闭环介绍。


疯狂创客圈 Java 死磕系列

  • Java (Netty) 聊天程序【 亿级流量】实战 开源项目实战

  • Netty 源码、原理、JAVA NIO 原理
  • Java 面试题 一网打尽
  • 疯狂创客圈 【 博客园 总入口 】