最近作一个带屏的冰箱项目,其中有个文件上传的功能。基本的思路是在一个局域网中的设备端起一个服务,这样局域网中的其余设备就能够经过Http访问的方式,实现文件上传的功能了。在设备端起一个服务,这里使用了一个开源的微服务项目NanoHTTPD. 虽然只有一个java文件,可是里面包含了不少网络处理方面的细节。所谓麻雀虽小,五脏俱全。这篇文章会介绍NanoHTTPD的源码,但又不只如此。我但愿这篇文章会把socket和http方面的基础概念介绍一下,由于我在阅读NanoHTTPD源码的时候发现,这些概念对于理解NanoHTTPD很是的重要。另外NanoHTTPD包含了网络处理的一些细节,当你以前没有深刻的研究过这些细节的时候,你就很难系统清晰的理解网络传输。html
下面这段代码是官网提供的Sample,其实挺简单,指定一个端口,调用start方法就能够了。serve方法的做用就是处理请求做出响应,sample中返回了一个html页面。java
public class App extends NanoHTTPD { public App() throws IOException { super(8080); start(NanoHTTPD.SOCKET_READ_TIMEOUT, false); System.out.println("\nRunning! Point your browsers to http://localhost:8080/ \n"); } public static void main(String[] args) { try { new App(); } catch (IOException ioe) { System.err.println("Couldn't start server:\n" + ioe); } } @Override public Response serve(IHTTPSession session) { String msg = "<html><body><h1>Hello server</h1>\n"; Map<String, String> parms = session.getParms(); if (parms.get("username") == null) { msg += "<form action='?' method='get'>\n <p>Your name: <input type='text' name='username'></p>\n" + "</form>\n"; } else { msg += "<p>Hello, " + parms.get("username") + "!</p>"; } return newFixedLengthResponse(msg + "</body></html>\n");//返回html页面 } } 复制代码
调用start方法后,服务就起来了。下面咱们深刻其内部看看这个start作了那些的操做。android
/** * Start the server. * * @param timeout timeout to use for socket connections. * @param daemon start the thread daemon or not. * @throws IOException if the socket is in use. */ public void start(final int timeout, boolean daemon) throws IOException { this.myServerSocket = this.getServerSocketFactory().create(); this.myServerSocket.setReuseAddress(true); ServerRunnable serverRunnable = createServerRunnable(timeout); this.myThread = new Thread(serverRunnable); this.myThread.setDaemon(daemon);//线程分为User线程和Daemon线程,当用户线程结束时,jvm也会推出, //Daemon线程也就结束了,可是只要有User线程在, Jvm就不会退出。 this.myThread.setName("NanoHttpd Main Listener"); this.myThread.start(); while (!serverRunnable.hasBinded && serverRunnable.bindException == null) { try { Thread.sleep(10L); } catch (Throwable e) { // on android this may not be allowed, that's why we // catch throwable the wait should be very short because we are // just waiting for the bind of the socket } } if (serverRunnable.bindException != null) { throw serverRunnable.bindException; } } 复制代码
首先建立了一个ServerSocket实例,而后起了一个线程,在这个线程中进行操做,至于进行了什么操做,它的逻辑在ServerRunnable中。git
/** * The runnable that will be used for the main listening thread. */ public class ServerRunnable implements Runnable { private final int timeout; private IOException bindException; private boolean hasBinded = false; private ServerRunnable(int timeout) { this.timeout = timeout; } @Override public void run() { try { myServerSocket.bind(hostname != null ? new InetSocketAddress(hostname, myPort) : new InetSocketAddress(myPort)); hasBinded = true; } catch (IOException e) { this.bindException = e; return; } do { try { final Socket finalAccept = NanoHTTPD.this.myServerSocket.accept(); if (this.timeout > 0) { finalAccept.setSoTimeout(this.timeout); } final InputStream inputStream = finalAccept.getInputStream(); NanoHTTPD.this.asyncRunner.exec(createClientHandler(finalAccept, inputStream)); } catch (IOException e) { NanoHTTPD.LOG.log(Level.FINE, "Communication with the client broken", e); } } while (!NanoHTTPD.this.myServerSocket.isClosed()); } } 复制代码
咱们看一下ServerRunnable的run方法,咱们建立了一个ServerSocket实例,如今咱们调用它的bind方法, 在Java层面只给我暴露了一个bind方法,可是咱们要知道,Java底层也是要调用系统提供的接口的,就是所谓的系统调用, Java中的bind方法其实对应的是系统调用的bind和listen两个方法。那咱们看看系统调用的bind和listen是作什么的。你们查看这些系统调用的api的文档可使用下面的命令:github
man 2 bind 复制代码
bind()函数把一个地址族中的特定地址赋给socket。例如对应AF_INET、AF_INET6就是把一个ipv4或ipv6地址和端口号组合赋给socket。至关于将ip地址和端口与socket创建了联系。编程
其实在真正的系统层级的Socket接口中还有一个listen方法。它的做用让其余进程的socket能够访问当前的socket.api
当ServerSocket调用accept方法的时候,就能够获取到客户端的请求了,这个方法是个阻塞方法(所谓阻塞,就是进程或是线程执行到这个函数时必须等待某个事件的发生,若是这个事件没有发生,则进程和线程就会阻塞在这个地方,不能继续往下执行了。),当有客户端connect时,这个方法就会被调用。它返回了一个Socket,这个Socket其实既包含了服务端的Socket描述符,也包含了客户端返回的地址信息等。从这个Socket有getInputStream和getOutputStream两个方法,分别表明从客户端发送过来的数据流和咱们返回给客户端的数据流。浏览器
上面的部分咱们已经在服务端建立了一个Socket而且调用了它的bind、listen等方法。可是到底什么是Socket呢,咱们如今讨论一下,在讨论什么是Socket以前,咱们先了解一下什么是网络协议,咱们人交谈时,说出的语言要符合语法和用语规范。机器之间的通话也要符合必定的协议。不然,鸡同鸭讲,没法相互理解。咱们平时所用的网络是由:缓存
这四层协议组成。它们的顺序为由上到下,也就是说上层协议要依赖下层协议。好比HTTP协议它要依赖TCP协议。 了解了这些内容以后咱们讲什么是Socket,它是系统对TCP/IP协议的封装的接口,也就是说Socket不是协议,它只是对TCP/IP协议的实现,方便开发者对网络进行开发而已。在Unix中一切皆文件,其实Socket也是一种文件,换句话说网络链接就是一个文件。由于它有对数据流有读写和关闭功能。当咱们创建一个网络链接时,就是建立了一个socket文件,这样咱们就能够read从别的计算器传输过来的数据,write数据给别的计算机。bash
做为服务端,面临的一种状况就是并发访问。NanoHTTPD其实在AsyncRunner的exec方法中作的处理,在DefaultAsyncRunner的exec方法中,启动了一个线程处理每个的访问链接。链接默认的上限是50.而真正处理请求的方法的地方在ClientHandler中。
客户端的请求处理都是在这个方法中完成的。
/** * The runnable that will be used for every new client connection. */ public class ClientHandler implements Runnable { ... @Override public void run() { OutputStream outputStream = null; try { outputStream = this.acceptSocket.getOutputStream(); TempFileManager tempFileManager = NanoHTTPD.this.tempFileManagerFactory.create(); HTTPSession session = new HTTPSession(tempFileManager, this.inputStream, outputStream, this.acceptSocket.getInetAddress()); while (!this.acceptSocket.isClosed()) { session.execute(); } } catch (Exception e) { // When the socket is closed by the client, // we throw our own SocketException // to break the "keep alive" loop above. If // the exception was anything other // than the expected SocketException OR a // SocketTimeoutException, print the // stacktrace if (!(e instanceof SocketException && "NanoHttpd Shutdown".equals(e.getMessage())) && !(e instanceof SocketTimeoutException)) { NanoHTTPD.LOG.log(Level.FINE, "Communication with the client broken", e); } } finally { safeClose(outputStream); safeClose(this.inputStream); safeClose(this.acceptSocket); NanoHTTPD.this.asyncRunner.closed(this); } } } 复制代码
从上面的逻辑能够发现,这里有建立了一个HTTPSession用于处理每一次的http请求,那咱们就看看HTTPSession.execute这个方法。
在这个方法中获取到InputStream,这个就是客户端请求数据流,经过它咱们就能够拿到客户端的请求数据了。咱们常常据说的HTTP协议这个时候就有用处了,Socket封装了TCP/IP协议,可是没有封装应用层协议。这一层的协议须要咱们本身处理。这个execute方法里的逻辑就是咱们对Http协议的实现。咱们的服务在一个机器上开始运转了,这个时候在另外一台机器的浏览器里输入了前面机器的Ip和端口。我知道这就是一个Http请求了,那么请求数据就到了inputStream这个输入流中。那么接下来就根据http协议规定的内容来解析数据了。
@Override public void execute() throws IOException { Response r = null; try { //读取前8192字节的数据,其实就是header,Apache默认的header限制大小为8KB byte[] buf = new byte[HTTPSession.BUFSIZE]; this.splitbyte = 0; this.rlen = 0; int read = -1; this.inputStream.mark(HTTPSession.BUFSIZE); try { read = this.inputStream.read(buf, 0, HTTPSession.BUFSIZE); } catch (Exception e) { safeClose(this.inputStream); safeClose(this.outputStream); throw new SocketException("NanoHttpd Shutdown"); } if (read == -1) { // socket was been closed safeClose(this.inputStream); safeClose(this.outputStream); throw new SocketException("NanoHttpd Shutdown"); } while (read > 0) { this.rlen += read; this.splitbyte = findHeaderEnd(buf, this.rlen); if (this.splitbyte > 0) { break; } read = this.inputStream.read(buf, this.rlen, HTTPSession.BUFSIZE - this.rlen); } if (this.splitbyte < this.rlen) { this.inputStream.reset(); this.inputStream.skip(this.splitbyte); } this.parms = new HashMap<String, String>(); if (null == this.headers) { this.headers = new HashMap<String, String>(); //建立header用于存储咱们解析出来的请求头 } else { this.headers.clear(); } // 建立BufferedReader用于解析header BufferedReader hin = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(buf, 0, this.rlen))); // Decode the header into parms and header java properties Map<String, String> pre = new HashMap<String, String>(); decodeHeader(hin, pre, this.parms, this.headers);//咱们将解析的header存入map中 //打印解析出来的header数据,笔者加的日志 for (Map.Entry<String, String> entry : headers.entrySet()) { Log.d(TAG, "header key = " + entry.getKey() + " value = " + entry.getValue()); } if (null != this.remoteIp) { this.headers.put("remote-addr", this.remoteIp); this.headers.put("http-client-ip", this.remoteIp); } this.method = Method.lookup(pre.get("method"));//获取请求方法 get、post、put if (this.method == null) { throw new ResponseException(Response.Status.BAD_REQUEST, "BAD REQUEST: Syntax error."); } this.uri = pre.get("uri");//获取请求的uri this.cookies = new CookieHandler(this.headers);//处理cookie String connection = this.headers.get("connection"); boolean keepAlive = protocolVersion.equals("HTTP/1.1") && (connection == null || !connection.matches("(?i).*close.*"));//判断是否支持keepAlive // Ok, now do the serve() // TODO: long body_size = getBodySize(); // TODO: long pos_before_serve = this.inputStream.totalRead() // (requires implementaion for totalRead()) r = serve(this);//这个serve就是咱们在开篇的实例代码中实现的方法,在这个方法中咱们要建立返回给客户端的响应数据。 // TODO: this.inputStream.skip(body_size - // (this.inputStream.totalRead() - pos_before_serve)) if (r == null) { throw new ResponseException(Response.Status.INTERNAL_ERROR, "SERVER INTERNAL ERROR: Serve() returned a null response."); } else { //r为响应数据,这里是添加一些公共的响应头 String acceptEncoding = this.headers.get("accept-encoding"); this.cookies.unloadQueue(r); r.setRequestMethod(this.method); r.setGzipEncoding(useGzipWhenAccepted(r) && acceptEncoding != null && acceptEncoding.contains("gzip")); r.setKeepAlive(keepAlive); r.send(this.outputStream);//将数据响应给客户端 } if (!keepAlive || "close".equalsIgnoreCase(r.getHeader("connection"))) { throw new SocketException("NanoHttpd Shutdown"); } } catch (SocketException e) { // throw it out to close socket object (finalAccept) throw e; } catch (SocketTimeoutException ste) { // treat socket timeouts the same way we treat socket exceptions // i.e. close the stream & finalAccept object by throwing the // exception up the call stack. throw ste; } catch (IOException ioe) { Response resp = newFixedLengthResponse(Response.Status.INTERNAL_ERROR, NanoHTTPD.MIME_PLAINTEXT, "SERVER INTERNAL ERROR: IOException: " + ioe.getMessage()); resp.send(this.outputStream); safeClose(this.outputStream); } catch (ResponseException re) { Response resp = newFixedLengthResponse(re.getStatus(), NanoHTTPD.MIME_PLAINTEXT, re.getMessage()); resp.send(this.outputStream); safeClose(this.outputStream); } finally { safeClose(r); this.tempFileManager.clear(); } } 复制代码
我介绍一下这段代码的功能:
下面看看serve方法,这个方法是须要咱们实现的,可是其内部是有个默认实现,那就是处理文件上传的一个实现
public Response serve(IHTTPSession session) { Map<String, String> files = new HashMap<String, String>(); Method method = session.getMethod(); if (Method.PUT.equals(method) || Method.POST.equals(method)) { try { session.parseBody(files);//重要的逻辑在parseBody中 } catch (IOException ioe) { return newFixedLengthResponse(Response.Status.INTERNAL_ERROR, NanoHTTPD.MIME_PLAINTEXT, "SERVER INTERNAL ERROR: IOException: " + ioe.getMessage()); } catch (ResponseException re) { return newFixedLengthResponse(re.getStatus(), NanoHTTPD.MIME_PLAINTEXT, re.getMessage()); } } Map<String, String> parms = session.getParms(); parms.put(NanoHTTPD.QUERY_STRING_PARAMETER, session.getQueryParameterString()); return serve(session.getUri(), method, session.getHeaders(), parms, files); } 复制代码
对就是这个session.parseBody()方法。
@Override public void parseBody(Map<String, String> files) throws IOException, ResponseException { RandomAccessFile randomAccessFile = null; try { long size = getBodySize(); ByteArrayOutputStream baos = null; DataOutput request_data_output = null; // Store the request in memory or a file, depending on size if (size < MEMORY_STORE_LIMIT) { baos = new ByteArrayOutputStream(); request_data_output = new DataOutputStream(baos); } else { randomAccessFile = getTmpBucket(); request_data_output = randomAccessFile; } // Read all the body and write it to request_data_output byte[] buf = new byte[REQUEST_BUFFER_LEN]; while (this.rlen >= 0 && size > 0) { this.rlen = this.inputStream.read(buf, 0, (int) Math.min(size, REQUEST_BUFFER_LEN)); size -= this.rlen; if (this.rlen > 0) { request_data_output.write(buf, 0, this.rlen); } } ByteBuffer fbuf = null; if (baos != null) { fbuf = ByteBuffer.wrap(baos.toByteArray(), 0, baos.size()); } else { fbuf = randomAccessFile.getChannel().map(FileChannel.MapMode.READ_ONLY, 0, randomAccessFile.length()); randomAccessFile.seek(0); } // If the method is POST, there may be parameters // in data section, too, read it: if (Method.POST.equals(this.method)) { String contentType = ""; String contentTypeHeader = this.headers.get("content-type"); Log.d(TAG, "contentTypeHeader = " + contentTypeHeader); StringTokenizer st = null; if (contentTypeHeader != null) { st = new StringTokenizer(contentTypeHeader, ",; "); if (st.hasMoreTokens()) { contentType = st.nextToken(); } } if ("multipart/form-data".equalsIgnoreCase(contentType)) {//文件上传 // Handle multipart/form-data if (!st.hasMoreTokens()) { throw new ResponseException(Response.Status.BAD_REQUEST, "BAD REQUEST: Content type is multipart/form-data but boundary missing. Usage: GET /example/file.html"); } //处理文件上传的方法 decodeMultipartFormData(getAttributeFromContentHeader(contentTypeHeader, BOUNDARY_PATTERN, null), // getAttributeFromContentHeader(contentTypeHeader, CHARSET_PATTERN, "US-ASCII"), fbuf, this.parms, files); } else { byte[] postBytes = new byte[fbuf.remaining()]; fbuf.get(postBytes); String postLine = new String(postBytes).trim(); // Handle application/x-www-form-urlencoded if ("application/x-www-form-urlencoded".equalsIgnoreCase(contentType)) { decodeParms(postLine, this.parms); } else if (postLine.length() != 0) { // Special case for raw POST data => create a // special files entry "postData" with raw content // data files.put("postData", postLine); } } } else if (Method.PUT.equals(this.method)) { files.put("content", saveTmpFile(fbuf, 0, fbuf.limit(), null)); } } finally { safeClose(randomAccessFile); } } 复制代码
这段代码的功能:
这个方法主要处理文件上传逻辑
/** * Decodes the Multipart Body data and put it into Key/Value pairs. */ private void decodeMultipartFormData(String boundary, String encoding, ByteBuffer fbuf, Map<String, String> parms, Map<String, String> files) throws ResponseException { try { int[] boundary_idxs = getBoundaryPositions(fbuf, boundary.getBytes()); if (boundary_idxs.length < 2) { throw new ResponseException(Response.Status.BAD_REQUEST, "BAD REQUEST: Content type is multipart/form-data but contains less than two boundary strings."); } byte[] part_header_buff = new byte[MAX_HEADER_SIZE]; for (int bi = 0; bi < boundary_idxs.length - 1; bi++) { fbuf.position(boundary_idxs[bi]); int len = (fbuf.remaining() < MAX_HEADER_SIZE) ? fbuf.remaining() : MAX_HEADER_SIZE; fbuf.get(part_header_buff, 0, len); BufferedReader in = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(part_header_buff, 0, len), Charset.forName(encoding)), len); int headerLines = 0; // First line is boundary string String mpline = in.readLine(); headerLines++; if (!mpline.contains(boundary)) { throw new ResponseException(Response.Status.BAD_REQUEST, "BAD REQUEST: Content type is multipart/form-data but chunk does not start with boundary."); } String part_name = null, file_name = null, content_type = null; // Parse the reset of the header lines mpline = in.readLine(); headerLines++; while (mpline != null && mpline.trim().length() > 0) {//经过正则的方式获取文件名称 Matcher matcher = CONTENT_DISPOSITION_PATTERN.matcher(mpline); if (matcher.matches()) { String attributeString = matcher.group(2); matcher = CONTENT_DISPOSITION_ATTRIBUTE_PATTERN.matcher(attributeString); while (matcher.find()) { String key = matcher.group(1); if (key.equalsIgnoreCase("name")) { part_name = matcher.group(2); } else if (key.equalsIgnoreCase("filename")) { file_name = matcher.group(2); } } } matcher = CONTENT_TYPE_PATTERN.matcher(mpline); if (matcher.matches()) { content_type = matcher.group(2).trim(); } mpline = in.readLine(); headerLines++; } int part_header_len = 0; while (headerLines-- > 0) { part_header_len = scipOverNewLine(part_header_buff, part_header_len); } // Read the part data if (part_header_len >= len - 4) { throw new ResponseException(Response.Status.INTERNAL_ERROR, "Multipart header size exceeds MAX_HEADER_SIZE."); } int part_data_start = boundary_idxs[bi] + part_header_len; int part_data_end = boundary_idxs[bi + 1] - 4; fbuf.position(part_data_start); if (content_type == null) { // Read the part into a string byte[] data_bytes = new byte[part_data_end - part_data_start]; fbuf.get(data_bytes); parms.put(part_name, new String(data_bytes, encoding)); } else { // Read it into a file String path = saveTmpFile(fbuf, part_data_start, part_data_end - part_data_start, file_name);//将文件存储在一个临时目录 if (!files.containsKey(part_name)) { files.put(part_name, path); } else { int count = 2; while (files.containsKey(part_name + count)) { count++; } files.put(part_name + count, path); } if (!parms.containsKey(part_name)) { parms.put(part_name, file_name); } else { int count = 2; while (parms.containsKey(part_name + count)) { count++; } parms.put(part_name + count, file_name); } } } } catch (ResponseException re) { throw re; } catch (Exception e) { throw new ResponseException(Response.Status.INTERNAL_ERROR, e.toString()); } } 复制代码
上面的代码就是将客户端上传的文件解析出来,而后存储在服务端。在解析数据的时候用了不少的正则匹配,这也从侧面印证了Http协议,若是没有Http协议也就无法使用正则去解析数据。
以上咱们已经把NanoHTTPD中最主要的逻辑介绍完了,NanoHTTPD经过Socket实现了一个服务端,而咱们的客户端则是由浏览器实现的。下面放一张完整的Socket实现客户端和服务端的流程图,帮助你们理解。
好了,NanoHTTPD的源码就解析完了,经过上面的介绍,咱们对Socket, HTTP协议,TCP/IP协议等有了一个更为深入的认识。不在那么迷茫。固然了,这之中涉及到了一些系统socket的api,经过这些我想说的是java层的api和系统层的api,原理是同样的,可是咱们应该更关心底层的实现原理和本质。
在前面建立ServerSocket的时候,设计到了IPV6,下面就介绍一些IPV6的格式。
IPv4地址大小是32位的,好比192.168.0.1, 每一个小圆点之间为8位。 IPv6的地址为128位。使用:分割,分红8个部分,每一个部分为16位,因此大多数IPv6的地址使用16进制表示,好比ffff:ffff:ffff:ffff:ffff:ffff:fff.
IPv6 地址大小为 128 位。首选 IPv6 地址表示法为 x:x:x:x:x:x:x:x,其中每一个 x 是地址的 8 个 16 位部分的十六进制值。IPv6 地址范围从 0000:0000:0000:0000:0000:0000:0000:0000 至 ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff。 这种是完整的表示格式,其实还有两种咱们比较常见的简写格式: