全栈开发——动手打造属于本身的直播间(Vue+SpringBoot+Nginx)

前言

大学的学习时光临近尾声,感叹时光匆匆,三年一晃而过。同窗们都忙着找工做,我也在这里抛一份简历吧,欢迎各位老板和猎手诚邀。咱们进入正题。直播行业是当前火热的行业,谁都想从中分得一杯羹,直播养活了一大批人,一个平台主播粗略估计就有几千号人,可是实时在线观看量有的竟然到了惊人的百万级别,特别是游戏主播,可想而知,直播间是一个磁铁式的广告传播媒介,也难怪这么多巨头公司都抢着作直播。我不太清楚直播行业技术有多深,毕竟本身没作过,可是我们能够本身实现一个知足几百号人同时观看的直播间呀。css


最终成果

手机端效果java

动图

这个场景很熟悉吧~~ 经过obs推流软件来推流。 mysql

图片描述

户外直播,经过yasea手机端推流软件,使用手机摄像头推流。react

图片描述

电脑端效果linux

播放香港卫视webpack

图片描述

直播画面ios

图片描述

项目总览

项目分为三个部分:

  1. 客户端
    直播间视频拉流、播放和聊天室,炫酷的弹幕以及直播间信息

  2. 服务端
    处理直播间、用户的数据业务,聊天室消息的处理

  3. 服务器部署
    视频服务器和web服务器

技术栈

移动客户端

  • VUE全家桶

  • UI层vonic

  • axios

  • 视频播放器: vue-video-player + videojs-contrib-hls

  • websocket客户端: vue-stomp

  • 弹幕插件: vue-barrage

  • 打包工具:webpack

电脑端客户端

  • 项目架构: Jquery + BootStrap

  • 视频播放器: video.js

  • websocket客户端: stomp.js + sockjs.js

  • 弹幕插件: Jquery.danmu.js

  • 模版引擎: thymeleaf

服务端

  • IDE: IntelliJ IDEA

  • 项目架构: SpringBoot1.5.4 +Maven3.0

  • 主数据库: Mysql5.7

  • 辅数据库: redis3.2

  • 数据库访问层: spring-boot-starter-data-jpa + spring-boot-starter-data-redis

  • websocket: spring-boot-starter-websocket

  • 消息中间件: RabbitMQ/3.6.10

服务器部署

  • 视频直播模块: nginx-rtmp-module

  • web应用服务器: tomcat8.0

  • 服务器: 腾讯云centos6.5

技术点讲解

直播间主要涉及到两个主要功能:第一是视频直播、第二是聊天室。这两个都是很是讲究实时性。

  • 视频直播

说到直播咱们先了解下几个经常使用的直播流协议,看了挺多的流媒体协议文章博客,但都是很是粗略,这里有个比较详细的 流媒体协议介绍,若是想详细了解协议内容估计去要看看专业书籍了。这里咱们用到的只是rtmp和hls,实践后发现:rtmp只可以在电脑端播放,hls只可以在手机端播放。并且rtmp是至关快的尽管没有rtsp那么快,延迟只有几秒,我测试的就差很少2-5秒,可是hls大概有10几秒。因此若是你体验过demo,就会发现手机延迟比较多。

直播的流程:
直播分为推流和拉流两个过程,那么流推向哪里,拉流又从哪里拉取呢?那固然须要视频服务器啦,千万不要觉得视频直播服务器很复杂,其实在nginx服务器中一切都变得简单。后面我会讲解如何部署Nginx服务器并配置视频模块(nginx-rtmp-module).

首先主播经过推流软件,好比OBS Studio推流软件,这个是比较专业级别的,不少直播平台的推荐主播使用这个软件来推送视频流,这里我也推荐一个开源的安卓端推流工具Yasea,下载地址,文件很小,可是很强大。
直播内容推送到服务器后,就能够在服务器端使用视频编码工具进行转码了,能够转换成各类高清,标清,超清的分辨率视频,也就是为何咱们在各个视频网站均可以选择视频清晰度。这里咱们没有转码,只是经过前端视频播放器(video.js)来拉取视频.这样整个视频推流拉流过程就完成了。

  • 聊天室

直播间里面的聊天室跟咱们的群聊天差很少,只不过它变成了web端,web端的即时通讯方案有不少,这里咱们选择websocket协议来与服务端通讯,websocket是基于http之上的传输协议,客户端向服务端发送http请求,并携带Upgrade:websocket升级头信息表示转换websocket协议,经过与服务端握手成功后就能够创建tcp通道,由此来传递消息,它与http最大的差异就是,服务端能够主动向客户端发送消息。

