对工做中用到的media player 播放领域中遇到的三个cache 实现优缺点总结:html
1,android 原生的NuCacheSource2前端
2,嵌入式播放中经常使用的ringBuffer cacheandroid
3,youtobe 开源播放器exoplayer 中用到的cache 策略git
I.android 原生中http + mp4 的播放模式,为缓解网络抖动平衡下载的时机,开发了NuCacheSource2 github
NuCacheSource2 集成与DataSource 抽象类,提供缓存读写统计功能.缓存
android 的多媒体框架在Source端的模型如图:网络
1,DataSource 被抽象为数据源(例如HTTP,Cache,LocalFIle,Prefetching) 抽象的是一种访问数据的类型.能够用IBINDER 初始化,也能够无参初始化,很是灵活. 数据结构
40class DataSource : public RefBase { 41public: 42 enum Flags { 43 kWantsPrefetching = 1, 44 kStreamedFromLocalHost = 2, 45 kIsCachingDataSource = 4, 46 kIsHTTPBasedSource = 8, 47 }; 48 49 static sp<DataSource> CreateFromURI( 50 const sp<IMediaHTTPService> &httpService, 51 const char *uri, 52 const KeyedVector<String8, String8> *headers = NULL, 53 String8 *contentType = NULL, 54 HTTPBase *httpSource = NULL); 55 56 static sp<DataSource> CreateMediaHTTP(const sp<IMediaHTTPService> &httpService); 57 static sp<DataSource> CreateFromIDataSource(const sp<IDataSource> &source); 58 59 DataSource() {} 60 61 virtual status_t initCheck() const = 0; 62 63 // Returns the number of bytes read, or -1 on failure. It's not an error if 64 // this returns zero; it just means the given offset is equal to, or 65 // beyond, the end of the source. 66 virtual ssize_t readAt(off64_t offset, void *data, size_t size) = 0; 67 74 // May return ERROR_UNSUPPORTED. 75 virtual status_t getSize(off64_t *size); 76 77 virtual uint32_t flags() { 78 return 0; 79 } 80 81 virtual status_t reconnectAtOffset(off64_t offset) { 82 return ERROR_UNSUPPORTED; 83 } 84 104 virtual String8 getUri() { 105 return String8(); 106 } 107 108 virtual String8 getMIMEType() const;
2,NuPlayer::Source 是MediaSource 的抽象表明一种多媒体源以及对多媒体源的基本的操做行为(prepare,start,pause,resume,seek,disconnect)多线程
struct NuPlayer::Source : public AHandler { 34 enum Flags { 35 FLAG_CAN_PAUSE = 1, 36 FLAG_CAN_SEEK_BACKWARD = 2, // the "10 sec back button" 37 FLAG_CAN_SEEK_FORWARD = 4, // the "10 sec forward button" 38 FLAG_CAN_SEEK = 8, // the "seek bar" 39 FLAG_DYNAMIC_DURATION = 16, 40 FLAG_SECURE = 32, 41 FLAG_PROTECTED = 64, 42 }; 43 44 enum { 45 kWhatPrepared, 46 kWhatFlagsChanged, 47 kWhatVideoSizeChanged, 48 kWhatBufferingUpdate, 49 kWhatBufferingStart, 50 kWhatBufferingEnd, 51 kWhatPauseOnBufferingStart, 52 kWhatResumeOnBufferingEnd, 53 kWhatCacheStats, 54 kWhatSubtitleData, 55 kWhatTimedTextData, 56 kWhatTimedMetaData, 57 kWhatQueueDecoderShutdown, 58 kWhatDrmNoLicense, 59 kWhatInstantiateSecureDecoders, 60 }; 61 62 // The provides message is used to notify the player about various 63 // events. 64 Source(const sp<AMessage> ¬ify) 65 : mNotify(notify) { 66 } 67 68 virtual void prepareAsync() = 0; 69 70 virtual void start() = 0; 71 virtual void stop() {} 72 virtual void pause() {} 73 virtual void resume() {} 74 75 // Explicitly disconnect the underling data source 76 virtual void disconnect() {} 77 78 // Returns OK iff more data was available, 79 // an error or ERROR_END_OF_STREAM if not. 80 virtual status_t feedMoreTSData() = 0; 81 82 virtual sp<AMessage> getFormat(bool audio); 83 virtual sp<MetaData> getFormatMeta(bool /* audio */) { return NULL; } 84 virtual sp<MetaData> getFileFormatMeta() const { return NULL; } 85 86 virtual status_t dequeueAccessUnit( 87 bool audio, sp<ABuffer> *accessUnit) = 0; 88 89 virtual status_t getDuration(int64_t * /* durationUs */) { 90 return INVALID_OPERATION; 91 } 92 93 virtual size_t getTrackCount() const { 94 return 0; 95 } 96 97 virtual sp<AMessage> getTrackInfo(size_t /* trackIndex */) const { 98 return NULL; 99 } 100 101 virtual ssize_t getSelectedTrack(media_track_type /* type */) const { 102 return INVALID_OPERATION; 103 } 104 105 virtual status_t selectTrack(size_t /* trackIndex */, bool /* select */, int64_t /* timeUs*/) { 106 return INVALID_OPERATION; 107 } 108 109 virtual status_t seekTo(int64_t /* seekTimeUs */) { 110 return INVALID_OPERATION; 111 } 112 113 virtual status_t setBuffers(bool /* audio */, Vector<MediaBuffer *> &/* buffers */) { 114 return INVALID_OPERATION; 115 } 116 117 virtual bool isRealTime() const { 118 return false; 119 } 120 121 virtual bool isStreaming() const { 122 return true; 123 }
3,这个类重点提供两个方法fetch,read 经过PageCache.app
30struct NuCachedSource2 : public DataSource { 31 static sp<NuCachedSource2> Create( 32 const sp<DataSource> &source, 33 const char *cacheConfig = NULL, 34 bool disconnectAtHighwatermark = false); 35 36 virtual status_t initCheck() const; 38 virtual ssize_t readAt(off64_t offset, void *data, size_t size); 40 virtual void disconnect(); 42 virtual status_t getSize(off64_t *size); 43 virtual uint32_t flags(); 49 virtual String8 getMIMEType() const; 50 51 //////////////////////////////////////////////////////////////////////////// 52 53 size_t cachedSize(); 54 size_t approxDataRemaining(status_t *finalStatus) const; 56 void resumeFetchingIfNecessary(); 58 // The following methods are supported only if the 59 // data source is HTTP-based; otherwise, ERROR_UNSUPPORTED 60 // is returned. 61 status_t getEstimatedBandwidthKbps(int32_t *kbps); 62 status_t setCacheStatCollectFreq(int32_t freqMs); 64 static void RemoveCacheSpecificHeaders( 65 KeyedVector<String8, String8> *headers, 66 String8 *cacheConfig, 67 bool *disconnectAtHighwatermark); 68 69protected: 70 virtual ~NuCachedSource2(); 71 72private: 73 friend struct AHandlerReflector<NuCachedSource2>; 75 NuCachedSource2( 76 const sp<DataSource> &source, 77 const char *cacheConfig, 78 bool disconnectAtHighwatermark); 80 enum { 81 kPageSize = 65536, 82 kDefaultHighWaterThreshold = 20 * 1024 * 1024, 83 kDefaultLowWaterThreshold = 4 * 1024 * 1024, 84 85 // Read data after a 15 sec timeout whether we're actively 86 // fetching or not. 87 kDefaultKeepAliveIntervalUs = 15000000, 88 }; 90 enum { 91 kWhatFetchMore = 'fetc', 92 kWhatRead = 'read', 93 }; 95 enum { 96 kMaxNumRetries = 10, 97 }; 99 sp<DataSource> mSource; 100 sp<AHandlerReflector<NuCachedSource2> > mReflector; 101 sp<ALooper> mLooper; 102 103 Mutex mSerializer; 104 mutable Mutex mLock; 105 Condition mCondition; 106 107 PageCache *mCache; 108 off64_t mCacheOffset; 109 status_t mFinalStatus; 110 off64_t mLastAccessPos; 111 sp<AMessage> mAsyncResult; 112 bool mFetching; 113 bool mDisconnecting; 114 int64_t mLastFetchTimeUs; 115 116 int32_t mNumRetriesLeft; 117 118 size_t mHighwaterThresholdBytes; 119 size_t mLowwaterThresholdBytes; 120 121 // If the keep-alive interval is 0, keep-alives are disabled. 122 int64_t mKeepAliveIntervalUs; 123 124 bool mDisconnectAtHighwatermark; 125 126 void onMessageReceived(const sp<AMessage> &msg); 127 void onFetch(); 128 void onRead(const sp<AMessage> &msg); 129 130 void fetchInternal(); 131 ssize_t readInternal(off64_t offset, void *data, size_t size); 132 status_t seekInternal_l(off64_t offset); 133 134 size_t approxDataRemaining_l(status_t *finalStatus) const; 135 136 void restartPrefetcherIfNecessary_l( 137 bool ignoreLowWaterThreshold = false, bool force = false); 138 139 void updateCacheParamsFromSystemProperty(); 140 void updateCacheParamsFromString(const char *s); 141 142 DISALLOW_EVIL_CONSTRUCTORS(NuCachedSource2); 143};
详解一下NuCachedSource2,PageCache;
PageCache 是一个数据结构,维护了两个List <page*>Activity ,Free ;Acivity 中的是need read 的数据,Free 中是已经读取解析须要释放的数据.数据结构简单有效,易于理解.肯定是没有migration 容易产生页碎片.
NuCacheSource2 是一个数据缓冲策略类,如低水位fetching数据,高水位中止bufferring 数据,retryFecting ,AkeepLiveCache ;以及调用HTTPBase或CURL 搜集一些码率带宽cacheDuration 等统计信息.
NOTICE: PageCache 是给NuCacheSource2的成员变量DataSource (HTTPDataSource--------->OKHTTP) 填充数据用的
II , 嵌入式中经常使用的cache 数据结构是RingBuffer,如在android 中能够直接替代PageCache;具体就是规定readPointer 永远追赶writePointer,抽象出一个无限大的cache. 好处就是在处理高帧率高清晰度的片源时节省空间,缺点就是多线程访问比较麻烦,每次读以前都须要 readPointer合writePointer比较
//TODO
III ,EXOPLAYER + OKIO
缓存分为两层:
1,okio 提供的丰富的source ,sink 访问pipe,async;共享buffer (相似与ArrayList 的结构;循环链表+segments);而Buffer 拥有的全部segment 都来自于segmentPool 管理的单链表. segments 提供 ownership 因此能够经过调用split方法改变ownership 以及share 减小copy;segments 提供compact 方法减小segments 碎片.
https://github.com/square/okio
Buffer is a mutable sequence of bytes. Like ArrayList
, you don't need to size your buffer in advance. You read and write buffers as a queue: write data to the end and read it from the front. There's no obligation to manage positions, limits, or capacities.
Buffer
is implemented as a linked list of segments. When you move data from one buffer to another, it reassigns ownershipof the segments rather than copying the data across. This approach is particularly helpful for multithreaded programs: a thread that talks to the network can exchange data with a worker thread without any copying or ceremony.
final class Segment { | |
/** The size of all segments in bytes. */ | |
static final int SIZE = 8192; | |
/** Segments will be shared when doing so avoids {@code arraycopy()} of this many bytes. */ | |
static final int SHARE_MINIMUM = 1024; | |
final byte[] data; | |
/** The next byte of application data byte to read in this segment. */ | |
int pos; | |
/** The first byte of available data ready to be written to. */ | |
int limit; | |
/** True if other segments or byte strings use the same byte array. */ | |
boolean shared; | |
/** True if this segment owns the byte array and can append to it, extending {@code limit}. */ | |
boolean owner; | |
/** Next segment in a linked or circularly-linked list. */ | |
Segment next; | |
/** Previous segment in a circularly-linked list. */ | |
Segment prev; |
在segment中有几个有意思的方法。
/** * Call this when the tail and its predecessor may both be less than half * full. This will copy data so that segments can be recycled. */ public void compact() { if (prev == this) throw new IllegalStateException(); if (!prev.owner) return; // Cannot compact: prev isn't writable. int byteCount = limit - pos; int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos); if (byteCount > availableByteCount) return; // Cannot compact: not enough writable space. writeTo(prev, byteCount); pop(); SegmentPool.recycle(this); }
将一个Segment的数据拆成两个,注意,这里有trick。若是有两个Segment相同的字节超过了SHARE_MINIMUM (1024),那么这两个Segment会共享一份数据,这样就省去了开辟内存及复制内存的开销,达到了提升性能的目的。
public Segment split(int byteCount) { if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException(); Segment prefix; // We have two competing performance goals: // - Avoid copying data. We accomplish this by sharing segments. // - Avoid short shared segments. These are bad for performance because they are readonly and // may lead to long chains of short segments. // To balance these goals we only share segments when the copy will be large. if (byteCount >= SHARE_MINIMUM) { prefix = new Segment(this); } else { prefix = SegmentPool.take(); System.arraycopy(data, pos, prefix.data, 0, byteCount); } prefix.limit = prefix.pos + byteCount; pos += byteCount; prev.push(prefix); return prefix; }
这是一个回收池,目前的设计是能存放64K的字节,即8个Segment。在实际使用中,建议对其进行调整。
final class SegmentPool { /** The maximum number of bytes to pool. */ // TODO: Is 64 KiB a good maximum size? Do we ever have that many idle segments? static final long MAX_SIZE = 64 * 1024; // 64 KiB. /** Singly-linked list of segments. */ static Segment next; /** Total bytes in this pool. */ static long byteCount; ... }
讲到这里,整个Buffer的实现原理也就呼之欲出了。
Buffer的写操做,实际上就是不断增长Segment的一个过程,读操做,就是不断消耗Segment中的数据,若是数据读取完,则使用SegmentPool进行回收。
当复制内存数据时,使用Segment的共享机制,多个Segment共享一份data[]。
Buffer更多的逻辑主要是跨Segment读取数据,须要把前一个Segment的尾端和后一个Segment的前端拼接在一块儿,所以看起来代码量相对多,但其实开销很是低。
在Okio中定义了一个类叫TimeOut,主要用于判断时间是否超过阈值,超过以后就抛出中断异常。
public void throwIfReached() throws IOException { if (Thread.interrupted()) { throw new InterruptedIOException("thread interrupted"); } if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) { throw new InterruptedIOException("deadline reached"); } }
2,exo 内部是一个vector 初始化大小为16M, 有高水位限制内存的增加
高水位30s && %80*TotalCacheSize
低水位5s or %20*TotalCacheSize