版权申明:转载请注明出处。
文章来源:http://bigdataer.net/?p=248html
排版乱?请移步原文得到更好的阅读体验java
数据准确性,稳定性,时效性是数据开发中须要重点关注的,通常称之为数据质量。保证数据质量每每会占用数据开发工程师的不少精力,因此一个好的数据监控系统或者一个合理的数据监控方案对于数据质量的保证相当重要。本文将展现一种实际生产中使用过的数据监控方案,并给出相关的代码。
数据计算采用spark,报警形式采用邮件报警。涉及到的内容有使用springMVC构建一个支持发送html和文件的邮件接口;在spark计算任意过程当中调用邮件接口;在spark中经过邮件接口发送hdfs上的结果数据。web
说明:一般状况下公司内部的hadoop/spark集群和外网隔离,直接在spark做业里发送邮件显然不现实。因此须要构建一个邮件发送服务,暴露内网接口给spark做业调用,同时也能访问外网,把邮件发送到用户邮箱中。spring
(1)支持自定义邮件发件人昵称,主题,内容等
(2)支持发送html以及文件apache
springMVC,JavaMailjson
邮件发送工具类EmailSendUtilapi
java 98行tomcat
import java.io.File; import java.util.Date; import java.util.Properties; import javax.activation.CommandMap; import javax.activation.DataHandler; import javax.activation.FileDataSource; import javax.activation.MailcapCommandMap; import javax.mail.Authenticator; import javax.mail.Message; import javax.mail.MessagingException; import javax.mail.PasswordAuthentication; import javax.mail.Session; import javax.mail.Transport; import javax.mail.internet.InternetAddress; import javax.mail.internet.MimeBodyPart; import javax.mail.internet.MimeMessage; import javax.mail.internet.MimeMultipart; import org.springframework.stereotype.Service; @Service public class EmailSendUtil { public void sendEmail(String nick,String subject,String content,String receivers,File file) throws Exception { Properties proper = new Properties(); proper.setProperty("mail.transport.protocol", "smtp"); proper.setProperty("mail.stmp.auth", "true"); //帐号密码认证 Session session = Session.getInstance(proper); MimeMessage msg = new MimeMessage(session); try { MailcapCommandMap mc = (MailcapCommandMap) CommandMap.getDefaultCommandMap(); mc.addMailcap("text/html;; x-Java-content-handler=com.sun.mail.handlers.text_html"); mc.addMailcap("text/xml;; x-java-content-handler=com.sun.mail.handlers.text_xml"); mc.addMailcap("text/plain;; x-java-content-handler=com.sun.mail.handlers.text_plain"); mc.addMailcap("multipart/*;; x-java-content-handler=com.sun.mail.handlers.multipart_mixed"); mc.addMailcap("message/rfc822;; x-java-content-handler=com.sun.mail.handlers.message_rfc822"); CommandMap.setDefaultCommandMap(mc); //设置发件人 String nickname=javax.mail.internet.MimeUtility.encodeText(nick); msg.setFrom(new InternetAddress(nickname+"发件人邮箱地址")); //设置收件人 msg.setRecipients(Message.RecipientType.TO, InternetAddress.parse(receivers)); //设置邮件主题 msg.setSubject(subject); MimeMultipart msgMimePart = new MimeMultipart ("mixed"); //正文内容 MimeBodyPart contents = getBodyPart(content); msgMimePart.addBodyPart(contents); //附件 if(file!=null){ MimeBodyPart attachment = getAttachPart(file); msgMimePart.addBodyPart(attachment); } //设置邮件消息体 msg.setContent(msgMimePart); //设置发送时间 msg.setSentDate(new Date()); msg.saveChanges(); Transport trans=session.getTransport(); trans.connect("smtp.exmail.qq.com", "发件人邮箱地址", "密码"); trans.sendMessage(msg, msg.getRecipients(Message.RecipientType.TO)); trans.close(); } catch (Exception e) { throw new Exception("email send error:"+e.getMessage()); }finally{ if(file!=null&&file.exists()){ file.delete(); } } } private static MimeBodyPart getBodyPart(String content) throws MessagingException{ MimeBodyPart body = new MimeBodyPart(); MimeMultipart mmp = new MimeMultipart("related"); MimeBodyPart contents = new MimeBodyPart(); contents.setContent(content, "text/html;charset=utf-8"); mmp.addBodyPart(contents); body.setContent(mmp); return body; } private static MimeBodyPart getAttachPart(File file) throws MessagingException{ MimeBodyPart attach = new MimeBodyPart(); FileDataSource fds = new FileDataSource(file); attach.setDataHandler(new DataHandler(fds)); attach.setFileName(file.getName()); return attach; } }
controller类,写的比较粗糙,提供了两个接口,一个发纯html,一个能够发送混合格式的邮件。session
java 47行架构
import java.io.File; import net.bigdataer.api.weixin.utils.EmailSendUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.multipart.MultipartFile; @Controller @RequestMapping("/email_api") public class EmailSendController extends DmBaseController{ @Autowired private EmailSendUtil est; //只发送html @RequestMapping("/send") public @ResponseBody String sendEmail(@RequestParam("nickname") String nickname,@RequestParam("subject") String subject,@RequestParam("content") String content,@RequestParam("receivers") String receivers){ String result = "{\"status\":\"0\",\"msg\":\"success\"}"; try{ est.sendEmail(nickname,subject, content, receivers,null); }catch(Exception e){ result = "{\"status\":\"-1\",\"msg\":\""+e.getMessage()+"\"}"; } return result; } //发送混合格式的邮件 @RequestMapping("/sendattachment") public @ResponseBody String sendAttachment(@RequestParam("nickname") String nickname,@RequestParam("subject") String subject,@RequestParam("content") String content,@RequestParam("receivers") String receivers,@RequestParam("attachment") MultipartFile attachment){ String result = "{\"status\":\"0\",\"msg\":\"success\"}"; File file = new File("/opt/soft/tomcat/temp/"+attachment.getOriginalFilename()); try { attachment.transferTo(file); est.sendEmail(nickname,subject, content, receivers,file); } catch (Exception e) { result = "{\"status\":\"-1\",\"msg\":\""+e.getMessage()+"\"}"; } return result; } }
这个类提供了对https及http协议的访问,同时支持get和post请求。在本例中没有使用到get请求。
另外这个类依赖于httpclient的相关jar包。我这里使用的jarmaven依赖以下:
xml 11行
<dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.2</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpmime</artifactId> <version>4.5.2</version> </dependency>
注意:由于spark源码中也使用了http-core的包,你的引用可能会和spark集群中自己的包冲突致使抛找不到某个类或者没有某个方法的异常,须要根据实际状况调整。不过代码大致上都同样,下面的代码能够参考
java 174行
import com.alibaba.fastjson.JSONObject; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.http.HttpEntity; import org.apache.http.HttpResponse; import org.apache.http.NameValuePair; import org.apache.http.StatusLine; import org.apache.http.client.ClientProtocolException; import org.apache.http.client.HttpResponseException; import org.apache.http.client.ResponseHandler; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.entity.UrlEncodedFormEntity; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.config.Registry; import org.apache.http.config.RegistryBuilder; import org.apache.http.conn.socket.ConnectionSocketFactory; import org.apache.http.conn.socket.PlainConnectionSocketFactory; import org.apache.http.conn.ssl.NoopHostnameVerifier; import org.apache.http.conn.ssl.SSLConnectionSocketFactory; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.apache.http.util.EntityUtils; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; import java.util.List; /** * 服务请求类 * 封装了post,get请求 * 同时支持http和https方式 * @author liuxuecheng * */ public class HttpClientUtil { private static final Log logger = LogFactory.getLog(HttpClientUtil.class); //设置超时(单位ms) private static int TIME_OUT = 3000; private static CloseableHttpClient client = null; private static TrustManager trustManager = new X509TrustManager() { @Override public X509Certificate[] getAcceptedIssuers() { return null; } @Override public void checkClientTrusted(X509Certificate[] arg0, String arg1) throws CertificateException { } @Override public void checkServerTrusted(X509Certificate[] arg0, String arg1) throws CertificateException { } }; static{ try{ //请求配置 RequestConfig config = RequestConfig.custom() .setConnectTimeout(TIME_OUT) .setConnectionRequestTimeout(TIME_OUT) .setSocketTimeout(TIME_OUT) .build(); //访问https站点相关 SSLContext context = SSLContext.getInstance("TLS"); context.init(null, new TrustManager[]{trustManager}, null); SSLConnectionSocketFactory scsf = new SSLConnectionSocketFactory(context, NoopHostnameVerifier.INSTANCE); //注册 Registry<ConnectionSocketFactory> registry = RegistryBuilder .<ConnectionSocketFactory>create() .register("http", PlainConnectionSocketFactory.INSTANCE) .register("https", scsf) .build(); //链接池 PoolingHttpClientConnectionManager manager = new PoolingHttpClientConnectionManager(registry); //构造请求client client = HttpClients.custom() .setConnectionManager(manager) .setDefaultRequestConfig(config) .build(); }catch(Exception e){ e.printStackTrace(); } } /** * post方法 * post请求涉及到不一样contentType,对应不一样的HttpEntity * 这里定义HttpEntity接口,全部实现了这个接口的实体都可传入 * @param token * @param url * @param entity * @return * @throws ClientProtocolException * @throws IOException */ public static JSONObject post(String token,String url,HttpEntity entity) throws ClientProtocolException, IOException{ //UrlEncodedFormEntity stringEntity = new UrlEncodedFormEntity(,"UTF-8"); HttpPost post = new HttpPost(url); if(token !=null){ post.setHeader("Authorization", "Bearer "+token); } post.setHeader("Accept-Charset", "UTF-8"); post.setEntity(entity); return client.execute(post, handler); } /** * get请求 * @param token * @param url * @param content_type * @param params * @return * @throws ClientProtocolException * @throws IOException * @throws URISyntaxException */ public static JSONObject get(String token,String url,String content_type,List<NameValuePair> params) throws ClientProtocolException, IOException, URISyntaxException{ UrlEncodedFormEntity entity = new UrlEncodedFormEntity(params,"UTF-8"); entity.setContentType(content_type); String para = EntityUtils.toString(entity); HttpGet get = new HttpGet(url); if(token !=null){ get.setHeader("Authorization", "Bearer "+token); } get.setHeader("Accept-Charset", "UTF-8"); //get请求将参数拼接在参数中 get.setURI(new URI(get.getURI().toString()+"?"+para)); return client.execute(get, handler); } //请求返回格式 public static ResponseHandler<JSONObject> handler = new ResponseHandler<JSONObject>(){ @Override public JSONObject handleResponse(HttpResponse res) throws ClientProtocolException, IOException { StatusLine status = res.getStatusLine(); HttpEntity entity = res.getEntity(); if(status.getStatusCode()>300){ throw new HttpResponseException(status.getStatusCode(), status.getReasonPhrase()); } if(entity==null){ throw new ClientProtocolException("respones has no content"); } String res_str = EntityUtils.toString(entity); return JSONObject.parseObject(res_str); } }; }
如下为scala代码
scala 51行
import java.io.File import java.util import org.apache.http.NameValuePair import org.apache.http.client.entity.UrlEncodedFormEntity import org.apache.http.entity.ContentType import org.apache.http.entity.mime.MultipartEntityBuilder import org.apache.http.entity.mime.content.{FileBody, StringBody} import org.apache.http.message.BasicNameValuePair import org.apache.http.util.CharsetUtils /** * Created by liuxuecheng on 2017/1/4. */ object EmailSendUtil { def sendEmail(nickname: String,subject:String,content:String,receivers:String):Unit={ val url = "邮件发送接口地址" val content_type = "application/x-www-form-urlencoded" val params = new util.ArrayList[NameValuePair]() params.add(new BasicNameValuePair ("nickname",nickname)) params.add(new BasicNameValuePair ("subject",subject)) params.add(new BasicNameValuePair ("content",content)) params.add(new BasicNameValuePair ("receivers",receivers)) try{ val entity = new UrlEncodedFormEntity(params,"UTF-8") entity.setContentType(content_type) HttpClientUtil.post(null,url,entity) }catch{ case e:Throwable=>e.printStackTrace() } } def sendAttachment(nickname: String,subject:String,content:String,receivers:String,file:File):Unit={ val url = "邮件发送接口地址" val body = new FileBody(file) val entity = MultipartEntityBuilder.create() .setCharset(CharsetUtils.get("UTF-8")) .addPart("attachment",body) .addPart("nickname",new StringBody(nickname,ContentType.create("text/plain",CharsetUtils.get("UTF-8")))) .addPart("subject",new StringBody(subject,ContentType.create("text/plain",CharsetUtils.get("UTF-8")))) .addPart("content",new StringBody(content,ContentType.create("text/plain",CharsetUtils.get("UTF-8")))) .addTextBody("receivers",receivers) .setContentType(ContentType.MULTIPART_FORM_DATA) .build() HttpClientUtil.post(null,url,entity) } }
如下截取代码片断
scala 7行
def getHdfsFile(sc:SparkContext,path:String):File = { //须要hdfs的全路径,开头通常为hdfs://或者viewfs:// 而且具体到文件名 val filePath = "viewfs://xx-cluster"+path+"part-00000" sc.addFile(filePath) new File(SparkFiles.get(new File(filePath).getName)) }
既然都拿到文件了,发送邮件不就很简单了,调用上面封装好的接口就行。
有时候不必生成hdfs,计算结果适合报表展现的时候能够直接collect到内存中,而后构建一段html发送,上代码。
scala 17行
val rdd = group_rdd.groupByKey(200) .map(x=>{ val raw_uv = x._2.flatMap(e=>e._1).toSeq.distinct.size val raw_pv = x._2.map(e=>e._2).reduce(_+_) val cm_uv = x._2.flatMap(e=>e._3).toSeq.distinct.size val cm_pv = x._2.map(e=>e._4).reduce(_+_) IndexEntity(x._1._1,x._1._2,x._1._3,raw_uv,raw_pv,cm_uv,cm_pv) }).collect().sortBy(_.search_flag).sortBy(_.platform).sortBy(_.bd) //模板拼接 val tbody:StringBuffer = new StringBuffer() rdd.foreach(entity=>{ tbody.append(s"<tr><td>${entity.bd}</td><td>${entity.platform}</td><td>${entity.search_flag}</td>" + s"<td>${entity.raw_uv}</td><td>${entity.cm_uv}</td><td>${new DecimalFormat(".00").format((entity.cm_uv.toDouble/entity.raw_uv)*100)}%</td>" + s"<td>${entity.raw_pv}</td><td>${entity.cm_pv}</td><td>${new DecimalFormat(".00").format((entity.cm_pv.toDouble/entity.raw_pv)*100)}%</td></tr>") })