车辆网项目架构设计

系统概述

本系统主要使用Spring Cloud技术,包含的模块 Gateway(网关),Nacos(微服务注册与发现),OpenFeign(HTTP+Restful客户端),Hystrix(断路器),ribbon(负载均衡),Security+OAuth2(安全认证),Kafka(消息队列),MybatisPlus(对象关系映射),Redis(缓存数据库),Netty等组件。html

系统架构图

车联网系统架构图

组件功能简介

Gateway:Web服务的统一入口,进行消息的转发和限流操做
Nacos:微服务的注册中心和数据中心,提供服务的发现和注册,数据的统一配置
OpenFeign:HTTP+Restfull客户端,实现服务之间的调用
Hystrix:断路器,实现服务的熔断和降级
Ribbon:微服务调用的负载均衡
Security+OAuth2:提供微服务调用的认证和受权
Kafka:消息中间件,缓存终端数据,支持大并发
MybatisPlus:对象关系映射,用于访问数据库
Redis:数据缓存服务器,内存数据库,并发量大
Netty:JavaNio架构,实现终端的Socket链接,支持更大的链接数








vue

项目工程结构

在这里插入图片描述

  • obd车联网项目
    • auth 认证服务器
    • cloudcore Spring cloud 核心项目
    • common 工具类项目
    • nettysocket 终端数据接收项目
    • obd-feign-api OpenFeign接口项目
    • obd-feign-client OpenFeign客户端项目
    • obd-gateway 网关
    • obd-member-auth app帐户认证中心
    • obd-task 定时任务
    • obd-terminal-simulator 终端模拟器
    • obd-third-park 三方服务项目
    • obd-zhb 真惠保项目
      • 真惠保APP服务项目
      • 真惠保后台管理项目
    • portal 车辆网后台项目
    • protoolanalysis 协议分析器

Authren认证服务器

  • 客户端认证配置
@EnableAuthorizationServer
@Configuration
public class AuthServerConfig extends AuthorizationServerConfigurerAdapter{ 
	@Autowired
	AuthenticationManager authenticationManager;
	@Autowired
	RedisConnectionFactory connectionFactory;
	@Autowired
	private DataSource dataSource;
	@Override
	public void configure(ClientDetailsServiceConfigurer clients) throws Exception { 
		clients.jdbc(dataSource);
		
	}
	
	//配置AuthorizationServer tokenServices
	@Override
	public void configure(AuthorizationServerEndpointsConfigurer endpoints)throws Exception { 
		endpoints.tokenStore(redisTokenStore())
		.accessTokenConverter(accessTokenConverter())
		.authenticationManager(authenticationManager)
		//禁用刷新令牌
		.reuseRefreshTokens(false);
	}
	//定义TokenStore 使用redis存储token
	@Bean
	public TokenStore redisTokenStore() { 
		RedisTokenStore redisTokenStore = new RedisTokenStore(connectionFactory);
		//token key生成规则 
		redisTokenStore.setAuthenticationKeyGenerator(oAuth2Authentication -> UUID.randomUUID().toString());
		return redisTokenStore;
	}
	//token 封装
	@Bean
	public JwtAccessTokenConverter accessTokenConverter() { 
		JwtAccessTokenConverter converter = new JwtAccessTokenConverter();
		converter.setSigningKey("******");
		return converter;
	}
	
	//认证请求设置
	@Override
	public void configure(AuthorizationServerSecurityConfigurer security) throws Exception { 
		//容许全部人请求令牌
		//以验证的客户端才能请求check_token
		security.tokenKeyAccess("permitAll()")
		.checkTokenAccess("isAuthenticated()")
		.allowFormAuthenticationForClients();
	}
}
  • 登陆帐户认证设置
@Configuration
@EnableWebSecurity
public class SecurityConfig extends WebSecurityConfigurerAdapter { 
	@Autowired
	private UserDetailsService userDetailsService;
	@Override
	protected void configure(AuthenticationManagerBuilder auth) throws Exception { 
		//设置服务认证提供者
		auth.authenticationProvider(authenticationProvider());
	}
	 /** * @return 封装身份认证提供者 */
    @Bean
    public DaoAuthenticationProvider authenticationProvider() { 
        DaoAuthenticationProvider authenticationProvider = new DaoAuthenticationProvider();
        //设置用户加载服务类
        authenticationProvider.setUserDetailsService(userDetailsService); 
        //设置加密类 
        authenticationProvider.setPasswordEncoder(passwordEncoder()); 
        return authenticationProvider;
    }

