一 项目概述1.1 角色1.2 业务术语1.3 项目效果展现二 项目需求三 项目概要3.1 项目技术架构3.2 项目目录结构3.3 项目技术选型3.4 项目总体集群规划3.5 建立项目工程四 APP 数据生成模块4.1 建立公共模块工程4.1.1 建立 Java 工程,导入 pom 文件4.1.2 建立 AppBaseLog 基类4.1.3 建立 AppErrorLog 错误日志类4.1.4 建立 AppEventLog 事件日志类4.1.5 建立 AppPageLog 页面日志类4.1.6 建立 AppStartupLog 启动日志类4.1.7 建立 AppUsageLog 使用时长日志类4.1.8 建立 AppLogEntity 日志实体类4.1.9 建立 GeoInfo 地理信息类4.1.10 建立 GeoUtil 地理信息工具类4.2 编写手机客户端工程4.2.1 建立 Java 工程,导入 pom 文件4.2.2 建立 GenerateData 数据生成类4.2.3 建立数据上传的工具类五 数据收集模块5.1 Web 数据收集模块5.1.0 数据收集模块集群部署规划5.1.1 建立 web 工程,导入 pom 文件5.1.2 在 web.xml 文件中加载 Spring 和 Springmvc 配置5.1.3 在 resources 路径下添加 Springmvc 配置文件5.1.4 在 resources 路径下添加 Spring 配置文件5.1.5 在 resources 路径下添加 log4j 文件5.1.6 网络请求处理5.1.7 修正服务器和客户端时间5.1.8 获取国家、省份、和IP地址信息5.1.9 向 Kafka 发送消息5.1.10 在 IDEA 上执行 Web 程序(测试环境)5.1.11 测试5.2 Kafka 集群模块5.2.1 配置 Zookeeper 集群5.2.2 配置 Kafka 集群5.2.3 测试5.2.4 在 Centos 上部署 Tomcat(生产环境)5.2.5 测试5.3 Flume 模块5.3.1 配置 Hadoop 集群5.3.2 Flume 安装5.3.3 建立 Flume 拦截器5.3.4 配置 Flume5.3.5 测试六 数据处理模块框架搭建6.1 数据处理模块集群部署规划6.2 配置 Hive 元数据存储到 mysql6.3 配置 Hive 支持 JSON 存储6.4 建立数据库及分区表6.5 编写 Hive 执行脚本6.6 编写 Linux 调度 crondtab6.7 测试七 业务需求处理7.1 自定义 UDF 函数7.2 新增用户统计7.2.1 任意日新增用户7.2.2 任意周新增用户7.2.3 月新增用户7.3 活跃用户统计7.3.1 日、周、月活跃用户7.3.2 指定时间内查询日活、周活、月活7.3.3 优化活跃数查询7.3.4 过去五周周活跃用户数7.3.5 过去六月活跃用户数7.3.6 连续 n 周活跃用户统计7.3.7 忠诚用户7.4 沉默用户统计7.5 启动次数统计7.6 版本分布统计7.7 留存分析统计7.7.1 本周回流用户统计7.7.2 连续n周没有启动的用户7.7.3 留存用户统计7.8 新鲜度分析八 数据展现模块8.1 建立 web 可视化工程8.1.1 建立 web 工程导入 pom.xml8.1.2 导入静态资源并调整 web 目录8.1.3 在 web.xml 文件中加载 Spring 和 Springmvc 配置8.1.4 在 resources 路径下添加 Springmvc 配置文件8.1.5 在 resources 路径下添加 Spring 配置文件8.1.6 在 resources 路径下添加 mybatis 配置文件8.1.7 在 resources 路径下添加 mybatis 的 mapper 配置文件8.1.8 在 resources 路径下添加 log4j 文件8.1.9 在 hive 端启动 hiveserver28.2 代码逻辑实现8.2.1 准备统计结果 bean8.2.2 编写 controller 逻辑8.2.3 编写 service 逻辑8.2.4 编写 dao 逻辑8.3 UI 页面数据展现8.4 在 IDEA 上执行 Web 程序(测试环境)8.5 测试九 项目总结十 问题总结javascript
1)App 开发商
每一个开发商能够有多个 App 产品。
2)App 软件
3)数据服务平台提供商
友盟:向 App 开发商提供服务。提供 App 使用状况的统计服务。
友盟官方网址:https://www.umeng.com/
4)SDK
数据服务平台提供商提供给 App 开发商的软件包。
内置 log 上报程序。
5)用户
每一个使用 App 的设备。
6)租户
购买了数据服务平台提供商服务的 App 开发商。php
1)用户
用户以设备为判断标准,在移动统计中,每一个独立设备认为是一个独立用户。Android 系统根据 IMEI 号,IOS 系统根据 OpenUDID 来标识一个独立用户,每部手机一个用户
。
2)新增用户
首次联网使用应用的用户。若是一个用户首次打开某 app,那这个用户定义为新增用户;卸载再安装的设备,不会被算做一次新增。日新增用户、周新增用户、月新增用户。
3)活跃用户
打开应用的用户即为活跃用户,不考虑用户的使用状况。天天一台设备打开屡次会被计为一个活跃用户。
4)周(月)活跃用户
某个天然周(月)内启动过应用的用户,该周(月)内的屡次启动只记一个活跃用户。
5)月活跃率
月活跃用户与截止到该月累计的用户总和之间的比例。
6)沉默用户
用户仅在安装当天(第二天)启动一次,后续时间无再启动行为。该指标能够反映新增用户质量和用户与 APP 的匹配程度。
7)版本分布
不一样版本的周内各天新增用户数,活跃用户数和启动次数。利于判断 App 各个版本之间的优劣和用户行为习惯。
8)本周回流用户
上周未启动过应用,本周启动了应用的用户。
9)连续 n 周活跃用户
连续 n 周,每周至少启动一次。
10)忠诚用户
连续活跃 5 周以上的用户
11)连续活跃用户
连续 2 周及以上活跃的用户
12)近期流失用户
连续 n(2 <= n <= 4) 周没有启动应用的用户。(第 n+1 周没有启动过)
13)留存用户
某段时间内的新增用户,通过一段时间后,仍然使用应用的被认做是留存用户;这部分用户占当时新增用户的比例便是留存率。例如,5 月份新增用户 200,这 200 人在 6 月份启动过应用的有 100 人,7 月份启动过应用的有 80 人,8 月份启动过应用的有 50 人;则 5 月份新增用户一个月后的留存率是 50%,二个月后的留存率是 40%,三个月后的留存率是 25%。
14)用户新鲜度
天天启动应用的新老用户比例。
15)单次使用时长
每次启动使用的时间长度。
16)日使用时长
累计一天内的使用时间长度。
17)启动次数计算标准
IOS 平台应用退到后台就算一次独立的启动;Android 平台咱们规定,两次启动之间的间隔小于 30 秒,被计算一次启动。用户在使用过程当中,若因收发短信或接电话等退出应用 30 秒又再次返回应用中,那这两次行为应该是延续而非独立的,因此能够被算做一次使用行为,即一次启动。业内大多使用 30 秒这个标准,但用户仍是能够自定义此时间间隔。css
一、日活用户统计html
1)实现收集手机 APP 日志。
2)按期离线分析手机 APP 新增用户、活跃用户、沉默用户、启动次数、版本分布、和留存用户等业务指标。
3)在数据展现服务上能够查询结果。java
1)Kafka_2.11-0.11.0.0
2)Zookeeper3.4.10
3)Hadoop2.7.2
4)Flume1.7.0
5)Tomcat7.0.72
6)Mysql 5.6.24
7)SSM 框架
8)echarts.jsnode
File --> New --> Project --> Java --> JDK 1.8 --> Nextmysql
点击 Nextlinux
输入项目名称和工做空间地址android
1)建立 app_logs_common
工程,主要用于编写公共的 javabean 和工具类。
2)添加 maven 支持,在 app_logs_common
模块上右键,选择 Add Framework Support… --> 勾选maven,而后点击 OK。ios
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.atguigu</groupId>
<artifactId>app_logs_common</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!-- 地理信息工具类 -->
<dependency>
<groupId>com.maxmind.db</groupId>
<artifactId>maxmind-db</artifactId>
<version>1.0.0</version>
</dependency>
<!-- 加载地理信息工具类 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>4.3.4.RELEASE</version>
</dependency>
</dependencies>
</project>
刷新一下 maven
1)建立 package 和 AppBaseLog 基类:com.atguigu.app.common.AppBaseLog
package com.atguigu.app.common;
import java.io.Serializable;
/**
* AppBaseLog 基类
*/
public class AppBaseLog implements Serializable {
private Long createdAtMs; // 日志建立时间
private String appId; // 应用惟一标识
private String tenantId; // 租户惟一标识,企业用户
private String deviceId; // 设备惟一标识
private String appVersion; // 版本
private String appChannel; // 渠道,安装时就在清单中指定了,如 appStore 等
private String appPlatform; // 平台
private String osType; // 操做系统
private String deviceStyle; // 机型
// getter 和 setter 省略
}
package com.atguigu.app.common;
/**
* 应用上报的 app 错误日志相关信息
*/
public class AppErrorLog extends AppBaseLog {
private String errorBrief; // 错误摘要
private String errorDetail; // 错误详情
// getter 和 setter 省略
}
package com.atguigu.app.common;
import java.util.Map;
/**
* 应用上报的事件相关信息
*/
public class AppEventLog extends AppBaseLog {
private String eventId; // 事件惟一标识
private Long eventDurationSecs; // 事件持续时长
private Map<String, String> paramKeyValueMap; // 参数名/值对
// getter 和 setter 省略
}
package com.atguigu.app.common;
/**
* 应用上报的页面相关信息
*/
public class AppPageLog extends AppBaseLog {
private String pageId; // 页面 id
private int visitIndex = 0; // 访问顺序号,0 为第一个页面
private String nextPage; // 下一个访问页面,如为空则表示为退出应用的页面
private Long stayDurationSecs = (long) 0; // 当前页面停留时长
// getter 和 setter 省略
}
package com.atguigu.app.common;
/**
* app 启动日志
*/
public class AppStartupLog extends AppBaseLog {
private String country; // 国家,终端不用上报,服务器自动填充该属性
private String province; // 省份,终端不用上报,服务器自动填充该属性
private String ipAddress; // ip地址
private String network; // 网络
private String carrier; // 运营商
private String brand; // 品牌
private String screenSize; // 分辨率
// getter 和 setter 省略
}
package com.atguigu.app.common;
/**
* 应用上报的使用时长相关信息
*/
public class AppUsageLog extends AppBaseLog {
private Long singleUseDurationSecs; // 单次使用时长(秒数),指一次启动内应用在前台的持续时长
private Long singleUploadTraffic; // 单次使用过程当中的上传流量
private Long singleDownloadTraffic; // 单次使用过程当中的下载流量
// getter 和 setter 省略
}
package com.atguigu.app.common;
/**
* AppLogEntity 日志实体类
* 内部含有各类日志时间的集合
*/
public class AppLogEntity extends AppBaseLog {
private AppStartupLog[] appStartupLogs; // 启动相关信息的数组
private AppPageLog[] appPageLogs; // 页面跳转相关信息的数组
private AppEventLog[] appEventLogs; // 事件相关信息的数组
private AppUsageLog[] appUsageLogs; // app 使用状况相关信息的数组
private AppErrorLog[] appErrorLogs; // 错误相关信息的数组
// getter 和 setter 省略
}
package com.atguigu.app.common;
/**
* 地理信息类
*/
public class GeoInfo {
private String country;
private String province;
// getter 和 setter 省略
}
1)建立一个工具类包:com.atguigu.app.utils
2)编写工具类:
地理信息数据库官方地址:https://www.maxmind.com/zh/geoip2-databases
API 使用说明:http://maxmind.github.io/GeoIP2-java/
package com.atguigu.app.utils;
import com.fasterxml.jackson.databind.JsonNode;
import com.maxmind.db.Reader;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
/**
* 地理工具类,实现经过 ip 查找地址区域
*/
public class GeoUtil {
private static InputStream in;
private static Reader reader;
/**
* 得到国家数据
*/
public static String getCountry(String ip) {
try {
Resource resource = new ClassPathResource("GeoLite2-City.mmdb");
reader = new Reader(resource.getFile());
if (reader != null) {
JsonNode node = reader.get(InetAddress.getByName(ip));
if (node != null) {
JsonNode countryNode = node.get("country");
if (countryNode != null) {
JsonNode namesNode = countryNode.get("names");
if (namesNode != null) {
JsonNode zhNode = namesNode.get("zh-CN");
if (zhNode != null) {
return zhNode.textValue();
}
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return "";
}
/**
* 得到省份数据
*/
public static String getProvince(String ip) {
try {
Resource resource = new ClassPathResource("GeoLite2-City.mmdb");
reader = new Reader(resource.getFile());
if (reader != null) {
JsonNode node = reader.get(InetAddress.getByName(ip));
if (node != null) {
JsonNode subdivisionsNode = node.get("subdivisions");
if (subdivisionsNode != null) {
JsonNode areaNode = subdivisionsNode.get(0);
if (areaNode != null) {
JsonNode namesNode = areaNode.get("names");
if (namesNode != null) {
JsonNode zhNode = namesNode.get("zh-CN");
if (zhNode != null) {
return zhNode.textValue();
}
}
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return "";
}
}
1)建立 app_logs_client 模块
2)添加 maven
3)导入 pom 文件,并刷新一下 maven
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.atguigu</groupId>
<artifactId>app_logs_client</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.atguigu</groupId>
<artifactId>app_logs_common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!-- json 解析 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.6</version>
</dependency>
</dependencies>
</project>
1)建立 package:com.atguigu.app.client
2)编写代码
package com.atguigu.app.client;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.app.common.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
public class GenerateData {
// 0 建立随机数对象
private static Random random = new Random();
// 1 准备基类log的属性值
// 1.1 功能属性值
private static Long[] createdAtMs = initCreatedAtMs(); // 日志建立时间
private static String appId = "sdk34734"; // 应用惟一标识
private static String[] tenantIds = {"cake"}; // 租户惟一标识,企业用户
private static String[] deviceIds = initDeviceId(); // 设备惟一标识
private static String[] appVersions = {"3.2.1", "3.2.2"}; // 版本
private static String[] appChannels = {"youmeng1", "youmeng2"}; // 渠道,安装时就在清单中指定了,如 appStore 等
private static String[] appPlatforms = {"android", "ios"}; // 平台
private static String[] osTypes = {"8.3", "7.1.1"}; // 操做系统
private static String[] deviceStyles = {"iPhone 6", "iPhone 6 Plus", "红米手机1s"}; // 机型
// 1.1.1 初始化设备 id
private static String[] initDeviceId() {
String base = "device22";
String[] result = new String[100];
for (int i = 0; i < 100; i++) {
result[i] = base + i + "";
}
return result;
}
// 1.1.2 初始化建立时间
private static Long[] initCreatedAtMs() {
Long createdAtMs = System.currentTimeMillis();
Long[] result = new Long[11];
for (int i = 0; i < 10; i++) {
result[i] = createdAtMs - (long) (i * 24 * 3600 * 1000);
}
result[10] = createdAtMs;
return result;
}
// 1.2 启动日志属性值
private static String[] countrys = {"America", "china"}; // 国家,终端不用上报,服务器自动填充该属性
private static String[] provinces = {"Washington", "jiangxi", "beijing"}; // 省份,终端不用上报,服务器自动填充该属性
private static String[] networks = {"WiFi", "CellNetwork"}; // 网络
private static String[] carriers = {"中国移动", "中国电信", "EE"}; // 运营商
private static String[] brands = {"三星", "华为", "Apple", "魅族", "小米", "锤子"}; // 品牌
private static String[] screenSizes = {"1136*640", "960*640", "480*320"}; // 分辨率
// 1.3 事件日志属性值
private static String[] eventIds = {"popMenu", "autoImport", "BookStore"}; // 事件惟一标识
private static Long[] eventDurationSecsS = {new Long(25), new Long(67), new Long(45)}; // 事件持续时长
static Map<String, String> map1 = new HashMap<String, String>() {
{
put("testparam1key", "testparam1value");
put("testparam2key", "testparam2value");
}
};
static Map<String, String> map2 = new HashMap<String, String>() {
{
put("testparam3key", "testparam3value");
put("testparam4key", "testparam4value");
}
};
private static Map[] paramKeyValueMapsS = {map1, map2}; // 参数名/值对
// 1.4 使用时长日志属性值
private static Long[] singleUseDurationSecsS = initSingleUseDurationSecs(); // 单次使用时长(秒数),指一次启动内应用在前台的持续时长
// 1.4.1 单次使用时长
private static Long[] initSingleUseDurationSecs() {
Long[] result = new Long[200];
for (int i = 1; i < 200; i++) {
result[i] = (long) random.nextInt(200);
}
return result;
}
// 1.5 错误日志属性值
private static String[] errorBriefs = {"at cn.lift.dfdf.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)", "at cn.lift" +
".appIn.control.CommandUtil.getInfo(CommandUtil.java:67)"}; // 错误摘要
private static String[] errorDetails = {"java.lang.NullPointerException\\n " + "at cn.lift.appIn.web.AbstractBaseController.validInbound" +
"(AbstractBaseController.java:72)\\n " + "at cn.lift.dfdf.web.AbstractBaseController.validInbound", "at cn.lift.dfdfdf.control" +
".CommandUtil.getInfo(CommandUtil.java:67)\\n " + "at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl" +
".java:43)\\n" + " at java.lang.reflect.Method.invoke(Method.java:606)\\n"}; // 错误详情
// 1.6 页面使用状况日志属性值
private static String[] pageIds = {"list.html", "main.html", "test.html"}; // 页面 id
private static int[] visitIndexs = {0, 1, 2, 3, 4}; // 访问顺序号,0为第一个页面
private static String[] nextPages = {"list.html", "main.html", "test.html", null}; // 下一个访问页面,如为空则表示为退出应用的页面
private static Long[] stayDurationSecsS = {new Long(45), new Long(2), new Long(78)};// 当前页面停留时长
// 2 初始化五类 log 的数据
// 启动相关信息的数组
private static AppStartupLog[] appStartupLogs = initAppStartupLogs();
// 页面跳转相关信息的数组
private static AppPageLog[] appPageLogs = initAppPageLogs();
// 事件相关信息的数组
private static AppEventLog[] appEventLogs = initAppEventLogs();
// app 使用状况相关信息的数组
private static AppUsageLog[] appUsageLogs = initAppUsageLogs();
// 错误相关信息的数组
private static AppErrorLog[] appErrorLogs = initAppErrorLogs();
// 2.1 初始化每类 log 的公共属性值
private static void initLogCommon(AppBaseLog baselog) {
// 日志建立时间
baselog.setCreatedAtMs(System.currentTimeMillis());
// appid
baselog.setAppId(appId);
// 租户惟一标识,企业用户
String tenantId = tenantIds[random.nextInt(tenantIds.length)];
if (tenantId != null) {
baselog.setTenantId(tenantId);
}
baselog.setTenantId(tenantIds[random.nextInt(tenantIds.length)]);
// 设备惟一标识
baselog.setDeviceId(deviceIds[random.nextInt(deviceIds.length)]);
// 版本
baselog.setAppVersion(appVersions[random.nextInt(appVersions.length)]);
// 渠道
baselog.setAppChannel(appChannels[random.nextInt(appChannels.length)]);
// 平台
baselog.setAppPlatform(appPlatforms[random.nextInt(appPlatforms.length)]);
// 操做系统
baselog.setOsType(osTypes[random.nextInt(osTypes.length)]);
// 机型
baselog.setDeviceStyle(deviceStyles[random.nextInt(deviceStyles.length)]);
}
// 2.2 启动相关信息的数组
private static AppStartupLog[] initAppStartupLogs() {
AppStartupLog[] result = new AppStartupLog[10];
for (int i = 0; i < 10; i++) {
AppStartupLog appStartupLog = new AppStartupLog();
// 初始化公共屬性值
initLogCommon(appStartupLog);
// 国家
appStartupLog.setCountry(countrys[random.nextInt(countrys.length)]);
// 省份
appStartupLog.setProvince(provinces[random.nextInt(provinces.length)]);
// 网络
appStartupLog.setNetwork(networks[random.nextInt(networks.length)]);
// 运营商
appStartupLog.setCarrier(carriers[random.nextInt(carriers.length)]);
// 品牌
appStartupLog.setBrand(brands[random.nextInt(brands.length)]);
// 分辨率
appStartupLog.setScreenSize(screenSizes[random.nextInt(screenSizes.length)]);
result[i] = appStartupLog;
}
return result;
}
// 2.3 页面跳转相关信息的数组
private static AppPageLog[] initAppPageLogs() {
AppPageLog[] result = new AppPageLog[10];
for (int i = 0; i < 10; i++) {
AppPageLog appPageLog = new AppPageLog();
// 初始化公共屬性值
initLogCommon(appPageLog);
// 页面 id
String pageId = pageIds[random.nextInt(pageIds.length)];
appPageLog.setPageId(pageId);
// 访问頁面顺序号
int visitIndex = visitIndexs[random.nextInt(visitIndexs.length)];
appPageLog.setVisitIndex(visitIndex);
// 下一个访问页面,如为空则表示为退出应用的页面
String nextPage = nextPages[random.nextInt(nextPages.length)];
while (pageId.equals(nextPage)) {
nextPage = nextPages[random.nextInt(nextPages.length)];
}
appPageLog.setNextPage(nextPage);
// 当前页面停留时长
Long stayDurationSecs = stayDurationSecsS[random.nextInt(stayDurationSecsS.length)];
appPageLog.setStayDurationSecs(stayDurationSecs);
result[i] = appPageLog;
}
return result;
}
// 2.4 事件相关信息的数组
private static AppEventLog[] initAppEventLogs() {
AppEventLog[] result = new AppEventLog[10];
for (int i = 0; i < 10; i++) {
AppEventLog appEventLog = new AppEventLog();
// 初始化公共屬性值
initLogCommon(appEventLog);
// 事件惟一标识
appEventLog.setEventId(eventIds[random.nextInt(eventIds.length)]);
// 事件持续时长
appEventLog.setEventDurationSecs(eventDurationSecsS[random.nextInt(eventDurationSecsS.length)]);
// 事件参数
appEventLog.setParamKeyValueMap(paramKeyValueMapsS[random.nextInt(paramKeyValueMapsS.length)]);
result[i] = appEventLog;
}
return result;
}
// 2.5 app 使用状况相关信息的数组
private static AppUsageLog[] initAppUsageLogs() {
AppUsageLog[] result = new AppUsageLog[10];
for (int i = 0; i < 10; i++) {
AppUsageLog appUsageLog = new AppUsageLog();
// 初始化公共屬性值
initLogCommon(appUsageLog);
// 单次使用时长(秒数),指一次启动内应用在前台的持续时长
appUsageLog.setSingleUseDurationSecs(singleUseDurationSecsS[random.nextInt(singleUseDurationSecsS.length)]);
result[i] = appUsageLog;
}
return result;
}
// 2.6 错误相关信息的数组
private static AppErrorLog[] initAppErrorLogs() {
AppErrorLog[] result = new AppErrorLog[10];
for (int i = 0; i < 10; i++) {
AppErrorLog appErrorLog = new AppErrorLog();
initLogCommon(appErrorLog);
// 错误摘要
appErrorLog.setErrorBrief(errorBriefs[random.nextInt(errorBriefs.length)]);
// 错误详情
appErrorLog.setErrorDetail(errorDetails[random.nextInt(errorDetails.length)]);
result[i] = appErrorLog;
}
return result;
}
// 3 循环发送数据
public static void main(String[] args) {
// 发送数据
for (int i = 1; i <= 200000000; i++) {
AppLogEntity logEntity = new AppLogEntity();
// 封装5 种 log 数据
logEntity.setAppStartupLogs(new AppStartupLog[]{appStartupLogs[random.nextInt(appStartupLogs.length)]});
logEntity.setAppEventLogs(new AppEventLog[]{appEventLogs[random.nextInt(appEventLogs.length)]});
logEntity.setAppErrorLogs(new AppErrorLog[]{appErrorLogs[random.nextInt(appErrorLogs.length)]});
logEntity.setAppPageLogs(new AppPageLog[]{appPageLogs[random.nextInt(appPageLogs.length)]});
logEntity.setAppUsageLogs(new AppUsageLog[]{appUsageLogs[random.nextInt(appUsageLogs.length)]});
try {
// 将对象转换成 json string
String json = JSONObject.toJSONString(logEntity);
// 网络请求发送 json 数据
UploadUtil.upload(json);
// 每隔 5 秒发送一条数据
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
package com.atguigu.app.client;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
/**
* 数据上传的工具类
*/
public class UploadUtil {
/**
* 上传日志
*/
public static void upload(String json) throws Exception {
try {
// 1 设置请求的 URL
// URL url = new URL("http://hadoop102:8080/app_logs/coll/index"); // 生产地址
URL url = new URL("http://localhost:8080/coll/index"); // 测试地址
// 2 获取链接
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
// 2.1 设置请求方式为 post
conn.setRequestMethod("POST");
// 2.2 容许上传数据使能
conn.setDoOutput(true);
// 2.3 时间头用来供 server 进行时钟校对
conn.setRequestProperty("clientTime", System.currentTimeMillis() + "");
// 2.4 设置请求的头信息,设置内容类型
conn.setRequestProperty("Content-Type", "application/json");
// 3 获取输出流
OutputStream out = conn.getOutputStream();
// 3.1 向输出流里面写数据
out.write(json.getBytes());
out.flush();
// 3.2 关闭资源
out.close();
// 4 获取响应码
int code = conn.getResponseCode();
System.out.println(code);
} catch (Exception e) {
e.printStackTrace();
}
}
}
1)建立 web 工程:app_logs_collect_web,输入项目名称和工做空间地址,点击完成便可。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.atguigu</groupId>
<artifactId>app_logs_collect_web</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>war</packaging>
<!-- 日志框架版本号 -->
<properties>
<log4j.version>1.2.17</log4j.version>
<slf4j.version>1.7.22</slf4j.version>
</properties>
<dependencies>
<!-- spring 框架 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>4.3.4.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>4.3.4.RELEASE</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.4.2</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<version>3.0.1</version>
<scope>provided</scope>
</dependency>
<!-- kafka 框架 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.1</version>
</dependency>
<!-- 打印日志框架 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<!-- jackson json 解析框架-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.8.8</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.8.3</version>
</dependency>
<!-- fastjson json 解析框架-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.24</version>
</dependency>
<!-- 项目公共模块 -->
<dependency>
<groupId>com.atguigu</groupId>
<artifactId>app_logs_common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!-- 加载地理信息 -->
<dependency>
<groupId>com.maxmind.db</groupId>
<artifactId>maxmind-db</artifactId>
<version>1.0.0</version>
</dependency>
</dependencies>
<!-- tomcat 插件-->
<build>
<plugins>
<plugin>
<groupId>org.apache.tomcat.maven</groupId>
<artifactId>tomcat7-maven-plugin</artifactId>
<version>2.2</version>
<configuration>
<!-- http port -->
<port>8080</port>
<path>/</path>
</configuration>
</plugin>
</plugins>
</build>
</project>
<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd"
version="3.1">
<!-- 1 spring 加载配置 -->
<listener>
<listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
</listener>
<context-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:beans.xml</param-value>
</context-param>
<!-- 2 springmvc 加载配置 -->
<servlet>
<servlet-name>dispatcher</servlet-name>
<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<init-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:dispatcher-servlet.xml</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
</servlet>
<servlet-mapping>
<servlet-name>dispatcher</servlet-name>
<url-pattern>/</url-pattern>
</servlet-mapping>
<!-- 3 解决传输中文乱码 -->
<filter>
<filter-name>CharacterEncodingFilter</filter-name>
<filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
<init-param>
<param-name>encoding</param-name>
<param-value>UTF-8</param-value>
</init-param>
<init-param>
<param-name>forceEncoding</param-name>
<param-value>true</param-value>
</init-param>
</filter>
<filter-mapping>
<filter-name>CharacterEncodingFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
</web-app>
1)dispatcher-servlet.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.2.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.2.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd
http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-3.2.xsd">
<!-- 配置扫描路径 -->
<context:component-scan base-package="com.atguigu.applogs.collect.web.controller"/>
<!-- 使用注解驱动 -->
<mvc:annotation-driven/>
<!-- 内部资源视图解析器 -->
<bean class="org.springframework.web.servlet.view.InternalResourceViewResolver">
<property name="prefix" value="/"/>
<property name="suffix" value=".jsp"/>
</bean>
</beans>
2)建立包名:com.atguigu.applogs.collect.web.controller
beans.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
">
<!--配置路径扫描-->
<context:component-scan base-package="com.atguigu.applogs.collect.web"/>
</beans>
log4j.properties
log4j.rootLogger=info, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%5L) : %m%n
log4j.appender.R=org.apache.log4j.RollingFileAppender
log4j.appender.R.File=analysis.log
log4j.appender.R.MaxFileSize=1024KB
log4j.appender.R.MaxBackupIndex=1
log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%6L) : %m%n
编写代码
package com.atguigu.applogs.collect.web.controller;
import com.atguigu.app.common.AppLogEntity;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.servlet.http.HttpServletRequest;
@Controller
@RequestMapping("/coll")
public class CollectLogController {
@RequestMapping(value = "/index", method = RequestMethod.POST)
@ResponseBody
public AppLogEntity collect(@RequestBody AppLogEntity e, HttpServletRequest req) {
// 1 修正服务器和客户端时间
verifyTime(e, req);
// 2 获取国家、省份和 ip 地址信息
processIp(e, req);
// 3 向 Kafka 发送消息
sendMessage(e);
return e;
}
}
1)分析
// 修正时间
private void verifyTime(AppLogEntity e, HttpServletRequest req) {
// 1 获取服务器时间
long myTime = System.currentTimeMillis();
// 2 获取客户端时间
long clientTime = Long.parseLong(req.getHeader("clientTime"));
// 3 计算服务器和客户端时间差
long diff = myTime - clientTime;
// 4 根据时间差,修正日志中时间
for (AppStartupLog log : e.getAppStartupLogs()) {
log.setCreatedAtMs(log.getCreatedAtMs() + diff);
}
for (AppUsageLog log : e.getAppUsageLogs()) {
log.setCreatedAtMs(log.getCreatedAtMs() + diff);
}
for (AppPageLog log : e.getAppPageLogs()) {
log.setCreatedAtMs(log.getCreatedAtMs() + diff);
}
for (AppEventLog log : e.getAppEventLogs()) {
log.setCreatedAtMs(log.getCreatedAtMs() + diff);
}
for (AppErrorLog log : e.getAppErrorLogs()) {
log.setCreatedAtMs(log.getCreatedAtMs() + diff);
}
}
1)在 resource 路径下添加 GeoLite2-City.mmdb 资源
2)编码实现
根据 ip 地址查询国家和省份信息,并作缓存处理。
// 处理 ip client 地址问题
private void processIp(AppLogEntity e, HttpServletRequest req) {
// 1 获取客户端 ip 地址
String clientIP = req.getRemoteAddr();
// 2 从缓存中获取数据
GeoInfo geoInfo = cache.get(clientIP);
// 若是该客户端 ip 地址没有获取过国家和省份信息,则经过工具类获取
// 若是该客户端 ip 地址已经获取过国家和省份信息,则直接从缓存对象中获取
if (geoInfo == null) {
geoInfo = new GeoInfo();
geoInfo.setCountry(GeoUtil.getCountry(clientIP));
geoInfo.setProvince(GeoUtil.getProvince(clientIP));
// 缓存数据
cache.put(clientIP, geoInfo);
}
// 3 设置国家、省份和客户端 ip 地址信息
for (AppStartupLog log : e.getAppStartupLogs()) {
log.setCountry(geoInfo.getCountry());
log.setProvince(geoInfo.getProvince());
log.setIpAddress(clientIP);
}
}
缓存代码
// 缓存地址信息
private Map<String, GeoInfo> cache = new HashMap<String, GeoInfo>();
1)在 app_logs_common 模块中在 com.atguigu.app.common 包下添加常量(5个 topic 主题)
package com.atguigu.app.common;
/**
* 常量类
*/
public class Constants {
// 主题
public static final String TOPIC_APP_STARTUP = "topic_app_startup";
public static final String TOPIC_APP_ERRROR = "topic_app_error";
public static final String TOPIC_APP_EVENT = "topic_app_event";
public static final String TOPIC_APP_USAGE = "topic_app_usage";
public static final String TOPIC_APP_PAGE = "topic_app_page";
}
2)编写代码(app_logs_collect_web 模块)
// 发送消息给发 Kafka
private void sendMessage(AppLogEntity e) {
// 1 建立配置对象
Properties props = new Properties();
// 1.1 Kafka 服务端的主机名和端口号
props.put("bootstrap.servers", "hadoop102:9092");
// 1.2 等待全部副本节点的应答
props.put("acks", "all");
// 1.3 消息发送最大尝试次数
props.put("retries", 0);
// 1.4 一批消息处理大小
props.put("batch.size", 16384);
// 1.5 请求延时
props.put("linger.ms", 1);
// 1.6 发送缓存区内存大小
props.put("buffer.memory", 33554432);
// 1.7 key 序列化
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 1.8 value 序列化
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 2 建立生产者
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
// 3 根据日志类型分别向 5 个主题发送消息
sendSingleLog(producer, Constants.TOPIC_APP_STARTUP, e.getAppStartupLogs());
sendSingleLog(producer, Constants.TOPIC_APP_ERRROR, e.getAppErrorLogs());
sendSingleLog(producer, Constants.TOPIC_APP_EVENT, e.getAppEventLogs());
sendSingleLog(producer, Constants.TOPIC_APP_PAGE, e.getAppPageLogs());
sendSingleLog(producer, Constants.TOPIC_APP_USAGE, e.getAppUsageLogs());
// 4 关闭生产者
producer.close();
}
// 发送单个的 log 消息给 kafka
private void sendSingleLog(KafkaProducer<String, String> producer, String topic, AppBaseLog[] logs) {
for (AppBaseLog log : logs) {
// 1 将 bean 对象转换为 json
String logMsg = JSONObject.toJSONString(log);
// 2 建立待发送消息对象
ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, logMsg);
// 3 发送消息
producer.send(data);
}
}
完整示例代码以下:
com\atguigu\applogs\collect\web\controller\CollectLogController.java
package com.atguigu.applogs.collect.web.controller;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.app.common.*;
import com.atguigu.app.utils.GeoUtil;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.servlet.http.HttpServletRequest;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@Controller
@RequestMapping("/coll")
public class CollectLogController {
// 缓存地址信息
private Map<String, GeoInfo> cache = new HashMap<String, GeoInfo>();
@RequestMapping(value = "/index", method = RequestMethod.POST)
@ResponseBody
public AppLogEntity collect(@RequestBody AppLogEntity e, HttpServletRequest req) {
// 1 修正服务器和客户端时间
verifyTime(e, req);
// 2 获取国家、省份和 ip 地址信息
processIp(e, req);
// 3 向 Kafka 发送消息
sendMessage(e);
return e;
}
// 修正时间
private void verifyTime(AppLogEntity e, HttpServletRequest req) {
// 1 获取服务器时间
long myTime = System.currentTimeMillis();
// 2 获取客户端时间
long clientTime = Long.parseLong(req.getHeader("clientTime"));
// 3 计算服务器和客户端时间差
long diff = myTime - clientTime;
// 4 根据时间差,修正日志中时间
for (AppStartupLog log : e.getAppStartupLogs()) {
log.setCreatedAtMs(log.getCreatedAtMs() + diff);
}
for (AppUsageLog log : e.getAppUsageLogs()) {
log.setCreatedAtMs(log.getCreatedAtMs() + diff);
}
for (AppPageLog log : e.getAppPageLogs()) {
log.setCreatedAtMs(log.getCreatedAtMs() + diff);
}
for (AppEventLog log : e.getAppEventLogs()) {
log.setCreatedAtMs(log.getCreatedAtMs() + diff);
}
for (AppErrorLog log : e.getAppErrorLogs()) {
log.setCreatedAtMs(log.getCreatedAtMs() + diff);
}
}
// 处理 ip client 地址问题
private void processIp(AppLogEntity e, HttpServletRequest req) {
// 1 获取客户端 ip 地址
String clientIP = req.getRemoteAddr();
// 2 从缓存中获取数据
GeoInfo geoInfo = cache.get(clientIP);
// 若是该客户端 ip 地址没有获取过国家和省份信息,则经过工具类获取
// 若是该客户端 ip 地址已经获取过国家和省份信息,则直接从缓存对象中获取
if (geoInfo == null) {
geoInfo = new GeoInfo();
geoInfo.setCountry(GeoUtil.getCountry(clientIP));
geoInfo.setProvince(GeoUtil.getProvince(clientIP));
// 缓存数据
cache.put(clientIP, geoInfo);
}
// 3 设置国家、省份和客户端 ip 地址信息
for (AppStartupLog log : e.getAppStartupLogs()) {
log.setCountry(geoInfo.getCountry());
log.setProvince(geoInfo.getProvince());
log.setIpAddress(clientIP);
}
}
// 发送消息给发 Kafka
private void sendMessage(AppLogEntity e) {
// 1 建立配置对象
Properties props = new Properties();
// 1.1 Kafka 服务端的主机名和端口号
props.put("bootstrap.servers", "hadoop102:9092");
// 1.2 等待全部副本节点的应答
props.put("acks", "all");
// 1.3 消息发送最大尝试次数
props.put("retries", 0);
// 1.4 一批消息处理大小
props.put("batch.size", 16384);
// 1.5 请求延时
props.put("linger.ms", 1);
// 1.6 发送缓存区内存大小
props.put("buffer.memory", 33554432);
// 1.7 key 序列化
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 1.8 value 序列化
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 2 建立生产者
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
// 3 根据日志类型分别向 5 个主题发送消息
sendSingleLog(producer, Constants.TOPIC_APP_STARTUP, e.getAppStartupLogs());
sendSingleLog(producer, Constants.TOPIC_APP_ERRROR, e.getAppErrorLogs());
sendSingleLog(producer, Constants.TOPIC_APP_EVENT, e.getAppEventLogs());
sendSingleLog(producer, Constants.TOPIC_APP_PAGE, e.getAppPageLogs());
sendSingleLog(producer, Constants.TOPIC_APP_USAGE, e.getAppUsageLogs());
// 4 关闭生产者
producer.close();
}
// 发送单个的 log 消息给 kafka
private void sendSingleLog(KafkaProducer<String, String> producer, String topic, AppBaseLog[] logs) {
for (AppBaseLog log : logs) {
// 1 将 bean 对象转换为 json
String logMsg = JSONObject.toJSONString(log);
// 2 建立待发送消息对象
ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, logMsg);
// 3 发送消息
producer.send(data);
}
}
}
0)因为目前咱们 Kafka 集群尚未配置出来,因此咱们向 Kafka 发送消息,并不能获得 Kafka 的及时应答,客户端因为收不到应答,就会卡死在那,因此为了测试方便,咱们先将向 Kafka 发送消息注释掉,以下:
2)配置 web 执行程序方式
① 点击 Edit Configurations…
3)向 web 工程中添加 jar 包依赖
1)启动 tomact7
2)启动客户端日志生成程序 app_statistics_project\app_logs_client\src\main\java\com\atguigu\app\client\GenerateData.java,查看是否正确收到数据响应码 200。
1)具体配置详见:http://www.javashuo.com/article/p-gbhycygo-kb.html
2)先启动 zookeeper 集群
[atguigu@hadoop102 zookeeper-3.4.10]$ bin/zkServer.sh start
[atguigu@hadoop103 zookeeper-3.4.10]$ bin/zkServer.sh start
[atguigu@hadoop104 zookeeper-3.4.10]$ bin/zkServer.sh start
0)具体配置详见:http://www.javashuo.com/article/p-sewqjxhc-cz.html
1)再启动 kafka 集群
[atguigu@hadoop102 kafka]$ bin/kafka-server-start.sh config/server.properties &
[atguigu@hadoop103 kafka]$ bin/kafka-server-start.sh config/server.properties &
[atguigu@hadoop104 kafka]$ bin/kafka-server-start.sh config/server.properties &
2)查看主题 topic
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --list
3)建立 kafka 的 topic
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --create --zookeeper hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic topic_app_startup
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --create --zookeeper hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic topic_app_error
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --create --zookeeper hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic topic_app_event
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --create --zookeeper hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic topic_app_usage
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --create --zookeeper hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic topic_app_page
1)建立消费者主题(主要用于测试数据是否可以接收到)
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic topic_app_startup
2)启动客户端日志生成程序。
注意:此时咱们已经配置好 Kafka 集群了,须要将向 Kafka 发送消息的代码注释取消,
代码位置:D:\learn\JetBrains\workspace_idea3\app_statistics_project\app_logs_collect_web\src\main\java\com\atguigu\applogs\collect\web\controller\CollectLogController.java)
3)观察 kafka 消费者是否正常消费到消息。
1)在 linux 上安装 Tomcat
(1)将 apache-tomcat-7.0.72.tar.gz 导入到 linux 的 /opt/software 路径下
(2)解压 apache-tomcat-7.0.72.tar.gz 到 /opt/module/ 路径下
[atguigu@hadoop102 software]$ tar -zxf apache-tomcat-7.0.72.tar.gz -C /opt/module/
(3)修更名称为 tomcat
[atguigu@hadoop102 module]$ mv apache-tomcat-7.0.72 tomcat
2)部署 Web 程序到 Tomcat
(1)将 web 工程(app_logs_collect_web)
打成 war 包,名称为app_logs.war,步骤以下:
a) 检查 pom 文件中是否添加,<packaging>war</packaging>
b) 在 maven project 中找到 app_logs_collect_web
工程,点击 clean,而后点击 package。生成 war 包。
app_logs_collect_web-1.0-SNAPSHOT.war
的名称为
app_logs.war
(2)将 app_logs.war 导入到 linux 的 /opt/module/tomcat/webapps 路径下
(3)启动 Tomcat
[atguigu@hadoop102 bin]$ pwd
/opt/module/tomcat/bin
[atguigu@hadoop102 bin]$ ./startup.sh
修改 app_logs_client
工程中的 UploadUtil.java 中的请求地址为:
URL url = new URL("http://hadoop102:8080/app_logs/coll/index");// 生产地址
执行日志生成程序,观察可否正确收到响应码 200,不然须要进一步调试。
1)配置 hadoop 集群,详见:http://www.javashuo.com/article/p-uyrkkoeu-gt.html
2)启动 hadoop 集群
[atguigu@hadoop102 hadoop-2.7.2]$ sbin/start-dfs.sh
[atguigu@hadoop103 hadoop-2.7.2]$ sbin/start-yarn.sh
1)将 apache-flume-1.7.0-bin.tar.gz 导入到 Linux 系统中 /opt/sotfware 目录下
2)解压 apache-flume-1.7.0-bin.tar.gz 到 /opt/module 目录下
[atguigu@hadoop102 software]$ tar -zxf apache-flume-1.7.0-bin.tar.gz -C /opt/module/
3)修改 apache-flume-1.7.0-bin 名称为 flume
[atguigu@hadoop102 module]$ mv apache-flume-1.7.0-bin/ flume
4)使用 root 用户,配置 flume 环境变量
[root@hadoop102 flume]# vim /etc/profile
#FLUME_HOME
export FLUME_HOME=/opt/module/flume
export PATH=$PATH:$FLUME_HOME/bin
[root@hadoop102 flume]# source /etc/profile
5)验证 flume 环境变量配置
[root@hadoop102 flume]# flume-ng version
Flume 1.7.0
1)建立拦截器主要目的是区分 kafka 传递过来的日志类型。
2)自定义拦截器实操
(1)建立 Java 工程 app_logs_flume
(2)添加 maven 框架支持,并导入 pom 文件,并刷新一下 maven
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.atguigu</groupId>
<artifactId>app_logs_flume</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.7.0</version>
</dependency>
</dependencies>
</project>
2)代码实现
a) 建立包名:com.atguigu.app.flume.interceptor
b) 根据系统时间拦截器自定义拦截器:Ctrl + Shift + t,输入TimestampInterceptor,将系统时间拦截器中代码拷贝过来,替换 TimestampInterceptor 的名称为 LogCollInterceptor。
c) 编写链接器业务代码
package com.atguigu.app.flume.interceptor;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.List;
import java.util.Map;
import static org.apache.flume.interceptor.TimestampInterceptor.Constants.*;
/**
* 自定义 flume 的拦截器,提取 body 中的日志类型做为header
*/
public class LogCollInterceptor implements Interceptor {
private final boolean preserveExisting;
private LogCollInterceptor(boolean preserveExisting) {
this.preserveExisting = preserveExisting;
}
public void initialize() {
}
/**
* Modifies events in-place.
*/
public Event intercept(Event event) {
// 1 获取 flume 接收消息头
Map<String, String> headers = event.getHeaders();
// 2 获取 flume 接收的 json 数据数组
byte[] json = event.getBody();
// 将 json 数组转换为字符串
String jsonStr = new String(json);
// pageLog
String logType = "";
if (jsonStr.contains("pageId")) {
logType = "page";
}
// eventLog
else if (jsonStr.contains("eventId")) {
logType = "event";
}
// usageLog
else if (jsonStr.contains("singleUseDurationSecs")) {
logType = "usage";
}
// error
else if (jsonStr.contains("errorBrief")) {
logType = "error";
}
// startup
else if (jsonStr.contains("network")) {
logType = "startup";
}
// 3 将日志类型存储到 flume 头中
headers.put("logType", logType);
return event;
}
/**
* Delegates to {@link #intercept(Event)} in a loop.
*
* @param events
* @return
*/
public List<Event> intercept(List<Event> events) {
for (Event event : events) {
intercept(event);
}
return events;
}
public void close() {
}
public static class Builder implements Interceptor.Builder {
private boolean preserveExisting = PRESERVE_DFLT;
public Interceptor build() {
return new LogCollInterceptor(preserveExisting);
}
public void configure(Context context) {
preserveExisting = context.getBoolean(PRESERVE, PRESERVE_DFLT);
}
}
public static class Constants {
public static String TIMESTAMP = "timestamp";
public static String PRESERVE = "preserveExisting";
public static boolean PRESERVE_DFLT = false;
}
}
(3)打成 jar 包,修改 jar 包名称为 app_logs_flume.jar,而后将该 jar 包导入 linux 虚拟机 /opt/module/flume/lib 目录下。
1)配置需求:实现消费 kafka 的 5 个主题,并把数据导入到 HDFS 文件系统。
2)配置实现:注意本博主将 flume 的配置文件放到自定义的目录中 /opt/module/flume/job/app_logs
[atguigu@hadoop102 app_logs]$ pwd
/opt/module/flume/job/app_logs
[atguigu@hadoop102 app_logs]$ vim kafka-flume-hdfs.conf
打开文件,添加如下内容:
a1.sources = r1
a1.channels = c1
a1.sinks = k1
# 新加内容 flume 拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.app.flume.interceptor.LogCollInterceptor$Builder
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092
a1.sources.r1.kafka.zookeeperConnect = hadoop102:2181,hadoop103:2181,hadoop104:2181
a1.sources.r1.kafka.topics=topic_app_startup,topic_app_error,topic_app_event,topic_app_usage,topic_app_page
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 10000
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/centos/applogs/%{logType}/%Y%m/%d/%H%M
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 30
a1.sinks.k1.hdfs.roundUnit = second
#不要产生大量小文件
a1.sinks.k1.hdfs.rollInterval = 30
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 0
#控制输出文件是原生文件
a1.sinks.k1.hdfs.fileType = DataStream
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
3)启动 flume
[atguigu@hadoop102 conf]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/app_logs/kafka-flume-hdfs.conf -Dflume.root.logger=INFO,console 或者
[atguigu@hadoop102 conf]$ bin/flume-ng agent -c conf/ -n a1 -f job/app_logs/kafka-flume-hdfs.conf -Dflume.root.logger=INFO,console
参数说明:
--conf conf/ :表示配置文件存储在 conf/ 目录,简写为 -c
--name a1 :表示给 agent 起名为 a1(要与配置文件一致),简写为 -n
--conf-file job/app_logs/kafka-flume-hdfs.conf :flume 本次启动读取的配置文件是在 job/app_logs 文件夹下的 kafka-flume-hdfs.conf 文件,简写为 -f
-Dflume.root.logger==INFO,console :-D 表示 flume 运行时动态修改 flume.root.logger 参数属性值,并将控制台日志打印级别设置为 INFO 级别。日志级别包括:log、info、warn、error
1)启动客户端日志生成程序
2)查看 HDFS 的 /user/centos/applogs 路径上是否有数据收到,以下图所示:
具体安装配置,详见:http://www.javashuo.com/article/p-baxyrovp-dy.html
使用 root 用户,查看 mysql 状态
[root@hadoop102 mysql-libs]# service mysql status
使用 root 用户,启动 mysql
[root@hadoop102 mysql-libs]# service mysql start
在 Hive 中采用 Json做为存储格式,须要建表时指定 Serde。insert into 时,Hive 使用 json 格式进行保存,查询时,经过 json 库进行解析。Hive 默认输出是压缩格式,这里改为不压缩。
1)实操
(1)将json-serde-1.3.8-jar-with-dependencies.jar 导入到 hive 的 /opt/module/hive/lib 路径下。
(2)在 /opt/module/hive/conf/hive-site.xml 文件中添加以下配置:
<property>
<name>hive.aux.jars.path</name>
<value>file:///opt/module/hive/lib/json-serde-1.3.8-jar-with-dependencies.jar</value>
</property>
<!-- 输出不压缩 -->
<property>
<name>hive.exec.compress.output</name>
<value>false</value>
</property>
0)启动 hive
[atguigu@hadoop103 hive]$ bin/hive
查看数据库
hive (default)> show databases;
若是 applogsdb 存在则删除数据库
hive (default)> drop database applogsdb;
1)建立数据库
hive (default)> create database applogsdb;
使用 applogsdb 数据库
hive (default)> use applogsdb;
2)建立分区表
--startup
CREATE external TABLE ext_startup_logs(
createdAtMs bigint,
appId string,
tenantId string,
deviceId string,
appVersion string,
appChannel string,
appPlatform string,
osType string,
deviceStyle string,
country string,
province string,
ipAddress string,
network string,
carrier string,
brand string,
screenSize string
)
PARTITIONED BY (ym string, day string,hm string)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
STORED AS TEXTFILE;
--error
CREATE external TABLE ext_error_logs(
createdAtMs bigint,
appId string,
tenantId string,
deviceId string,
appVersion string,
appChannel string,
appPlatform string,
osType string,
deviceStyle string,
errorBrief string,
errorDetail string
)
PARTITIONED BY (ym string, day string, hm string)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
STORED AS TEXTFILE;
--event
CREATE external TABLE ext_event_logs(
createdAtMs bigint,
appId string,
tenantId string,
deviceId string,
appVersion string,
appChannel string,
appPlatform string,
osType string,
deviceStyle string,
eventId string,
eventDurationSecs bigint,
paramKeyValueMap Map<string, string>
)
PARTITIONED BY (ym string, day string, hm string)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
STORED AS TEXTFILE;
--page
CREATE external TABLE ext_page_logs(
createdAtMs bigint,
appId string,
tenantId string,
deviceId string,
appVersion string,
appChannel string,
appPlatform string,
osType string,
deviceStyle string,
pageViewCntInSession int,
pageId string,
visitIndex int,
nextPage string,
stayDurationSecs bigint
)
PARTITIONED BY (ym string, day string, hm string)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
STORED AS TEXTFILE;
--usage
CREATE external TABLE ext_usage_logs(
createdAtMs bigint,
appId string,
tenantId string,
deviceId string,
appVersion string,
appChannel string,
appPlatform string,
osType string,
deviceStyle string,
singleUseDurationSecs bigint,
singleUploadTraffic bigint,
singleDownloadTraffic bigint
)
PARTITIONED BY (ym string, day string, hm string)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
STORED AS TEXTFILE;
3)查看数据库中的分区表
hive (applogsdb)> show tables;
tab_name
ext_error_logs
ext_event_logs
ext_page_logs
ext_startup_logs
ext_usage_logs
4)退出 hive
hive (applogsdb)> quit;
0)需求:实现每隔一分钟将 HDFS 上的数据,导入到 Hive 对应分区中一次。
1)Date 命令
(1)明天
[atguigu@hadoop102 module]$ date -d "1 day" +%Y%m%d
20190527
(2)昨天
[atguigu@hadoop102 module]$ date -d "-1 day" +%Y%m%d
20190525
(3)上一个月
[atguigu@hadoop102 module]$ date -d "-1 month" +%Y%m%d
20190426
(4)前三分钟
[atguigu@hadoop102 module]$ date -d "-3 minute" +%Y%m-%d-%H%M
201905-26-2133
(5)用“-”分割,截取出第一个参数
[atguigu@hadoop102 module]$ date -d "-3 minute" +%Y%m-%d-%H%M | awk -F '-' '{print $1}'
201905
(6)用“-”分割,截取出第二个参数
[atguigu@hadoop102 module]$ date -d "-3 minute" +%Y%m-%d-%H%M | awk -F '-' '{print $2}'
26
(7)用“-”分割,截取出第三个参数
[atguigu@hadoop102 module]$ date -d "-3 minute" +%Y%m-%d-%H%M | awk -F '-' '{print $3}'
2135
2)在 /home/atguigu/bin/applogs 目录下编写 shell 脚本(hdfstohive.sh)
(1)建立文件夹:
[atguigu@hadoop102 bin]$ mkdir applogs
(2)建立脚本 hdfstohive.sh
[atguigu@hadoop102 applogs]$ touch hdfstohive.sh
内容以下:
#!/bin/bash
systime=`date -d "-3 minute" +%Y%m-%d-%H%M`
ym=`echo ${systime} | awk -F '-' '{print $1}'`
day=`echo ${systime} | awk -F '-' '{print $2}'`
hm=`echo ${systime} | awk -F '-' '{print $3}'`
#执行 hive 的命令
hive -e "load data inpath '/user/centos/applogs/startup/${ym}/${day}/${hm}' into table applogsdb.ext_startup_logs partition(ym='${ym}',day='${day}',hm='${hm}')"
hive -e "load data inpath '/user/centos/applogs/error/${ym}/${day}/${hm}' into table applogsdb.ext_error_logs partition(ym='${ym}',day='${day}',hm='${hm}')"
hive -e "load data inpath '/user/centos/applogs/event/${ym}/${day}/${hm}' into table applogsdb.ext_event_logs partition(ym='${ym}',day='${day}',hm='${hm}')"
hive -e "load data inpath '/user/centos/applogs/usage/${ym}/${day}/${hm}' into table applogsdb.ext_usage_logs partition(ym='${ym}',day='${day}',hm='${hm}')"
hive -e "load data inpath '/user/centos/applogs/page/${ym}/${day}/${hm}' into table applogsdb.ext_page_logs partition(ym='${ym}',day='${day}',hm='${hm}')"
(3)修改脚本 hdfstohive.sh 权限
[atguigu@hadoop102 applogs]$ chmod 777 hdfstohive.sh
(4)必须保证 hive 的环境变量已经配置
[root@hadoop102 applogs]# vim /etc/profile
#HIVE_HOME
export HIVE_HOME=/opt/module/hive
export PATH=$PATH:$HIVE_HOME/bin
[root@hadoop102 applogs]# source /etc/profile
(5)执行一下 hive 脚本
[atguigu@hadoop102 applogs]$ ./hdfstohive.sh
(6)查看 hive 数据库
[atguigu@hadoop102 hive]$ bin/hive
hive (default)> show databases;
OK
database_name
applogsdb
default
Time taken: 0.818 seconds, Fetched: 2 row(s)
hive (default)> use applogsdb;
OK
Time taken: 0.023 seconds
hive (applogsdb)> show tables;
OK
tab_name
ext_error_logs
ext_event_logs
ext_page_logs
ext_startup_logs
ext_usage_logs
Time taken: 0.022 seconds, Fetched: 5 row(s)
hive (applogsdb)> select * from ext_error_logs;
1)crontab 经常使用命令
(1)查看状态:
service crond status
(2)中止状态:
service crond stop
(3)启动状态:
service crond start
(4)编辑 crontab 定时任务
crontab -e
(5)查询 crontab 任务
crontab -l
(6)删除当前用户全部的 crontab 任务
crontab -r
2)编写 crontab 调度
(1)进入编写 crontab 调度
[atguigu@hadoop102 applogs]$ crontab -e
(2)实现每分钟执行一次
* * * * * source /etc/profile;/home/atguigu/bin/applogs/hdfstohive.sh
参数解释:
[atguigu@hadoop102 applogs]$ crontab -l
* * * * * source /etc/profile;/home/atguigu/bin/applogs/hdfstohive.sh
1)启动客户端日志生成程序
2)等待三分钟后,检查 HDFS 的 /user/hive/warehouse/applogsdb.db 路径是否有新数据产生。
3)查询全部启动日志信息
[atguigu@hadoop102 hive]$ bin/hive
hive (default)> use applogsdb;
hive (applogsdb)> select * from ext_startup_logs;
4)查询指定 app 的用户数
hive (applogsdb)> select count(distinct deviceid) from ext_startup_logs where appid='sdk34734';
5)统计新增用户
须要自定义 UDF 函数。
0)需求:
根据输入的时间信息,返回当天的起始时间;
根据输入的时间信息,返回本周的起始时间;
根据输入的时间信息,返回本月的起始时间;
根据输入的时间和时间格式化信息,返回按照格式化要求显示的信息。
1)在 jar 工程:app_logs_hive
中的 com.atguigu.hive 目录下编写获取日期开始时间、周开始时间、和月开始时间的工具类
package com.atguigu.hive;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
public class DateUtil {
/**
* 获得指定 date 所在天的的零时刻
*/
public static Date getDayBeginTime(Date d) {
try {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd 00:00:00");
return sdf.parse(sdf.format(d));
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* 获得指定 date 所在天的零时刻(+偏移量)
*/
public static Date getDayBeginTime(Date d, int offset) {
try {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd 00:00:00");
Date beginDate = sdf.parse(sdf.format(d));
Calendar c = Calendar.getInstance();
c.setTime(beginDate);
c.add(Calendar.DAY_OF_MONTH, offset);
return c.getTime();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* 获得指定 date 所在周的起始时刻
*/
public static Date getWeekBeginTime(Date d) {
try {
// 获得 d 的所在天的的零时刻
Date beginDate = getDayBeginTime(d);
Calendar c = Calendar.getInstance();
c.setTime(beginDate);
int n = c.get(Calendar.DAY_OF_WEEK);
c.add(Calendar.DAY_OF_MONTH, -(n - 1));
return c.getTime();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* 获得指定 date 所在周的起始时刻(+偏移量)
*/
public static Date getWeekBeginTime(Date d, int offset) {
try {
// 获得 d 的所在天的的零时刻
Date beginDate = getDayBeginTime(d);
Calendar c = Calendar.getInstance();
c.setTime(beginDate);
int n = c.get(Calendar.DAY_OF_WEEK);
// 定位到本周第一天
c.add(Calendar.DAY_OF_MONTH, -(n - 1));
c.add(Calendar.DAY_OF_MONTH, offset * 7);
return c.getTime();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* 获得指定 date 所在月的起始时刻
*/
public static Date getMonthBeginTime(Date d) {
try {
// 获得 d 的所在天的的零时刻
Date beginDate = getDayBeginTime(d);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/01 00:00:00");
return sdf.parse(sdf.format(beginDate));
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* 获得指定 date 所在月的偏移时刻(+偏移量)
*/
public static Date getMonthBeginTime(Date d, int offset) {
try {
// 获得 d 的所在天的的零时刻
Date beginDate = getDayBeginTime(d);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/01 00:00:00");
// 获得 d 所在月的第一天的零时刻
Date firstDay = sdf.parse(sdf.format(beginDate));
Calendar c = Calendar.getInstance();
c.setTime(firstDay);
// 对月进行滚动
c.add(Calendar.MONTH, offset);
return c.getTime();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
2)编写 DayBeginUDF、WeekBeginUDF、MonthBeginUDF、FormatTimeUDF 函数
(1)建立 java 工程导入 pom.xml 文件
建立 jar 工程:app_logs_hive
添加 maven 框架支持,并导入 pom 文件,并刷新一下 maven
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.atguigu</groupId>
<artifactId>app_logs_hive</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.2.1</version>
</dependency>
</dependencies>
</project>
(2)编写 DayBeginUDF
建立包名:com.atguigu.hive
编写代码:
package com.atguigu.hive;
import org.apache.hadoop.hive.ql.exec.UDF;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* 计算 day 的起始毫秒数
*/
public class DayBeginUDF extends UDF {
// 计算如今的起始时刻(毫秒数)
public long evaluate() throws ParseException {
return evaluate(new Date());
}
// 指定天偏移量
public long evaluate(int offset) throws ParseException {
return evaluate(DateUtil.getDayBeginTime(new Date(), offset));
}
// 计算某天的起始时刻,日期类型(毫秒数)
public long evaluate(Date d) throws ParseException {
return DateUtil.getDayBeginTime(d).getTime();
}
// 计算某天的起始时刻,日期类型,带偏移量(毫秒数)
public long evaluate(Date d, int offset) throws ParseException {
return DateUtil.getDayBeginTime(d, offset).getTime();
}
// 计算某天的起始时刻,String 类型(毫秒数)
public long evaluate(String dateStr) throws ParseException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
Date d = sdf.parse(dateStr);
return evaluate(d);
}
// 计算某天的起始时刻,String 类型,带偏移量(毫秒数)
public long evaluate(String dateStr, int offset) throws ParseException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
Date d = sdf.parse(dateStr);
return DateUtil.getDayBeginTime(d, offset).getTime();
}
// 计算某天的起始时刻,String 类型,带格式化要求(毫秒数)
public long evaluate(String dateStr, String fmt) throws ParseException {
SimpleDateFormat sdf = new SimpleDateFormat(fmt);
Date d = sdf.parse(dateStr);
return DateUtil.getDayBeginTime(d).getTime();
}
// 计算某天的起始时刻,String 类型,带格式化,带偏移量(毫秒数)
public long evaluate(String dateStr, String fmt, int offset) throws ParseException {
SimpleDateFormat sdf = new SimpleDateFormat(fmt);
Date d = sdf.parse(dateStr);
return DateUtil.getDayBeginTime(d, offset).getTime();
}
}
(3)编写 WeekBeginUDF
package com.atguigu.hive;
import org.apache.hadoop.hive.ql.exec.UDF;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* 计算 week 的起始毫秒数
*/
public class WeekBeginUDF extends UDF {
// 计算本周的起始时间,(毫秒数)
public long evaluate() throws ParseException {
return DateUtil.getWeekBeginTime(new Date()).getTime();
}
// 指定周偏移量
public long evaluate(int offset) throws ParseException {
return DateUtil.getWeekBeginTime(new Date(), offset).getTime();
}
// 计算某周的起始时刻,日期类型(毫秒数)
public long evaluate(Date d) throws ParseException {
return DateUtil.getWeekBeginTime(d).getTime();
}
// 计算某周的起始时刻,日期类型,带偏移量(毫秒数)
public long evaluate(Date d, int offset) throws ParseException {
return DateUtil.getWeekBeginTime(d, offset).getTime();
}
// 计算某周的起始时刻,String 类型(毫秒数)
public long evaluate(String dateStr) throws ParseException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
Date d = sdf.parse(dateStr);
return DateUtil.getWeekBeginTime(d).getTime();
}
// 计算某周的起始时刻,String 类型,带偏移量(毫秒数)
public long evaluate(String dateStr, int offset) throws ParseException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
Date d = sdf.parse(dateStr);
return DateUtil.getWeekBeginTime(d, offset).getTime();
}
// 计算某周的起始时刻,String 类型,带格式化要求(毫秒数)
public long evaluate(String dateStr, String fmt) throws ParseException {
SimpleDateFormat sdf = new SimpleDateFormat(fmt);
Date d = sdf.parse(dateStr);
return DateUtil.getWeekBeginTime(d).getTime();
}
// 计算某周的起始时刻,String 类型,带格式化,带偏移量(毫秒数)
public long evaluate(String dateStr, String fmt, int offset) throws ParseException {
SimpleDateFormat sdf = new SimpleDateFormat(fmt);
Date d = sdf.parse(dateStr);
return DateUtil.getWeekBeginTime(d, offset).getTime();
}
}
(4)编写 MonthBeginUDF
package com.atguigu.hive;
import org.apache.hadoop.hive.ql.exec.UDF;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* 计算 month 的起始毫秒数
*/
public class MonthBeginUDF extends UDF {
// 计算本月的起始时刻(毫秒数)
public long evaluate() throws ParseException {
return DateUtil.getMonthBeginTime(new Date()).getTime();
}
// 指定月偏移量
public long evaluate(int offset) throws ParseException {
return DateUtil.getMonthBeginTime(new Date(), offset).getTime();
}
// 计算某月的起始时刻,日期类型(毫秒数)
public long evaluate(Date d) throws ParseException {
return DateUtil.getMonthBeginTime(d).getTime();
}
// 计算某月的起始时刻,日期类型,带偏移量(毫秒数)
public long evaluate(Date d, int offset) throws ParseException {
return DateUtil.getMonthBeginTime(d, offset).getTime();
}
// 计算某月的起始时刻,String 类型(毫秒数)
public long evaluate(String dateStr) throws ParseException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
Date d = sdf.parse(dateStr);
return DateUtil.getMonthBeginTime(d).getTime();
}
// 计算某月的起始时刻,String 类型,带偏移量(毫秒数)
public long evaluate(String dateStr, int offset) throws ParseException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
Date d = sdf.parse(dateStr);
return DateUtil.getMonthBeginTime(d, offset).getTime();
}
// 计算某月的起始时刻,String 类型,带格式化要求(毫秒数)
public long evaluate(String dateStr, String fmt) throws ParseException {
SimpleDateFormat sdf = new SimpleDateFormat(fmt);
Date d = sdf.parse(dateStr);
return DateUtil.getMonthBeginTime(d).getTime();
}
// 计算某月的起始时刻,String 类型,带格式化,带偏移量(毫秒数)
public long evaluate(String dateStr, String fmt, int offset) throws ParseException {
SimpleDateFormat sdf = new SimpleDateFormat(fmt);
Date d = sdf.parse(dateStr);
return DateUtil.getMonthBeginTime(d, offset).getTime();
}
}
(5)编写 FormatTimeUDF
package com.atguigu.hive;
import org.apache.hadoop.hive.ql.exec.UDF;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* 格式化时间
*/
public class FormatTimeUDF extends UDF {
// 根据输入的时间毫秒值(long 类型)和格式化要求,返回 String 类型时间
public String evaluate(long ms, String fmt) throws ParseException {
SimpleDateFormat sdf = new SimpleDateFormat(fmt);
Date d = new Date();
d.setTime(ms);
return sdf.format(d);
}
// 根据输入的时间毫秒值(String 类型)和格式化要求,返回 String 类型时间
public String evaluate(String ms, String fmt) throws ParseException {
SimpleDateFormat sdf = new SimpleDateFormat(fmt);
Date d = new Date();
d.setTime(Long.parseLong(ms));
return sdf.format(d);
}
// 根据输入的时间毫秒值(long 类型)、格式化要求,和区分周的任意值,返回 String 类型时间
public String evaluate(long ms, String fmt, int week) throws ParseException {
Date d = new Date();
d.setTime(ms);
// 周内第一天
Date firstDay = DateUtil.getWeekBeginTime(d);
SimpleDateFormat sdf = new SimpleDateFormat(fmt);
return sdf.format(firstDay);
}
}
3)导出 jar 包(app_logs_hive.jar
)
4)添加 app_logs_hive.jar
到类路径 /opt/module/hive/lib 下
(1)临时添加 jar 包:(每次关闭 hive 后,再次链接 hive 时须要再次添加,太麻烦!)
hive (applogsdb)> add jar /opt/module/hive/lib/app_logs_hive.jar;
(2)永久添加 jar 包:
在 hive-site.xml 文件中添加:
<property>
<name>hive.aux.jars.path</name>
<value>file:///opt/module/hive/lib/app_logs_hive.jar</value>
</property>
因为以前添加过 json 的 jar 包,因此修改成以下方式:
<!-- 能够添加支持 json 格式的 jar 包,自定义的 UDF jar 包等等 -->
<property>
<name>hive.aux.jars.path</name>
<value>file:///opt/module/hive/lib/json-serde-1.3.8-jar-with-dependencies.jar,file:///opt/module/hive/lib/app_logs_hive.jar</value>
</property>
5)注册为永久函数(学习测试阶段为了学习的方便,注册为永久的函数)
hive (applogsdb)> create function getdaybegin AS 'com.atguigu.hive.DayBeginUDF'; # getdaybegin 是函数名称,自定义的,起什么均可以
hive (applogsdb)> create function getweekbegin AS 'com.atguigu.hive.WeekBeginUDF';
hive (applogsdb)> create function getmonthbegin AS 'com.atguigu.hive.MonthBeginUDF';
hive (applogsdb)> create function formattime AS 'com.atguigu.hive.FormatTimeUDF';
注册为临时函数
hive (applogsdb)> create temporary function getdaybegin AS 'com.atguigu.hive.DayBeginUDF';
6)验证函数
登陆 mysql
[atguigu@hadoop102 ~]$ mysql -uroot -p123456
mysql> show databases;
mysql> use metastore;
mysql> show tables;
mysql> select * from FUNCS; # 查看注册的永久函数
查看结果截图
hive (applogsdb)> drop function getdaybegin;
hive (applogsdb)> drop function getweekbegin;
hive (applogsdb)> drop function getmonthbegin;
hive (applogsdb)> drop function formattime;
8)注意:在哪一个数据库中注册的永久函数,必须在哪一个数据库下将该方法删除!!!
好比:在 applogsdb 数据库中建立的函数,必须在该数据中调用 drop 方法才能实现删除功能。
1)今天新增用户
(1)判断今天新增用户条件:
先按照设备 id 分组;
再根据建立该日志的最开始时间,是否在今天范围内。
(2)统计今天新增个数
select
count(*)
from
(select min(createdatms) as mintime
from ext_startup_logs
where appid = 'sdk34734'
group by deviceid
having mintime >= getdaybegin() and mintime < getdaybegin(1)
) t;
2)昨天新增用户
select
count(*)
from
(select min(createdatms) as mintime
from ext_startup_logs
where appid = 'sdk34734'
group by deviceid
having mintime >= getdaybegin(-1) and mintime < getdaybegin()
) t;
3)指定时间的新增用户
select
count(*)
from
(select min(createdatms) as mintime
from ext_startup_logs
where appid = 'sdk34734'
group by deviceid
having mintime >= getdaybegin('2019/5/26 00:00:00') and mintime < getdaybegin('2019/05/26 00:00:00',1)
) t;
1)本周新增用户
select
count(*)
from
(select min(createdatms) as mintime
from ext_startup_logs
where appid = 'sdk34734'
group by deviceid
having mintime >= getweekbegin() and mintime < getweekbegin(1)
) t;
2)上一周新增用户
select
count(*)
from
(select min(createdatms) as mintime
from ext_startup_logs
where appid = 'sdk34734'
group by deviceid
having mintime >= getweekbegin(-1) and mintime < getweekbegin()
) t;
3)指定周时间的新增用户
select
count(*)
from
(select min(createdatms) as mintime
from ext_startup_logs
where appid = 'sdk34734'
group by deviceid
having mintime >= getweekbegin('2019/05/26 00:00:00') and mintime < getweekbegin('2019/05/26 00:00:00',1)
) t;
select
count(*)
from
(select min(createdatms) as mintime
from ext_startup_logs
where appid = 'sdk34734'
group by deviceid
having mintime >= getmonthbegin() and mintime < getmonthbegin(1)
) t;
0)分析:
select
count(distinct deviceid)
from ext_startup_logs
where
appid = 'sdk34734' and createdatms >= getdaybegin() and createdatms < getdaybegin(1);
2)周活跃用户数
select
count(distinct deviceid)
from ext_startup_logs
where
appid = 'sdk34734' and createdatms >= getweekbegin() and createdatms < getweekbegin(1);
3)月活跃用户数
select
count(distinct deviceid)
from ext_startup_logs
where
appid = 'sdk34734' and createdatms >= getmonthbegin() and createdatms < getmonthbegin(1);
0)分析:
select
formattime(createdatms,'yyyy/MM/dd') as day, count(distinct deviceid) as count
from ext_startup_logs
where appid = 'sdk34734' and createdatms >= getweekbegin() and createdatms < getweekbegin(1)
group by formattime(createdatms,'yyyy/MM/dd');
2)一次查询出过去的 5 周,每周的周活跃数
select
formattime(createdatms,'yyyy/MM/dd',0) as week ,count(distinct deviceid) as count
from ext_startup_logs
where appid = 'sdk34734' and createdatms >= getweekbegin(-6) and createdatms < getweekbegin(-1)
group by formattime(createdatms,'yyyy/MM/dd',0);
3)一次查询出过去的三个月内,每周的月活跃数
select
formattime(createdatms,'yyyy/MM',0) as month ,count(distinct deviceid) as count
from ext_startup_logs
where appid = 'sdk34734' and createdatms >= getmonthbegin(-4) and createdatms < getmonthbegin(-1)
group by formattime(createdatms,'yyyy/MM',0);
根据时间分区表去查询,避免全表扫描
select
count(distinct deviceid)
from ext_startup_logs
where appid = 'sdk34734' and ym = formattime(getdaybegin(),'yyyyMM') and day = formattime(getdaybegin(),'dd');
过去的五周(包含本周)某个 app 每周的周活跃用户数
链接函数测试:select concat(ym,day) from ext_startup_logs;
select
formattime(createdatms,'yyyyMMdd',0) as stdate, count(distinct deviceid) as stcount
from ext_startup_logs
where concat(ym,day) >= formattime(getweekbegin(-4),'yyyyMMdd') and appid ='sdk34734'
group by formattime(createdatms,'yyyyMMdd',0);
最近的六个月(包含本月)每个月的月活跃数。
select
formattime(createdatms,'yyyyMM') as stdate, count(distinct deviceid) as stcount
from ext_startup_logs
where ym >= formattime(getmonthbegin(-5),'yyyyMM') and appid ='sdk34734'
group by formattime(createdatms,'yyyyMM');
分析:
select
deviceid, count(distinct(formattime(createdatms,'yyyyMMdd',0))) c
from ext_startup_logs
where appid = 'sdk34734' and concat(ym,day) >= formattime(getweekbegin(-2),'yyyyMMdd')
group by deviceid
having c = 3;
忠诚用户(连续活跃5周)
select
deviceid, count(distinct(formattime(createdatms,'yyyyMMdd',0))) c
from ext_startup_logs
where appid = 'sdk34734' and concat(ym,day) >= formattime(getweekbegin(-4),'yyyyMMdd')
group by deviceid
having c = 5;
分析:
查询沉默用户数(一共只有一条日志;且安装时间超过2天)
select
count(*)
from
(select deviceid, count(createdatms) as dcount, min(createdatms) as dmin
from ext_startup_logs
where appid = 'sdk34734'
group by deviceid
having dcount = 1 and dmin < getdaybegin(-1)
) t;
今天 app 的启动次数:启动次数相似于活跃用户数,活跃用户数去重,启动次数不须要去重。
select
count(deviceid)
from ext_startup_logs
where appid = 'sdk34734' and ym = formattime(getdaybegin(),'yyyyMM') and day = formattime(getdaybegin(),'dd');
1)今天 appid 为 34734 的不一样版本的活跃用户数。
select
appversion, count(distinct deviceid)
from ext_startup_logs
where appid = 'sdk34734' and ym = formattime(getdaybegin(),'yyyyMM') and day = formattime(getdaybegin(),'dd')
group by appversion;
2)本周内天天各版本日活跃数
(1)本周内:where concat(ym,day) >= formattime(getweekbegin(),'yyyyMMdd')
(2)天天:group by formattime(createdatms,'yyyyMMdd')
(2)各个版本:group by appversion
(3)日活跃数:count(distinct deviceid)
select
formattime(createdatms,'yyyyMMdd'), appversion, count(distinct deviceid)
from ext_startup_logs
where appid = 'sdk34734' and concat(ym,day) >= formattime(getweekbegin(),'yyyyMMdd')
group by formattime(createdatms,'yyyyMMdd'), appversion;
本周回流用户:上周没有启动过,本周启动过。
分析:
select
distinct s.deviceid
from ext_startup_logs s
where appid = 'sdk34734' and concat(ym,day) >= formattime(getweekbegin(),'yyyyMMdd') and deviceid not in
(select
distinct t.deviceid
from ext_startup_logs t
where t.appid = 'sdk34734' and concat(t.ym,t.day) >= formattime(getweekbegin(-1),'yyyyMMdd') and concat(t.ym,t.day) < formattime(getweekbegin(),'yyyyMMdd')
);
连续 2 周内没有启动过:本周和上周没启动过;大上周启动过。
sql 语句以下:
select
distinct s.deviceid
from ext_startup_logs s
where appid='sdk34734'
and concat(ym,day) >= formattime(getweekbegin(-2),'yyyyMMdd')
and concat(ym,day) < formattime(getweekbegin(-1),'yyyyMMdd')
and deviceid not in
(select
distinct(t.deviceid)
from ext_startup_logs t
where t.appid='sdk34734'
and concat(t.ym,t.day) >= formattime(getweekbegin(-1),'yyyyMMdd')
);
本周留存用户=上周新增用户和本周活跃用户的交集。
分析:
select
distinct s.deviceid
from ext_startup_logs s
where appid = 'sdk34734'
and concat(ym,day) >= formattime(getweekbegin(-1),'yyyyMMdd')
and concat(ym,day) < formattime(getweekbegin(),'yyyyMMdd')
and deviceid in (
select distinct t.deviceid
from (
select tt.deviceid , min(tt.createdatms) mintime
from ext_startup_logs tt
where tt.appid = 'sdk34734'
group by tt.deviceid having mintime >= getweekbegin(-2) and mintime < getweekbegin(-1)
) t);
用户新鲜度 = 某段时间的新增用户数/某段时间的活跃的用户数 .
1)今天新增用户(为n)
select
count(*)
from
(select min(createdatms) mintime
from ext_startup_logs
where appid = 'sdk34734'
group by deviceid
having mintime >= getdaybegin() and mintime < getdaybegin(1)
) t;
2)今天活跃用户(m)
select
count(distinct deviceid)
from ext_startup_logs
where appid = 'sdk34734' and createdatms >= getdaybegin() and createdatms < getdaybegin(1);
3)新鲜度 = n / m
注意判断 m 等于0的状况。
1)建立一个 web 工程:app_logs_visualize_web
2)添加maven框架支持
3)导入pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.atguigu</groupId>
<artifactId>app_logs_visualize_web</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>war</packaging>
<!-- 日志框架版本号 -->
<properties>
<log4j.version>1.2.17</log4j.version>
<slf4j.version>1.7.22</slf4j.version>
</properties>
<dependencies>
<!-- 日志框架 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<!-- 单元测试 -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
</dependency>
<!-- mybatis 框架 -->
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>c3p0</groupId>
<artifactId>c3p0</artifactId>
<version>0.9.1.2</version>
</dependency>
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis-spring</artifactId>
<version>1.3.0</version>
</dependency>
<!-- spring 框架 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>4.3.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>4.3.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
<version>1.8.10</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<version>3.0.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>jstl</groupId>
<artifactId>jstl</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>taglibs</groupId>
<artifactId>standard</artifactId>
<version>1.1.2</version>
</dependency>
<!--jackson json 解析框架-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.8.8</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.8.3</version>
</dependency>
<!-- hive 框架 -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<exclusions>
<exclusion>
<groupId>org.eclipse.jetty.aggregate</groupId>
<artifactId>jetty-all</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-compiler</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-runtime</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jsp-2.1</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jsp-api-2.1</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty.orbit</groupId>
<artifactId>javax.servlet</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>jsp-api</artifactId>
</