[Git][java-team/reactive-streams][upstream] New upstream version 1.0.3

Emmanuel Bourg gitlab at salsa.debian.org
Sun Jan 26 14:08:32 GMT 2020



Emmanuel Bourg pushed to branch upstream at Debian Java Maintainers / reactive-streams


Commits:
3efe12c2 by Emmanuel Bourg at 2020-01-26T15:00:48+01:00
New upstream version 1.0.3
- - - - -


25 changed files:

- CopyrightWaivers.txt
- README.md
- RELEASE-NOTES.md
- api/build.gradle
- api/src/main/java/org/reactivestreams/Subscription.java
- flow-adapters/src/main/java/org/reactivestreams/FlowAdapters.java → api/src/main/java9/org/reactivestreams/FlowAdapters.java
- build.gradle
- − flow-adapters/.gitignore
- − flow-adapters/build.gradle
- settings.gradle
- tck-flow/README.md
- tck-flow/build.gradle
- tck-flow/src/main/java/org/reactivestreams/tck/flow/IdentityFlowProcessorVerification.java
- flow-adapters/src/test/java/org/reactivestreams/FlowAdaptersTest.java → tck-flow/src/test/java/org/reactivestreams/FlowAdaptersTest.java
- flow-adapters/src/test/java/org/reactivestreams/MulticastPublisher.java → tck-flow/src/test/java/org/reactivestreams/MulticastPublisher.java
- flow-adapters/src/test/java/org/reactivestreams/SubmissionPublisherTckTest.java → tck-flow/src/test/java/org/reactivestreams/SubmissionPublisherTckTest.java
- flow-adapters/src/test/java/org/reactivestreams/TestEitherConsumer.java → tck-flow/src/test/java/org/reactivestreams/TestEitherConsumer.java
- tck/README.md
- tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java
- tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java
- tck/src/main/java/org/reactivestreams/tck/SubscriberWhiteboxVerification.java
- tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java
- tck/src/test/java/org/reactivestreams/tck/PublisherVerificationTest.java
- tck/src/test/java/org/reactivestreams/tck/SubscriberWhiteboxVerificationTest.java
- tck/src/test/java/org/reactivestreams/tck/flow/support/TCKVerificationSupport.java


Changes:

=====================================
CopyrightWaivers.txt
=====================================
@@ -41,3 +41,8 @@ patriknw       | Patrik Nordwall, patrik.nordwall at gmail.com, Lightbend Inc
 angelsanz      | Ángel Sanz, angelsanz at users.noreply.github.com
 shenghaiyang   | 盛海洋, shenghaiyang at aliyun.com
 kiiadi         | Kyle Thomson, kylthoms at amazon.com, Amazon.com
+jroper         | James Roper, james at jazzy.id.au, Lightbend Inc.
+olegdokuka     | Oleh Dokuka, shadowgun at .i.ua, Netifi Inc.
+Scottmitch     | Scott Mitchell, scott_mitchell at apple.com, Apple Inc.
+retronym       | Jason Zaugg, jzaugg at gmail.com, Lightbend Inc.
+


=====================================
README.md
=====================================
@@ -8,12 +8,12 @@ The latest release is available on Maven Central as
 <dependency>
   <groupId>org.reactivestreams</groupId>
   <artifactId>reactive-streams</artifactId>
-  <version>1.0.2</version>
+  <version>1.0.3</version>
 </dependency>
 <dependency>
   <groupId>org.reactivestreams</groupId>
   <artifactId>reactive-streams-tck</artifactId>
-  <version>1.0.2</version>
+  <version>1.0.3</version>
   <scope>test</scope>
 </dependency>
 ```
@@ -79,12 +79,12 @@ followed by a possibly unbounded number of `onNext` signals (as requested by `Su
 | <a name="term_non-obstructing">Non-obstructing</a> | Quality describing a method which is as quick to execute as possible—on the calling thread. This means, for example, avoids heavy computations and other things that would stall the caller´s thread of execution. |
 | <a name="term_terminal_state">Terminal state</a> | For a Publisher: When `onComplete` or `onError` has been signalled. For a Subscriber: When an `onComplete` or `onError` has been received.|
 | <a name="term_nop">NOP</a> | Execution that has no detectable effect to the calling thread, and can as such safely be called any number of times.|