	@Bean
	@Override
	public AuthenticationManager authenticationManagerBean() throws Exception { 
		//使用父级认证管理器
		AuthenticationManager manager = super.authenticationManagerBean();
		return manager;
	}
	
	@Override
	protected void configure(HttpSecurity http) throws Exception { 
		//容许访问/oauth受权接口
		http.csrf().disable()
		//设置会话管理器,不是用HttpSession
		.sessionManagement().sessionCreationPolicy(SessionCreationPolicy.STATELESS)
		.and()
		.requestMatchers().anyRequest()
		.and()
		.formLogin().permitAll()
		.and()
		.authorizeRequests()
		//调用认证接口 不须要认证
		.antMatchers("/oauth/*").permitAll()
		.and();
	}
	
	//配置密码解码器
	@Bean
	public BCryptPasswordEncoder passwordEncoder() { 
		return new MyPasswordEncoder();
	}

}
  • 用户加载服务类设置
@Service
public class MyUserDetailsService implements UserDetailsService { 
	@Autowired 
	private SUserInfoMapper userInfoMapper;
	
	@Override
	public UserDetails loadUserByUsername(String username) throws UsernameNotFoundException { 
		QueryWrapper<SUserInfo> qw = new QueryWrapper<SUserInfo>();
		qw.eq("userName", username);
		qw.ne("status", 9);
		SUserInfo sysUser = userInfoMapper.selectOne(qw);
		if(sysUser == null) { 
			throw new UsernameNotFoundException("用户不存在");
		}
		User user = new User();
		user.setSysUser(sysUser);
		//不提供受权
		user.setAuthorities(new ArrayList<>());
		
		return user;
	}
}

后台服务器配置

  • 资源服务器设置
@Configuration
@EnableResourceServer//开启资源服务器
@EnableGlobalMethodSecurity(prePostEnabled = true)//开启方法级别的校验 https://www.cnblogs.com/felordcn/p/12142497.html
public class ResourceServerConfig extends ResourceServerConfigurerAdapter{ 
	@Autowired
	RestTemplate resourceRestTemplate;
	//本地受权服务
	@Autowired
	MyLocalUserAuthoritiesService  userAuthoritiesService;
	@Override
	public void configure(ResourceServerSecurityConfigurer resources) { 
		resources
			.tokenStore(new JwtTokenStore(accessTokenConverter()))
			.stateless(true);
		//配置RemoteTokenServices, 用于向AuthorizationServer验证令牌
		MyRemoteTokenServices tokenServices = new MyRemoteTokenServices(userAuthoritiesService);
		tokenServices.setAccessTokenConverter(accessTokenConverter());
		//为restTemplate配置异常处理器,忽略400错误
		resourceRestTemplate.setErrorHandler(new DefaultResponseErrorHandler() { 
			@Override
			//忽略 400
			public void handleError(ClientHttpResponse response) throws IOException { 
				if(response.getRawStatusCode() != 400) { 
					super.handleError(response);
				}
			}
		});
		tokenServices.setRestTemplate(resourceRestTemplate);
		//设置认证服务器地址
		tokenServices.setCheckTokenEndpointUrl("http://"+NacosServerHostConstant.AUTH_SERVER_NAME+NacosServerHostConstant.AUTH_SERVER_ADDRESS+"/oauth/check_token");
		//客户端id
		tokenServices.setClientId(OAuth2ClientEnum.OBD_PORTAL.getClientId());
		//客户端密码
		tokenServices.setClientSecret(OAuth2ClientEnum.OBD_PORTAL.getPassword());
		//无状态
		resources.tokenServices(tokenServices).stateless(true);
		//设置资源服务id
		resources.resourceId(OAuth2ClientEnum.OBD_PORTAL.getClientId());
	}
	//token封装
	@Bean
	public JwtAccessTokenConverter accessTokenConverter() { 
		JwtAccessTokenConverter converter = new JwtAccessTokenConverter();
		converter.setSigningKey("*****");
		return converter;
	}
	@LoadBalanced
    @Bean
    public RestTemplate resourceRestTemplate() { 
        return new RestTemplate();
    }
    //资源请求路径设置
	@Override
	public void configure(HttpSecurity http) throws Exception { 
		//容许跨域
		 http.cors();
		//配置资源服务器拦截规则
		http.sessionManagement().sessionCreationPolicy(SessionCreationPolicy.STATELESS)
		.and()
		.requestMatchers().anyRequest()
		.and()
		.anonymous()
		.and()
		.authorizeRequests()
		//设置不须要认证的请求路径
		.antMatchers("/login/loginByPassword.vue","/**/*.vueNologin").permitAll()
		.anyRequest().authenticated()
		.and()
		.exceptionHandling().accessDeniedHandler(new OAuth2AccessDeniedHandler());
		
	}
}
  • 本地受权服务
@Component
public class MyLocalUserAuthoritiesServiceImpl implements MyLocalUserAuthoritiesService { 
	@Autowired
	private SUserInfoMapper sysUserMapper;
	@Autowired
	private SMenuMapper sysMenuMapper;
	/** * redis客户端 */
	@Autowired
	private RedisClient redisClient;
	
	@SuppressWarnings("unchecked")
	@Override
	public List<String> loadUserAuthoritiesByUserName(String accessToken ,String userName) { 
		//缓存中获取权限列表
		List<String>  authorityList = (List<String>) redisClient.getValue(RedisKeyPreConstant.USER_LOGIN_OAUTH, accessToken);
		//缓存中没有 从数据库中获取
		if(authorityList == null) { 
			QueryWrapper<SUserInfo> qw = new QueryWrapper<SUserInfo>();
			qw.eq("userName", userName);
			SUserInfo sysUser = sysUserMapper.selectOne(qw);
			if(sysUser == null) { 
				throw new UsernameNotFoundException("用户不存在");
			}
			authorityList = sysMenuMapper.getPermsListByUserId(sysUser.getUserId());
			//获取token对象
			AuthAccessToken saveToken = (AuthAccessToken) redisClient.getValue(RedisKeyPreConstant.USER_LOGIN_TOKEN, accessToken);
			if(saveToken != null) { 
				//有效时间和token的有效时间一直
				redisClient.setValue(RedisKeyPreConstant.USER_LOGIN_OAUTH, accessToken,authorityList,saveToken.getExpires_in()-(System.currentTimeMillis()-saveToken.getCreate_time()));
			}else { 
				//设置默认的有效时间
				redisClient.setValue(RedisKeyPreConstant.USER_LOGIN_OAUTH, accessToken,authorityList,Constant.AUNTH_MESSAGE_IN_REDIS_TIME);
			}
		}
		return authorityList;
	}
}
  • 控制类设置
@RestController
@RequestMapping("sys/user")
public class UserController extends BaseController{ 
	@Autowired
	private UserService userService;
	/** * 用户分页查询 * @param loginUser 登陆用户 * @param userName 用户名 * @param pageModel 分页参数 * @return */
	@RequestMapping("getSysUserPageList.vue")
	//设置求情权限
	@PreAuthorize("hasAuthority('user:view')")
	//设置事物级别为只读
	@Transactional(readOnly = true)
	public ReturnModel getUserPageList(@ModelAttribute("loginUser") LoginUser loginUser,String userName,String useMan,String linkPhone,Date beginTime,Date endTime,PageModel pageModel) { 
		ReturnModel returnModel = new ReturnModel();
		returnModel.setData(userService.getPageList(getLoginUserOrganizationFullId(loginUser),userName, useMan, linkPhone, beginTime, endTime, pageModel));
		return returnModel;
	}
	/** * 新增用户 * @param loginUser 登陆用户 * @param user 添加用户对象 * @return */
	@PostMapping("addSysUser.vue")
	//开启事物
	@Transactional
	//设置请求权限
	@PreAuthorize("hasAuthority('user:add')")
	public ReturnModel addRole(HttpServletRequest request,SUserInfo user,/*自动注入登陆用户对象*/@ModelAttribute("loginUser") LoginUser loginUser) { 
		ReturnModel returnModel = new ReturnModel();
		SUserInfo oldUser = userService.getUserByUserName(user.getUserName());
		if(oldUser != null) { 
			returnModel.setResultCode(ResponseCodeConstant.EXISTED);
			returnModel.setResultMessage("用户名名称不能重复");
			return returnModel;
		}
		oldUser = userService.getUser(user.getUseMan(),user.getOrganizeId());
		if(oldUser != null) { 
			returnModel.setResultCode(ResponseCodeConstant.EXISTED);
			returnModel.setResultMessage("使用人不能重复");
			return returnModel;
		}
		//正常状态
		user.setStatus(1);
		user.setCreaterId(loginUser.getUser().getUserId());
		user.setCreaterName(loginUser.getUser().getUseMan());
		user.setOperatorId(loginUser.getUser().getUserId());
		user.setOperatorName(loginUser.getUser().getUseMan());
		userService.addUser(user);
		return returnModel;
	}
}
  • 服务类设置
@Service
public class UserService { 
	@Autowired
	private SUserInfoMapper userInfoMapper;
	/** * 添加用户信息 * @param user 用户对象 * @return */
	 //添加日志注解 查询功能能够不添加
	@Log("新增用户")
	public int addUser(SUserInfo user) { 
		Date now = new Date();
		user.setCreateTime(now);
		user.setModifiedTime(now);
		return userInfoMapper.insert(user);
	}
}

OpenFeign工程配置

  • 接口定义
@RequestMapping("feign/location")
public interface IFeignLocationService { 
    /** * 根据矩阵获及车辆id列表取gps信息 * * @param organizeId 机构Id * @param bounds 矩阵 右上角精度,右上角维度|左下角精度,左下角维度 * @return */
    @RequestMapping("getVehicleInMap")
    ReturnModel getVehicleInMap(@RequestBody String vehicleIds, @RequestParam(required=false) String bounds);
}
  • 客户端配置
@FeignClient(/**客户端名称 对应nacos注册中心的服务名称*/name = NacosServerHostConstant.OBD_PORTAL_NAME,/**调用接口异常,快速失败了*/fallback= FeignLocationServiceFallback.class,/**config类*/configuration = OAuth2FeignAutoConfig.class)
//实现IFeignLocationService接口
public interface FeignLocationService extends IFeignLocationService{ 
}
  • fallback类
//注册spring bean
@Component
//重载父类映射路径 避免发生路径冲突
@RequestMapping("feign/locationback")
public class FeignLocationServiceFallback implements FeignLocationService { 
	@Override
	public ReturnModel getVehicleInMap(String vehicleIds,String bounds) { 
		return ReturnModel.feignFail();
	}
	@Override
	public ReturnModel getVehicleRealTimeStatus(int vehicleId) { 
		return ReturnModel.feignFail();
	}
	@Override
	public ReturnModel getVehicleLocation(String vehicleIds) { 
		return ReturnModel.feignFail();
	}
}
  • config类
public class OAuth2FeignAutoConfig { 
	//获取Auth2token类
	private CloudTokenService cloudTokenService;
	public OAuth2FeignAutoConfig(CloudTokenService cloudTokenService) { 
		this.cloudTokenService = cloudTokenService;
	}
	//feign请求拦截器
	@Bean
	public RequestInterceptor OAuth2FeignRequestInterceptor() { 
		return new OAuth2FeignRequestInterceptor(cloudTokenService);
	}
}
  • Fegin控制器实现类(服务端)
@RestController
public class FeignLocationController extends ClientBaseController implements IFeignLocationService{ 
    @Autowired
    private VehicleMonitorService vehicleMonitorService;
    
    /** * 获取矩形区域内的车辆信息 * @param organizeId 机构Id * @param bounds 矩阵 右上角精度,右上角维度|左下角精度,左下角维度 * @return */
    @Override
    public ReturnModel getVehicleInMap(String vehicleIds, String bounds) { 
        ReturnModel model = new ReturnModel();
        model.setData(vehicleMonitorService.getVehicleInMap(JSONArray.parseArray(vehicleIds), bounds));
        return model;
    }
}

网关项目

  • 路由配置
server:
  port: 8080 #服务端口号
  tomcat:
    max-http-form-post-size: 20971520
spring:
  profiles:
    active:
    - prod
  application:
  	#微服务注册名称
    name: obd-gateway
  cloud:
    gateway:
      #默认过滤器 处理跨域
      default-filters:
        - DedupeResponseHeader=Access-Control-Allow-Origin, RETAIN_UNIQUE
      globalcors:
        cors-configurations:
          '[/**]':
            allowedHeaders: "*"
            allowedOrigins: "*"
            allowedMethods: "*"
      discovery:
        locator:
          enabled: true
      routes:
      #基于服务发现配置 portal
      - id: portal
      	#lb 负载均衡 obd-portal 调用微服务的名称
        uri: lb://obd-portal
        predicates:
        - Path=/obd-portal/**
        filters:
        - StripPrefix=1 #应用路由截掉路径的第一部分前缀
        - name: Hystrix #熔断降级
          args:
            name: fallbackcmd
            fallbackUri: forward:/hystrixFallback?a=test
# 限流配置
# - name: RequestRateLimiter
# args:
# 速率
# redis-rate-limiter.replenishRate: 10
#容量
# redis-rate-limiter.burstCapacity: 20 
                    #基于服务发现配置 manage
       #基于服务发现配置 zhb
      - id: zhb
        uri: lb://obd-zhb
        predicates:
        - Path=/obd-zhb/**
        filters:
        - StripPrefix=1 #应用路由截掉路径的第一部分前缀
        - name: Hystrix #熔断降级
          args:
            name: fallbackcmd
            fallbackUri: forward:/hystrixFallback?a=test
                    #基于服务发现配置 zhbManage
      - id: zhbManage
        uri: lb://obd-zhb-manage
        predicates:
        - Path=/obd-zhb-manage/**
        filters:
        - StripPrefix=1 #应用路由截掉路径的第一部分前缀 

接收终端数据项目

  • 启动socket项目
@Component
@Sharable
@Slf4j
public class OBDServer { 
	@Autowired
	private MessageHandler messageHandler;
	@Autowired
	private TerminalAuthHandler terminalAuthHandler;
	@Autowired
	private TerminalCommonResponse terminalCommonResponse;
	@Autowired
	private TerminalRegisterHandler terminalRegisterHandler;
	public void bind(int port) { 
		log.info("OBDNettySocket启动 port="+port);
		//建立NIO线程组 其实是reactor线程组
		//用于接收客户端链接
		EventLoopGroup boosGroup = new NioEventLoopGroup();
		//用于进行SocketChnnel的网络读写
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try { 
			//启用nio服务对象
			ServerBootstrap b = new ServerBootstrap();
			//绑定nio线程
			b.group(boosGroup, workerGroup)
				//设置channel 为NioServerSocketChannel 对应java nio中的ServerSocketChannel
				.channel(NioServerSocketChannel.class)
				//设置NioServerSocketChannel的TCP参数 backlog
				.option(ChannelOption.SO_BACKLOG, 1024)
				//绑定nio事件处理类 做用相似于reactor模式中的handler
				.childHandler(new ChannelInitializer<SocketChannel>() { 
					@Override
					protected void initChannel(SocketChannel ch) throws Exception { 
						ch.pipeline().addLast(new OBDNettyMessageDecoder());
						ch.pipeline().addLast(new OBDNettyMessageEncoder());
						ch.pipeline().addLast(terminalRegisterHandler);
						ch.pipeline().addLast(terminalAuthHandler);
						ch.pipeline().addLast(terminalCommonResponse);
						ch.pipeline().addLast(new HeartBeatHandler());
						//必须放到最后
						ch.pipeline().addLast(messageHandler);
					}
				});
			//绑定端口 同步等待成功
			ChannelFuture f = b.bind(port).sync();
			//等待服务监听端口关闭
			f.channel().closeFuture().sync();
		} catch (InterruptedException e) { 
			e.printStackTrace();
		}finally { 
			log.info("OBDNettySocket关闭 port="+port);
			//优雅退出 释放线程资源
			boosGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
		}
	}
	
}
  • 终端注册
@Component
@Sharable
@Slf4j
public class TerminalRegisterHandler  extends BaseHandler{ 
	/** * 终端注册 256 */
	 private final int  TERMINAL_REGISTER = 0x0100;
	 /** * 终端注册 应答 33024 */
	 private final int TERMINAL_REGISTER_RESPONSE = 0x8100; 
	 @Autowired
	 private CheckTerminalIdIsCanUseService  checkTerminalIdIsCanUseService;
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
		byte[] array = (byte[]) msg;
		int messageId = OBDUtil.getIntFromODBIntArray(array, 0, 2);
		if(messageId == TERMINAL_REGISTER) { 
			String terminalId = OBDUtil.getTerminalNumber(array);
			//失败
			byte status = 1;
			String authCode = null;
			if(checkTerminalIdIsCanUseService.check(terminalId)) { 
				status = 0;
				//认证码 用于终端认证 目前没有使用
				authCode = OBDConstant.TERMINAL_REGISTER_AUTH_CODE;
				log.info("注册成功:terminalId="+terminalId);
			}else { 
				log.info("注册失败:terminalId="+terminalId);
			}
			//发送注册结果
			ctx.writeAndFlush(getTerminalRegisterResponse(array,new byte[] { array[10],array[11]}, status,authCode));
			
		}
		ctx.fireChannelRead(msg);
	}
  • 终端认证
