.class final Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscription; .super Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionBase; .source "ParallelJoin.java" # annotations .annotation system Ldalvik/annotation/EnclosingClass; value = Lio/reactivex/internal/operators/parallel/ParallelJoin; .end annotation .annotation system Ldalvik/annotation/InnerClass; accessFlags = 0x19 name = "JoinSubscription" .end annotation .annotation system Ldalvik/annotation/Signature; value = { "", "Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionBase<", "TT;>;" } .end annotation # static fields .field private static final serialVersionUID:J = 0x579a0f4f7e6bd483L # direct methods .method public constructor (Lok/d;II)V .locals 0 .annotation system Ldalvik/annotation/Signature; value = { "(", "Lok/d<", "-TT;>;II)V" } .end annotation .line 1 invoke-direct {p0, p1, p2, p3}, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionBase;->(Lok/d;II)V return-void .end method # virtual methods .method public drain()V .locals 1 .line 1 invoke-virtual {p0}, Ljava/util/concurrent/atomic/AtomicInteger;->getAndIncrement()I move-result v0 if-eqz v0, :cond_0 return-void .line 2 :cond_0 invoke-virtual {p0}, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscription;->drainLoop()V return-void .end method .method public drainLoop()V .locals 18 move-object/from16 v0, p0 .line 1 iget-object v1, v0, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionBase;->subscribers:[Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinInnerSubscriber; .line 2 array-length v2, v1 .line 3 iget-object v3, v0, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionBase;->downstream:Lok/d; const/4 v5, 0x1 .line 4 :goto_0 iget-object v6, v0, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionBase;->requested:Ljava/util/concurrent/atomic/AtomicLong; invoke-virtual {v6}, Ljava/util/concurrent/atomic/AtomicLong;->get()J move-result-wide v6 const-wide/16 v8, 0x0 move-wide v10, v8 :cond_0 cmp-long v12, v10, v6 if-eqz v12, :cond_8 .line 5 iget-boolean v12, v0, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionBase;->cancelled:Z if-eqz v12, :cond_1 .line 6 invoke-virtual/range {p0 .. p0}, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionBase;->cleanup()V return-void .line 7 :cond_1 iget-object v12, v0, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionBase;->errors:Lio/reactivex/internal/util/AtomicThrowable; invoke-virtual {v12}, Ljava/util/concurrent/atomic/AtomicReference;->get()Ljava/lang/Object; move-result-object v12 check-cast v12, Ljava/lang/Throwable; if-eqz v12, :cond_2 .line 8 invoke-virtual/range {p0 .. p0}, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionBase;->cleanup()V .line 9 invoke-interface {v3, v12}, Lok/d;->onError(Ljava/lang/Throwable;)V return-void .line 10 :cond_2 iget-object v12, v0, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionBase;->done:Ljava/util/concurrent/atomic/AtomicInteger; invoke-virtual {v12}, Ljava/util/concurrent/atomic/AtomicInteger;->get()I move-result v12 if-nez v12, :cond_3 const/4 v12, 0x1 goto :goto_1 :cond_3 const/4 v12, 0x0 :goto_1 const/4 v14, 0x0 const/4 v15, 0x1 .line 11 :goto_2 array-length v4, v1 if-ge v14, v4, :cond_6 .line 12 aget-object v4, v1, v14 .line 13 iget-object v13, v4, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinInnerSubscriber;->queue:Lrh/n; if-eqz v13, :cond_5 .line 14 invoke-interface {v13}, Lrh/n;->poll()Ljava/lang/Object; move-result-object v13 if-eqz v13, :cond_5 .line 15 invoke-interface {v3, v13}, Lok/d;->onNext(Ljava/lang/Object;)V .line 16 invoke-virtual {v4}, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinInnerSubscriber;->requestOne()V const-wide/16 v16, 0x1 add-long v10, v10, v16 cmp-long v4, v10, v6 if-nez v4, :cond_4 goto :goto_3 :cond_4 const/4 v15, 0x0 :cond_5 add-int/lit8 v14, v14, 0x1 goto :goto_2 :cond_6 if-eqz v12, :cond_7 if-eqz v15, :cond_7 .line 17 invoke-interface {v3}, Lok/d;->onComplete()V return-void :cond_7 if-eqz v15, :cond_0 :cond_8 :goto_3 cmp-long v4, v10, v6 if-nez v4, :cond_e .line 18 iget-boolean v4, v0, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionBase;->cancelled:Z if-eqz v4, :cond_9 .line 19 invoke-virtual/range {p0 .. p0}, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionBase;->cleanup()V return-void .line 20 :cond_9 iget-object v4, v0, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionBase;->errors:Lio/reactivex/internal/util/AtomicThrowable; invoke-virtual {v4}, Ljava/util/concurrent/atomic/AtomicReference;->get()Ljava/lang/Object; move-result-object v4 check-cast v4, Ljava/lang/Throwable; if-eqz v4, :cond_a .line 21 invoke-virtual/range {p0 .. p0}, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionBase;->cleanup()V .line 22 invoke-interface {v3, v4}, Lok/d;->onError(Ljava/lang/Throwable;)V return-void .line 23 :cond_a iget-object v4, v0, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionBase;->done:Ljava/util/concurrent/atomic/AtomicInteger; invoke-virtual {v4}, Ljava/util/concurrent/atomic/AtomicInteger;->get()I move-result v4 if-nez v4, :cond_b const/4 v4, 0x1 goto :goto_4 :cond_b const/4 v4, 0x0 :goto_4 const/4 v12, 0x0 :goto_5 if-ge v12, v2, :cond_d .line 24 aget-object v13, v1, v12 .line 25 iget-object v13, v13, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinInnerSubscriber;->queue:Lrh/n; if-eqz v13, :cond_c .line 26 invoke-interface {v13}, Lrh/o;->isEmpty()Z move-result v13 if-nez v13, :cond_c const/4 v13, 0x0 goto :goto_6 :cond_c add-int/lit8 v12, v12, 0x1 goto :goto_5 :cond_d const/4 v13, 0x1 :goto_6 if-eqz v4, :cond_e if-eqz v13, :cond_e .line 27 invoke-interface {v3}, Lok/d;->onComplete()V return-void :cond_e cmp-long v4, v10, v8 if-eqz v4, :cond_f const-wide v8, 0x7fffffffffffffffL cmp-long v4, v6, v8 if-eqz v4, :cond_f .line 28 iget-object v4, v0, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionBase;->requested:Ljava/util/concurrent/atomic/AtomicLong; neg-long v6, v10 invoke-virtual {v4, v6, v7}, Ljava/util/concurrent/atomic/AtomicLong;->addAndGet(J)J .line 29 :cond_f invoke-virtual/range {p0 .. p0}, Ljava/util/concurrent/atomic/AtomicInteger;->get()I move-result v4 if-ne v4, v5, :cond_10 neg-int v4, v5 .line 30 invoke-virtual {v0, v4}, Ljava/util/concurrent/atomic/AtomicInteger;->addAndGet(I)I move-result v4 if-nez v4, :cond_10 return-void :cond_10 move v5, v4 goto/16 :goto_0 .end method .method public onComplete()V .locals 1 .line 1 iget-object v0, p0, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionBase;->done:Ljava/util/concurrent/atomic/AtomicInteger; invoke-virtual {v0}, Ljava/util/concurrent/atomic/AtomicInteger;->decrementAndGet()I .line 2 invoke-virtual {p0}, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscription;->drain()V return-void .end method .method public onError(Ljava/lang/Throwable;)V .locals 2 .line 1 iget-object v0, p0, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionBase;->errors:Lio/reactivex/internal/util/AtomicThrowable; const/4 v1, 0x0 invoke-virtual {v0, v1, p1}, Ljava/util/concurrent/atomic/AtomicReference;->compareAndSet(Ljava/lang/Object;Ljava/lang/Object;)Z move-result v0 if-eqz v0, :cond_0 .line 2 invoke-virtual {p0}, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionBase;->cancelAll()V .line 3 invoke-virtual {p0}, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscription;->drain()V goto :goto_0 .line 4 :cond_0 iget-object v0, p0, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionBase;->errors:Lio/reactivex/internal/util/AtomicThrowable; invoke-virtual {v0}, Ljava/util/concurrent/atomic/AtomicReference;->get()Ljava/lang/Object; move-result-object v0 if-eq p1, v0, :cond_1 .line 5 invoke-static {p1}, Lwh/a;->Y(Ljava/lang/Throwable;)V :cond_1 :goto_0 return-void .end method .method public onNext(Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinInnerSubscriber;Ljava/lang/Object;)V .locals 6 .annotation system Ldalvik/annotation/Signature; value = { "(", "Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinInnerSubscriber<", "TT;>;TT;)V" } .end annotation .line 1 invoke-virtual {p0}, Ljava/util/concurrent/atomic/AtomicInteger;->get()I move-result v0 const-string v1, "Queue full?!" if-nez v0, :cond_4 const/4 v0, 0x0 const/4 v2, 0x1 invoke-virtual {p0, v0, v2}, Ljava/util/concurrent/atomic/AtomicInteger;->compareAndSet(II)Z move-result v0 if-eqz v0, :cond_4 .line 2 iget-object v0, p0, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionBase;->requested:Ljava/util/concurrent/atomic/AtomicLong; invoke-virtual {v0}, Ljava/util/concurrent/atomic/AtomicLong;->get()J move-result-wide v2 const-wide/16 v4, 0x0 cmp-long v0, v2, v4 if-eqz v0, :cond_1 .line 3 iget-object v0, p0, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionBase;->downstream:Lok/d; invoke-interface {v0, p2}, Lok/d;->onNext(Ljava/lang/Object;)V .line 4 iget-object p2, p0, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionBase;->requested:Ljava/util/concurrent/atomic/AtomicLong; invoke-virtual {p2}, Ljava/util/concurrent/atomic/AtomicLong;->get()J move-result-wide v0 const-wide v2, 0x7fffffffffffffffL cmp-long p2, v0, v2 if-eqz p2, :cond_0 .line 5 iget-object p2, p0, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionBase;->requested:Ljava/util/concurrent/atomic/AtomicLong; invoke-virtual {p2}, Ljava/util/concurrent/atomic/AtomicLong;->decrementAndGet()J :cond_0 const-wide/16 v0, 0x1 .line 6 invoke-virtual {p1, v0, v1}, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinInnerSubscriber;->request(J)V goto :goto_1 .line 7 :cond_1 invoke-virtual {p1}, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinInnerSubscriber;->getQueue()Lrh/n; move-result-object p1 .line 8 invoke-interface {p1, p2}, Lrh/o;->offer(Ljava/lang/Object;)Z move-result p1 if-nez p1, :cond_3 .line 9 invoke-virtual {p0}, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionBase;->cancelAll()V .line 10 new-instance p1, Lio/reactivex/exceptions/MissingBackpressureException; invoke-direct {p1, v1}, Lio/reactivex/exceptions/MissingBackpressureException;->(Ljava/lang/String;)V .line 11 iget-object p2, p0, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionBase;->errors:Lio/reactivex/internal/util/AtomicThrowable; const/4 v0, 0x0 invoke-virtual {p2, v0, p1}, Ljava/util/concurrent/atomic/AtomicReference;->compareAndSet(Ljava/lang/Object;Ljava/lang/Object;)Z move-result p2 if-eqz p2, :cond_2 .line 12 iget-object p2, p0, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionBase;->downstream:Lok/d; invoke-interface {p2, p1}, Lok/d;->onError(Ljava/lang/Throwable;)V goto :goto_0 .line 13 :cond_2 invoke-static {p1}, Lwh/a;->Y(Ljava/lang/Throwable;)V :goto_0 return-void .line 14 :cond_3 :goto_1 invoke-virtual {p0}, Ljava/util/concurrent/atomic/AtomicInteger;->decrementAndGet()I move-result p1 if-nez p1, :cond_6 return-void .line 15 :cond_4 invoke-virtual {p1}, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinInnerSubscriber;->getQueue()Lrh/n; move-result-object p1 .line 16 invoke-interface {p1, p2}, Lrh/o;->offer(Ljava/lang/Object;)Z move-result p1 if-nez p1, :cond_5 .line 17 invoke-virtual {p0}, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionBase;->cancelAll()V .line 18 new-instance p1, Lio/reactivex/exceptions/MissingBackpressureException; invoke-direct {p1, v1}, Lio/reactivex/exceptions/MissingBackpressureException;->(Ljava/lang/String;)V invoke-virtual {p0, p1}, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscription;->onError(Ljava/lang/Throwable;)V return-void .line 19 :cond_5 invoke-virtual {p0}, Ljava/util/concurrent/atomic/AtomicInteger;->getAndIncrement()I move-result p1 if-eqz p1, :cond_6 return-void .line 20 :cond_6 invoke-virtual {p0}, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscription;->drainLoop()V return-void .end method