-| <a name="term_ext_sync">External synchronization</a> | Access coordination for thread safety purposes implemented outside of the constructs defined in this specification, using techniques such as, but not limited to, `atomics`, `monitors`, or `locks`. |
+| <a name="term_serially">Serial(ly)</a> | In the context of a [Signal](#term_signal), non-overlapping. In the context of the JVM, calls to methods on an object are serial if and only if there is a happens-before relationship between those calls (implying also that the calls do not overlap). When the calls are performed asynchronously, coordination to establish the happens-before relationship is to be implemented using techniques such as, but not limited to, atomics, monitors, or locks. |
 | <a name="term_thread-safe">Thread-safe</a> | Can be safely invoked synchronously, or asychronously, without requiring external synchronization to ensure program correctness. |
 
 ### SPECIFICATION
 
-#### 1. Publisher ([Code](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.2/api/src/main/java/org/reactivestreams/Publisher.java))
+#### 1. Publisher ([Code](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/api/src/main/java/org/reactivestreams/Publisher.java))
 
 ```java
 public interface Publisher<T> {
@@ -98,8 +98,8 @@ public interface Publisher<T> {
 | [:bulb:](#1.1 "1.1 explained") | *The intent of this rule is to make it clear that Publishers cannot signal more elements than Subscribers have requested. There’s an implicit, but important, consequence to this rule: Since demand can only be fulfilled after it has been received, there’s a happens-before relationship between requesting elements and receiving elements.* |
 | <a name="1.2">2</a>       | A `Publisher` MAY signal fewer `onNext` than requested and terminate the `Subscription` by calling `onComplete` or `onError`. |
 | [:bulb:](#1.2 "1.2 explained") | *The intent of this rule is to make it clear that a Publisher cannot guarantee that it will be able to produce the number of elements requested; it simply might not be able to produce them all; it may be in a failed state; it may be empty or otherwise already completed.* |
-| <a name="1.3">3</a>       | `onSubscribe`, `onNext`, `onError` and `onComplete` signaled to a `Subscriber` MUST be signaled in a [thread-safe](#term_thread-safe) manner—and if performed by multiple threads—use [external synchronization](#term_ext_sync). |
-| [:bulb:](#1.3 "1.3 explained") | *The intent of this rule is to make it clear that [external synchronization](#term_ext_sync) must be employed if the Publisher intends to send signals from multiple/different threads.* |
+| <a name="1.3">3</a>       | `onSubscribe`, `onNext`, `onError` and `onComplete` signaled to a `Subscriber` MUST be signaled [serially](#term_serially). |
+| [:bulb:](#1.3 "1.3 explained") | *The intent of this rule is to permit the signalling of signals (including from multiple threads) if and only if a happens-before relation between each of the signals is established.* |
 | <a name="1.4">4</a>       | If a `Publisher` fails it MUST signal an `onError`. |
 | [:bulb:](#1.4 "1.4 explained") | *The intent of this rule is to make it clear that a Publisher is responsible for notifying its Subscribers if it detects that it cannot proceed—Subscribers must be given a chance to clean up resources or otherwise deal with the Publisher´s failures.* |
 | <a name="1.5">5</a>       | If a `Publisher` terminates successfully (finite stream) it MUST signal an `onComplete`. |
@@ -117,7 +117,7 @@ public interface Publisher<T> {
 | <a name="1.11">11</a>     | A `Publisher` MAY support multiple `Subscriber`s and decides whether each `Subscription` is unicast or multicast. |
 | [:bulb:](#1.11 "1.11 explained") | *The intent of this rule is to give Publisher implementations the flexibility to decide how many, if any, Subscribers they will support, and how elements are going to be distributed.* |
 
-#### 2. Subscriber ([Code](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.2/api/src/main/java/org/reactivestreams/Subscriber.java))
+#### 2. Subscriber ([Code](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/api/src/main/java/org/reactivestreams/Subscriber.java))
 
 ```java
 public interface Subscriber<T> {
@@ -131,7 +131,7 @@ public interface Subscriber<T> {
 | ID                        | Rule                                                                                                   |
 | ------------------------- | ------------------------------------------------------------------------------------------------------ |
 | <a name="2.1">1</a>       | A `Subscriber` MUST signal demand via `Subscription.request(long n)` to receive `onNext` signals. |
-| [:bulb:](#2.1 "2.1 explained") | *The intent of this rule is to establish that it is the responsibility of the Subscriber to signal when, and how many, elements it is able and willing to receive.* |
+| [:bulb:](#2.1 "2.1 explained") | *The intent of this rule is to establish that it is the responsibility of the Subscriber to decide when and how many elements it is able and willing to receive. To avoid signal reordering caused by reentrant Subscription methods, it is strongly RECOMMENDED for synchronous Subscriber implementations to invoke Subscription methods at the very end of any signal processing. It is RECOMMENDED that Subscribers request the upper limit of what they are able to process, as requesting only one element at a time results in an inherently inefficient "stop-and-wait" protocol.* |
 | <a name="2.2">2</a>       | If a `Subscriber` suspects that its processing of signals will negatively impact its `Publisher`´s responsivity, it is RECOMMENDED that it asynchronously dispatches its signals. |
 | [:bulb:](#2.2 "2.2 explained") | *The intent of this rule is that a Subscriber should [not obstruct](#term_non-obstructing) the progress of the Publisher from an execution point-of-view. In other words, the Subscriber should not starve the Publisher from receiving CPU cycles.* |
 | <a name="2.3">3</a>       | `Subscriber.onComplete()` and `Subscriber.onError(Throwable t)` MUST NOT call any methods on the `Subscription` or the `Publisher`. |
@@ -139,11 +139,11 @@ public interface Subscriber<T> {
 | <a name="2.4">4</a>       | `Subscriber.onComplete()` and `Subscriber.onError(Throwable t)` MUST consider the Subscription cancelled after having received the signal. |
 | [:bulb:](#2.4 "2.4 explained") | *The intent of this rule is to make sure that Subscribers respect a Publisher’s [terminal state](#term_terminal_state) signals. A Subscription is simply not valid anymore after an onComplete or onError signal has been received.* |
 | <a name="2.5">5</a>       | A `Subscriber` MUST call `Subscription.cancel()` on the given `Subscription` after an `onSubscribe` signal if it already has an active `Subscription`. |
-| [:bulb:](#2.5 "2.5 explained") | *The intent of this rule is to prevent that two, or more, separate Publishers from thinking that they can interact with the same Subscriber. Enforcing this rule means that resource leaks are prevented since extra Subscriptions will be cancelled.* |
+| [:bulb:](#2.5 "2.5 explained") | *The intent of this rule is to prevent that two, or more, separate Publishers from trying to interact with the same Subscriber. Enforcing this rule means that resource leaks are prevented since extra Subscriptions will be cancelled. Failure to conform to this rule may lead to violations of Publisher rule 1, amongst others. Such violations can lead to hard-to-diagnose bugs.* |
 | <a name="2.6">6</a>       | A `Subscriber` MUST call `Subscription.cancel()` if the `Subscription` is no longer needed. |
 | [:bulb:](#2.6 "2.6 explained") | *The intent of this rule is to establish that Subscribers cannot just throw Subscriptions away when they are no longer needed, they have to call `cancel` so that resources held by that Subscription can be safely, and timely, reclaimed. An example of this would be a Subscriber which is only interested in a specific element, which would then cancel its Subscription to signal its completion to the Publisher.* |
-| <a name="2.7">7</a>       | A `Subscriber` MUST ensure that all calls on its `Subscription` take place from the same thread or provide for respective [external synchronization](#term_ext_sync). |
-| [:bulb:](#2.7 "2.7 explained") | *The intent of this rule is to establish that [external synchronization](#term_ext_sync) must be added if a Subscriber will be using a Subscription concurrently by two or more threads.* |
+| <a name="2.7">7</a>       | A Subscriber MUST ensure that all calls on its Subscription's request and cancel methods are performed [serially](#term_serially). |
+| [:bulb:](#2.7 "2.7 explained") | *The intent of this rule is to permit the calling of the request and cancel methods (including from multiple threads) if and only if a happens-before relation between each of the calls is established.* |
 | <a name="2.8">8</a>       | A `Subscriber` MUST be prepared to receive one or more `onNext` signals after having called `Subscription.cancel()` if there are still requested elements pending [see [3.12](#3.12)]. `Subscription.cancel()` does not guarantee to perform the underlying cleaning operations immediately. |
 | [:bulb:](#2.8 "2.8 explained") | *The intent of this rule is to highlight that there may be a delay between calling `cancel` and the Publisher observing that cancellation.* |
 | <a name="2.9">9</a>       | A `Subscriber` MUST be prepared to receive an `onComplete` signal with or without a preceding `Subscription.request(long n)` call. |
@@ -157,7 +157,7 @@ public interface Subscriber<T> {
 | <a name="2.13">13</a>     | Calling `onSubscribe`, `onNext`, `onError` or `onComplete` MUST [return normally](#term_return_normally) except when any provided parameter is `null` in which case it MUST throw a `java.lang.NullPointerException` to the caller, for all other situations the only legal way for a `Subscriber` to signal failure is by cancelling its `Subscription`. In the case that this rule is violated, any associated `Subscription` to the `Subscriber` MUST be considered as cancelled, and the caller MUST raise this error condition in a fashion that is adequate for the runtime environment. |
 | [:bulb:](#2.13 "2.13 explained") | *The intent of this rule is to establish the semantics for the methods of Subscriber and what the Publisher is allowed to do in which case this rule is violated. «Raise this error condition in a fashion that is adequate for the runtime environment» could mean logging the error—or otherwise make someone or something aware of the situation—as the error cannot be signalled to the faulty Subscriber.* |
 
-#### 3. Subscription ([Code](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.2/api/src/main/java/org/reactivestreams/Subscription.java))
+#### 3. Subscription ([Code](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/api/src/main/java/org/reactivestreams/Subscription.java))
 
 ```java
 public interface Subscription {
@@ -205,7 +205,7 @@ public interface Subscription {
 
 A `Subscription` is shared by exactly one `Publisher` and one `Subscriber` for the purpose of mediating the data exchange between this pair. This is the reason why the `subscribe()` method does not return the created `Subscription`, but instead returns `void`; the `Subscription` is only passed to the `Subscriber` via the `onSubscribe` callback.
 
-#### 4.Processor ([Code](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.2/api/src/main/java/org/reactivestreams/Processor.java))
+#### 4.Processor ([Code](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/api/src/main/java/org/reactivestreams/Processor.java))
 
 ```java
 public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {


=====================================
RELEASE-NOTES.md
=====================================
@@ -1,5 +1,114 @@
 # Release notes for Reactive Streams
 
+# Version 1.0.3 released on 2019-08-23
+
+## Announcement:
+
+We—the Reactive Streams community—are pleased to announce the immediate availability of `Reactive Streams 1.0.3`. This update to `Reactive Streams` brings the following improvements over `1.0.2`.
+
+## Highlights:
+
+- Specification
+  + Glossary term "External synchronization" has been [superseded](#Glossary)
+  + No breaking/semantical changes
+  + Rule [clarifications](#specification-clarifications-103-RC1)
+- Interfaces
+  + No changes
+- Technology Compatibility Kit (TCK)
+  + Improved [coverage](#tck-alterations-103-RC1)
+  + Improved JavaDoc
+- Examples
+  + No changes
+- Artifacts
+  + FlowAdapters artifact removed, FlowAdapters moved into the core jar ([#424](https://github.com/reactive-streams/reactive-streams-jvm/issues/424))
+
+## Specification clarifications 1.0.3
+
+## Glossary term "External synchronization" replaced by "Serial(ly)"
+
+**1.0.2:** Access coordination for thread safety purposes implemented outside of the constructs defined in this specification, using techniques such as, but not limited to, `atomics`, `monitors`, or `locks`.
+
+**1.0.3** In the context of a Signal, non-overlapping. In the context of the JVM, calls to methods on an object are serial if and only if there is a happens-before relationship between those calls (implying also that the calls do not overlap). When the calls are performed asynchronously, coordination to establish the happens-before relationship is to be implemented using techniques such as, but not limited to, atomics, monitors, or locks.
+
+## Publisher Rule 3 (Rule and Intent clarified)
+
+**1.0.2:** `onSubscribe`, `onNext`, `onError` and `onComplete` signaled to a `Subscriber` MUST be signaled in a thread-safe manner—and if performed by multiple threads—use external synchronization.
+
+*The intent of this rule is to make it clear that external synchronization must be employed if the Publisher intends to send signals from multiple/different threads.*
+
+**1.0.3:** `onSubscribe`, `onNext`, `onError` and `onComplete` signaled to a `Subscriber` MUST be signaled serially.
+
+*The intent of this rule is to permit the signalling of signals (including from multiple threads) if and only if a happens-before relation between each of the signals is established.*
+
+## Subscriber Rule 1 (Intent clarified)
+
+**1.0.2:** A `Subscriber` MUST signal demand via `Subscription.request(long n)` to receive `onNext` signals.
+
+*The intent of this rule is to establish that it is the responsibility of the Subscriber to signal when, and how many, elements it is able and willing to receive.*
+
+**1.0.3:** A `Subscriber` MUST signal demand via `Subscription.request(long n)` to receive `onNext` signals.
+
+*The intent of this rule is to establish that it is the responsibility of the Subscriber to decide when and how many elements it is able and willing to receive. To avoid signal reordering caused by reentrant Subscription methods, it is strongly RECOMMENDED for synchronous Subscriber implementations to invoke Subscription methods at the very end of any signal processing. It is RECOMMENDED that Subscribers request the upper limit of what they are able to process, as requesting only one element at a time results in an inherently inefficient "stop-and-wait" protocol.*
+
+## Subscriber Rule 5 (Intent clarified)
+
+**1.0.2:** A `Subscriber` MUST call `Subscription.cancel()` on the given `Subscription` after an `onSubscribe` signal if it already has an active `Subscription`
+
+*The intent of this rule is to prevent that two, or more, separate Publishers from thinking that they can interact with the same Subscriber. Enforcing this rule means that resource leaks are prevented since extra Subscriptions will be cancelled.*
+
+**1.0.3:** A `Subscriber` MUST call `Subscription.cancel()` on the given `Subscription` after an `onSubscribe` signal if it already has an active `Subscription`
+
+*The intent of this rule is to prevent that two, or more, separate Publishers from trying to interact with the same Subscriber. Enforcing this rule means that resource leaks are prevented since extra Subscriptions will be cancelled. Failure to conform to this rule may lead to violations of Publisher rule 1, amongst others. Such violations can lead to hard-to-diagnose bugs.*
+
+## Subscriber Rule 7 (Rule and Intent clarified)
+
+**1.0.2:** A `Subscriber` MUST ensure that all calls on its `Subscription` take place from the same thread or provide for respective external synchronization.
+
+*The intent of this rule is to establish that external synchronization must be added if a Subscriber will be using a Subscription concurrently by two or more threads.*
+
+**1.0.3:** A Subscriber MUST ensure that all calls on its Subscription's request and cancel methods are performed serially.
+
+*The intent of this rule is to permit the calling of the request and cancel methods (including from multiple threads) if and only if a happens-before relation between each of the calls is established.*
+
+## TCK alterations 1.0.3
+
+- `PublisherVerification.optional_spec105_emptyStreamMustTerminateBySignallingOnComplete` fails if the publisher completes synchronously ([#422](https://github.com/reactive-streams/reactive-streams-jvm/issues/422))
+- IdentityFlowProcessorVerification throws NPE when `createFailedFlowPublisher` returns null ([#425](https://github.com/reactive-streams/reactive-streams-jvm/issues/425))
+- `required_spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel` does not wait for request before invoking onNext ([#277](https://github.com/reactive-streams/reactive-streams-jvm/issues/277))
+- Subscriber whitebox verification tests demand ([#280](https://github.com/reactive-streams/reactive-streams-jvm/issues/280))
+- Incomplete documentation on stochastic tests in TCK ([#278](https://github.com/reactive-streams/reactive-streams-jvm/issues/278))
+- TCK performance ([#446](https://github.com/reactive-streams/reactive-streams-jvm/issues/446))
+- TCK: Receptacle#expectError timeout approach ([#451](https://github.com/reactive-streams/reactive-streams-jvm/issues/451))
+
+
+## Contributors
+  + Roland Kuhn [(@rkuhn)](https://github.com/rkuhn)
+  + Ben Christensen [(@benjchristensen)](https://github.com/benjchristensen)
+  + Viktor Klang [(@viktorklang)](https://github.com/viktorklang)
+  + Stephane Maldini [(@smaldini)](https://github.com/smaldini)
+  + Stanislav Savulchik [(@savulchik)](https://github.com/savulchik)
+  + Konrad Malawski [(@ktoso)](https://github.com/ktoso)
+  + Slim Ouertani [(@ouertani)](https://github.com/ouertani)
+  + Martynas Mickevičius [(@2m)](https://github.com/2m)
+  + Luke Daley [(@ldaley)](https://github.com/ldaley)
+  + Colin Godsey [(@colinrgodsey)](https://github.com/colinrgodsey)
+  + David Moten [(@davidmoten)](https://github.com/davidmoten)
+  + Brian Topping [(@briantopping)](https://github.com/briantopping)
+  + Rossen Stoyanchev [(@rstoyanchev)](https://github.com/rstoyanchev)
+  + Björn Hamels [(@BjornHamels)](https://github.com/BjornHamels)
+  + Jake Wharton [(@JakeWharton)](https://github.com/JakeWharton)
+  + Anthony Vanelverdinghe[(@anthonyvdotbe)](https://github.com/anthonyvdotbe)
+  + Kazuhiro Sera [(@seratch)](https://github.com/seratch)
+  + Dávid Karnok [(@akarnokd)](https://github.com/akarnokd)
+  + Evgeniy Getman [(@egetman)](https://github.com/egetman)
+  + Ángel Sanz [(@angelsanz)](https://github.com/angelsanz)
+  + shenghaiyang [(@shenghaiyang)](https://github.com/shenghaiyang)
+  + Kyle Thomson [(@kiiadi)](https://github.com/kiiadi)
+  + (new) James Roper [(@jroper)](https://github.com/jroper)
+  + (new) Oleh Dokuka [(@olegdokuka)](https://github.com/olegdokuka)
+  + (new) Scott Mitchell [(@Scottmitch)](https://github.com/Scottmitch)
+
+
 ---
 
 # Version 1.0.2 released on 2017-12-19


=====================================
api/build.gradle
=====================================
@@ -1,4 +1,22 @@
 description = "reactive-streams"
+def jdkFlow = false
+try {
+    Class.forName("java.util.concurrent.Flow")
+    jdkFlow = true
+} catch (ClassNotFoundException cnfe) {
+
+}
+
+sourceSets {
+    main {
+        java {
+            if (jdkFlow)
+              srcDirs = ['src/main/java', 'src/main/java9']
+            else
+              srcDirs = ['src/main/java']
+        }
+    }
+}
 
 jar {
     manifest {


=====================================
api/src/main/java/org/reactivestreams/Subscription.java
=====================================
@@ -23,8 +23,8 @@ public interface Subscription {
     /**
      * No events will be sent by a {@link Publisher} until demand is signaled via this method.
      * <p>
-     * It can be called however often and whenever needed—but the outstanding cumulative demand must never exceed Long.MAX_VALUE.
-     * An outstanding cumulative demand of Long.MAX_VALUE may be treated by the {@link Publisher} as "effectively unbounded".
+     *  It can be called however often and whenever needed—but if the outstanding cumulative demand ever becomes Long.MAX_VALUE or more,
+     *  it may be treated by the {@link Publisher} as "effectively unbounded".
      * <p>
      * Whatever has been requested can be sent by the {@link Publisher} so only signal demand for what can be safely handled.
      * <p>


=====================================
flow-adapters/src/main/java/org/reactivestreams/FlowAdapters.java → api/src/main/java9/org/reactivestreams/FlowAdapters.java
=====================================
@@ -12,6 +12,7 @@
 package org.reactivestreams;
 
 import java.util.concurrent.Flow;
+import static java.util.Objects.requireNonNull;
 
 /**
  * Bridge between Reactive Streams API and the Java 9 {@link java.util.concurrent.Flow} API.
@@ -31,16 +32,16 @@ public final class FlowAdapters {
     @SuppressWarnings("unchecked")
     public static <T> org.reactivestreams.Publisher<T> toPublisher(
             Flow.Publisher<? extends T> flowPublisher) {
-        if (flowPublisher == null) {
-            throw new NullPointerException("flowPublisher");
-        }
+        requireNonNull(flowPublisher, "flowPublisher");
+        final org.reactivestreams.Publisher<T> publisher;
         if (flowPublisher instanceof FlowPublisherFromReactive) {
-            return (org.reactivestreams.Publisher<T>)(((FlowPublisherFromReactive<T>)flowPublisher).reactiveStreams);
-        }
-        if (flowPublisher instanceof org.reactivestreams.Publisher) {
-            return (org.reactivestreams.Publisher<T>)flowPublisher;
+            publisher = (org.reactivestreams.Publisher<T>)(((FlowPublisherFromReactive<T>)flowPublisher).reactiveStreams);
+        } else if (flowPublisher instanceof org.reactivestreams.Publisher) {
+            publisher = (org.reactivestreams.Publisher<T>)flowPublisher;
+        } else {
+            publisher = new ReactivePublisherFromFlow<T>(flowPublisher);
         }
-        return new ReactivePublisherFromFlow<T>(flowPublisher);
+        return publisher;
     }
 
     /**
@@ -53,16 +54,16 @@ public final class FlowAdapters {
     public static <T> Flow.Publisher<T> toFlowPublisher(
             org.reactivestreams.Publisher<? extends T> reactiveStreamsPublisher
     ) {
-        if (reactiveStreamsPublisher == null) {
-            throw new NullPointerException("reactiveStreamsPublisher");
-        }
+        requireNonNull(reactiveStreamsPublisher, "reactiveStreamsPublisher");
+        final Flow.Publisher<T> flowPublisher;
         if (reactiveStreamsPublisher instanceof ReactivePublisherFromFlow) {
-            return (Flow.Publisher<T>)(((ReactivePublisherFromFlow<T>)reactiveStreamsPublisher).flow);
-        }
-        if (reactiveStreamsPublisher instanceof Flow.Publisher) {
-            return (Flow.Publisher<T>)reactiveStreamsPublisher;
+            flowPublisher = (Flow.Publisher<T>)(((ReactivePublisherFromFlow<T>)reactiveStreamsPublisher).flow);
+        } else if (reactiveStreamsPublisher instanceof Flow.Publisher) {
+            flowPublisher = (Flow.Publisher<T>)reactiveStreamsPublisher;
+        } else {
+            flowPublisher = new FlowPublisherFromReactive<T>(reactiveStreamsPublisher);
         }
-        return new FlowPublisherFromReactive<T>(reactiveStreamsPublisher);
+        return flowPublisher;
     }
 
     /**
@@ -76,16 +77,16 @@ public final class FlowAdapters {
     public static <T, U> org.reactivestreams.Processor<T, U> toProcessor(
             Flow.Processor<? super T, ? extends U> flowProcessor
     ) {
-        if (flowProcessor == null) {
-            throw new NullPointerException("flowProcessor");
-        }
+        requireNonNull(flowProcessor, "flowProcessor");
+        final org.reactivestreams.Processor<T, U> processor;
         if (flowProcessor instanceof FlowToReactiveProcessor) {
-            return (org.reactivestreams.Processor<T, U>)(((FlowToReactiveProcessor<T, U>)flowProcessor).reactiveStreams);
-        }
-        if (flowProcessor instanceof org.reactivestreams.Processor) {
-            return (org.reactivestreams.Processor<T, U>)flowProcessor;
+            processor = (org.reactivestreams.Processor<T, U>)(((FlowToReactiveProcessor<T, U>)flowProcessor).reactiveStreams);
+        } else if (flowProcessor instanceof org.reactivestreams.Processor) {
+            processor = (org.reactivestreams.Processor<T, U>)flowProcessor;
+        } else {
+            processor = new ReactiveToFlowProcessor<T, U>(flowProcessor);
         }
-        return new ReactiveToFlowProcessor<T, U>(flowProcessor);
+        return processor;
     }
 
     /**
@@ -99,16 +100,16 @@ public final class FlowAdapters {
     public static <T, U> Flow.Processor<T, U> toFlowProcessor(
             org.reactivestreams.Processor<? super T, ? extends U> reactiveStreamsProcessor
         ) {
-        if (reactiveStreamsProcessor == null) {
-            throw new NullPointerException("reactiveStreamsProcessor");
-        }
+        requireNonNull(reactiveStreamsProcessor, "reactiveStreamsProcessor");
+        final Flow.Processor<T, U> flowProcessor;
         if (reactiveStreamsProcessor instanceof ReactiveToFlowProcessor) {
-            return (Flow.Processor<T, U>)(((ReactiveToFlowProcessor<T, U>)reactiveStreamsProcessor).flow);
+            flowProcessor = (Flow.Processor<T, U>)(((ReactiveToFlowProcessor<T, U>)reactiveStreamsProcessor).flow);
+        } else if (reactiveStreamsProcessor instanceof Flow.Processor) {
+            flowProcessor = (Flow.Processor<T, U>)reactiveStreamsProcessor;
+        } else {
+            flowProcessor = new FlowToReactiveProcessor<T, U>(reactiveStreamsProcessor);
         }
-        if (reactiveStreamsProcessor instanceof Flow.Processor) {
-            return (Flow.Processor<T, U>)reactiveStreamsProcessor;
-        }
-        return new FlowToReactiveProcessor<T, U>(reactiveStreamsProcessor);
+        return flowProcessor;
     }
 
     /**
@@ -119,16 +120,16 @@ public final class FlowAdapters {
      */
     @SuppressWarnings("unchecked")
     public static <T> Flow.Subscriber<T> toFlowSubscriber(org.reactivestreams.Subscriber<T> reactiveStreamsSubscriber) {
-        if (reactiveStreamsSubscriber == null) {
-            throw new NullPointerException("reactiveStreamsSubscriber");
-        }
+        requireNonNull(reactiveStreamsSubscriber, "reactiveStreamsSubscriber");
+        final Flow.Subscriber<T> flowSubscriber;
         if (reactiveStreamsSubscriber instanceof ReactiveToFlowSubscriber) {
-            return (Flow.Subscriber<T>)((ReactiveToFlowSubscriber<T>)reactiveStreamsSubscriber).flow;
+            flowSubscriber = (Flow.Subscriber<T>)((ReactiveToFlowSubscriber<T>)reactiveStreamsSubscriber).flow;
+        } else if (reactiveStreamsSubscriber instanceof Flow.Subscriber) {
+            flowSubscriber = (Flow.Subscriber<T>)reactiveStreamsSubscriber;
+        } else {
+            flowSubscriber = new FlowToReactiveSubscriber<T>(reactiveStreamsSubscriber);
         }
-        if (reactiveStreamsSubscriber instanceof Flow.Subscriber) {
-            return (Flow.Subscriber<T>)reactiveStreamsSubscriber;
-        }
-        return new FlowToReactiveSubscriber<T>(reactiveStreamsSubscriber);
+        return flowSubscriber;
     }
 
     /**
@@ -139,16 +140,16 @@ public final class FlowAdapters {
      */
     @SuppressWarnings("unchecked")
     public static <T> org.reactivestreams.Subscriber<T> toSubscriber(Flow.Subscriber<T> flowSubscriber) {
-        if (flowSubscriber == null) {
-            throw new NullPointerException("flowSubscriber");
-        }
+        requireNonNull(flowSubscriber, "flowSubscriber");
+        final org.reactivestreams.Subscriber<T> subscriber;
         if (flowSubscriber instanceof FlowToReactiveSubscriber) {
-            return (org.reactivestreams.Subscriber<T>)((FlowToReactiveSubscriber<T>)flowSubscriber).reactiveStreams;
+            subscriber = (org.reactivestreams.Subscriber<T>)((FlowToReactiveSubscriber<T>)flowSubscriber).reactiveStreams;
+        } else if (flowSubscriber instanceof org.reactivestreams.Subscriber) {
+            subscriber = (org.reactivestreams.Subscriber<T>)flowSubscriber;
+        } else {
+            subscriber = new ReactiveToFlowSubscriber<T>(flowSubscriber);
         }
-        if (flowSubscriber instanceof org.reactivestreams.Subscriber) {
-            return (org.reactivestreams.Subscriber<T>)flowSubscriber;
-        }
-        return new ReactiveToFlowSubscriber<T>(flowSubscriber);
+        return subscriber;
     }
 
     /**
@@ -200,8 +201,7 @@ public final class FlowAdapters {
      * Wraps a Reactive Streams Subscriber and forwards methods of the Flow Subscriber to it.
      * @param <T> the element type
      */
-    static final class FlowToReactiveSubscriber<T>
-            implements Flow.Subscriber<T> {
+    static final class FlowToReactiveSubscriber<T> implements Flow.Subscriber<T> {
         final org.reactivestreams.Subscriber<? super T> reactiveStreams;
 
         public FlowToReactiveSubscriber(org.reactivestreams.Subscriber<? super T> reactive) {
@@ -231,11 +231,10 @@ public final class FlowAdapters {
     }
 
     /**
-     * Wraps a Reactive Streams Subscriber and forwards methods of the Flow Subscriber to it.
+     * Wraps a Flow Subscriber and forwards methods of the Reactive Streams Subscriber to it.
      * @param <T> the element type
      */
-    static final class ReactiveToFlowSubscriber<T>
-            implements org.reactivestreams.Subscriber<T> {
+    static final class ReactiveToFlowSubscriber<T> implements org.reactivestreams.Subscriber<T> {
         final Flow.Subscriber<? super T> flow;
 
         public ReactiveToFlowSubscriber(Flow.Subscriber<? super T> flow) {
@@ -269,8 +268,7 @@ public final class FlowAdapters {
      * @param <T> the input type
      * @param <U> the output type
      */
-    static final class ReactiveToFlowProcessor<T, U>
-            implements org.reactivestreams.Processor<T, U> {
+    static final class ReactiveToFlowProcessor<T, U> implements org.reactivestreams.Processor<T, U> {
         final Flow.Processor<? super T, ? extends U> flow;
 
         public ReactiveToFlowProcessor(Flow.Processor<? super T, ? extends U> flow) {
@@ -308,8 +306,7 @@ public final class FlowAdapters {
      * @param <T> the input type
      * @param <U> the output type
      */
-    static final class FlowToReactiveProcessor<T, U>
-            implements Flow.Processor<T, U> {
+    static final class FlowToReactiveProcessor<T, U> implements Flow.Processor<T, U> {
         final org.reactivestreams.Processor<? super T, ? extends U> reactiveStreams;
 
         public FlowToReactiveProcessor(org.reactivestreams.Processor<? super T, ? extends U> reactive) {
@@ -347,7 +344,6 @@ public final class FlowAdapters {
      * @param <T> the element type
      */
     static final class ReactivePublisherFromFlow<T> implements org.reactivestreams.Publisher<T> {
-
         final Flow.Publisher<? extends T> flow;
 
         public ReactivePublisherFromFlow(Flow.Publisher<? extends T> flowPublisher) {


=====================================
build.gradle
=====================================
@@ -3,7 +3,7 @@ subprojects {
     apply plugin: "osgi"
 
     group = "org.reactivestreams"
-    version = "1.0.2"
+    version = "1.0.3"
 
     sourceCompatibility = 1.6
     targetCompatibility = 1.6
@@ -42,17 +42,17 @@ subprojects {
             instructionReplace "Bundle-Vendor", "Reactive Streams SIG"
             instructionReplace "Bundle-Description", "Reactive Streams API"
             instructionReplace "Bundle-DocURL", "http://reactive-streams.org"
-            instructionReplace "Bundle-Version", "1.0.2"
+            instructionReplace "Bundle-Version", "1.0.3"
         }
     }
 
     if (name in ["reactive-streams",
                  "reactive-streams-tck",
                  "reactive-streams-tck-flow",
-                 "reactive-streams-examples",
-                 "reactive-streams-flow-adapters"]) {
+                 "reactive-streams-examples"]) {
         apply plugin: "maven"
         apply plugin: "signing"
+        apply plugin: "maven-publish"
 
         signing {
             sign configurations.archives
@@ -68,6 +68,17 @@ subprojects {
             from javadoc
         }
 
+        publishing {
+          publications {
+            mavenJava(MavenPublication) {
+              from components.java
+            }
+          }
+          repositories {
+            mavenLocal()
+          }
+        }
+
         artifacts {
             archives sourcesJar, javadocJar
         }


=====================================
flow-adapters/.gitignore deleted
=====================================
@@ -1 +0,0 @@
-/bin
\ No newline at end of file


=====================================
flow-adapters/build.gradle deleted
=====================================
@@ -1,20 +0,0 @@
-description = 'reactive-streams-flow-adapters'
-
-dependencies {
-    compile project(':reactive-streams')
-
-    testCompile project(':reactive-streams-tck')
-    testCompile group: 'org.testng', name: 'testng', version: '5.14.10'
-}
-
-jar {
-    manifest {
-        attributes('Automatic-Module-Name': 'org.reactivestreams.flowadapters')
-    }
-}
-
-test.useTestNG()
-
-javadoc {
-    options.links("https://docs.oracle.com/javase/9/docs/api")
-}
\ No newline at end of file


=====================================
settings.gradle
=====================================
@@ -11,12 +11,12 @@ try {
     Class.forName("java.util.concurrent.Flow")
     jdkFlow = true
     println(ANSI_GREEN + "   INFO: ------------------ JDK9 classes detected ---------------------------------" + ANSI_RESET)
-    println(ANSI_GREEN + "   INFO: Java 9 Flow API found; Including [flow-adapters, tck-flow] in build.      " + ANSI_RESET)
+    println(ANSI_GREEN + "   INFO: Java 9 Flow API found; Including [tck-flow] & FlowAdapters in build.      " + ANSI_RESET)
     println(ANSI_GREEN + "   INFO: --------------------------------------------------------------------------" + ANSI_RESET)
 } catch (Throwable ex) {
     // Flow API not available
     println(ANSI_RED + "WARNING: -------------------- JDK9 classes NOT detected -----------------------------" + ANSI_RESET)
-    println(ANSI_RED + "WARNING: Java 9 Flow API not found; Not including [flow-adapters, tck-flow] in build." + ANSI_RESET)
+    println(ANSI_RED + "WARNING: Java 9 Flow API not found; Not including [tck-flow] & FlowAdapters in build." + ANSI_RESET)
     println(ANSI_RED + "WARNING: In order to execute the complete test-suite run the build using JDK9+.      " + ANSI_RESET)
     println(ANSI_RED + "WARNING: ----------------------------------------------------------------------------" + ANSI_RESET)
 }
@@ -26,7 +26,6 @@ include ':reactive-streams-tck'
 include ':reactive-streams-examples'
 
 if (jdkFlow) {
-    include ':reactive-streams-flow-adapters'
     include ':reactive-streams-tck-flow'
 }
 
@@ -34,6 +33,5 @@ project(':reactive-streams').projectDir = "$rootDir/api" as File
 project(':reactive-streams-tck').projectDir = "$rootDir/tck" as File
 project(':reactive-streams-examples').projectDir = "$rootDir/examples" as File
 if (jdkFlow) {
-    project(':reactive-streams-flow-adapters').projectDir = "$rootDir/flow-adapters" as File
     project(':reactive-streams-tck-flow').projectDir = "$rootDir/tck-flow" as File
 }


=====================================
tck-flow/README.md
=====================================
@@ -27,7 +27,7 @@ The TCK is provided as binary artifact on [Maven Central](http://search.maven.or
 <dependency>
   <groupId>org.reactivestreams</groupId>
   <artifactId>reactive-streams-tck-flow</artifactId>
-  <version>1.0.2</version>
+  <version>1.0.3</version>
   <scope>test</scope>
 </dependency>
 ```
@@ -209,18 +209,24 @@ within the TCK which await for something to happen. The other timeout is `publis
 [Rule 3.13](https://github.com/reactive-streams/reactive-streams-jvm#3.13) which defines that `Subscriber` references MUST be dropped
 by the Publisher.
 
-Note that the TCK differenciates between timeouts for "waiting for a signal" (``defaultTimeoutMillis``),
-and "asserting no signals happen during a given amount of time" (``envDefaultNoSignalsTimeoutMillis``).
-While the latter defaults to the prior, it may be useful to tweak them independently when running on continious 
-integration servers (for example, keeping the no-signals timeout significantly lower).
+Note that the TCK differentiates between timeouts for "waiting for a signal"
+(`defaultTimeoutMillis`), and "asserting no signals happen during a given amount of time"
+(`defaultNoSignalsTimeoutMillis`). While the latter defaults to the prior, it may be useful to tweak
+them independently when running on continuous integration servers (for example, keeping the
+no-signals timeout significantly lower). Another configuration option is the "poll timeout" which is
+used whenever an operation has to poll for a `defaultTimeoutMillis` for a signal to appear (most
+often errors), it can then poll and check using the `defaultPollTimeoutMillis`, for the expected
+error, rather than blocking for the full default timeout.
 
-In order to configure these timeouts (for example when running on a slow continious integtation machine), you can either:
+In order to configure these timeouts (for example when running on a slow continuous integration
+machine), you can either:
 
 **Use env variables** to set these timeouts, in which case the you can do:
 
 ```bash
 export DEFAULT_TIMEOUT_MILLIS=100
 export DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS=100
+export DEFAULT_POLL_TIMEOUT_MILLIS=20
 export PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS=300
 ```
 
@@ -231,10 +237,11 @@ public class RangePublisherTest extends FlowPublisherVerification<Integer> {
 
   public static final long DEFAULT_TIMEOUT_MILLIS = 100L;
   public static final long DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS = DEFAULT_TIMEOUT_MILLIS;
-  public static final long PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS = 500L;
+  public static final long DEFAULT_POLL_TIMEOUT_MILLIS = 20L;
+  public static final long PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS = 300L;
 
   public RangePublisherTest() {
-    super(new TestEnvironment(DEFAULT_TIMEOUT_MILLIS, DEFAULT_TIMEOUT_MILLIS), PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS);
+    super(new TestEnvironment(DEFAULT_TIMEOUT_MILLIS, DEFAULT_TIMEOUT_MILLIS, DEFAULT_POLL_TIMEOUT_MILLIS), PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS);
   }
 
   // ...


=====================================
tck-flow/build.gradle
=====================================
@@ -2,7 +2,6 @@ description = 'reactive-streams-tck-flow'
 dependencies {
     compile group: 'org.testng', name: 'testng', version:'5.14.10'
     compile project(':reactive-streams-tck')
-    compile project(':reactive-streams-flow-adapters')
 }
 
 jar {


=====================================
tck-flow/src/main/java/org/reactivestreams/tck/flow/IdentityFlowProcessorVerification.java
=====================================
@@ -60,7 +60,9 @@ public abstract class IdentityFlowProcessorVerification<T> extends IdentityProce
 
   @Override
   public final Publisher<T> createFailedPublisher() {
-    return FlowAdapters.toPublisher(createFailedFlowPublisher());
+    Flow.Publisher<T> failed = createFailedFlowPublisher();
+    if (failed == null) return null; // because `null` means "SKIP" in createFailedPublisher
+    else return FlowAdapters.toPublisher(failed);
   }
 
 }


=====================================
flow-adapters/src/test/java/org/reactivestreams/FlowAdaptersTest.java → tck-flow/src/test/java/org/reactivestreams/FlowAdaptersTest.java
=====================================


=====================================
flow-adapters/src/test/java/org/reactivestreams/MulticastPublisher.java → tck-flow/src/test/java/org/reactivestreams/MulticastPublisher.java
=====================================


=====================================
flow-adapters/src/test/java/org/reactivestreams/SubmissionPublisherTckTest.java → tck-flow/src/test/java/org/reactivestreams/SubmissionPublisherTckTest.java
=====================================


=====================================
flow-adapters/src/test/java/org/reactivestreams/TestEitherConsumer.java → tck-flow/src/test/java/org/reactivestreams/TestEitherConsumer.java
=====================================


=====================================
tck/README.md
=====================================
@@ -27,7 +27,7 @@ The TCK is provided as binary artifact on [Maven Central](http://search.maven.or
 <dependency>
   <groupId>org.reactivestreams</groupId>
   <artifactId>reactive-streams-tck</artifactId>
-  <version>1.0.2</version>
+  <version>1.0.3</version>
   <scope>test</scope>
 </dependency>
 ```
@@ -209,18 +209,24 @@ within the TCK which await for something to happen. The other timeout is `publis
 [Rule 3.13](https://github.com/reactive-streams/reactive-streams-jvm#3.13) which defines that `Subscriber` references MUST be dropped
 by the Publisher.
 
-Note that the TCK differenciates between timeouts for "waiting for a signal" (``defaultTimeoutMillis``),
-and "asserting no signals happen during a given amount of time" (``envDefaultNoSignalsTimeoutMillis``).
-While the latter defaults to the prior, it may be useful to tweak them independently when running on continious 
-integration servers (for example, keeping the no-signals timeout significantly lower).
+Note that the TCK differentiates between timeouts for "waiting for a signal"
+(`defaultTimeoutMillis`), and "asserting no signals happen during a given amount of time"
+(`defaultNoSignalsTimeoutMillis`). While the latter defaults to the prior, it may be useful to tweak
+them independently when running on continuous integration servers (for example, keeping the
+no-signals timeout significantly lower). Another configuration option is the "poll timeout" which is
+used whenever an operation has to poll for a `defaultTimeoutMillis` for a signal to appear (most
+often errors), it can then poll and check using the `defaultPollTimeoutMillis`, for the expected
+error, rather than blocking for the full default timeout.
 
-In order to configure these timeouts (for example when running on a slow continious integtation machine), you can either:
+In order to configure these timeouts (for example when running on a slow continuous integration
+machine), you can either:
 
 **Use env variables** to set these timeouts, in which case the you can do:
 
 ```bash
 export DEFAULT_TIMEOUT_MILLIS=100
 export DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS=100
+export DEFAULT_POLL_TIMEOUT_MILLIS=20
 export PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS=300
 ```
 
@@ -231,10 +237,11 @@ public class RangePublisherTest extends PublisherVerification<Integer> {
 
   public static final long DEFAULT_TIMEOUT_MILLIS = 100L;
   public static final long DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS = DEFAULT_TIMEOUT_MILLIS;
-  public static final long PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS = 500L;
+  public static final long DEFAULT_POLL_TIMEOUT_MILLIS = 20L;
+  public static final long PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS = 300L;
 
   public RangePublisherTest() {
-    super(new TestEnvironment(DEFAULT_TIMEOUT_MILLIS, DEFAULT_TIMEOUT_MILLIS), PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS);
+    super(new TestEnvironment(DEFAULT_TIMEOUT_MILLIS, DEFAULT_TIMEOUT_MILLIS, DEFAULT_POLL_TIMEOUT_MILLIS), PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS);
   }
 
   // ...


=====================================
tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java
=====================================
@@ -172,7 +172,9 @@ public abstract class IdentityProcessorVerification<T> extends WithHelperPublish
 
   /**
    * Override and return {@code true} in order to skip executing tests marked as {@code Stochastic}.
-   * Such tests MAY sometimes fail even though the impl
+   * Stochastic in this case means that the Rule is impossible or infeasible to deterministically verify—
+   * usually this means that this test case can yield false positives ("be green") even if for some case,
+   * the given implementation may violate the tested behaviour.
    */
   public boolean skipStochasticTests() {
     return false;
@@ -502,7 +504,9 @@ public abstract class IdentityProcessorVerification<T> extends WithHelperPublish
 
           @Override
           public void onSubscribe(final Subscription subscription) {
-            env.debug(String.format("whiteboxSubscriber::onSubscribe(%s)", subscription));
+            if (env.debugEnabled()) {
+              env.debug(String.format("whiteboxSubscriber::onSubscribe(%s)", subscription));
+            }
             if (subs.isCompleted()) subscription.cancel(); // the Probe must also pass subscriber verification
 
             probe.registerOnSubscribe(new SubscriberWhiteboxVerification.SubscriberPuppet() {
@@ -521,19 +525,25 @@ public abstract class IdentityProcessorVerification<T> extends WithHelperPublish
 
           @Override
           public void onNext(T element) {
-            env.debug(String.format("whiteboxSubscriber::onNext(%s)", element));
+            if (env.debugEnabled()) {
+              env.debug(String.format("whiteboxSubscriber::onNext(%s)", element));
+            }
             probe.registerOnNext(element);
           }
 
           @Override
           public void onComplete() {
-            env.debug("whiteboxSubscriber::onComplete()");
+            if (env.debugEnabled()) {
+              env.debug("whiteboxSubscriber::onComplete()");
+            }
             probe.registerOnComplete();
           }
 
           @Override
           public void onError(Throwable cause) {
-            env.debug(String.format("whiteboxSubscriber::onError(%s)", cause));
+            if (env.debugEnabled()) {
+              env.debug(String.format("whiteboxSubscriber::onError(%s)", cause));
+            }
             probe.registerOnError(cause);
           }
         });


=====================================
tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java
=====================================
@@ -772,7 +772,9 @@ public abstract class PublisherVerification<T> implements PublisherVerificationR
 
             signalsReceived += 1;
             stackDepthCounter.set(stackDepthCounter.get() + 1);
-            env.debug(String.format("%s(recursion depth: %d)::onNext(%s)", this, stackDepthCounter.get(), element));
+            if (env.debugEnabled()) {
+              env.debug(String.format("%s(recursion depth: %d)::onNext(%s)", this, stackDepthCounter.get(), element));
+            }
 
             final long callsUntilNow = stackDepthCounter.get();
             if (callsUntilNow > boundedDepthOfOnNextAndRequestRecursion()) {
@@ -1082,7 +1084,9 @@ public abstract class PublisherVerification<T> implements PublisherVerificationR
 
           @Override
           public void onNext(T element) {
-            env.debug(String.format("%s::onNext(%s)", this, element));
+            if (env.debugEnabled()) {
+              env.debug(String.format("%s::onNext(%s)", this, element));
+            }
             if (subscription.isCompleted()) {
               if (callsCounter > 0) {
                 subscription.value().request(Long.MAX_VALUE - 1);


=====================================
tck/src/main/java/org/reactivestreams/tck/SubscriberWhiteboxVerification.java
=====================================
@@ -266,7 +266,9 @@ public abstract class SubscriberWhiteboxVerification<T> extends WithHelperPublis
       @Override
       public void run(WhiteboxTestStage stage) throws InterruptedException {
         stage.puppet().triggerRequest(1);
+        stage.expectRequest();
         stage.puppet().signalCancel();
+        stage.expectCancelling();
         stage.signalNext();
 
         stage.puppet().triggerRequest(1);
@@ -437,7 +439,12 @@ public abstract class SubscriberWhiteboxVerification<T> extends WithHelperPublis
       @Override
       public void run(WhiteboxTestStage stage) throws InterruptedException {
         stage.puppet().triggerRequest(2);
+        long requestedElements = stage.expectRequest();
         stage.probe.expectNext(stage.signalNext());
+        // Some subscribers may only request one element at a time.
+        if (requestedElements < 2) {
+          stage.expectRequest();
+        }
         stage.probe.expectNext(stage.signalNext());
 
         stage.probe.expectNone();
@@ -789,13 +796,34 @@ public abstract class SubscriberWhiteboxVerification<T> extends WithHelperPublis
    * make implement these calls.
    */
   public interface SubscriberPuppet {
+
     /**
-     * Trigger {@code request(elements)} on your {@link Subscriber}
+     * Ensure that at least {@code elements} are eventually requested by your {@link Subscriber}, if it hasn't already
+     * requested that many elements.
+     * <p>
+     * This does not necessarily have to correlate 1:1 with a {@code Subscription.request(elements)} call, but the sum
+     * of the elements requested by your {@code Subscriber} must eventually be at least the the sum of the elements
+     * triggered to be requested by all the invocations of this method.
+     * <p>
+     * Additionally, subscribers are permitted to delay requesting elements until previous requests for elements have
+     * been fulfilled. For example, a subscriber that only requests one element at a time may fulfill the request made
+     * by this method by requesting one element {@code elements} times, waiting for each element to arrive before the
+     * next request is made.
+     * <p>
+     * Before sending any element to the subscriber, the TCK must wait for the subscriber to request that element, and
+     * must be prepared for the subscriber to only request one element at a time, it is not enough for the TCK to
+     * simply invoke this method before sending elements.
+     * <p>
+     * An invocation of {@link #signalCancel()} may be coalesced into any elements that have not yet been requested,
+     * such that only a cancel signal is emitted.
      */
     void triggerRequest(long elements);
 
     /**
-     * Trigger {@code cancel()} on your {@link Subscriber}
+     * Trigger {@code cancel()} on your {@link Subscriber}.
+     * <p>
+     * An invocation of this method may be coalesced into any outstanding requests, as requested by
+     * {@link #triggerRequest(long)}, such that only a cancel signal is emitted.
      */
     void signalCancel();
   }


=====================================
tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java
=====================================
@@ -14,8 +14,8 @@ package org.reactivestreams.tck;
 import org.reactivestreams.Publisher;
 import org.reactivestreams.Subscriber;
 import org.reactivestreams.Subscription;
-import org.reactivestreams.tck.flow.support.SubscriberBufferOverflowException;
 import org.reactivestreams.tck.flow.support.Optional;
+import org.reactivestreams.tck.flow.support.SubscriberBufferOverflowException;
 
 import java.util.Collections;
 import java.util.LinkedList;
@@ -24,7 +24,10 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
@@ -35,8 +38,10 @@ public class TestEnvironment {
   private static final long DEFAULT_TIMEOUT_MILLIS = 100;
 
   private static final String DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS_ENV = "DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS";
+  private static final String DEFAULT_POLL_TIMEOUT_MILLIS_ENV = "DEFAULT_POLL_TIMEOUT_MILLIS_ENV";
 
   private final long defaultTimeoutMillis;
+  private final long defaultPollTimeoutMillis;
   private final long defaultNoSignalsTimeoutMillis;
   private final boolean printlnDebug;
 
@@ -49,14 +54,46 @@ public class TestEnvironment {
    * run the tests.
    * @param defaultTimeoutMillis default timeout to be used in all expect* methods
    * @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore
+   * @param defaultPollTimeoutMillis default amount of time to poll for events if {@code defaultTimeoutMillis} isn't
+    *                                preempted by an asynchronous event.
    * @param printlnDebug         if true, signals such as OnNext / Request / OnComplete etc will be printed to standard output,
    */
-  public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMillis, boolean printlnDebug) {
+  public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMillis, long defaultPollTimeoutMillis,
+                         boolean printlnDebug) {
     this.defaultTimeoutMillis = defaultTimeoutMillis;
+    this.defaultPollTimeoutMillis = defaultPollTimeoutMillis;
     this.defaultNoSignalsTimeoutMillis = defaultNoSignalsTimeoutMillis;
     this.printlnDebug = printlnDebug;
   }
 
+  /**
+   * Tests must specify the timeout for expected outcome of asynchronous
+   * interactions. Longer timeout does not invalidate the correctness of
+   * the implementation, but can in some cases result in longer time to
+   * run the tests.
+   * @param defaultTimeoutMillis default timeout to be used in all expect* methods
+   * @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore
+   * @param printlnDebug         if true, signals such as OnNext / Request / OnComplete etc will be printed to standard output,
+   */
+  public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMillis, boolean printlnDebug) {
+    this(defaultTimeoutMillis, defaultNoSignalsTimeoutMillis, defaultTimeoutMillis, printlnDebug);
+  }
+
+  /**
+   * Tests must specify the timeout for expected outcome of asynchronous
+   * interactions. Longer timeout does not invalidate the correctness of
+   * the implementation, but can in some cases result in longer time to
+   * run the tests.
+   *
+   * @param defaultTimeoutMillis default timeout to be used in all expect* methods
+   * @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore
+   * @param defaultPollTimeoutMillis default amount of time to poll for events if {@code defaultTimeoutMillis} isn't
+   *                                 preempted by an asynchronous event.
+   */
+  public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMillis, long defaultPollTimeoutMillis) {
+      this(defaultTimeoutMillis, defaultNoSignalsTimeoutMillis, defaultPollTimeoutMillis, false);
+  }
+
   /**
    * Tests must specify the timeout for expected outcome of asynchronous
    * interactions. Longer timeout does not invalidate the correctness of
@@ -67,7 +104,7 @@ public class TestEnvironment {
    * @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore
    */
   public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMillis) {
-    this(defaultTimeoutMillis, defaultNoSignalsTimeoutMillis, false);
+    this(defaultTimeoutMillis, defaultTimeoutMillis, defaultNoSignalsTimeoutMillis);
   }
 
   /**
@@ -79,7 +116,7 @@ public class TestEnvironment {
    * @param defaultTimeoutMillis default timeout to be used in all expect* methods
    */
   public TestEnvironment(long defaultTimeoutMillis) {
-    this(defaultTimeoutMillis, defaultTimeoutMillis, false);
+    this(defaultTimeoutMillis, defaultTimeoutMillis, defaultTimeoutMillis);
   }
 
   /**
@@ -95,7 +132,7 @@ public class TestEnvironment {
    *                     often helpful to pinpoint simple race conditions etc.
    */
   public TestEnvironment(boolean printlnDebug) {
-    this(envDefaultTimeoutMillis(), envDefaultNoSignalsTimeoutMillis(), printlnDebug);
+    this(envDefaultTimeoutMillis(), envDefaultNoSignalsTimeoutMillis(), envDefaultPollTimeoutMillis(), printlnDebug);
   }
 
   /**
@@ -124,6 +161,14 @@ public class TestEnvironment {
     return defaultNoSignalsTimeoutMillis;
   }
 
+  /**
+   * The default amount of time to poll for events if {@code defaultTimeoutMillis} isn't preempted by an asynchronous
+   * event.
+   */
+  public long defaultPollTimeoutMillis() {
+    return defaultPollTimeoutMillis;
+  }
+
   /**
    * Tries to parse the env variable {@code DEFAULT_TIMEOUT_MILLIS} as long and returns the value if present OR its default value.
    *
@@ -154,6 +199,21 @@ public class TestEnvironment {
     }
   }
 
+  /**
+   * Tries to parse the env variable {@code DEFAULT_POLL_TIMEOUT_MILLIS_ENV} as long and returns the value if present OR its default value.
+   *
+   * @throws java.lang.IllegalArgumentException when unable to parse the env variable
+   */
+  public static long envDefaultPollTimeoutMillis() {
+    final String envMillis = System.getenv(DEFAULT_POLL_TIMEOUT_MILLIS_ENV);
+    if (envMillis == null) return envDefaultTimeoutMillis();
+    else try {
+      return Long.parseLong(envMillis);
+    } catch (NumberFormatException ex) {
+      throw new IllegalArgumentException(String.format("Unable to parse %s env value [%s] as long!", DEFAULT_POLL_TIMEOUT_MILLIS_ENV, envMillis), ex);
+    }
+  }
+
   /**
    * To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously.
    * This method does *NOT* fail the test - it's up to inspections of the error to fail the test if required.
@@ -191,7 +251,7 @@ public class TestEnvironment {
       asyncErrors.add(thr);
     }
   }
-  
+
   /**
    * To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously.
    * This method does *NOT* fail the test - it's up to inspections of the error to fail the test if required.
@@ -275,7 +335,7 @@ public class TestEnvironment {
   }
 
   /**
-   * Waits for {@link TestEnvironment#defaultTimeoutMillis()} and then verifies that no asynchronous errors
+   * Waits for {@link TestEnvironment#defaultNoSignalsTimeoutMillis()} and then verifies that no asynchronous errors
    * were signalled pior to, or during that time (by calling {@code flop()}).
    */
   public void verifyNoAsyncErrors() {
@@ -318,8 +378,13 @@ public class TestEnvironment {
 
   /** If {@code TestEnvironment#printlnDebug} is true, print debug message to std out. */
   public void debug(String msg) {
-    if (printlnDebug)
+    if (debugEnabled()) {
       System.out.printf("[TCK-DEBUG] %s%n", msg);
+    }
+  }
+
+  public final boolean debugEnabled() {
+    return printlnDebug;
   }
 
   /**
@@ -512,26 +577,32 @@ public class TestEnvironment {
     }
 
     public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, String requiredMessagePart) throws Exception {
-      expectErrorWithMessage(expected, requiredMessagePart, env.defaultTimeoutMillis());
+      expectErrorWithMessage(expected, Collections.singletonList(requiredMessagePart), env.defaultTimeoutMillis(), env.defaultPollTimeoutMillis());
     }
     public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, List<String> requiredMessagePartAlternatives) throws Exception {
-      expectErrorWithMessage(expected, requiredMessagePartAlternatives, env.defaultTimeoutMillis());
+      expectErrorWithMessage(expected, requiredMessagePartAlternatives, env.defaultTimeoutMillis(), env.defaultPollTimeoutMillis());
     }
 
     @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
     public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, String requiredMessagePart, long timeoutMillis) throws Exception {
       expectErrorWithMessage(expected, Collections.singletonList(requiredMessagePart), timeoutMillis);
     }
+
     public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, List<String> requiredMessagePartAlternatives, long timeoutMillis) throws Exception {
-      final E err = expectError(expected, timeoutMillis);
+      expectErrorWithMessage(expected, requiredMessagePartAlternatives, timeoutMillis, timeoutMillis);
+    }
+
+    public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, List<String> requiredMessagePartAlternatives,
+                                                             long totalTimeoutMillis, long pollTimeoutMillis) throws Exception {
+      final E err = expectError(expected, totalTimeoutMillis, pollTimeoutMillis);
       final String message = err.getMessage();
-      
+
       boolean contains = false;
-      for (String requiredMessagePart : requiredMessagePartAlternatives) 
+      for (String requiredMessagePart : requiredMessagePartAlternatives)
         if (message.contains(requiredMessagePart)) contains = true; // not short-circuting loop, it is expected to
       assertTrue(contains,
-                 String.format("Got expected exception [%s] but missing message part [%s], was: %s",
-                               err.getClass(), "anyOf: " + requiredMessagePartAlternatives, err.getMessage()));
+              String.format("Got expected exception [%s] but missing message part [%s], was: %s",
+                      err.getClass(), "anyOf: " + requiredMessagePartAlternatives, err.getMessage()));
     }
 
     public <E extends Throwable> E expectError(Class<E> expected) throws Exception {
@@ -539,7 +610,7 @@ public class TestEnvironment {
     }
 
     public <E extends Throwable> E expectError(Class<E> expected, long timeoutMillis) throws Exception {
-      return expectError(expected, timeoutMillis, String.format("Expected onError(%s)", expected.getName()));
+      return expectError(expected, timeoutMillis, env.defaultPollTimeoutMillis());
     }
 
     public <E extends Throwable> E expectError(Class<E> expected, String errorMsg) throws Exception {
@@ -547,7 +618,16 @@ public class TestEnvironment {
     }
 
     public <E extends Throwable> E expectError(Class<E> expected, long timeoutMillis, String errorMsg) throws Exception {
-      return received.expectError(expected, timeoutMillis, errorMsg);
+      return expectError(expected, timeoutMillis, env.defaultPollTimeoutMillis(), errorMsg);
+    }
+
+    public <E extends Throwable> E expectError(Class<E> expected, long totalTimeoutMillis, long pollTimeoutMillis) throws Exception {
+      return expectError(expected, totalTimeoutMillis, pollTimeoutMillis, String.format("Expected onError(%s)", expected.getName()));
+    }
+
+    public <E extends Throwable> E expectError(Class<E> expected, long totalTimeoutMillis, long pollTimeoutMillis,
+                                               String errorMsg) throws Exception {
+      return received.expectError(expected, totalTimeoutMillis, pollTimeoutMillis, errorMsg);
     }
 
     public void expectNone() throws InterruptedException {
@@ -576,7 +656,9 @@ public class TestEnvironment {
 
     @Override
     public void onNext(T element) {
-      env.debug(String.format("%s::onNext(%s)", this, element));
+      if (env.debugEnabled()) {
+        env.debug(String.format("%s::onNext(%s)", this, element));
+      }
       if (subscription.isCompleted()) {
         super.onNext(element);
       } else {
@@ -586,7 +668,9 @@ public class TestEnvironment {
 
     @Override
     public void onComplete() {
-      env.debug(this + "::onComplete()");
+      if (env.debugEnabled()) {
+        env.debug(this + "::onComplete()");
+      }
       if (subscription.isCompleted()) {
         super.onComplete();
       } else {
@@ -596,7 +680,9 @@ public class TestEnvironment {
 
     @Override
     public void onSubscribe(Subscription s) {
-      env.debug(String.format("%s::onSubscribe(%s)", this, s));
+      if (env.debugEnabled()) {
+        env.debug(String.format("%s::onSubscribe(%s)", this, s));
+      }
       if (!subscription.isCompleted()) {
         subscription.complete(s);
       } else {
@@ -606,7 +692,9 @@ public class TestEnvironment {
 
     @Override
     public void onError(Throwable cause) {
-      env.debug(String.format("%s::onError(%s)", this, cause));
+      if (env.debugEnabled()) {
+        env.debug(String.format("%s::onError(%s)", this, cause));
+      }
       if (subscription.isCompleted()) {
         super.onError(cause);
       } else {
@@ -629,7 +717,9 @@ public class TestEnvironment {
 
     @Override
     public void onNext(T element) {
-      env.debug(String.format("%s::onNext(%s)", this, element));
+      if (env.debugEnabled()) {
+        env.debug(String.format("%s::onNext(%s)", this, element));
+      }
       if (!subscription.isCompleted()) {
         env.flop(String.format("Subscriber::onNext(%s) called before Subscriber::onSubscribe", element));
       }
@@ -801,7 +891,7 @@ public class TestEnvironment {
     public void expectCancelling(long timeoutMillis) throws InterruptedException {
       cancelled.expectClose(timeoutMillis, "Did not receive expected cancelling of upstream subscription");
     }
-    
+
     public boolean isCancelled() throws InterruptedException {
       return cancelled.isClosed();
     }
@@ -882,11 +972,12 @@ public class TestEnvironment {
     }
 
     private ArrayBlockingQueue<T> abq = new ArrayBlockingQueue<T>(1);
-    private volatile T _value = null;
+    private AtomicReference<T> _value = new AtomicReference<T>();
 
     public T value() {
-      if (isCompleted()) {
-        return _value;
+      final T value = _value.get();
+      if (value != null) {
+        return value;
       } else {
         env.flop("Cannot access promise value before completion");
         return null;
@@ -894,22 +985,28 @@ public class TestEnvironment {
     }
 
     public boolean isCompleted() {
-      return _value != null;
+      return _value.get() != null;
     }
 
     /**
      * Allows using expectCompletion to await for completion of the value and complete it _then_
      */
     public void complete(T value) {
-      abq.add(value);
+      if (_value.compareAndSet(null, value)) {
+        // we add the value to the queue such to wake up any expectCompletion which was triggered before complete() was called
+        abq.add(value);
+      } else {
+        env.flop(String.format("Cannot complete a promise more than once! Present value: %s, attempted to set: %s", _value.get(), value));
+      }
     }
 
     /**
-     * Completes the promise right away, it is not possible to expectCompletion on a Promise completed this way
+     * Same as complete.
+     *
+     * Keeping this method for binary compatibility.
      */
     public void completeImmediatly(T value) {
-      complete(value); // complete!
-      _value = value;  // immediatly!
+      complete(value);
     }
 
     public void expectCompletion(long timeoutMillis, String errorMsg) throws InterruptedException {
@@ -918,8 +1015,6 @@ public class TestEnvironment {
 
         if (val == null) {
           env.flop(String.format("%s within %d ms", errorMsg, timeoutMillis));
-        } else {
-          _value = val;
         }
       }
     }
@@ -1003,22 +1098,44 @@ public class TestEnvironment {
       } // else, ok
     }
 
-    @SuppressWarnings("unchecked")
+    /**
+     * @deprecated Deprecated in favor of {@link #expectError(Class, long, long, String)}.
+     */
+    @Deprecated
     public <E extends Throwable> E expectError(Class<E> clazz, long timeoutMillis, String errorMsg) throws Exception {
-      Thread.sleep(timeoutMillis);
-
-      if (env.asyncErrors.isEmpty()) {
-        return env.flopAndFail(String.format("%s within %d ms", errorMsg, timeoutMillis));
-      } else {
-        // ok, there was an expected error
-        Throwable thrown = env.asyncErrors.remove(0);
+      return expectError(clazz, timeoutMillis, timeoutMillis, errorMsg);
+    }
 
-        if (clazz.isInstance(thrown)) {
-          return (E) thrown;
+    @SuppressWarnings("unchecked")
+    final <E extends Throwable> E expectError(Class<E> clazz, final long totalTimeoutMillis,
+                                              long pollTimeoutMillis,
+                                              String errorMsg) throws Exception {
+      long totalTimeoutRemainingNs = MILLISECONDS.toNanos(totalTimeoutMillis);
+      long timeStampANs = System.nanoTime();
+      long timeStampBNs;
+
+      for (;;) {
+        Thread.sleep(Math.min(pollTimeoutMillis, NANOSECONDS.toMillis(totalTimeoutRemainingNs)));
+
+        if (env.asyncErrors.isEmpty()) {
+          timeStampBNs = System.nanoTime();
+          totalTimeoutRemainingNs =- timeStampBNs - timeStampANs;
+          timeStampANs = timeStampBNs;
+
+          if (totalTimeoutRemainingNs <= 0) {
+            return env.flopAndFail(String.format("%s within %d ms", errorMsg, totalTimeoutMillis));
+          }
         } else {
+          // ok, there was an expected error
+          Throwable thrown = env.asyncErrors.remove(0);
 
-          return env.flopAndFail(String.format("%s within %d ms; Got %s but expected %s",
-                                               errorMsg, timeoutMillis, thrown.getClass().getCanonicalName(), clazz.getCanonicalName()));
+          if (clazz.isInstance(thrown)) {
+            return (E) thrown;
+          } else {
+
+            return env.flopAndFail(String.format("%s within %d ms; Got %s but expected %s",
+                    errorMsg, totalTimeoutMillis, thrown.getClass().getCanonicalName(), clazz.getCanonicalName()));
+          }
         }
       }
     }


=====================================
tck/src/test/java/org/reactivestreams/tck/PublisherVerificationTest.java
=====================================
@@ -228,6 +228,39 @@ public class PublisherVerificationTest extends TCKVerificationSupport {
     }, "Subscriber::onComplete() called before Subscriber::onSubscribe");
   }
 
+  @Test
+  public void optional_spec105_emptyStreamMustTerminateBySignallingOnComplete_shouldPass_whenOnCompleteSynchronouslyInovked() throws Throwable {
+    final Publisher<Integer> synchronousOnCompletePublisher = new Publisher<Integer>() {
+      @Override public void subscribe(final Subscriber<? super Integer> s) {
+        s.onSubscribe(new Subscription() {
+          @Override public void request(long n) { }
+          @Override public void cancel() { }
+        });
+        s.onComplete();
+      }
+    };
+
+    requireOptionalTestPass(new ThrowingRunnable() {
+      @Override public void run() throws Throwable {
+        PublisherVerification<Integer> verification = new PublisherVerification<Integer>(newTestEnvironment()) {
+          @Override public Publisher<Integer> createPublisher(long elements) {
+            return synchronousOnCompletePublisher;
+          }
+
+          @Override public long maxElementsFromPublisher() {
+            return 0; // it is an "empty" Publisher
+          }
+
+          @Override public Publisher<Integer> createFailedPublisher() {
+            return null;
+          }
+        };
+
+        verification.optional_spec105_emptyStreamMustTerminateBySignallingOnComplete();
+      }
+    });
+  }
+
   @Test
   public void required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled_shouldFailForNotCompletingPublisher() throws Throwable {
     requireTestFailure(new ThrowingRunnable() {


=====================================
tck/src/test/java/org/reactivestreams/tck/SubscriberWhiteboxVerificationTest.java
=====================================
@@ -23,7 +23,10 @@ import org.testng.annotations.Test;
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Validates that the TCK's {@link SubscriberWhiteboxVerification} fails with nice human readable errors.
@@ -31,8 +34,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
  */
 public class SubscriberWhiteboxVerificationTest extends TCKVerificationSupport {
 
-  private ExecutorService ex;
-  @BeforeClass void before() { ex = Executors.newFixedThreadPool(4); }
+  private ScheduledExecutorService ex;
+  @BeforeClass void before() { ex = Executors.newScheduledThreadPool(4); }
   @AfterClass void after() { if (ex != null) ex.shutdown(); }
 
   @Test
@@ -217,6 +220,51 @@ public class SubscriberWhiteboxVerificationTest extends TCKVerificationSupport {
     }, "But I thought it's cancelled!");
   }
 
+  @Test
+  public void required_spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel_shouldWaitForDemandBeforeSignalling() throws Throwable {
+    customSubscriberVerification(new Function<WhiteboxSubscriberProbe<Integer>, Subscriber<Integer>>() {
+      @Override
+      public Subscriber<Integer> apply(WhiteboxSubscriberProbe<Integer> probe) throws Throwable {
+
+        final AtomicBoolean demandRequested = new AtomicBoolean(false);
+
+        return new SimpleSubscriberWithProbe(probe) {
+          @Override public void onSubscribe(final Subscription s) {
+            this.subscription = s;
+            probe.registerOnSubscribe(new SubscriberPuppet() {
+              @Override public void triggerRequest(final long elements) {
+                ex.schedule(new Runnable() {
+                  @Override
+                  public void run() {
+                    demandRequested.set(true);
+                    subscription.request(elements);
+                  }
+                }, TestEnvironment.envDefaultTimeoutMillis() / 2, TimeUnit.MILLISECONDS);
+              }
+
+              @Override public void signalCancel() {
+                // Delay this too to ensure that cancel isn't invoked before request.
+                ex.schedule(new Runnable() {
+                  @Override
+                  public void run() {
+                    subscription.cancel();
+                  }
+                }, TestEnvironment.envDefaultTimeoutMillis() / 2, TimeUnit.MILLISECONDS);
+              }
+            });
+          }
+
+          @Override public void onNext(Integer element) {
+            if (!demandRequested.get()) {
+              throw new RuntimeException("onNext signalled without demand!");
+            }
+            probe.registerOnNext(element);
+          }
+        };
+      }
+    }).required_spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel();
+  }
+
   @Test
   public void required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall_shouldFail() throws Throwable {
     requireTestFailure(new ThrowingRunnable() {
@@ -299,7 +347,7 @@ public class SubscriberWhiteboxVerificationTest extends TCKVerificationSupport {
   }
   
   @Test
-  public void required_spec308_requestMustRegisterGivenNumberElementsToBeProduced_shouldFail() throws Throwable {
+  public void required_spec308_requestMustRegisterGivenNumberElementsToBeProduced_shouldPass() throws Throwable {
     // sanity checks the "happy path", that triggerRequest() propagates the right demand
     customSubscriberVerification(new Function<WhiteboxSubscriberProbe<Integer>, Subscriber<Integer>>() {
       @Override
@@ -309,6 +357,114 @@ public class SubscriberWhiteboxVerificationTest extends TCKVerificationSupport {
     }).required_spec308_requestMustRegisterGivenNumberElementsToBeProduced();
   }
 
+  @Test
+  public void required_spec308_requestMustRegisterGivenNumberElementsToBeProduced_shouldWaitForDemandBeforeSignalling() throws Throwable {
+    customSubscriberVerification(new Function<WhiteboxSubscriberProbe<Integer>, Subscriber<Integer>>() {
+      @Override
+      public Subscriber<Integer> apply(WhiteboxSubscriberProbe<Integer> probe) throws Throwable {
+
+        final AtomicBoolean demandRequested = new AtomicBoolean(false);
+        return new SimpleSubscriberWithProbe(probe) {
+          @Override
+          public void onSubscribe(Subscription s) {
+            this.subscription = s;
+            probe.registerOnSubscribe(new SubscriberPuppet() {
+              @Override
+              public void triggerRequest(final long elements) {
+                ex.schedule(new Runnable() {
+                  @Override
+                  public void run() {
+                    demandRequested.set(true);
+                    subscription.request(elements);
+                  }
+                }, TestEnvironment.envDefaultTimeoutMillis() / 2, TimeUnit.MILLISECONDS);
+              }
+
+              @Override
+              public void signalCancel() {
+                // Delay this too to ensure that cancel isn't invoked before request.
+                ex.schedule(new Runnable() {
+                  @Override
+                  public void run() {
+                    subscription.cancel();
+                  }
+                }, TestEnvironment.envDefaultTimeoutMillis() / 2, TimeUnit.MILLISECONDS);
+              }
+            });
+          }
+
+          @Override
+          public void onNext(Integer element) {
+            if (!demandRequested.get()) {
+              throw new RuntimeException("onNext signalled without demand!");
+            }
+            probe.registerOnNext(element);
+          }
+        };
+      }
+    }).required_spec308_requestMustRegisterGivenNumberElementsToBeProduced();
+  }
+
+  @Test
+  public void required_spec308_requestMustRegisterGivenNumberElementsToBeProduced_shouldWaitForDemandTwiceForOneAtATimeSubscribers() throws Throwable {
+    customSubscriberVerification(new Function<WhiteboxSubscriberProbe<Integer>, Subscriber<Integer>>() {
+      @Override
+      public Subscriber<Integer> apply(WhiteboxSubscriberProbe<Integer> probe) throws Throwable {
+
+        final AtomicLong outstandingRequest = new AtomicLong(0);
+        final AtomicBoolean demandRequested = new AtomicBoolean();
+        return new SimpleSubscriberWithProbe(probe) {
+          @Override
+          public void onSubscribe(Subscription s) {
+            this.subscription = s;
+            probe.registerOnSubscribe(new SubscriberPuppet() {
+              @Override
+              public void triggerRequest(final long elements) {
+                outstandingRequest.getAndAdd(elements);
+                ex.schedule(new Runnable() {
+                  @Override
+                  public void run() {
+                    demandRequested.set(true);
+                    subscription.request(1);
+                  }
+                }, TestEnvironment.envDefaultTimeoutMillis() / 2, TimeUnit.MILLISECONDS);
+              }
+
+              @Override
+              public void signalCancel() {
+                // Delay this too to ensure that cancel isn't invoked before request.
+                ex.schedule(new Runnable() {
+                  @Override
+                  public void run() {
+                    subscription.cancel();
+                  }
+                }, TestEnvironment.envDefaultTimeoutMillis() / 2, TimeUnit.MILLISECONDS);
+              }
+            });
+          }
+
+          @Override
+          public void onNext(Integer element) {
+            if (!demandRequested.getAndSet(false)) {
+              throw new RuntimeException("onNext signalled without demand!");
+            }
+            if (outstandingRequest.decrementAndGet() > 0) {
+              ex.schedule(new Runnable() {
+                @Override
+                public void run() {
+                  demandRequested.set(true);
+                  subscription.request(1);
+                }
+              }, TestEnvironment.envDefaultTimeoutMillis() / 2, TimeUnit.MILLISECONDS);
+            }
+            probe.registerOnNext(element);
+          }
+        };
+      }
+    }).required_spec308_requestMustRegisterGivenNumberElementsToBeProduced();
+  }
+
+
   // FAILING IMPLEMENTATIONS //
 
   /**


=====================================
tck/src/test/java/org/reactivestreams/tck/flow/support/TCKVerificationSupport.java
=====================================
@@ -81,6 +81,18 @@ public class TCKVerificationSupport {
     throw new RuntimeException("Expected TCK to SKIP this test, instead if PASSed!");
   }
 
+  public void requireOptionalTestPass(ThrowingRunnable run) {
+    try {
+      run.run();
+    } catch (SkipException skip) {
+      throw new RuntimeException("Expected TCK to PASS this test, instead it was SKIPPED", skip.getCause());
+    } catch (Throwable throwable) {
+      throw new RuntimeException(
+          String.format("Expected TCK to PASS this test, yet it threw %s(%s) instead!",
+              throwable.getClass().getName(), throwable.getMessage()), throwable);
+    }
+  }
+
   /**
    * This publisher does NOT fulfil all Publisher spec requirements.
    * It's just the bare minimum to enable this test to fail the Subscriber tests.



View it on GitLab: https://salsa.debian.org/java-team/reactive-streams/commit/3efe12c24fd95d4c7e074a895e5031f5e067d58f

-- 
View it on GitLab: https://salsa.debian.org/java-team/reactive-streams/commit/3efe12c24fd95d4c7e074a895e5031f5e067d58f
You're receiving this email because of your account on salsa.debian.org.


-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/pkg-java-commits/attachments/20200126/b66ac4a3/attachment.html>


More information about the pkg-java-commits mailing list