@Component
@Sharable
@Slf4j
public class TerminalAuthHandler extends BaseHandler{ 
	/** * 终端鉴权 258 */
	 private final static int  TERMINAL_AUTH = 0x0102;
	 /** * 鉴权终端是否可用服务类 */
	 @Autowired
	 private CheckTerminalIdIsCanUseService  checkTerminalIdIsCanUseService;
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
		byte[] array = (byte[]) msg;
		//消息id
		int messageId = OBDUtil.getIntFromODBIntArray(array, 0, 2);
		if(messageId == TERMINAL_AUTH) { 
			//终端号
			String terminalId = OBDUtil.getTerminalNumber(array);
			//失败
			byte status = 1;
			//认证成功
			if(checkTerminalIdIsCanUseService.check(terminalId)) { 
				status = 0;
				//没有保存链路信息
				if(SocketConfig.getSocketModel(terminalId) == null) { 
					//心跳检查定时任务
					HeartBeatTask task = new HeartBeatTask(terminalId);
					ctx.executor().schedule(task, OBDConstant.HEART_BEAT_INTERVAL, TimeUnit.MILLISECONDS);
				}
				//保存链路信息
				SocketConfig.putSocket(terminalId, new SocketModel(ctx));
				log.info("认证成功:terminalId="+terminalId);
			}else { 
				log.info("认证失败:terminalId="+terminalId);
			}
			//返回认证结果
			ctx.writeAndFlush(OBDCommonResponseUtil.getOBDCommonResponse(array,new byte[] { array[10],array[11]}, status));
			
		}else { 
			ctx.fireChannelRead(msg);
		}
	}
	
}
  • 消息处理
@Component
@Sharable
@Slf4j
public class MessageHandler extends BaseHandler{ 
	@Autowired
	private DealMessageService dealMessageService;
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
		byte[] array = (byte[]) msg;
		int messageId = OBDUtil.getIntFromODBIntArray(array, 0, 2);
		String terminalId = OBDUtil.getTerminalNumber(array);
		//log.info("收到终端消息:terminalId="+terminalId+" messageId="+messageId);
		SocketModel socketModel = SocketConfig.getSocketModel(terminalId);
		//有链路信息 说明终端设备认证
		if(socketModel != null) { 
			//处理消息
			dealMessageService.dealMessage(String.valueOf(messageId),array);
			socketModel.resetHeartNoRespCount();
			ctx.writeAndFlush(OBDCommonResponseUtil.getOBDCommonResponse(array, new byte[] { array[10],array[11]}, (byte)0));
		}else { 
			System.out.println(messageId);
			log.info("终端未认证 terminalId="+terminalId);
			//没有发现终端
			ctx.writeAndFlush(OBDCommonResponseUtil.getOBDCommonResponse(array,new byte[] { array[10],array[11]}, (byte)5));
		}
	}
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 
		System.out.println(ctx);
		ctx.close();
		cause.printStackTrace();
		log.error("链路异常关闭");
	}
}

轨迹解析服务

  • 从kafa中获取数据
