很久没有更新博客了,一个是由于太忙,业务是在太多,另一个就是也比较懒,因此拖着就有接近两个月的时间没有写博客了,实在是罪过。今天分享一个开源库 AndroidVideoCache 。这个库主要是作视频缓存管理功能,支持边下边播,离线播放,缓存管理等。用过MediaPlayer的小伙伴都知道,能够支持在线播放和播放本地资源,可是不支持缓存,这样很消耗用户流量,这个时候AndroidVideoCache就派上用场了。java
AndroidVideoCache 经过代理的策略将咱们的网络请求代理到本地服务,本地服务再决定是从本地缓存拿仍是发起网络请求,若是须要发起网络请求就先向本地写入数据,再从本地提供数据给视频播放器。这样就作到了数据的复用。git
借用一张AndroidVideoCache - 视频边播放边缓存的代理策略里面的图片看的比较清楚: github
在视频播放器,好比VideoView发起一个urlA,经过HttpProxyCacheServer
转成一个本地host和端口的urlB,这样视频播放器发起请求就是向HttpProxyCacheServer
请求,返回视频播放器的Socket,Server再创建一个HttpProxyCacheServerClients
来发起网络请求处理缓存等工做,而后把数据经过前面的Socket返回给视频播放器。算法
了解了基本原理,再看下代码结构。数据库
整个代码结构仍是比较清晰,涉及到的类比较多,这里只画出了一些主要的相关类,看下个人手绘图😢: 缓存
HttpProxyCacheServer
是库对外的接口,经过这个和视频播放器联系,判断本地是否有缓存,有的话直接返回本地文件;没有就创建一个和url对应的HttpProxyCacheServerClients
处理本次请求,请求工做是交给Source接口,缓存工做是经过Cache接口。文件缓存是用LRU算法实现,能够根据文件大小或者文件个数管理缓存。bash
CacheListener
是缓存本地成功后回调接口,能够用于更新视频进度条等UI需求。服务器
上面总体介绍了下原理和代码结构,接下来是时候看下使用方法了,暴露出来的接口比较少,因此使用起来也简单。网络
首先是导包,截止到写这边博客,最新的版本是2.7.1:session
dependencies {
compile 'com.danikula:videocache:2.7.1'
}
复制代码
而后在全局初始化一个本地代理服务器,这里选择在 Application 的实现类中
public class App extends Application {
private HttpProxyCacheServer proxy;
public static HttpProxyCacheServer getProxy(Context context) {
App app = (App) context.getApplicationContext();
return app.proxy == null ? (app.proxy = app.newProxy()) : app.proxy;
}
private HttpProxyCacheServer newProxy() {
return new HttpProxyCacheServer(this);
}
}
复制代码
有了代理服务器,咱们就可使用了,把本身的网络视频 url 用提供的方法替换成另外一个 URL
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
HttpProxyCacheServer proxy = getProxy();
String proxyUrl = proxy.getProxyUrl(VIDEO_URL);
videoView.setVideoPath(proxyUrl);
}
复制代码
提供了更多的能够自定义的地方,好比缓存的文件最大大小,以及文件个数,缓存采起的是 LruCache 的方法,对于老文件在达到上限后会自动清理。
private HttpProxyCacheServer newProxy() {
return new HttpProxyCacheServer.Builder(this)
.maxCacheSize(1024 * 1024 * 1024) // 1 Gb for cache
.build();
}
private HttpProxyCacheServer newProxy() {
return new HttpProxyCacheServer.Builder(this)
.maxCacheFilesCount(20)
.build();
}
复制代码
除了这个,还有一个就是生成的文件名,默认是使用的 MD5 方式生成 key,考虑到一些业务逻辑,咱们也能够继承一个 FileNameGenerator 来实现本身的策略
public class MyFileNameGenerator implements FileNameGenerator {
// Urls contain mutable parts (parameter 'sessionToken') and stable video's id (parameter 'videoId'). // e. g. http://example.com?videoId=abcqaz&sessionToken=xyz987 public String generate(String url) { Uri uri = Uri.parse(url); String videoId = uri.getQueryParameter("videoId"); return videoId + ".mp4"; } } ... HttpProxyCacheServer proxy = HttpProxyCacheServer.Builder(context) .fileNameGenerator(new MyFileNameGenerator()) .build() 复制代码
很明显,构造Server是经过建造者的模式,看下Builder的代码就知道支持哪些配置和默认配置是什么了。
private static final long DEFAULT_MAX_SIZE = 512 * 1024 * 1024;
private File cacheRoot;
private FileNameGenerator fileNameGenerator;
private DiskUsage diskUsage;
private SourceInfoStorage sourceInfoStorage;
private HeaderInjector headerInjector;
public Builder(Context context) {
this.sourceInfoStorage = SourceInfoStorageFactory.newSourceInfoStorage(context);
this.cacheRoot = StorageUtils.getIndividualCacheDirectory(context);
this.diskUsage = new TotalSizeLruDiskUsage(DEFAULT_MAX_SIZE);
this.fileNameGenerator = new Md5FileNameGenerator();
this.headerInjector = new EmptyHeadersInjector();
}
复制代码
cacheRoot
就是缓存默认的文件夹,若是有sd卡而且申请了权限,会放到下面的目录<i>("/Android/data/[app_package_name]/cache")</i>
复制代码
不然放到手机的内部存储
cacheDirPath = "/data/data/" + context.getPackageName() + "/cache/";
复制代码
FileNameGenerator
用于生成文件名,默认是 Md5FileNameGenerator
,生成MD5串做为文件名。
DiskUsage
是用于管理本地缓存,默认是经过文件大小进行管理,大小默认是512M
private static final long DEFAULT_MAX_SIZE = 512 * 1024 * 1024;
this.diskUsage = new TotalSizeLruDiskUsage(DEFAULT_MAX_SIZE);
复制代码
SourceInfoStorage
是用于存储SourInfo
,默认是数据库存储this.sourceInfoStorage = SourceInfoStorageFactory.newSourceInfoStorage(context);
public static SourceInfoStorage newSourceInfoStorage(Context context) {
return new DatabaseSourceInfoStorage(context);
}
复制代码
那SourInfo
是什么?主要用于存储http请求源的一些信息,好比url,数据长度length,请求资源的类型mime:
public final String url;
public final long length;
public final String mime;
复制代码
HeaderInjector
主要用于添加一些自定义的头部字段,默认是空this.headerInjector = new EmptyHeadersInjector();
复制代码
最后把这些字段构形成Config,构造HttpProxyCacheServer
须要,后面会再传给HttpProxyCacheServerClients
用于发起请求(url,length,mime)等,和本地缓存(DiskUsage,SourceInfoStorage,cacheRoot)等。
/**
* Builds new instance of {@link HttpProxyCacheServer}.
*
* @return proxy cache. Only single instance should be used across whole app.
*/
public HttpProxyCacheServer build() {
Config config = buildConfig();
return new HttpProxyCacheServer(config);
}
private Config buildConfig() {
return new Config(cacheRoot, fileNameGenerator, diskUsage, sourceInfoStorage, headerInjector);
}
复制代码
从上面分析知道入口是HttpProxyCacheServer
,因此咱们先看下它:
HttpProxyCacheServer.java
private static final Logger LOG = LoggerFactory.getLogger("HttpProxyCacheServer");
private static final String PROXY_HOST = "127.0.0.1";
private final Object clientsLock = new Object();
private final ExecutorService socketProcessor = Executors.newFixedThreadPool(8);
private final Map<String, HttpProxyCacheServerClients> clientsMap = new ConcurrentHashMap<>();
private final ServerSocket serverSocket;
private final int port;
private final Thread waitConnectionThread;
private final Config config;
private final Pinger pinger;
public HttpProxyCacheServer(Context context) {
this(new Builder(context).buildConfig());
}
private HttpProxyCacheServer(Config config) {
this.config = checkNotNull(config);
try {
InetAddress inetAddress = InetAddress.getByName(PROXY_HOST);
this.serverSocket = new ServerSocket(0, 8, inetAddress);
this.port = serverSocket.getLocalPort();
IgnoreHostProxySelector.install(PROXY_HOST, port);
CountDownLatch startSignal = new CountDownLatch(1);
this.waitConnectionThread = new Thread(new WaitRequestsRunnable(startSignal));
this.waitConnectionThread.start();
startSignal.await(); // freeze thread, wait for server starts
this.pinger = new Pinger(PROXY_HOST, port);
LOG.info("Proxy cache server started. Is it alive? " + isAlive());
} catch (IOException | InterruptedException e) {
socketProcessor.shutdown();
throw new IllegalStateException("Error starting local proxy server", e);
}
}
复制代码
首先是构造一个本地127.0.0.1
的ServerSocker
,随机分配了一个端口,而后启动一个线程去执行WaitRequestsRunnable
,在这里面执行 waitForRequest
,经过 accept() 方法监听这个服务器 socket 的入站链接,accept() 方法会一直阻塞,直到有一个客户端尝试创建链接。
private void waitForRequest() {
try {
while (!Thread.currentThread().isInterrupted()) {
Socket socket = serverSocket.accept();
LOG.debug("Accept new socket " + socket);
socketProcessor.submit(new SocketProcessorRunnable(socket));
}
} catch (IOException e) {
onError(new ProxyCacheException("Error during waiting connection", e));
}
}
复制代码
再回到前面的构造函数中,有个信号量用来保证Server
启动后再走往下的流程,Server
启动后会构造一个pinger,用来看服务是否可用。
CountDownLatch startSignal = new CountDownLatch(1);
this.waitConnectionThread = new Thread(new WaitRequestsRunnable(startSignal));
this.waitConnectionThread.start();
startSignal.await(); // freeze thread, wait for server starts
this.pinger = new Pinger(PROXY_HOST, port);
复制代码
经过上面几步,HttpProxyCacheServer
就已经启动起来了,在等待客户端的链接,那客户端怎么链接到服务?再看下第三章节使用里面提到的另一个方法getProxyUrl
,看下官方解释,若是本地有缓存那么会返回本地地址的 Uri,file:// uri,不然返回一个代理的url。
/**
* Returns url that wrap original url and should be used for client (MediaPlayer, ExoPlayer, etc).
* <p>
* If parameter {@code allowCachedFileUri} is {@code true} and file for this url is fully cached
* (it means method {@link #isCached(String)} returns {@code true}) then file:// uri to cached file will be returned.
*
* @param url a url to file that should be cached.
* @param allowCachedFileUri {@code true} if allow to return file:// uri if url is fully cached
* @return a wrapped by proxy url if file is not fully cached or url pointed to cache file otherwise (if {@code allowCachedFileUri} is {@code true}).
*/
复制代码
再看下代码就很简单了, 若是本地已经缓存了,就直接拿本地地址的 Uri,而且 touch 一下文件,把时间更新后最新,由于后面 LruCache 是根据文件被访问的时间进行排序的。
public String getProxyUrl(String url, boolean allowCachedFileUri) {
if (allowCachedFileUri && isCached(url)) {
File cacheFile = getCacheFile(url);
touchFileSafely(cacheFile);
return Uri.fromFile(cacheFile).toString();
}
return isAlive() ? appendToProxyUrl(url) : url;
}
复制代码
若是文件没有被缓存那么就会先走一下 isAlive() 方法,这里会ping一下Server,确保是通的。若是不通就直接返回原url,通的话就返回代理url:
private static final String PROXY_HOST = "127.0.0.1";
private String appendToProxyUrl(String url) {
return String.format(Locale.US, "http://%s:%d/%s", PROXY_HOST, port, ProxyCacheUtils.encode(url));
}
复制代码
因此在视频播放器拿着这个代理url发起请求会和Server进行链接,而后前面提到的waitForRequest
会返回一个客户端的Socket,用于和客户端通讯。而后会用线程池处理这个请求,能够看到最多支持8个并发链接。
private final ExecutorService socketProcessor = Executors.newFixedThreadPool(8);
socketProcessor.submit(new SocketProcessorRunnable(socket));
复制代码
SocketProcessorRunnable
请求会经过processSocket
进行处理,前面 ping 的过程其实也被会这个 socket 监听而且走进来这一段。资源请求会走到else逻辑里面。
private void processSocket(Socket socket) {
try {
GetRequest request = GetRequest.read(socket.getInputStream());
LOG.debug("Request to cache proxy:" + request);
String url = ProxyCacheUtils.decode(request.uri);
if (pinger.isPingRequest(url)) {
pinger.responseToPing(socket);
} else {
HttpProxyCacheServerClients clients = getClients(url);
clients.processRequest(request, socket);
}
} catch (SocketException e) {
// There is no way to determine that client closed connection http://stackoverflow.com/a/10241044/999458
// So just to prevent log flooding don't log stacktrace LOG.debug("Closing socket… Socket is closed by client."); } catch (ProxyCacheException | IOException e) { onError(new ProxyCacheException("Error processing request", e)); } finally { releaseSocket(socket); LOG.debug("Opened connections: " + getClientsCount()); } } 复制代码
首先在内存缓存,其实就是ConcurrentHashMap
,看看有没有url对应的HttpProxyCacheServerClients
,没有的话构造一个。HttpProxyCacheServerClients
就是用来处理一个请求url对应的工做。
public void processRequest(GetRequest request, Socket socket) throws ProxyCacheException, IOException {
startProcessRequest();
try {
clientsCount.incrementAndGet();
proxyCache.processRequest(request, socket);
} finally {
finishProcessRequest();
}
}
复制代码
经过startProcessRequest()
构造HttpProxyCache
:
private synchronized void startProcessRequest() throws ProxyCacheException {
proxyCache = proxyCache == null ? newHttpProxyCache() : proxyCache;
}
private HttpProxyCache newHttpProxyCache() throws ProxyCacheException {
HttpUrlSource source = new HttpUrlSource(url, config.sourceInfoStorage, config.headerInjector);
FileCache cache = new FileCache(config.generateCacheFile(url), config.diskUsage);
HttpProxyCache httpProxyCache = new HttpProxyCache(source, cache);
httpProxyCache.registerCacheListener(uiCacheListener);
return httpProxyCache;
}
复制代码
在前面第二章节代码结构中能够看到无论网络请求HttpUrlSource
仍是缓存FileCache
都是经过HttpProxyCache
管理。而后注册一个回调CacheListener
,在HttpProxyCache
缓存可用的时候会回调通知HttpProxyCacheServerClients
,Clients就能够通知监听者:
httpProxyCache.registerCacheListener(uiCacheListener);
this.uiCacheListener = new UiListenerHandler(url, listeners);
private static final class UiListenerHandler extends Handler implements CacheListener {
private final String url;
private final List<CacheListener> listeners;
public UiListenerHandler(String url, List<CacheListener> listeners) {
super(Looper.getMainLooper());
this.url = url;
this.listeners = listeners;
}
@Override
public void onCacheAvailable(File file, String url, int percentsAvailable) {
Message message = obtainMessage();
message.arg1 = percentsAvailable;
message.obj = file;
sendMessage(message);
}
@Override
public void handleMessage(Message msg) {
for (CacheListener cacheListener : listeners) {
cacheListener.onCacheAvailable((File) msg.obj, url, msg.arg1);
}
}
}
复制代码
再回到HttpProxyCacheServerClients
构造函数中,接下来会调用proxyCache.processRequest(request, socket)
:
public void processRequest(GetRequest request, Socket socket) throws IOException, ProxyCacheException {
OutputStream out = new BufferedOutputStream(socket.getOutputStream());
String responseHeaders = newResponseHeaders(request);
out.write(responseHeaders.getBytes("UTF-8"));
long offset = request.rangeOffset;
if (isUseCache(request)) {
responseWithCache(out, offset);
} else {
responseWithoutCache(out, offset);
}
}
复制代码
首先经过Socket回消息给视频播放器头部信息,接下来判断是否须要走缓存,不走缓存就直接经过HttpUrlSource
发起HttpURLConnection
,读取数据经过Socket返回给播放器。若是须要走缓存,会走下面代码,先调用read读取8k的数据,读取成功经过Socket先返回给播放器,再重复读直到完成。
HttpProxyCache.java
static final int DEFAULT_BUFFER_SIZE = 8 * 1024;
private void responseWithCache(OutputStream out, long offset) throws ProxyCacheException, IOException {
byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
int readBytes;
while ((readBytes = read(buffer, offset, buffer.length)) != -1) {
out.write(buffer, 0, readBytes);
offset += readBytes;
}
out.flush();
}
复制代码
read方法是调用的父类ProxyCache
的read方法:
public int read(byte[] buffer, long offset, int length) throws ProxyCacheException {
ProxyCacheUtils.assertBuffer(buffer, offset, length);
while (!cache.isCompleted() && cache.available() < (offset + length) && !stopped) {
readSourceAsync();
waitForSourceData();
checkReadSourceErrorsCount();
}
int read = cache.read(buffer, offset, length);
if (cache.isCompleted() && percentsAvailable != 100) {
percentsAvailable = 100;
onCachePercentsAvailableChanged(100);
}
return read;
}
复制代码
经过循环不断读取数据,直到下面其中一个条件知足:
读取数据会启动一个新的线程去读取:
private synchronized void readSourceAsync() throws ProxyCacheException {
boolean readingInProgress = sourceReaderThread != null && sourceReaderThread.getState() != Thread.State.TERMINATED;
if (!stopped && !cache.isCompleted() && !readingInProgress) {
sourceReaderThread = new Thread(new SourceReaderRunnable(), "Source reader for " + source);
sourceReaderThread.start();
}
}
复制代码
在SourceReaderRunnable
中主要就是调用readSource
,这里主要是经过HttpUrlSource.read
读取网络数据,而后经过FileCache
写入到本地缓存,在缓存结束后一样也会发送一个通知通知本身已经缓存完了,回调由外界控制。
private void readSource() {
long sourceAvailable = -1;
long offset = 0;
try {
offset = cache.available();
source.open(offset);
sourceAvailable = source.length();
byte[] buffer = new byte[ProxyCacheUtils.DEFAULT_BUFFER_SIZE];
int readBytes;
while ((readBytes = source.read(buffer)) != -1) {
synchronized (stopLock) {
if (isStopped()) {
return;
}
cache.append(buffer, readBytes);
}
offset += readBytes;
notifyNewCacheDataAvailable(offset, sourceAvailable);
}
tryComplete();
onSourceRead();
} catch (Throwable e) {
readSourceErrorsCount.incrementAndGet();
onError(e);
} finally {
closeSource();
notifyNewCacheDataAvailable(offset, sourceAvailable);
}
}
复制代码
同时调用ProxyCache.read
的线程如今在作什么?在看下read方法里面的代码:
public int read(byte[] buffer, long offset, int length) throws ProxyCacheException {
ProxyCacheUtils.assertBuffer(buffer, offset, length);
while (!cache.isCompleted() && cache.available() < (offset + length) && !stopped) {
readSourceAsync();
waitForSourceData();
checkReadSourceErrorsCount();
}
int read = cache.read(buffer, offset, length);
if (cache.isCompleted() && percentsAvailable != 100) {
percentsAvailable = 100;
onCachePercentsAvailableChanged(100);
}
return read;
}
复制代码
当readSourceAsync
启动另一个线程(为了方便这里简称为ThreadB)后,本线程(为了方便这里简称为ThreadA)会接下来执行 waitForSourceData
, 先得到wc这个锁,而后调用ThreadA会挂起1s的时间或者ThreadB已经写完缓存,经过notifyAll
通知。
private void waitForSourceData() throws ProxyCacheException {
synchronized (wc) {
try {
wc.wait(1000);
} catch (InterruptedException e) {
throw new ProxyCacheException("Waiting source data is interrupted!", e);
}
}
}
private void notifyNewCacheDataAvailable(long cacheAvailable, long sourceAvailable) {
onCacheAvailable(cacheAvailable, sourceAvailable);
synchronized (wc) {
wc.notifyAll();
}
}
复制代码
接下来ThreadA会继续执行checkReadSourceErrorsCount
方法,若是ThreadB在readSource
出现异常,会增长一次错误次数,而后会抛出异常。
ProxyCache.java
private static final int MAX_READ_SOURCE_ATTEMPTS = 1;
private void checkReadSourceErrorsCount() throws ProxyCacheException {
int errorsCount = readSourceErrorsCount.get();
if (errorsCount >= MAX_READ_SOURCE_ATTEMPTS) {
readSourceErrorsCount.set(0);
throw new ProxyCacheException("Error reading source " + errorsCount + " times");
}
}
复制代码
线程ThreadA会在while循环中继续判断条件,若是知足会跳出,而后从FileCache
中读取length字节的数据返回到HttpProxyCache
的responseWithCache
方法中,经过Socket写回给播放器。
到此整个读取数据,缓存数据的流程就结束了。
写的比较长,先介绍了下AndroidVideoCache
的基本原理,而后手绘了张代码框架图,方便全局了解,而后看了下使用方法,最后分析了主要流程的源码。简单提及来就是经过代理策略,拦截网络请求,从本地拿出数据给到播放器。后面若是有时间能够再简单说下本地缓存的一些代码。
若是本文对你有帮助,欢迎关注哈。
感谢@右倾倾,但愿你能少点痛苦,平平安安,快快乐乐。
下车了,提早祝你们新年快乐!