www.javatips.net
Open in
urlscan Pro
2606:4700:3030::ac43:8105
Public Scan
URL:
https://www.javatips.net/api/-master/RxJava-2.x/src/main/java/io/reactivex/processors/PublishProcessor.java
Submission: On November 13 via manual from AR — Scanned from DE
Submission: On November 13 via manual from AR — Scanned from DE
Form analysis
0 forms found in the DOMText Content
WE VALUE YOUR PRIVACY We and our partners store and/or access information on a device, such as cookies and process personal data, such as unique identifiers and standard information sent by a device for personalised ads and content, ad and content measurement, and audience insights, as well as to develop and improve products. With your permission we and our partners may use precise geolocation data and identification through device scanning. You may click to consent to our and our partners’ processing as described above. Alternatively you may access more detailed information and change your preferences before consenting or to refuse consenting. Please note that some processing of your personal data may not require your consent, but you have a right to object to such processing. Your preferences will apply to this website only. You can change your preferences at any time by returning to this site or visit our privacy policy. MORE OPTIONSAGREE * Home * Java * JSTL * Struts * Spring * Hibernate * Webservice * Eclipse * API * Guest Post Menu * Explorer * -master * RxJava-2.x * src * main * java * io * reactivex * BackpressureOverflowStrategy.java * BackpressureStrategy.java * Completable.java * CompletableEmitter.java * CompletableObserver.java * CompletableOnSubscribe.java * CompletableOperator.java * CompletableSource.java * CompletableTransformer.java * Emitter.java * Flowable.java * FlowableEmitter.java * FlowableOnSubscribe.java * FlowableOperator.java * FlowableSubscriber.java * FlowableTransformer.java * Maybe.java * MaybeEmitter.java * MaybeObserver.java * MaybeOnSubscribe.java * MaybeOperator.java * MaybeSource.java * MaybeTransformer.java * Notification.java * Observable.java * ObservableEmitter.java * ObservableOnSubscribe.java * ObservableOperator.java * ObservableSource.java * ObservableTransformer.java * Observer.java * Scheduler.java * Single.java * SingleEmitter.java * SingleObserver.java * SingleOnSubscribe.java * SingleOperator.java * SingleSource.java * SingleTransformer.java * annotations * disposables * exceptions * flowables * functions * internal * observables * observers * package-info.java * parallel * plugins * processors * AsyncProcessor.java * BehaviorProcessor.java * FlowableProcessor.java * PublishProcessor.java * ReplayProcessor.java * SerializedProcessor.java * UnicastProcessor.java * package-info.java * schedulers * subjects * subscribers * perf * test /** * Copyright (c) 2016-present, RxJava Contributors. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in * compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the License is * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See * the License for the specific language governing permissions and limitations under the License. */ package io.reactivex.processors; import java.util.concurrent.atomic.*; import org.reactivestreams.*; import io.reactivex.annotations.*; import io.reactivex.exceptions.MissingBackpressureException; import io.reactivex.internal.subscriptions.SubscriptionHelper; import io.reactivex.internal.util.BackpressureHelper; import io.reactivex.plugins.RxJavaPlugins; /** * A Subject that multicasts events to Subscribers that are currently subscribed to it. * * <p> * <img width="640" height="405" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/S.PublishSubject.png" alt=""> * * <p>The subject does not coordinate backpressure for its subscribers and implements a weaker onSubscribe which * calls requests Long.MAX_VALUE from the incoming Subscriptions. This makes it possible to subscribe the PublishSubject * to multiple sources (note on serialization though) unlike the standard contract on Subscriber. Child subscribers, however, are not overflown but receive an * IllegalStateException in case their requested amount is zero. * * <p>The implementation of onXXX methods are technically thread-safe but non-serialized calls * to them may lead to undefined state in the currently subscribed Subscribers. * * <p>Due to the nature Flowables are constructed, the PublishProcessor can't be instantiated through * {@code new} but must be created via the {@link #create()} method. * * Example usage: * <p> * <pre> {@code PublishProcessor<Object> processor = PublishProcessor.create(); // subscriber1 will receive all onNext and onComplete events processor.subscribe(subscriber1); processor.onNext("one"); processor.onNext("two"); // subscriber2 will only receive "three" and onComplete processor.subscribe(subscriber2); processor.onNext("three"); processor.onComplete(); } </pre> * @param <T> the value type multicast to Subscribers. */ public final class PublishProcessor<T> extends FlowableProcessor<T> { /** The terminated indicator for the subscribers array. */ @SuppressWarnings("rawtypes") static final PublishSubscription[] TERMINATED = new PublishSubscription[0]; /** An empty subscribers array to avoid allocating it all the time. */ @SuppressWarnings("rawtypes") static final PublishSubscription[] EMPTY = new PublishSubscription[0]; /** The array of currently subscribed subscribers. */ final AtomicReference<PublishSubscription<T>[]> subscribers; /** The error, write before terminating and read after checking subscribers. */ Throwable error; /** * Constructs a PublishProcessor. * @param <T> the value type * @return the new PublishProcessor */ @CheckReturnValue public static <T> PublishProcessor<T> create() { return new PublishProcessor<T>(); } /** * Constructs a PublishProcessor. * @since 2.0 */ @SuppressWarnings("unchecked") PublishProcessor() { subscribers = new AtomicReference<PublishSubscription<T>[]>(EMPTY); } @Override public void subscribeActual(Subscriber<? super T> t) { PublishSubscription<T> ps = new PublishSubscription<T>(t, this); t.onSubscribe(ps); if (add(ps)) { // if cancellation happened while a successful add, the remove() didn't work // so we need to do it again if (ps.isCancelled()) { remove(ps); } } else { Throwable ex = error; if (ex != null) { t.onError(ex); } else { t.onComplete(); } } } /** * Tries to add the given subscriber to the subscribers array atomically * or returns false if the subject has terminated. * @param ps the subscriber to add * @return true if successful, false if the subject has terminated */ boolean add(PublishSubscription<T> ps) { for (;;) { PublishSubscription<T>[] a = subscribers.get(); if (a == TERMINATED) { return false; } int n = a.length; @SuppressWarnings("unchecked") PublishSubscription<T>[] b = new PublishSubscription[n + 1]; System.arraycopy(a, 0, b, 0, n); b[n] = ps; if (subscribers.compareAndSet(a, b)) { return true; } } } /** * Atomically removes the given subscriber if it is subscribed to the subject. * @param ps the subject to remove */ @SuppressWarnings("unchecked") void remove(PublishSubscription<T> ps) { for (;;) { PublishSubscription<T>[] a = subscribers.get(); if (a == TERMINATED || a == EMPTY) { return; } int n = a.length; int j = -1; for (int i = 0; i < n; i++) { if (a[i] == ps) { j = i; break; } } if (j < 0) { return; } PublishSubscription<T>[] b; if (n == 1) { b = EMPTY; } else { b = new PublishSubscription[n - 1]; System.arraycopy(a, 0, b, 0, j); System.arraycopy(a, j + 1, b, j, n - j - 1); } if (subscribers.compareAndSet(a, b)) { return; } } } @Override public void onSubscribe(Subscription s) { if (subscribers.get() == TERMINATED) { s.cancel(); return; } // PublishSubject doesn't bother with request coordination. s.request(Long.MAX_VALUE); } @Override public void onNext(T t) { if (subscribers.get() == TERMINATED) { return; } if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } for (PublishSubscription<T> s : subscribers.get()) { s.onNext(t); } } @SuppressWarnings("unchecked") @Override public void onError(Throwable t) { if (subscribers.get() == TERMINATED) { RxJavaPlugins.onError(t); return; } if (t == null) { t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."); } error = t; for (PublishSubscription<T> s : subscribers.getAndSet(TERMINATED)) { s.onError(t); } } @SuppressWarnings("unchecked") @Override public void onComplete() { if (subscribers.get() == TERMINATED) { return; } for (PublishSubscription<T> s : subscribers.getAndSet(TERMINATED)) { s.onComplete(); } } /** * Tries to emit the item to all currently subscribed Subscribers if all of them * has requested some value, returns false otherwise. * <p> * This method should be called in a sequential manner just like the onXXX methods * of the PublishProcessor. * <p> * Calling with null will terminate the PublishProcessor and a NullPointerException * is signalled to the Subscribers. * @param t the item to emit, not null * @return true if the item was emitted to all Subscribers * @since 2.0.8 - experimental */ @Experimental public boolean offer(T t) { if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return true; } PublishSubscription<T>[] array = subscribers.get(); for (PublishSubscription<T> s : array) { if (s.isFull()) { return false; } } for (PublishSubscription<T> s : array) { s.onNext(t); } return true; } @Override public boolean hasSubscribers() { return subscribers.get().length != 0; } @Override public Throwable getThrowable() { if (subscribers.get() == TERMINATED) { return error; } return null; } @Override public boolean hasThrowable() { return subscribers.get() == TERMINATED && error != null; } @Override public boolean hasComplete() { return subscribers.get() == TERMINATED && error == null; } /** * Wraps the actual subscriber, tracks its requests and makes cancellation * to remove itself from the current subscribers array. * * @param <T> the value type */ static final class PublishSubscription<T> extends AtomicLong implements Subscription { private static final long serialVersionUID = 3562861878281475070L; /** The actual subscriber. */ final Subscriber<? super T> actual; /** The subject state. */ final PublishProcessor<T> parent; /** * Constructs a PublishSubscriber, wraps the actual subscriber and the state. * @param actual the actual subscriber * @param parent the parent PublishProcessor */ PublishSubscription(Subscriber<? super T> actual, PublishProcessor<T> parent) { this.actual = actual; this.parent = parent; } public void onNext(T t) { long r = get(); if (r == Long.MIN_VALUE) { return; } if (r != 0L) { actual.onNext(t); if (r != Long.MAX_VALUE) { decrementAndGet(); } } else { cancel(); actual.onError(new MissingBackpressureException("Could not emit value due to lack of requests")); } } public void onError(Throwable t) { if (get() != Long.MIN_VALUE) { actual.onError(t); } else { RxJavaPlugins.onError(t); } } public void onComplete() { if (get() != Long.MIN_VALUE) { actual.onComplete(); } } @Override public void request(long n) { if (SubscriptionHelper.validate(n)) { BackpressureHelper.addCancel(this, n); } } @Override public void cancel() { if (getAndSet(Long.MIN_VALUE) != Long.MIN_VALUE) { parent.remove(this); } } public boolean isCancelled() { return get() == Long.MIN_VALUE; } boolean isFull() { return get() == 0L; } } }