在并发状况下,不推荐使用一般的Subject对象,而是推荐使用SerializedSubject,并发时只容许一个线程调用onnext等方法!
官方说明:css
When you use an ordinary Subject as a Subscriber, you must take care not to call its Subscriber.onNext method (or its other on methods) from multiple threads, as this could lead to non-serialized calls, which violates the Observable contract and creates an ambiguity in the resulting Subject.
大体意思是当咱们使用普通的Subject,必需要注意不要在多线程状况下调用onNext 方法,这样是违反了Observable 协议而且会致使执行结果返回带有有歧义的值(线程并发致使返回值混淆了)!java
To protect a Subject from this danger, you can convert it into a SerializedSubject with code like the following:
mySafeSubject = new SerializedSubject( myUnsafeSubject );
官方文档说的很明白了,只须要使用SerializedSubject封装原来的
Subject便可!!markdown
测试demo:多线程
public class MultiThread {
public static void main(String[] args) throws InterruptedException {
final PublishSubject<Integer> subject = PublishSubject.create();
subject.subscribe(new Action1<Integer>() {
@Override
public void call(Integer t) {
System.out.println("======onnext===>value:" + t + ",threadId:" + Thread.currentThread().getId());
}
});
final SerializedSubject<Integer, Integer> ser = new SerializedSubject<Integer, Integer>(subject);
for (int i = 0; i < 20; i++) {
final int value = i;
new Thread() {
public void run() {
ser.onNext((int) (value * 10000 + Thread.currentThread().getId()));
};
}.start();
}
Thread.sleep(2000);
//
// for (int i = 11; i < 20; i++) {
// final int value = i;
// new Thread() {
// public void run() {
// subject.onNext(value);
// };
// }.start();
// }
}
}
执行结果:并发
======onnext===>value:10,threadId:10
======onnext===>value:10011,threadId:10
======onnext===>value:50015,threadId:10
======onnext===>value:40014,threadId:10
======onnext===>value:30013,threadId:10
======onnext===>value:20012,threadId:10
======onnext===>value:70017,threadId:10
======onnext===>value:60016,threadId:10
======onnext===>value:80018,threadId:10
======onnext===>value:100020,threadId:10
======onnext===>value:90019,threadId:10
======onnext===>value:110021,threadId:21
======onnext===>value:130023,threadId:23
======onnext===>value:120022,threadId:23
======onnext===>value:160026,threadId:26
======onnext===>value:150025,threadId:25
======onnext===>value:140024,threadId:25
======onnext===>value:170027,threadId:25
======onnext===>value:180028,threadId:25
======onnext===>value:190029,threadId:25app
上面的结果有点晕了,为何不是在一个线程上呢?和我以前觉得的SerializedSubject时将值放在一个线程上而后处理的想法有些出入了!ide
源码面前了无秘密,SerializedSubject跟进去看看测试
public SerializedSubject(final Subject<T, R> actual) {
super(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> child) {
actual.unsafeSubscribe(child);
}
});
this.actual = actual;
this.observer = new SerializedObserver<T>(actual);
}
其实SerializedSubject的处理是交给了SerializedObserver,继续跟进到SerializedObserver,类注释:ui
/**
* Enforces single-threaded, serialized, ordered execution of {@link #onNext}, {@link #onCompleted}, and
* {@link #onError}.
* <p>
* When multiple threads are emitting and/or notifying they will be serialized by:
* </p><ul>
* <li>Allowing only one thread at a time to emit</li>
* <li>Adding notifications to a queue if another thread is already emitting</li>
* <li>Not holding any locks or blocking any threads while emitting</li>
* </ul>
*
* @param <T>
* the type of items expected to be observed by the {@code Observer}
*/
这里一看就明白了,他是只保证同时只有一个线程调用 {@link #onNext}, {@link #onCompleted}, and{@link #onError}.方法,并非将全部emit的值放到一个线程上而后处理,这就解释了为何执行结果不是所有在一个线程上的缘由 了!this
再看看源码再onnext方法:
@Override
public void onNext(T t) {
FastList list;
//同步锁
synchronized (this) {
if (terminated) {
return;
}
if (emitting) {
if (queue == null) {
queue = new FastList();
}
queue.add(t != null ? t : NULL_SENTINEL);
// another thread is emitting so we add to the queue and return
return;
}
// we can emit
emitting = true;
// reference to the list to drain before emitting our value
list = queue;
queue = null;
}
// we only get here if we won the right to emit, otherwise we returned in the if(emitting) block above
boolean skipFinal = false;
try {
int iter = MAX_DRAIN_ITERATION;
do {
drainQueue(list);
if (iter == MAX_DRAIN_ITERATION) {
// after the first draining we emit our own value
actual.onNext(t);
}
--iter;
if (iter > 0) {
synchronized (this) {
list = queue;
queue = null;
if (list == null) {
emitting = false;
skipFinal = true;
return;
}
}
}
} while (iter > 0);
} finally {
if (!skipFinal) {
synchronized (this) {
if (terminated) {
list = queue;
queue = null;
} else {
emitting = false;
list = null;
}
}
}
}
// this will only drain if terminated (done here outside of synchronized block)
drainQueue(list);
}
// another thread is emitting so we add to the queue and return 若是有其余线程正在处理,则将emit的值放到队列上,线程执行完毕后,会顺序emit队列上的值!!这样就保证了一次只会有一个线程调用!!!