既然创建了消息通道,那咱们就须要往通道里发消息,可是总得须要一个东西来管控消息该发给谁吧,要否则全乱套了,因此咱们选择了消息中间件RabbitMQ.使用它来负责消息的路由去向。


理论知识都讲完啦,实操时间到!

移动客户端实操

源码地址

工程结构

|—— build                        构建服务和webpack配置        
|—— congfig                      项目不一样环境的配置
|—— dist                         build生成生产目录
|—— static                       静态资源
|—— package.json                 项目配置文件
|—— src                          开发源代码目录
    |—— api                      经过axios导出的api目录
    |—— components               页面和组件
    |—— public                   公有组件
    |—— vuex                     全局状态
    |—— main.js                  应用启动配置点

功能模块

  • 拉取服务器的直播视频流(hls)并播放直播画面

  • 与服务端建立websocket链接,收发聊天室消息

  • 经过websocket获取消息并发送到弹幕

  • 经过websocket实时更新在线用户

  • 结合服务端获取访问历史记录

  • 问题反馈模块

效果图

全局功能

项目说明

请参考源码

服务端实操

源码地址

因为我的比较喜欢接触新的东西,因此后端选择了springboot,前端选择了Vue.js年轻人嘛总得跟上潮流。SpringBoot实践事后发现真的太省心了,不用再理会各类配置文件,全自动化装配。
这里贴一下pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.hushangjie</groupId>
    <artifactId>rtmp-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>rtmp-demo</name>
    <description>Demo project for Spring Boot</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.4.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-actuator-docs</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <!--<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-security</artifactId>
        </dependency>-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-thymeleaf</artifactId>
        </dependency>
        <!--非严格模式解析HTML5-->
        <dependency>
            <groupId>net.sourceforge.nekohtml</groupId>
            <artifactId>nekohtml</artifactId>
            <version>1.9.22</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <!-- 打包成war时能够移除嵌入式tomcat插件 -->
            <!--<exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-tomcat</artifactId>
                </exclusion>
            </exclusions>-->
        </dependency>
        <!--<dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>javax.servlet-api</artifactId>
            <version>3.1.0</version>
            <scope>provided</scope>
        </dependency>-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.webjars</groupId>
            <artifactId>vue</artifactId>
            <version>2.1.3</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <dependency>
            <groupId>joda-time</groupId>
            <artifactId>joda-time</artifactId>
            <version>2.9.2</version>
        </dependency>
        <!-- RabbitMQ相关配置-->
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
            <version>2.0.8.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-net</artifactId>
            <version>2.0.8.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.6.Final</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <fork>true</fork>
                </configuration>
            </plugin>
        </plugins>
    </build>


</project>

application.properties文件

spring.datasource.url=jdbc:mysql://host:3306/database?characterEncoding=utf8&amp;useSSL=false
spring.datasource.username=username
spring.datasource.password=password
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.thymeleaf.mode=LEGACYHTML5
server.port=8085
# REDIS (RedisProperties)
# Redis数据库索引(默认为0)
spring.redis.database=0  
# Redis服务器地址
spring.redis.host=127.0.0.1
# Redis服务器链接端口
spring.redis.port=6379  
# Redis服务器链接密码(默认为空)
spring.redis.password=
# 链接池最大链接数(使用负值表示没有限制)
spring.redis.pool.max-active=8  
# 链接池最大阻塞等待时间(使用负值表示没有限制)
spring.redis.pool.max-wait=-1  
# 链接池中的最大空闲链接
spring.redis.pool.max-idle=8  
# 链接池中的最小空闲链接
spring.redis.pool.min-idle=0  
# 链接超时时间(毫秒)
spring.redis.timeout=0

websocket配置

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
    //拦截器注入service失败解决办法
    @Bean
    public MyChannelInterceptor myChannelInterceptor(){
        return new MyChannelInterceptor();
    }
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        //添加访问域名限制能够防止跨域socket链接
        //setAllowedOrigins("http://localhost:8085")
        registry.addEndpoint("/live").setAllowedOrigins("*").addInterceptors(new HandShkeInceptor()).withSockJS();

    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        /*.enableSimpleBroker("/topic","/queue");*/
        //假如须要第三方消息代理,好比rabitMQ,activeMq,在这里配置
        registry.setApplicationDestinationPrefixes("/demo")
                .enableStompBrokerRelay("/topic","/queue")
                .setRelayHost("127.0.0.1")
                .setRelayPort(61613)
                .setClientLogin("guest")
                .setClientPasscode("guest")
                .setSystemLogin("guest")
                .setSystemPasscode("guest")
                .setSystemHeartbeatSendInterval(5000)
                .setSystemHeartbeatReceiveInterval(4000);
    }

    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        ChannelRegistration channelRegistration = registration.setInterceptors(myChannelInterceptor());
        super.configureClientInboundChannel(registration);
    }

    @Override
    public void configureClientOutboundChannel(ChannelRegistration registration) {
        super.configureClientOutboundChannel(registration);
    }

}

