对象 | 明细 |
---|---|
Log | 日志、日志组表示等基本概念 |
Project | 项目 |
Config | 配置 |
LogStore | 日志库 |
Index | 索引 |
Shard | 分区 |
ConsumerGroup | 消费组 |
就如同使用 API 和日志服务服务端交互同样,使用 SDK 也须要指定一些基本配置。目前,全部语言的 SDK 都定义了一个 Client 类做为入口类,这些基本配置信息在该入口类的构造时指定。html
具体包括以下几项:java
同一个消费组下面的消费者名称必须不一样,不然相同的消费者会同时消费logstore同份数据,形成数据重复git
协同消费库(Consumer Library)是对日志服务中日志进行消费的高级模式,提供了消费组(ConsumerGroup)的概念对消费端进行抽象和管理,和直接使用SDK进行数据读取的区别在于,用户无需关心日志服务的实现细节,只须要专一于业务逻辑,另外,消费者之间的负载均衡、failover等用户也都无需关心github
消费组(ConsumerGroup)缓存
消费组(Consumer)安全
shared消费组、消费组关系网络
<dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>2.5.0</version> </dependency> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>loghub-client-lib</artifactId> <version>0.6.15</version> </dependency>
阿里云client依赖log4j,若是项目中使用的logback,须要增长转换log4j到logback的转换负载均衡
<dependency> <groupId>org.slf4j</groupId> <artifactId>log4j-over-slf4j</artifactId> <version>1.7.25</version> </dependency>
public class Main { // 日志服务域名,根据实际状况填写 private static String sEndpoint = "cn-hangzhou.log.aliyuncs.com"; // 日志服务项目名称,根据实际状况填写 private static String sProject = "ali-cn-hangzhou-sls-admin"; // 日志库名称,根据实际状况填写 private static String sLogstore = "sls_operation_log"; // 消费组名称,根据实际状况填写 private static String sConsumerGroup = "consumerGroupX"; // 消费数据的ak,根据实际状况填写 private static String sAccessKeyId = ""; private static String sAccessKey = ""; public static void main(String []args) throws LogHubClientWorkerException, InterruptedException { // 第二个参数是消费者名称,同一个消费组下面的消费者名称必须不一样,可使用相同的消费组名称,不一样的消费者名称在多台机器上启动多个进程,来均衡消费一个Logstore,这个时候消费者名称可使用机器ip来区分。第9个参数(maxFetchLogGroupSize)是每次从服务端获取的LogGroup数目,使用默认值便可,若有调整请注意取值范围(0,1000] LogHubConfig config = new LogHubConfig(sConsumerGroup, "consumer_1", sEndpoint, sProject, sLogstore, sAccessKeyId, sAccessKey, LogHubConfig.ConsumePosition.BEGIN_CURSOR); ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config); Thread thread = new Thread(worker); //Thread运行以后,Client Worker会自动运行,ClientWorker扩展了Runnable接口。 thread.start(); Thread.sleep(60 * 60 * 1000); //调用worker的Shutdown函数,退出消费实例,关联的线程也会自动中止。 worker.shutdown(); //ClientWorker运行过程当中会生成多个异步的Task,Shutdown以后最好等待还在执行的Task安全退出,建议sleep 30s。 Thread.sleep(30 * 1000); } }
public class SampleLogHubProcessor implements ILogHubProcessor { private int mShardId; // 记录上次持久化 check point 的时间 private long mLastCheckTime = 0; public void initialize(int shardId) { mShardId = shardId; } // 消费数据的主逻辑,这里面的全部异常都须要捕获,不能抛出去。 public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) { // 这里简单的将获取到的数据打印出来 for(LogGroupData logGroup: logGroups){ FastLogGroup flg = logGroup.GetFastLogGroup(); System.out.println(String.format("\tcategory\t:\t%s\n\tsource\t:\t%s\n\ttopic\t:\t%s\n\tmachineUUID\t:\t%s", flg.getCategory(), flg.getSource(), flg.getTopic(), flg.getMachineUUID())); System.out.println("Tags"); for (int tagIdx = 0; tagIdx < flg.getLogTagsCount(); ++tagIdx) { FastLogTag logtag = flg.getLogTags(tagIdx); System.out.println(String.format("\t%s\t:\t%s", logtag.getKey(), logtag.getValue())); } for (int lIdx = 0; lIdx < flg.getLogsCount(); ++lIdx) { FastLog log = flg.getLogs(lIdx); System.out.println("--------\nLog: " + lIdx + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount()); for (int cIdx = 0; cIdx < log.getContentsCount(); ++cIdx) { FastLogContent content = log.getContents(cIdx); System.out.println(content.getKey() + "\t:\t" + content.getValue()); } } } long curTime = System.currentTimeMillis(); // 每隔 30 秒,写一次 check point 到服务端,若是 30 秒内,worker crash, // 新启动的 worker 会从上一个 checkpoint 其消费数据,有可能有少许的重复数据 if (curTime - mLastCheckTime > 30 * 1000) { try { //参数true表示当即将checkpoint更新到服务端,为false会将checkpoint缓存在本地,后台默认隔60s会将checkpoint刷新到服务端。 checkPointTracker.saveCheckPoint(true); } catch (LogHubCheckPointException e) { e.printStackTrace(); } mLastCheckTime = curTime; } return null; } // 当 worker 退出的时候,会调用该函数,用户能够在此处作些清理工做。 public void shutdown(ILogHubCheckPointTracker checkPointTracker) { //将消费断点保存到服务端。 try { checkPointTracker.saveCheckPoint(true); } catch (LogHubCheckPointException e) { e.printStackTrace(); } } } class SampleLogHubProcessorFactory implements ILogHubProcessorFactory { public ILogHubProcessor generatorProcessor() { // 生成一个消费实例 return new SampleLogHubProcessor(); } }
SDK 可能出现的异常错误能够分红以下几类:运维
目前,各个语言 SDK 的实现都采起抛出异常的方式处理错误。具体原则以下:异步
API错误重试
原文出处:https://www.cnblogs.com/guozp/p/10327607.html