@Slf4j
@Component
public class KafkaConsumer { 
	@Autowired
	private GpsVehicleTraceService gpsVehicleTraceService;
	@Autowired
	private TerminalNumberVehicleService terminalNumberVehicleService;
	@Autowired
	private RedisClient redisClient;
	//处理轨迹上传数据
	@KafkaListener(id="gpsLocation1",topics= KafkaTopicConstant.GPS_LOCATION)
	public void onGPSLocation(ConsumerRecord<String, byte[]> record) { 
		//协议解析
		OBDModel obd = ProtocolAnalysis.parse(record.value());
		if(obd != null) { 
			String terminalNumber = obd.getTerminalNumber();
			//获取绑定车辆信息
			Integer vehicleId = terminalNumberVehicleService.getVehicleIdByTerminalNumber(terminalNumber);
			if(vehicleId != null) { 
				PositionModel positionModel = (PositionModel)obd.getBody();
				if(positionModel != null) { 
					//处理轨迹
					gpsVehicleTraceService.dealGpsVehicle(vehicleId, positionModel,false);
				}
				
			}else { 
				log.info("未找到绑定信息 terminalNumber ="+terminalNumber);
			}
		}
	}
	//轨迹补传处理
	@KafkaListener(id="gpsLocationBeath1",topics=KafkaTopicConstant.GPS_LOCATION_BEATCH)
	public void onGPSLocationBeath(ConsumerRecord<String, byte[]> record) { 
		OBDModel obd = ProtocolAnalysis.parse(record.value());
		if(obd != null) { 
			String terminalNumber = obd.getTerminalNumber();
			Integer vehicleId = terminalNumberVehicleService.getVehicleIdByTerminalNumber(terminalNumber);
			if(vehicleId != null) { 
				LocationBatchModel positionBatch = (LocationBatchModel)obd.getBody();
				if(positionBatch != null) { 
					List<PositionModel> list = positionBatch.getList();
					if(list != null && !list.isEmpty()) { 
						for (PositionModel positionModel : list) { 
							gpsVehicleTraceService.dealGpsVehicle(vehicleId, positionModel,true);
						}
					}
				}
			}
		}
	}
	//终端注册
	@KafkaListener(id="terminalRegister",topics=KafkaTopicConstant.TERMINAL_REGISTER)
	public void onTerminalRegister(ConsumerRecord<String, byte[]> record) { 
		ProtocolAnalysis.parse(record.value());
	}
	//终端注销
	@KafkaListener(id="terminalUnRegister",topics=KafkaTopicConstant.TERMINAL_UNREGISTER)
	public void onTerminalUnRegister(ConsumerRecord<String, byte[]> record) { 
		ProtocolAnalysis.parse(record.value());
	}

	/** * 查询终端参数 */
	@KafkaListener(id="terminalAnswer",topics=KafkaTopicConstant.TERMINAL_ANSWER)
	public void getTerminalAnswer(ConsumerRecord<String, byte[]> record){ 
		OBDModel obd = ProtocolAnalysis.parse(record.value());
		if(obd != null){ 
			String terminalNumber = obd.getTerminalNumber();
			TerminalAnswerData answerData = (TerminalAnswerData)obd.getBody();
			List<TerminalAnswerValue> answerValue = (List<TerminalAnswerValue>)answerData.getValue();
			redisClient.setValue(RedisKeyPreConstant.TERMINAL_DATA, terminalNumber, answerValue);
		}
	}
}
  • 协议解析类
public class ProtocolAnalysis { 
	//协议解析类map
	private final static Map<Integer,DealMessageService> dealMessageServiceMap = new HashMap<>();
	static { 
		//初始化
		dealMessageServiceMap.put(OBDConstant.MessageId.TERMINAL_AUTH, new TerminalAuthDealMessageServiceImpl());
		dealMessageServiceMap.put(OBDConstant.MessageId.POSSITION, new PositionBaseDealMessageServiceImpl());
		dealMessageServiceMap.put(OBDConstant.MessageId.TERMINAL_GENER_RESPONSE, new TerminalGeneralResponseDealMessageServiceImpl());
		dealMessageServiceMap.put(OBDConstant.MessageId.POSSITION_BATCH, new PossitionBatchDealMassageServiceImpl());
		dealMessageServiceMap.put(OBDConstant.MessageId.TERMINAL_REGISTER, new TerminalRegisterDealMessageServiceImpl());
		dealMessageServiceMap.put(OBDConstant.MessageId.TERMINAL_ANSWER, new TerminalAnswerDealMessageImpl());
		dealMessageServiceMap.put(OBDConstant.MessageId.ALARM_INFO, new AlarmDealMessageServiceImpl());
	}
// public static void main(String[] args) { 
// byte[] data = new byte[] {0, 1, 0, 5, 1, 65, 68, 120, 116, -107, 0, 33, 0, 1, -126, 7, 0, 60};
// 
// OBDModel obdModel =parse(data);
// System.out.println(obdModel);
// System.out.println(Integer.toHexString(obdModel.getMessageId()));
// }
	