配置类继承了消息代理配置类,意味着咱们将使用消息代理rabbitmq.使用registerStompEndpoints方法注册一个websocket终端链接。这里咱们须要了解两个东西,第一个是stomp和sockjs,sockjs是啥呢,其实它是对于websocket的封装,由于若是单纯使用websocket的话效率会很是低,咱们须要的编码量也会增多,并且若是浏览器不支持websocket,sockjs会自动降级为轮询策略,并模拟websocket,保证客户端和服务端能够通讯。
stomp有是什么看这里

stomp是一种简单(流)文本定向消息协议,它提供了一个可互操做的链接格式,容许STOMP客户端与任意STOMP消息代理(Broker)进行交互,也就是咱们上面的RabbbitMQ,它就是一个消息代理。
咱们能够经过configureMessageBroker来配置消息代理,须要注意的是咱们将要部署的服务器也应该要有RabbitMQ,由于它是一个中间件,安装很是容易,这里就不说明了。这里咱们配置了“/topic,/queue”两个代理转播策略,就是说客户端订阅了前缀为“/topic,/queue”频道都会经过消息代理(RabbitMQ)来转发。跟spring没啥关系啦,彻底解耦。

websocke如何保证安全

一开始接触 stomp的时候一直有个问题困扰我,客户端只要与服务端经过websocket创建了链接,那么他就能够订阅任何内容,意味着能够接受任何消息,这样岂不是乱了套啦,因而我翻阅了大量博客文章,不少都是官方的例子并无解决实际问题。通过琢磨,其实websocket是要考虑安全性的。具体在如下几个方面

  1. 跨域websocket链接

  2. 协议升级前握手拦截器

  3. 消息信道拦截器

对于跨域问题,咱们能够经过setAllowedOrigins方法来设置可链接的域名,防止跨站链接。

对于站内用户是否容许链接咱们能够以下配置

public class HandShkeInceptor extends HttpSessionHandshakeInterceptor {
    private static final Set<UserEntity> ONLINE_USERS = new HashSet<>();
    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {

        System.out.println("握手前"+request.getURI());
        //http协议转换websoket协议进行前,一般这个拦截器能够用来判断用户合法性等
        //鉴别用户
       if (request instanceof ServletServerHttpRequest) {
            ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request;
           //这句话很重要若是getSession(true)会致使移动端没法握手成功
           //request.getSession(true):若存在会话则返回该会话,不然新建一个会话。
           //request.getSession(false):若存在会话则返回该会话,不然返回NULL
           //HttpSession session = servletRequest.getServletRequest().getSession(false);
            HttpSession session = servletRequest.getServletRequest().getSession();
            UserEntity user = (UserEntity) session.getAttribute("user");
            if (user != null) {
                //这里只使用简单的session来存储用户,若是使用了springsecurity能够直接使用principal
                return super.beforeHandshake(request, response, wsHandler, attributes);
            }else {
                System.out.println("用户未登陆,握手失败!");
                return false;
            }
        }
        return false;
    }

    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception ex) {
        //握手成功后,一般用来注册用户信息
        System.out.println("握手后");
        super.afterHandshake(request, response, wsHandler, ex);
    }
}

HttpSessionHandshakeInterceptor 这个拦截器用来管理握手和握手后的事情,咱们能够经过请求信息,好比token、或者session判用户是否能够链接,这样就可以防范非法用户。

那如何限制用户只能订阅指定内容呢?咱们接着往下看

public class MyChannelInterceptor extends ChannelInterceptorAdapter {
    @Autowired
    private StatDao statDao;
    @Autowired
    private SimpMessagingTemplate simpMessagingTemplate;

