最近作一个带屏的冰箱项目,其中有个文件上传的功能。基本的思路是在一个局域网中的设备端起一个服务,这样局域网中的其余设备就能够经过Http访问的方式,实现文件上传的功能了。在设备端起一个服务,这里使用了一个开源的微服务项目NanoHTTPD. 虽然只有一个java文件,可是里面包含了不少网络处理方面的细节。所谓麻雀虽小,五脏俱全。这篇文章会介绍NanoHTTPD的源码,但又不只如此。我但愿这篇文章会把socket和http方面的基础概念介绍一下,由于我在阅读NanoHTTPD源码的时候发现,这些概念对于理解NanoHTTPD很是的重要。另外NanoHTTPD包含了网络处理的一些细节,当你以前没有深刻的研究过这些细节的时候,你就很难系统清晰的理解网络传输。html
下面这段代码是官网提供的Sample,其实挺简单,指定一个端口,调用start方法就能够了。serve方法的做用就是处理请求做出响应,sample中返回了一个html页面。java
public class App extends NanoHTTPD {
public App() throws IOException {
super(8080);
start(NanoHTTPD.SOCKET_READ_TIMEOUT, false);
System.out.println("\nRunning! Point your browsers to http://localhost:8080/ \n");
}
public static void main(String[] args) {
try {
new App();
} catch (IOException ioe) {
System.err.println("Couldn't start server:\n" + ioe);
}
}
@Override
public Response serve(IHTTPSession session) {
String msg = "<html><body><h1>Hello server</h1>\n";
Map<String, String> parms = session.getParms();
if (parms.get("username") == null) {
msg += "<form action='?' method='get'>\n <p>Your name: <input type='text' name='username'></p>\n" + "</form>\n";
} else {
msg += "<p>Hello, " + parms.get("username") + "!</p>";
}
return newFixedLengthResponse(msg + "</body></html>\n");//返回html页面
}
}
复制代码
调用start方法后,服务就起来了。下面咱们深刻其内部看看这个start作了那些的操做。android
/**
* Start the server.
*
* @param timeout timeout to use for socket connections.
* @param daemon start the thread daemon or not.
* @throws IOException if the socket is in use.
*/
public void start(final int timeout, boolean daemon) throws IOException {
this.myServerSocket = this.getServerSocketFactory().create();
this.myServerSocket.setReuseAddress(true);
ServerRunnable serverRunnable = createServerRunnable(timeout);
this.myThread = new Thread(serverRunnable);
this.myThread.setDaemon(daemon);//线程分为User线程和Daemon线程,当用户线程结束时,jvm也会推出,
//Daemon线程也就结束了,可是只要有User线程在, Jvm就不会退出。
this.myThread.setName("NanoHttpd Main Listener");
this.myThread.start();
while (!serverRunnable.hasBinded && serverRunnable.bindException == null) {
try {
Thread.sleep(10L);
} catch (Throwable e) {
// on android this may not be allowed, that's why we // catch throwable the wait should be very short because we are // just waiting for the bind of the socket } } if (serverRunnable.bindException != null) { throw serverRunnable.bindException; } } 复制代码
首先建立了一个ServerSocket实例,而后起了一个线程,在这个线程中进行操做,至于进行了什么操做,它的逻辑在ServerRunnable中。git
/**
* The runnable that will be used for the main listening thread.
*/
public class ServerRunnable implements Runnable {
private final int timeout;
private IOException bindException;
private boolean hasBinded = false;
private ServerRunnable(int timeout) {
this.timeout = timeout;
}
@Override
public void run() {
try {
myServerSocket.bind(hostname != null ? new InetSocketAddress(hostname, myPort) : new InetSocketAddress(myPort));
hasBinded = true;
} catch (IOException e) {
this.bindException = e;
return;
}
do {
try {
final Socket finalAccept = NanoHTTPD.this.myServerSocket.accept();
if (this.timeout > 0) {
finalAccept.setSoTimeout(this.timeout);
}
final InputStream inputStream = finalAccept.getInputStream();
NanoHTTPD.this.asyncRunner.exec(createClientHandler(finalAccept, inputStream));
} catch (IOException e) {
NanoHTTPD.LOG.log(Level.FINE, "Communication with the client broken", e);
}
} while (!NanoHTTPD.this.myServerSocket.isClosed());
}
}
复制代码
咱们看一下ServerRunnable的run方法,咱们建立了一个ServerSocket实例,如今咱们调用它的bind方法, 在Java层面只给我暴露了一个bind方法,可是咱们要知道,Java底层也是要调用系统提供的接口的,就是所谓的系统调用, Java中的bind方法其实对应的是系统调用的bind和listen两个方法。那咱们看看系统调用的bind和listen是作什么的。你们查看这些系统调用的api的文档可使用下面的命令:github
man 2 bind
复制代码
bind()函数把一个地址族中的特定地址赋给socket。例如对应AF_INET、AF_INET6就是把一个ipv4或ipv6地址和端口号组合赋给socket。至关于将ip地址和端口与socket创建了联系。编程
其实在真正的系统层级的Socket接口中还有一个listen方法。它的做用让其余进程的socket能够访问当前的socket.api
当ServerSocket调用accept方法的时候,就能够获取到客户端的请求了,这个方法是个阻塞方法(所谓阻塞,就是进程或是线程执行到这个函数时必须等待某个事件的发生,若是这个事件没有发生,则进程和线程就会阻塞在这个地方,不能继续往下执行了。),当有客户端connect时,这个方法就会被调用。它返回了一个Socket,这个Socket其实既包含了服务端的Socket描述符,也包含了客户端返回的地址信息等。从这个Socket有getInputStream和getOutputStream两个方法,分别表明从客户端发送过来的数据流和咱们返回给客户端的数据流。浏览器
上面的部分咱们已经在服务端建立了一个Socket而且调用了它的bind、listen等方法。可是到底什么是Socket呢,咱们如今讨论一下,在讨论什么是Socket以前,咱们先了解一下什么是网络协议,咱们人交谈时,说出的语言要符合语法和用语规范。机器之间的通话也要符合必定的协议。不然,鸡同鸭讲,没法相互理解。咱们平时所用的网络是由:缓存
这四层协议组成。它们的顺序为由上到下,也就是说上层协议要依赖下层协议。好比HTTP协议它要依赖TCP协议。 了解了这些内容以后咱们讲什么是Socket,它是系统对TCP/IP协议的封装的接口,也就是说Socket不是协议,它只是对TCP/IP协议的实现,方便开发者对网络进行开发而已。在Unix中一切皆文件,其实Socket也是一种文件,换句话说网络链接就是一个文件。由于它有对数据流有读写和关闭功能。当咱们创建一个网络链接时,就是建立了一个socket文件,这样咱们就能够read从别的计算器传输过来的数据,write数据给别的计算机。bash
做为服务端,面临的一种状况就是并发访问。NanoHTTPD其实在AsyncRunner的exec方法中作的处理,在DefaultAsyncRunner的exec方法中,启动了一个线程处理每个的访问链接。链接默认的上限是50.而真正处理请求的方法的地方在ClientHandler中。
客户端的请求处理都是在这个方法中完成的。
/**
* The runnable that will be used for every new client connection.
*/
public class ClientHandler implements Runnable {
...
@Override
public void run() {
OutputStream outputStream = null;
try {
outputStream = this.acceptSocket.getOutputStream();
TempFileManager tempFileManager = NanoHTTPD.this.tempFileManagerFactory.create();
HTTPSession session = new HTTPSession(tempFileManager, this.inputStream, outputStream, this.acceptSocket.getInetAddress());
while (!this.acceptSocket.isClosed()) {
session.execute();
}
} catch (Exception e) {
// When the socket is closed by the client,
// we throw our own SocketException
// to break the "keep alive" loop above. If
// the exception was anything other
// than the expected SocketException OR a
// SocketTimeoutException, print the
// stacktrace
if (!(e instanceof SocketException && "NanoHttpd Shutdown".equals(e.getMessage())) && !(e instanceof SocketTimeoutException)) {
NanoHTTPD.LOG.log(Level.FINE, "Communication with the client broken", e);
}
} finally {
safeClose(outputStream);
safeClose(this.inputStream);
safeClose(this.acceptSocket);
NanoHTTPD.this.asyncRunner.closed(this);
}
}
}
复制代码
从上面的逻辑能够发现,这里有建立了一个HTTPSession用于处理每一次的http请求,那咱们就看看HTTPSession.execute这个方法。
在这个方法中获取到InputStream,这个就是客户端请求数据流,经过它咱们就能够拿到客户端的请求数据了。咱们常常据说的HTTP协议这个时候就有用处了,Socket封装了TCP/IP协议,可是没有封装应用层协议。这一层的协议须要咱们本身处理。这个execute方法里的逻辑就是咱们对Http协议的实现。咱们的服务在一个机器上开始运转了,这个时候在另外一台机器的浏览器里输入了前面机器的Ip和端口。我知道这就是一个Http请求了,那么请求数据就到了inputStream这个输入流中。那么接下来就根据http协议规定的内容来解析数据了。
@Override
public void execute() throws IOException {
Response r = null;
try {
//读取前8192字节的数据,其实就是header,Apache默认的header限制大小为8KB
byte[] buf = new byte[HTTPSession.BUFSIZE];
this.splitbyte = 0;
this.rlen = 0;
int read = -1;
this.inputStream.mark(HTTPSession.BUFSIZE);
try {
read = this.inputStream.read(buf, 0, HTTPSession.BUFSIZE);
} catch (Exception e) {
safeClose(this.inputStream);
safeClose(this.outputStream);
throw new SocketException("NanoHttpd Shutdown");
}
if (read == -1) {
// socket was been closed
safeClose(this.inputStream);
safeClose(this.outputStream);
throw new SocketException("NanoHttpd Shutdown");
}
while (read > 0) {
this.rlen += read;
this.splitbyte = findHeaderEnd(buf, this.rlen);
if (this.splitbyte > 0) {
break;
}
read = this.inputStream.read(buf, this.rlen, HTTPSession.BUFSIZE - this.rlen);
}
if (this.splitbyte < this.rlen) {
this.inputStream.reset();
this.inputStream.skip(this.splitbyte);
}
this.parms = new HashMap<String, String>();
if (null == this.headers) {
this.headers = new HashMap<String, String>(); //建立header用于存储咱们解析出来的请求头
} else {
this.headers.clear();
}
// 建立BufferedReader用于解析header
BufferedReader hin = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(buf, 0, this.rlen)));
// Decode the header into parms and header java properties
Map<String, String> pre = new HashMap<String, String>();
decodeHeader(hin, pre, this.parms, this.headers);//咱们将解析的header存入map中
//打印解析出来的header数据,笔者加的日志
for (Map.Entry<String, String> entry : headers.entrySet()) {
Log.d(TAG, "header key = " + entry.getKey() + " value = " + entry.getValue());
}
if (null != this.remoteIp) {
this.headers.put("remote-addr", this.remoteIp);
this.headers.put("http-client-ip", this.remoteIp);
}
this.method = Method.lookup(pre.get("method"));//获取请求方法 get、post、put
if (this.method == null) {
throw new ResponseException(Response.Status.BAD_REQUEST, "BAD REQUEST: Syntax error.");
}
this.uri = pre.get("uri");//获取请求的uri
this.cookies = new CookieHandler(this.headers);//处理cookie
String connection = this.headers.get("connection");
boolean keepAlive = protocolVersion.equals("HTTP/1.1") && (connection == null || !connection.matches("(?i).*close.*"));//判断是否支持keepAlive
// Ok, now do the serve()
// TODO: long body_size = getBodySize();
// TODO: long pos_before_serve = this.inputStream.totalRead()
// (requires implementaion for totalRead())
r = serve(this);//这个serve就是咱们在开篇的实例代码中实现的方法,在这个方法中咱们要建立返回给客户端的响应数据。
// TODO: this.inputStream.skip(body_size -
// (this.inputStream.totalRead() - pos_before_serve))
if (r == null) {
throw new ResponseException(Response.Status.INTERNAL_ERROR, "SERVER INTERNAL ERROR: Serve() returned a null response.");
} else {
//r为响应数据,这里是添加一些公共的响应头
String acceptEncoding = this.headers.get("accept-encoding");
this.cookies.unloadQueue(r);
r.setRequestMethod(this.method);
r.setGzipEncoding(useGzipWhenAccepted(r) && acceptEncoding != null && acceptEncoding.contains("gzip"));
r.setKeepAlive(keepAlive);
r.send(this.outputStream);//将数据响应给客户端
}
if (!keepAlive || "close".equalsIgnoreCase(r.getHeader("connection"))) {
throw new SocketException("NanoHttpd Shutdown");
}
} catch (SocketException e) {
// throw it out to close socket object (finalAccept)
throw e;
} catch (SocketTimeoutException ste) {
// treat socket timeouts the same way we treat socket exceptions
// i.e. close the stream & finalAccept object by throwing the
// exception up the call stack.
throw ste;
} catch (IOException ioe) {
Response resp = newFixedLengthResponse(Response.Status.INTERNAL_ERROR, NanoHTTPD.MIME_PLAINTEXT, "SERVER INTERNAL ERROR: IOException: " + ioe.getMessage());
resp.send(this.outputStream);
safeClose(this.outputStream);
} catch (ResponseException re) {
Response resp = newFixedLengthResponse(re.getStatus(), NanoHTTPD.MIME_PLAINTEXT, re.getMessage());
resp.send(this.outputStream);
safeClose(this.outputStream);
} finally {
safeClose(r);
this.tempFileManager.clear();
}
}
复制代码
我介绍一下这段代码的功能:
下面看看serve方法,这个方法是须要咱们实现的,可是其内部是有个默认实现,那就是处理文件上传的一个实现
public Response serve(IHTTPSession session) {
Map<String, String> files = new HashMap<String, String>();
Method method = session.getMethod();
if (Method.PUT.equals(method) || Method.POST.equals(method)) {
try {
session.parseBody(files);//重要的逻辑在parseBody中
} catch (IOException ioe) {
return newFixedLengthResponse(Response.Status.INTERNAL_ERROR, NanoHTTPD.MIME_PLAINTEXT, "SERVER INTERNAL ERROR: IOException: " + ioe.getMessage());
} catch (ResponseException re) {
return newFixedLengthResponse(re.getStatus(), NanoHTTPD.MIME_PLAINTEXT, re.getMessage());
}
}
Map<String, String> parms = session.getParms();
parms.put(NanoHTTPD.QUERY_STRING_PARAMETER, session.getQueryParameterString());
return serve(session.getUri(), method, session.getHeaders(), parms, files);
}
复制代码
对就是这个session.parseBody()方法。
@Override
public void parseBody(Map<String, String> files) throws IOException, ResponseException {
RandomAccessFile randomAccessFile = null;
try {
long size = getBodySize();
ByteArrayOutputStream baos = null;
DataOutput request_data_output = null;
// Store the request in memory or a file, depending on size
if (size < MEMORY_STORE_LIMIT) {
baos = new ByteArrayOutputStream();
request_data_output = new DataOutputStream(baos);
} else {
randomAccessFile = getTmpBucket();
request_data_output = randomAccessFile;
}
// Read all the body and write it to request_data_output
byte[] buf = new byte[REQUEST_BUFFER_LEN];
while (this.rlen >= 0 && size > 0) {
this.rlen = this.inputStream.read(buf, 0, (int) Math.min(size, REQUEST_BUFFER_LEN));
size -= this.rlen;
if (this.rlen > 0) {
request_data_output.write(buf, 0, this.rlen);
}
}
ByteBuffer fbuf = null;
if (baos != null) {
fbuf = ByteBuffer.wrap(baos.toByteArray(), 0, baos.size());
} else {
fbuf = randomAccessFile.getChannel().map(FileChannel.MapMode.READ_ONLY, 0, randomAccessFile.length());
randomAccessFile.seek(0);
}
// If the method is POST, there may be parameters
// in data section, too, read it:
if (Method.POST.equals(this.method)) {
String contentType = "";
String contentTypeHeader = this.headers.get("content-type");
Log.d(TAG, "contentTypeHeader = " + contentTypeHeader);
StringTokenizer st = null;
if (contentTypeHeader != null) {
st = new StringTokenizer(contentTypeHeader, ",; ");
if (st.hasMoreTokens()) {
contentType = st.nextToken();
}
}
if ("multipart/form-data".equalsIgnoreCase(contentType)) {//文件上传
// Handle multipart/form-data
if (!st.hasMoreTokens()) {
throw new ResponseException(Response.Status.BAD_REQUEST,
"BAD REQUEST: Content type is multipart/form-data but boundary missing. Usage: GET /example/file.html");
}
//处理文件上传的方法
decodeMultipartFormData(getAttributeFromContentHeader(contentTypeHeader, BOUNDARY_PATTERN, null), //
getAttributeFromContentHeader(contentTypeHeader, CHARSET_PATTERN, "US-ASCII"), fbuf, this.parms, files);
} else {
byte[] postBytes = new byte[fbuf.remaining()];
fbuf.get(postBytes);
String postLine = new String(postBytes).trim();
// Handle application/x-www-form-urlencoded
if ("application/x-www-form-urlencoded".equalsIgnoreCase(contentType)) {
decodeParms(postLine, this.parms);
} else if (postLine.length() != 0) {
// Special case for raw POST data => create a
// special files entry "postData" with raw content
// data
files.put("postData", postLine);
}
}
} else if (Method.PUT.equals(this.method)) {
files.put("content", saveTmpFile(fbuf, 0, fbuf.limit(), null));
}
} finally {
safeClose(randomAccessFile);
}
}
复制代码
这段代码的功能:
这个方法主要处理文件上传逻辑
/**
* Decodes the Multipart Body data and put it into Key/Value pairs.
*/
private void decodeMultipartFormData(String boundary, String encoding, ByteBuffer fbuf, Map<String, String> parms, Map<String, String> files) throws ResponseException {
try {
int[] boundary_idxs = getBoundaryPositions(fbuf, boundary.getBytes());
if (boundary_idxs.length < 2) {
throw new ResponseException(Response.Status.BAD_REQUEST, "BAD REQUEST: Content type is multipart/form-data but contains less than two boundary strings.");
}
byte[] part_header_buff = new byte[MAX_HEADER_SIZE];
for (int bi = 0; bi < boundary_idxs.length - 1; bi++) {
fbuf.position(boundary_idxs[bi]);
int len = (fbuf.remaining() < MAX_HEADER_SIZE) ? fbuf.remaining() : MAX_HEADER_SIZE;
fbuf.get(part_header_buff, 0, len);
BufferedReader in = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(part_header_buff, 0, len), Charset.forName(encoding)), len);
int headerLines = 0;
// First line is boundary string
String mpline = in.readLine();
headerLines++;
if (!mpline.contains(boundary)) {
throw new ResponseException(Response.Status.BAD_REQUEST, "BAD REQUEST: Content type is multipart/form-data but chunk does not start with boundary.");
}
String part_name = null, file_name = null, content_type = null;
// Parse the reset of the header lines
mpline = in.readLine();
headerLines++;
while (mpline != null && mpline.trim().length() > 0) {//经过正则的方式获取文件名称
Matcher matcher = CONTENT_DISPOSITION_PATTERN.matcher(mpline);
if (matcher.matches()) {
String attributeString = matcher.group(2);
matcher = CONTENT_DISPOSITION_ATTRIBUTE_PATTERN.matcher(attributeString);
while (matcher.find()) {
String key = matcher.group(1);
if (key.equalsIgnoreCase("name")) {
part_name = matcher.group(2);
} else if (key.equalsIgnoreCase("filename")) {
file_name = matcher.group(2);
}
}
}
matcher = CONTENT_TYPE_PATTERN.matcher(mpline);
if (matcher.matches()) {
content_type = matcher.group(2).trim();
}
mpline = in.readLine();
headerLines++;
}
int part_header_len = 0;
while (headerLines-- > 0) {
part_header_len = scipOverNewLine(part_header_buff, part_header_len);
}
// Read the part data
if (part_header_len >= len - 4) {
throw new ResponseException(Response.Status.INTERNAL_ERROR, "Multipart header size exceeds MAX_HEADER_SIZE.");
}
int part_data_start = boundary_idxs[bi] + part_header_len;
int part_data_end = boundary_idxs[bi + 1] - 4;
fbuf.position(part_data_start);
if (content_type == null) {
// Read the part into a string
byte[] data_bytes = new byte[part_data_end - part_data_start];
fbuf.get(data_bytes);
parms.put(part_name, new String(data_bytes, encoding));
} else {
// Read it into a file
String path = saveTmpFile(fbuf, part_data_start, part_data_end - part_data_start, file_name);//将文件存储在一个临时目录
if (!files.containsKey(part_name)) {
files.put(part_name, path);
} else {
int count = 2;
while (files.containsKey(part_name + count)) {
count++;
}
files.put(part_name + count, path);
}
if (!parms.containsKey(part_name)) {
parms.put(part_name, file_name);
} else {
int count = 2;
while (parms.containsKey(part_name + count)) {
count++;
}
parms.put(part_name + count, file_name);
}
}
}
} catch (ResponseException re) {
throw re;
} catch (Exception e) {
throw new ResponseException(Response.Status.INTERNAL_ERROR, e.toString());
}
}
复制代码
上面的代码就是将客户端上传的文件解析出来,而后存储在服务端。在解析数据的时候用了不少的正则匹配,这也从侧面印证了Http协议,若是没有Http协议也就无法使用正则去解析数据。
以上咱们已经把NanoHTTPD中最主要的逻辑介绍完了,NanoHTTPD经过Socket实现了一个服务端,而咱们的客户端则是由浏览器实现的。下面放一张完整的Socket实现客户端和服务端的流程图,帮助你们理解。
好了,NanoHTTPD的源码就解析完了,经过上面的介绍,咱们对Socket, HTTP协议,TCP/IP协议等有了一个更为深入的认识。不在那么迷茫。固然了,这之中涉及到了一些系统socket的api,经过这些我想说的是java层的api和系统层的api,原理是同样的,可是咱们应该更关心底层的实现原理和本质。
在前面建立ServerSocket的时候,设计到了IPV6,下面就介绍一些IPV6的格式。
IPv4地址大小是32位的,好比192.168.0.1, 每一个小圆点之间为8位。 IPv6的地址为128位。使用:分割,分红8个部分,每一个部分为16位,因此大多数IPv6的地址使用16进制表示,好比ffff:ffff:ffff:ffff:ffff:ffff:fff.
IPv6 地址大小为 128 位。首选 IPv6 地址表示法为 x:x:x:x:x:x:x:x,其中每一个 x 是地址的 8 个 16 位部分的十六进制值。IPv6 地址范围从 0000:0000:0000:0000:0000:0000:0000:0000 至 ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff。 这种是完整的表示格式,其实还有两种咱们比较常见的简写格式: