package com.lala.lala.pipe.dbinfo import java.io.{ByteArrayOutputStream, PrintWriter} import com.alibaba.fastjson.JSON import com.alibaba.fastjson.serializer.SerializerFeature import com.lala.lala.common.query.option.HDFSOptions import com.lala.lala.common.visual.hdfs.{HdfsFile, HdfsFileStatusWrapper} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.security.AccessControlException import org.apache.spark.sql.SparkSession class HdfsShow(spark: SparkSession, map: Map[String, String]) { private val sc = spark.sparkContext def getSampleData: String = { val path = map(HDFSOptions.PATH) sc.textFile(path).take(DBShow.Num).mkString("\n") } def listFiles(): String = { val pathStr = map.getOrElse(HDFSOptions.PATH, "/") val path = new Path(pathStr) val hdfs = HdfsShow.getHdfs(spark) val baos = new ByteArrayOutputStream() val pw = new PrintWriter(baos) val lf = new HdfsFileStatusWrapper val arr = try { val liststatus = hdfs.listStatus(path) Some(liststatus) } catch { case ex: AccessControlException => ex.printStackTrace(pw); lf.setPermissionDenied(true); None case ex => ex.printStackTrace(pw); None } finally { if (pw != null) { pw.flush() pw.close() } } lf.setPath(pathStr) if (arr.isDefined) { lf.setSuccess(true) val hfarr = for (filestatus <- arr.get) yield { val pathx = filestatus.getPath new HdfsFile(pathx.getName, s"${pathx.getParent}/${pathx.getName}", filestatus.isDirectory, filestatus.isFile) } import collection.JavaConverters._ lf.setFileList(hfarr.toSeq.asJava) } lf.setMsg(baos.toString) JSON.toJSONString(lf, SerializerFeature.WriteNullListAsEmpty) } } object HdfsShow { private var hdfs: FileSystem = null def getHdfs(spark: SparkSession) = { if (hdfs == null) { val hadoopConf = spark.sparkContext.hadoopConfiguration hdfs = org.apache.hadoop.fs.FileSystem.get(hadoopConf) } hdfs } }