    @Override
    public boolean preReceive(MessageChannel channel) {
        System.out.println("preReceive");
        return super.preReceive(channel);
    }

    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
        StompCommand command = accessor.getCommand();
        //检测用户订阅内容(防止用户订阅不合法频道)
        if (StompCommand.SUBSCRIBE.equals(command)) {
            //从数据库获取用户订阅频道进行对比(这里为了演示直接使用set集合代替)
            Set<String> subedChannelInDB = new HashSet<>();
            subedChannelInDB.add("/topic/group");
            subedChannelInDB.add("/topic/online_user");
            if (subedChannelInDB.contains(accessor.getDestination())) {
                //该用户订阅的频道合法
                return super.preSend(message, channel);
            } else {
                //该用户订阅的频道不合法直接返回null前端用户就接受不到该频道信息。
                return null;
            }
        } else {
            return super.preSend(message, channel);
        }

    }
    @Override
    public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
        //System.out.println("afterSendCompletion");
        //检测用户是否链接成功,搜集在线的用户信息若是数据量过大咱们能够选择使用缓存数据库好比redis,
        //这里因为须要频繁的删除和增长集合内容,咱们选择set集合来存储在线用户
        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
        StompCommand command = accessor.getCommand();
        if (StompCommand.SUBSCRIBE.equals(command)){
            Map<String,UserEntity> map = (Map<String, UserEntity>) accessor.getHeader("simpSessionAttributes");
            //ONLINE_USERS.add(map.get("user"));
            UserEntity user = map.get("user");
            if(user != null){
                statDao.pushOnlineUser(user);
                Guest guest = new Guest();
                guest.setUserEntity(user);
                guest.setAccessTime(Calendar.getInstance().getTimeInMillis());
                statDao.pushGuestHistory(guest);
                //经过websocket实时返回在线人数
                this.simpMessagingTemplate.convertAndSend("/topic/online_user",statDao.getAllUserOnline());
            }

        }
        //若是用户断开链接,删除用户信息
        if (StompCommand.DISCONNECT.equals(command)){
            Map<String,UserEntity> map = (Map<String, UserEntity>) accessor.getHeader("simpSessionAttributes");
            //ONLINE_USERS.remove(map.get("user"));
            UserEntity user = map.get("user");
            if (user != null){
                statDao.popOnlineUser(user);
                simpMessagingTemplate.convertAndSend("/topic/online_user",statDao.getAllUserOnline());
            }

        }
        super.afterSendCompletion(message, channel, sent, ex);
    }

}

在stomp里面,Channel信道就是消息传送的通道,客户端与服务端创建了链接就至关于创建了通道,之后的信息就是经过这个通道来传输。全部的消息都有消息头,被封装在了spring 的messag接口中,好比创建链接时候消息头就含有CONNECT,固然还有一些其余的信息。客户端订阅的时候也有订阅头信息SUBSCRIBE,那么我是否是能够在这个拦截器ChannelInterceptorAdapter 中拦截每一个人的订阅信息,而后与数据库的信息做比对,最后决定这个用户是否能够订阅这个频道的信息呢,对的,这是个人想法,按照这样的思路,作单聊不是迎刃而解了吗。
那客户端经过websocket发送的消息如何到达订阅者手中呢,按照rabbitmq的规则,订阅者属于消费者,发送消息的一方属于生产者,生产者经过websocket把消息发送到服务端,服务端经过转发给消息代理(rabbitmq),消息代理负责存储消息,管理发送规则,推送消息给订阅者,看下面的代码

@MessageMapping(value = "/chat")
    @SendTo("/topic/group")
    public MsgEntity testWst(String message , @Header(value = "simpSessionAttributes") Map<String,Object> session){
        UserEntity user = (UserEntity) session.get("user");
        String username = user.getRandomName();
        MsgEntity msg = new MsgEntity();
        msg.setCreator(username);
        msg.setsTime(Calendar.getInstance());
        msg.setMsgBody(message);
        return msg;
    }

@MessageMapping看起来跟springmvc方法特别像,它便可以用在类级别上也能够用在方法级别上
当发送者往‘/chat’发送消息后,服务端接受到消息,再发送给“/topic/group”的订阅者,@SendTo就是发送给谁,这里须要注意的有,若是咱们没有配置消息代理,只使用了enableSimpleBroker("/topic","/queue")简单消息代理,那么就是直接发送到消息订阅者,若是配置了消息代理,那还要经过消息代理,由它来转发。

若是咱们想在服务端随时发送消息,而不是在客户端发送(这样的场景很常见,好比发送全局通知),可使用SimpMessagingTemplate类,经过注入该bean,在合适的业务场景中发送消息。

Redis统计数据