	public static OBDModel parse(byte[] chars) { 
		//转化为整行 
		int[] dataArray =  new int[chars.length];
		for(int i=0;i<dataArray.length;i++) { 
			if(chars[i] < 0 ) { 
				//数据修正 传输数据为无符号数
				dataArray[i] = OBDUtil.obdDataCorrection(chars[i]);
			}else { 
				dataArray[i] = chars[i];
			}
		}

		OBDModel obdModel = new OBDModel();
		//消息 ID 占用两个字节 高八位0 低八位 1
		obdModel.setMessageId(OBDUtil.getIntFromODBIntArray(dataArray, 0, 2));
		//消息体属性 占用两个字节 高八位2 低八位3
		obdModel.setBodyDescribe(getBodyDescribe(OBDUtil.getIntFromODBIntArray(dataArray, 2, 2)));
		//终端号 占用 4~9位
		obdModel.setTerminalNumber(getTerminalNumber(dataArray));
		//消息流水号 占用两个字节 高八位10低八位11
		obdModel.setSerialNumber(OBDUtil.getIntFromODBIntArray(dataArray, 10, 2));
		int bodyIndex = 12;
		if(obdModel.getBodyDescribe().getIsSubpackage()) { 
			//消息总包数 占用两个字节 高八位12低八位13
			obdModel.setTotalPackage(OBDUtil.getIntFromODBIntArray(dataArray, 12, 2));
			//包序号 占用两个字节 高八位14低八位15
			obdModel.setPackageNum(OBDUtil.getIntFromODBIntArray(dataArray, 14, 2));
			bodyIndex += 4;
		}
		int bodyLength = obdModel.getBodyDescribe().getBodyLength();
		if(bodyLength>0) { 
			if(OBDConstant.TOP_MIN_LENGTH + bodyLength <= dataArray.length) { 
				int[] bodyIntArray = new int[bodyLength];
				System.arraycopy(dataArray, bodyIndex, bodyIntArray, 0, bodyLength);
				//设置消息体
				obdModel.setBody(getBody(obdModel.getMessageId(),bodyIntArray));
			}
		}
		return obdModel;
	};
	/** * 获取body信息 */
	public static Object getBody(int messageid,int[] bodyArray) { 
		//是否支持该消息类型的解析
		if(dealMessageServiceMap.containsKey(messageid)) { 
			return dealMessageServiceMap.get(messageid).getBody(bodyArray);
		}else { 
			return Arrays.toString(bodyArray);
		}
	}
	
	/** * 获取终端号 BCD[6] 4-9 */
	public static String getTerminalNumber(int[] dataArray) { 
		String result = "";
		for(int i=4;i<=9;i++) { 
			result += OBDUtil.getBCDStr(dataArray[i]);
		}
		return result;
	}
	
	/** * 获取消息体属性 */
	public static BodyDescribe getBodyDescribe(int bodyInt) { 
		BodyDescribe bodyDescribe = new BodyDescribe();
		//低9为字节表示长度
		bodyDescribe.setBodyLength(bodyInt % (1<<10));
		//第10-12字节全为0表示消息不加密
		bodyDescribe.setIsEncryption((bodyInt&((1<<10)+(1<<11)+(1<<12)))!=0);
		//第10字节为1表示RSA加密
		bodyDescribe.setEncryptionType(OBDUtil.checkIntBitIsOne(bodyInt, 10)?1:null);
		//第13位为1表示分包传输
		bodyDescribe.setIsSubpackage(OBDUtil.checkIntBitIsOne(bodyInt, 13));
		return bodyDescribe;
	}
	
	/** * 验证码校验 */
	public static boolean check(int[] dataArray) { 
		int result = dataArray[dataArray.length-1];
		int calR = dataArray[0] ^ dataArray[1];
		for(int i=2;i<dataArray.length-1;i++) { 
			calR = calR^dataArray[i];
		}
		return calR == result;
	}
  • 认证消息类型解析
public class TerminalAuthDealMessageServiceImpl implements DealMessageService{ 
	@Override
	public Object getBody(int[] bodyArray) { 
		return new String(bodyArray,0,bodyArray.length);
	}
}
相关文章
相关标签/搜索