参考插件安装官方说明。 该插件安装完成后,须要重启ESjavascript
在此处要注意一点:在hdfs上建立的目录的全部者须与ES的启动用户一致,否则hdfs会拒绝链接(缘由本身Google)。html
PUT _snapshot/test { "type": "hdfs", "settings": { "uri": "hdfs://x.x.x.32:9000/", "path": "/es_back", "load_defaults":"true" } }
未完待续java
问题描述忘了,大概是指 es在读取hdfs时,无访问权限。这个应该是该插件在实现Java权限控制的时候搞出来的bug,我是经过修改源码解决掉的。git
grant { // Hadoop UserGroupInformation, HdfsConstants, PipelineAck clinit permission java.lang.RuntimePermission "getClassLoader"; // UserGroupInformation (UGI) Metrics clinit permission java.lang.RuntimePermission "accessDeclaredMembers"; permission java.lang.reflect.ReflectPermission "suppressAccessChecks"; // org.apache.hadoop.util.StringUtils clinit permission java.util.PropertyPermission "*", "read,write"; // org.apache.hadoop.util.ShutdownHookManager clinit permission java.lang.RuntimePermission "shutdownHooks"; // JAAS is used always, we use a fake subject, hurts nobody permission javax.security.auth.AuthPermission "getSubject"; permission javax.security.auth.AuthPermission "doAs"; permission javax.security.auth.AuthPermission "modifyPrivateCredentials"; permission java.lang.RuntimePermission "accessDeclaredMembers"; permission java.lang.RuntimePermission "getClassLoader"; permission java.lang.RuntimePermission "shutdownHooks"; permission java.lang.reflect.ReflectPermission "suppressAccessChecks"; permission javax.security.auth.AuthPermission "doAs"; permission javax.security.auth.AuthPermission "getSubject"; permission javax.security.auth.AuthPermission "modifyPrivateCredentials"; permission java.security.AllPermission; permission java.util.PropertyPermission "*", "read,write"; permission javax.security.auth.PrivateCredentialPermission "org.apache.hadoop.security.Credentials * \"*\"", "read"; };
解决方案二: 将repository-hdfs/plugin-security.policy 添加到ES启动配置中。在ES的 config/jvm.options中,添加github
-Djava.security.policy=file:///data/soft/elasticsearch-5.0.1/plugins/repository-hdfs/plugin-security.policy
而后重启ES集群就能够了。apache
参看api
@Repository public class SnapshotAndRestoreDao { private Logger logger = LoggerFactory.getLogger(SnapshotAndRestoreDao.class); @Autowired private Client client; /** * @return */ public List<RepositoryMetaData> queryRepositoryList() { List<RepositoryMetaData> repositories = this.client.admin().cluster() .prepareGetRepositories().get().repositories(); return repositories; } public boolean isRepositoryExist(String repositoryName) { boolean result = false; try { List<RepositoryMetaData> repositories = this.queryRepositoryList(); if (repositories.size() > 0) { result = repositories.stream().filter(repositoryMetaData -> StringUtils.equals(repositoryName, repositoryMetaData.name())) .findAny().isPresent(); } } catch (Exception ex) { logger.error("Exception in getRepository method: " + ex.toString()); } return result; } /** * 建立仓库 * * @param repositoryName * @param path * @param compress * @return */ public boolean createRepository(String repositoryName, String path, boolean compress) { boolean result = false; PutRepositoryResponse putRepositoryResponse = null; try { if (!isRepositoryExist(repositoryName)) { Settings settings = Settings.builder() .put("path", path) .put("compress", compress) .put("uri", "hdfs://10.94.128.32:9000/") .build(); putRepositoryResponse = client.admin().cluster().preparePutRepository(repositoryName) .setType("hdfs").setSettings(settings).get(); logger.info("Repository was created."); return (putRepositoryResponse != null && putRepositoryResponse.isAcknowledged()); } else { logger.info(repositoryName + " repository already exists"); } } catch (Exception ex) { logger.error("Exception in createRepository method", ex); } return result; } /** * 删除仓库 * * @param repositoryName * @return */ public DeleteRepositoryResponse deleteRepository(String repositoryName) { DeleteRepositoryResponse deleteRepositoryResponse = null; try { if (isRepositoryExist(repositoryName)) { deleteRepositoryResponse = this.client.admin().cluster() .prepareDeleteRepository(repositoryName).execute().actionGet(); logger.info("{}, repository has been deleted.", repositoryName); } } catch (Exception ex) { logger.error("Exception in deleteRepository method: " + ex.toString()); } return deleteRepositoryResponse; } public List<SnapshotInfo> querySnapShotInfoListByName(String repositoryName) { List<SnapshotInfo> snapshotInfoList = client.admin().cluster() .prepareGetSnapshots(repositoryName).get().getSnapshots(); return snapshotInfoList; } /** * 判断快照名称是否在仓库中存在 * * @param repositoryName * @param snapshotName * @return */ public boolean isSnapshotExist(String repositoryName, String snapshotName) { boolean result = false; try { List<SnapshotInfo> snapshotInfoList = this.querySnapShotInfoListByName(repositoryName); if (null != snapshotInfoList && !snapshotInfoList.isEmpty()) { result = snapshotInfoList.stream().filter( snapshotInfo -> StringUtils.equals(snapshotName, snapshotInfo.snapshotId().getName())).findAny().isPresent(); } } catch (Exception ex) { logger.error("Exception in getSnapshot method: " + ex.toString()); } return result; } /** * 建立快照 * * @param repositoryName * @param snapshotName * @param indexName * @return */ public CreateSnapshotResponse createSnapshot(String repositoryName, String snapshotName, String indexName) { CreateSnapshotResponse createSnapshotResponse = null; // String snapshotName = snapshotPrefix + "-" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss")); try { if (isSnapshotExist(repositoryName, snapshotName)) logger.info(snapshotName + " snapshot already exists"); else { createSnapshotResponse = this.client.admin().cluster() .prepareCreateSnapshot(repositoryName, snapshotName) .setWaitForCompletion(true) .setIndices(indexName).get(); logger.info("Snapshot was created."); } } catch (Exception ex) { logger.error("Exception in createSnapshot method: " + ex.toString()); } return createSnapshotResponse; } /** * 删除快照 * * @param repositoryName * @param snapshotName * @return */ public DeleteSnapshotResponse deleteSnapshot(String repositoryName, String snapshotName) { DeleteSnapshotResponse deleteSnapshotResponse = null; try { if (isSnapshotExist(repositoryName, snapshotName)) { deleteSnapshotResponse = this.client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshotName) .get(); logger.info(snapshotName + " snapshot has been deleted."); } } catch (Exception ex) { logger.error("Exception in deleteSnapshot method: " + ex.toString()); } return deleteSnapshotResponse; } /** * @param repositoryName * @return */ public List<SnapshotState> querySnapshotStatus(String repositoryName) { List<SnapshotState> snapshotStatses = new ArrayList<>(); SnapshotsStatusRequest snapshotsStatusRequest = new SnapshotsStatusRequest(repositoryName); try { snapshotStatses = this.client.admin().cluster() .snapshotsStatus(snapshotsStatusRequest) .get().getSnapshots() .stream().map(SnapshotState::from).collect(Collectors.toList()); } catch (Exception e) { logger.error(String.format("获取仓库%s中快照信息失败", repositoryName), e); } return snapshotStatses; } /** * @param repositoryName * @param snapshotName * @return */ public Optional<SnapshotState> querySnapshotStatus(String repositoryName, String snapshotName) { List<SnapshotState> snapshotStates = this.querySnapshotStatus(repositoryName); return snapshotStates.stream().filter( snapshotState -> { return StringUtils.equals(snapshotName, snapshotState.getSnapShotName()); } ).findFirst(); } /** * 备份恢复 * 注意,备份恢复时,待备份的index不能打开 * * @param repositoryName * @param snapshotName * @return */ public RestoreSnapshotResponse restoreSnapshot(String repositoryName, String snapshotName) { RestoreSnapshotResponse restoreSnapshotResponse = null; try { if (isRepositoryExist(repositoryName) && isSnapshotExist(repositoryName, snapshotName)) { //查询该快照下全部的index信息 List<SnapshotInfo> snapshotInfoList = this.queryIndicsByRepoAndSnapshot(repositoryName, snapshotName); if (!snapshotInfoList.isEmpty()) { List<String> indicesList = snapshotInfoList.get(0).indices(); indicesList = indicesList.stream().filter( indice -> this.isIndexExists(indice) ).collect(Collectors.toList()); if (!indicesList.isEmpty()) { CloseIndexRequestBuilder closeIndexRequestBuilder = new CloseIndexRequestBuilder(client.admin().indices(), CloseIndexAction.INSTANCE); closeIndexRequestBuilder.setIndices(indicesList.toArray(new String[indicesList.size()])); CloseIndexResponse closeIndexResponse = closeIndexRequestBuilder.get(); if (null != closeIndexResponse && closeIndexResponse.isAcknowledged()) { return this.restore(repositoryName, snapshotName); } } else { return this.restore(repositoryName, snapshotName); } } } } catch (Exception e) { logger.error("Exception in restoreSnapshot method", e); } return null; } /** * @param repositoryName * @param snapshotName * @return * @throws ExecutionException * @throws InterruptedException */ private RestoreSnapshotResponse restore(String repositoryName, String snapshotName) throws ExecutionException, InterruptedException { RestoreSnapshotRequest restoreSnapshotRequest = new RestoreSnapshotRequest(repositoryName, snapshotName); RestoreSnapshotResponse restoreSnapshotResponse = this.client.admin().cluster().restoreSnapshot(restoreSnapshotRequest).get(); logger.info("Snapshot was restored."); return restoreSnapshotResponse; } /** * @param repositoryName * @param snapshotName * @return */ public List<SnapshotInfo> queryIndicsByRepoAndSnapshot(String repositoryName, String snapshotName) { GetSnapshotsAction getSnapshotsAction = GetSnapshotsAction.INSTANCE; GetSnapshotsRequestBuilder builder = new GetSnapshotsRequestBuilder(this.client.admin().cluster(), getSnapshotsAction, repositoryName); builder.setSnapshots(snapshotName); GetSnapshotsResponse getSnapshotsResponse = builder.get(); if (getSnapshotsAction != null) { return getSnapshotsResponse.getSnapshots(); } return Lists.newArrayList(); } /** * 查询索引是否存在 * * @param indexname * @return */ public boolean isIndexExists(String indexname) { IndicesExistsRequestBuilder existsRequestBuilder = new IndicesExistsRequestBuilder(this.client.admin().indices() , IndicesExistsAction.INSTANCE, indexname); IndicesExistsResponse indicesExistsResponse = existsRequestBuilder.get(); return indicesExistsResponse != null && indicesExistsResponse.isExists(); } }