直播间常常须要统计数据,好比实时在线人数,访问量,贡献排行榜,订阅量。我选择的方案是使用redis来计数,尽管这个demo可能不会太多人访问,可是个人目的是学习如何使用redis
先看springboot中redis的配置

@Configuration
public class RedisConfig extends CachingConfigurerSupport{
    /**
     * 生成key的策略
     *
     * @return
     */
    @Bean
    public KeyGenerator keyGenerator() {
        return new KeyGenerator() {
            @Override
            public Object generate(Object target, Method method, Object... params) {
                StringBuilder sb = new StringBuilder();
                sb.append(target.getClass().getName());
                sb.append(method.getName());
                for (Object obj : params) {
                    sb.append(obj.toString());
                }
                return sb.toString();
            }
        };
    }

    /**
     * 管理缓存
     *
     * @param redisTemplate
     * @return
     */
    @SuppressWarnings("rawtypes")
    @Bean
    public CacheManager cacheManager(RedisTemplate redisTemplate) {
        RedisCacheManager rcm = new RedisCacheManager(redisTemplate);
        //设置缓存过时时间
        // rcm.setDefaultExpiration(60);//秒
        //设置value的过时时间
        Map<String,Long> map=new HashMap();
        map.put("test",60L);
        rcm.setExpires(map);
        return rcm;
    }

    /**
     * RedisTemplate配置
     * @param factory
     * @return
     */
    @Bean
    public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
        StringRedisTemplate template = new StringRedisTemplate(factory);
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        template.setValueSerializer(jackson2JsonRedisSerializer);//若是key是String 须要配置一下StringSerializer,否则key会乱码 /XX/XX
        template.afterPropertiesSet();
        //template.setStringSerializer();
        return template;
    }
}

redis数据统计Dao的实现

@Repository
public class StatDao {
    @Autowired
    RedisTemplate redisTemplate;
    public void pushOnlineUser(UserEntity userEntity){
        redisTemplate.opsForSet().add("OnlineUser",userEntity);
    }
    public void popOnlineUser(UserEntity userEntity){
        redisTemplate.opsForSet().remove("OnlineUser" ,userEntity);
    }
    public Set getAllUserOnline(){
        return redisTemplate.opsForSet().members("OnlineUser");
    }
    public void pushGuestHistory(Guest guest){
        //最多存储指定个数的访客
        if (redisTemplate.opsForList().size("Guest") == 200l){
            redisTemplate.opsForList().rightPop("Guest");
        }
        redisTemplate.opsForList().leftPush("Guest",guest);
    }
    public List getGuestHistory(){
        return redisTemplate.opsForList().range("Guest",0,-1);
    }
}

Dao层很是简单,由于咱们只须要统计在线人数和访客。可是在线人数是实时更新的,既然咱们使用了websocket实时数据更新就很是容易了,前面咱们讲过,经过信道拦截器能够拦截链接,订阅,断开链接等等事件信息,因此咱们就能够当用户链接时存储在线用户,经过websocket返回在线用户信息。

public class MyChannelInterceptor extends ChannelInterceptorAdapter {
    @Autowired
    private StatDao statDao;
    @Autowired
    private SimpMessagingTemplate simpMessagingTemplate;

    @Override
    public boolean preReceive(MessageChannel channel) {
        System.out.println("preReceive");
        return super.preReceive(channel);
    }

    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
        StompCommand command = accessor.getCommand();
        //检测用户订阅内容(防止用户订阅不合法频道)
        if (StompCommand.SUBSCRIBE.equals(command)) {
            //从数据库获取用户订阅频道进行对比(这里为了演示直接使用set集合代替)
            Set<String> subedChannelInDB = new HashSet<>();
            subedChannelInDB.add("/topic/group");
            subedChannelInDB.add("/topic/online_user");
            if (subedChannelInDB.contains(accessor.getDestination())) {
                //该用户订阅的频道合法
                return super.preSend(message, channel);
            } else {
                //该用户订阅的频道不合法直接返回null前端用户就接受不到该频道信息。
                return null;
            }
        } else {
            return super.preSend(message, channel);
        }

    }
    @Override
    public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
        //System.out.println("afterSendCompletion");
        //检测用户是否链接成功,搜集在线的用户信息若是数据量过大咱们能够选择使用缓存数据库好比redis,
        //这里因为须要频繁的删除和增长集合内容,咱们选择set集合来存储在线用户
        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
        StompCommand command = accessor.getCommand();
        if (StompCommand.SUBSCRIBE.equals(command)){
            Map<String,UserEntity> map = (Map<String, UserEntity>) accessor.getHeader("simpSessionAttributes");
            //ONLINE_USERS.add(map.get("user"));
            UserEntity user = map.get("user");
            if(user != null){
                statDao.pushOnlineUser(user);
                Guest guest = new Guest();
                guest.setUserEntity(user);
                guest.setAccessTime(Calendar.getInstance().getTimeInMillis());
                statDao.pushGuestHistory(guest);
                //经过websocket实时返回在线人数
                this.simpMessagingTemplate.convertAndSend("/topic/online_user",statDao.getAllUserOnline());
            }

        }
        //若是用户断开链接,删除用户信息
        if (StompCommand.DISCONNECT.equals(command)){
            Map<String,UserEntity> map = (Map<String, UserEntity>) accessor.getHeader("simpSessionAttributes");
            //ONLINE_USERS.remove(map.get("user"));
            UserEntity user = map.get("user");
            if (user != null){
                statDao.popOnlineUser(user);
                simpMessagingTemplate.convertAndSend("/topic/online_user",statDao.getAllUserOnline());
            }

        }
        super.afterSendCompletion(message, channel, sent, ex);
    }

}

因为这个项目有移动端和电脑端,因此须要根据请求代理UserAgent来判断客户端属于哪种类型。这个工具类在源码上有。我就不贴了。

服务器部署

说了这么多即时通讯,却没发现视频直播。不要着急咱们立刻进入视频环节。文章开头就说明了几种媒体流协议,这里不讲解详细的协议流程,只须要知道,咱们是经过推流软件采集视频信息,如何采集也不是咱们关注的。采集到信息后经过软件来推送到指定的服务器,以下图

obs推流设置

电脑端

yasea手机端推流设置

电脑端

红色部分是服务器开放的获取流接口。

Nginx-rtmp-module配置

视频服务器有不少,也支持不少媒体流协议。这里咱们选择nginx-rtmp-module来作视频服务,接下来咱们须要在linux下安装nginx,并安装rtmp模块。本人也是linux初学者,一步步摸索着把服务器搭建好,据说tomcat和nginx很配哦,因此做为免费开源的固然首选这两个。
接下来须要在linux安装一下软件和服务。

  1. Nginx以及Nginx-rtmp-module

  2. Tomcat

  3. Mysql

  4. Redis

  5. RabbitMQ

安装步骤我就不说了,你们搜索一下啦,这里贴一下nginx.conf文件配置

rtmp {
    server {
        listen 1935;
        chunk_size 4096;

        application video {
                play /yjdata/www/www/video;
        }
        application live {
                live on;
                hls on;
                hls_path /yjdata/www/www/live/hls/;
                hls_fragment 5s;
        }
    }
}

上面代码是配置rtmp模块, play /yjdata/www/www/video 指的是配置点播模块,能够直接播放/yjdata/www/www/video路径下的视频。hls_path制定hls分块存放路径,由于hls是经过获取到推送的视频流信息,分块存储在服务器。因此它的延时比rtmp要更高。

server {
        listen       80;
        server_name  localhost;

        #charset koi8-r;
        index index.jsp index.html;
        root /yjdata/www/www;
        #access_log  logs/host.access.log  main;

        location / {
            proxy_pass  http://127.0.0.1:8080;
        }
        location ~ .*\.(gif|jpg|jpeg|png|bmp|swf|js|css|docx|pdf|doc|ppt|html|properties)$ {
                expires 30d;
                root /yjdata/www/www/static/;
        }
        location /hls {
            types {
                application/vnd.apple.mpegurl m3u8;
                #application/x-mpegURL;
                video/mp2t ts;
            }
            alias /yjdata/www/www/live/hls/;
            expires -1;
            add_header Cache-Control no-cache;
        }

        location /stat {
                 rtmp_stat all;
                 rtmp_stat_stylesheet stat.xsl;
        }

        location /stat.xsl {
                root /soft/nginx/nginx-rtmp-module/;
         }

上面配置了location 指向/hls,别名是/yjdata/www/www/live/hls/,因此能够在前端直接经过域名+/hls/+文件名.m3u8获取直播视频。
关于nginx的配置还有不少,我也在学习当中。总而言之nginx很是强大。

总结

经过从前端=>后台=>服务器,整个流程走下来仍是须要花不少心思。可是收获也是不少。本人将从大学出来,初出茅庐,文章错误之处,尽请指正。本人邮箱979783618@qq.com

相关文章
相关标签/搜索