[Git][java-team/mqtt-client][upstream] 2 commits: New upstream version 1.15
Emmanuel Bourg
gitlab at salsa.debian.org
Thu Sep 10 14:57:07 BST 2020
Emmanuel Bourg pushed to branch upstream at Debian Java Maintainers / mqtt-client
Commits:
289bbf16 by Emmanuel Bourg at 2020-09-10T15:51:28+02:00
New upstream version 1.15
- - - - -
05b56bde by Emmanuel Bourg at 2020-09-10T15:51:46+02:00
New upstream version 1.16
- - - - -
6 changed files:
- mqtt-client-java1.4-uber/pom.xml
- mqtt-client-website/pom.xml
- mqtt-client/pom.xml
- mqtt-client/src/main/java/org/fusesource/mqtt/client/CallbackConnection.java
- mqtt-client/src/main/java/org/fusesource/mqtt/codec/MessageSupport.java
- pom.xml
Changes:
=====================================
mqtt-client-java1.4-uber/pom.xml
=====================================
@@ -25,13 +25,13 @@
<parent>
<groupId>org.fusesource.mqtt-client</groupId>
<artifactId>mqtt-client-project</artifactId>
- <version>1.14</version>
+ <version>1.16</version>
</parent>
<groupId>org.fusesource.mqtt-client</groupId>
<artifactId>mqtt-client-java1.4-uber</artifactId>
- <version>1.14</version>
+ <version>1.16</version>
<name>${project.artifactId}</name>
<description>
@@ -150,12 +150,12 @@
<plugin>
<groupId>com.github.wvengen</groupId>
<artifactId>proguard-maven-plugin</artifactId>
- <version>2.0.5</version>
+ <version>2.0.14</version>
<dependencies>
<dependency>
<groupId>net.sf.proguard</groupId>
<artifactId>proguard-base</artifactId>
- <version>4.8</version>
+ <version>6.0.3</version>
<scope>runtime</scope>
</dependency>
</dependencies>
@@ -168,7 +168,7 @@
</execution>
</executions>
<configuration>
- <proguardVersion>4.8</proguardVersion>
+ <proguardVersion>6.0.3</proguardVersion>
<injar>${project.build.finalName}.jar</injar>
<outjar>${project.build.finalName}.jar</outjar>
<libs>
=====================================
mqtt-client-website/pom.xml
=====================================
@@ -26,12 +26,12 @@
<parent>
<groupId>org.fusesource.mqtt-client</groupId>
<artifactId>mqtt-client-project</artifactId>
- <version>1.14</version>
+ <version>1.16</version>
</parent>
<groupId>org.fusesource.mqtt-client</groupId>
<artifactId>mqtt-client-website</artifactId>
- <version>1.14</version>
+ <version>1.16</version>
<packaging>war</packaging>
<name>${project.artifactId}</name>
=====================================
mqtt-client/pom.xml
=====================================
@@ -25,12 +25,12 @@
<parent>
<groupId>org.fusesource.mqtt-client</groupId>
<artifactId>mqtt-client-project</artifactId>
- <version>1.14</version>
+ <version>1.16</version>
</parent>
<groupId>org.fusesource.mqtt-client</groupId>
<artifactId>mqtt-client</artifactId>
- <version>1.14</version>
+ <version>1.16</version>
<packaging>bundle</packaging>
<name>${project.artifactId}</name>
@@ -131,13 +131,13 @@
<groupId>com.github.wvengen</groupId>
<artifactId>proguard-maven-plugin</artifactId>
- <version>2.0.5</version>
+ <version>2.0.14</version>
<dependencies>
<dependency>
<groupId>net.sf.proguard</groupId>
<artifactId>proguard-base</artifactId>
- <version>4.8</version>
+ <version>6.0.3</version>
<scope>runtime</scope>
</dependency>
</dependencies>
@@ -151,7 +151,7 @@
</execution>
</executions>
<configuration>
- <proguardVersion>4.8</proguardVersion>
+ <proguardVersion>6.0.3</proguardVersion>
<injar>${project.build.finalName}.jar</injar>
<outjar>${project.build.finalName}-uber.jar</outjar>
<attach>true</attach>
=====================================
mqtt-client/src/main/java/org/fusesource/mqtt/client/CallbackConnection.java
=====================================
@@ -57,6 +57,7 @@ import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.fusesource.hawtbuf.Buffer.utf8;
@@ -71,7 +72,7 @@ import static org.fusesource.hawtdispatch.Dispatch.createQueue;
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public class CallbackConnection {
-
+
private static class Request {
private final MQTTFrame frame;
private final short id;
@@ -113,11 +114,13 @@ public class CallbackConnection {
private HeartBeatMonitor heartBeatMonitor;
private long pingedAt;
private long reconnects = 0;
+ private AtomicBoolean isReconnecting = new AtomicBoolean(false);
private final AtomicInteger suspendCount = new AtomicInteger(0);
private final AtomicInteger suspendChanges = new AtomicInteger(0);
private final HashMap<UTF8Buffer, QoS> activeSubs = new HashMap<UTF8Buffer, QoS>();
+ private final Object nextMessageIdLock = new Object();
public CallbackConnection(MQTT mqtt) {
this.mqtt = mqtt;
@@ -144,7 +147,29 @@ public class CallbackConnection {
}
}
+ private long calculateDelay() {
+ long reconnectDelay = mqtt.reconnectDelay;
+ if( reconnectDelay > 0 && mqtt.reconnectBackOffMultiplier > 1.0 ) {
+ reconnectDelay = (long)Math.pow(mqtt.reconnectDelay * reconnects, mqtt.reconnectBackOffMultiplier);
+ }
+
+ reconnectDelay = Math.min(reconnectDelay, mqtt.reconnectDelayMax);
+ reconnects += 1;
+ return reconnectDelay;
+ }
+
void reconnect() {
+ if( isReconnecting.getAndSet(true) ) {
+ return;
+ }
+
+ final long reconnectDelay = calculateDelay();
+ try {
+ Thread.sleep(reconnectDelay);
+ } catch (final InterruptedException e1) {
+ // ignore it
+ }
+
try {
// And reconnect.
createTransport(new LoginHandler(new Callback<Void>() {
@@ -169,7 +194,7 @@ public class CallbackConnection {
// Replay any un-acked requests..
for (Map.Entry<Short, Request> entry : originalRequests.entrySet()) {
MQTTFrame frame = entry.getValue().frame;
- frame.dup(true); // set the dup flag as these frames were previously transmitted.
+ frame.dup(frame.messageType() == PUBLISH.TYPE); // set the dup flag as these frames were previously transmitted.
send(entry.getValue());
}
@@ -179,13 +204,17 @@ public class CallbackConnection {
send(request);
}
+ reconnects = 0;
+ isReconnecting.set(false);
}
public void onFailure(Throwable value) {
+ isReconnecting.set(false);
handleFatalFailure(value);
}
}, false));
} catch (Throwable e) {
+ isReconnecting.set(false);
handleFatalFailure(e);
}
}
@@ -221,13 +250,7 @@ public class CallbackConnection {
}
void reconnect(final Callback<Transport> onConnect) {
- long reconnectDelay = mqtt.reconnectDelay;
- if( reconnectDelay> 0 && mqtt.reconnectBackOffMultiplier > 1.0 ) {
- reconnectDelay = (long) Math.pow(mqtt.reconnectDelay*reconnects, mqtt.reconnectBackOffMultiplier);
- }
- reconnectDelay = Math.min(reconnectDelay, mqtt.reconnectDelayMax);
- reconnects += 1;
- queue.executeAfter(reconnectDelay, TimeUnit.MILLISECONDS, new Task() {
+ queue.executeAfter(calculateDelay(), TimeUnit.MILLISECONDS, new Task() {
@Override
public void run() {
if(disconnected) {
@@ -391,12 +414,12 @@ public class CallbackConnection {
mqtt.tracer.debug("Logging in");
assert accepted: "First frame should always be accepted by the transport";
}
-
+
private boolean tryReconnect() {
if(initialConnect) {
return mqtt.connectAttemptsMax<0 || reconnects < mqtt.connectAttemptsMax;
}
-
+
return mqtt.reconnectAttemptsMax<0 || reconnects < mqtt.reconnectAttemptsMax;
}
@@ -447,7 +470,7 @@ public class CallbackConnection {
// Don't care if the offer is rejected, just means we have data outbound.
if(!disconnected && pingedAt==0) {
MQTTFrame encoded = new PINGREQ().encode();
- if(CallbackConnection.this.transport.offer(encoded)) {
+ if(CallbackConnection.this.transport != null && CallbackConnection.this.transport.offer(encoded)) {
mqtt.tracer.onSend(encoded);
final long now = System.currentTimeMillis();
final long suspends = suspendChanges.get();
@@ -471,7 +494,7 @@ public class CallbackConnection {
});
}
}
-
+
}
});
heartBeatMonitor.start();
@@ -575,19 +598,21 @@ public class CallbackConnection {
heartBeatMonitor.stop();
heartBeatMonitor = null;
}
- transport.stop(new Task() {
- @Override
- public void run() {
- listener.onDisconnected();
- if (onComplete != null) {
- onComplete.onSuccess(null);
+ if(transport != null) {
+ transport.stop(new Task() {
+ @Override
+ public void run() {
+ listener.onDisconnected();
+ if (onComplete != null) {
+ onComplete.onSuccess(null);
+ }
}
- }
- });
+ });
+ }
}
}
};
-
+
Callback<Void> cb = new Callback<Void>() {
public void onSuccess(Void v) {
// To make sure DISCONNECT has been flushed out to the socket
@@ -607,7 +632,7 @@ public class CallbackConnection {
stop.run();
}
};
-
+
// Pop the frame into a request so it we get notified
// of any failures so we continue to stop the transport.
if(transport!=null) {
@@ -720,7 +745,7 @@ public class CallbackConnection {
request.cb.onFailure(failure);
}
} else {
- // Put the request in the map before sending it over the wire.
+ // Put the request in the map before sending it over the wire.
if(request.id!=0) {
this.requests.put(request.id, request);
}
@@ -731,7 +756,7 @@ public class CallbackConnection {
if( request.cb!=null ) {
((Callback<Void>)request.cb).onSuccess(null);
}
-
+
}
} else {
// Remove it from the request.
@@ -743,12 +768,14 @@ public class CallbackConnection {
private short nextMessageId = 1;
private short getNextMessageId() {
- short rc = nextMessageId;
- nextMessageId++;
- if(nextMessageId==0) {
- nextMessageId=1;
+ synchronized(nextMessageIdLock) {
+ short rc = nextMessageId;
+ nextMessageId++;
+ if(nextMessageId==0) {
+ nextMessageId=1;
+ }
+ return rc;
}
- return rc;
}
private void drainOverflow() {
@@ -918,7 +945,7 @@ public class CallbackConnection {
private void handleFatalFailure(Throwable error) {
if( failure == null ) {
failure = error;
-
+
mqtt.tracer.debug("Fatal connection failure: %s", error);
// Fail incomplete requests.
ArrayList<Request> values = new ArrayList(requests.values());
@@ -936,7 +963,7 @@ public class CallbackConnection {
entry.cb.onFailure(failure);
}
}
-
+
if( listener !=null && !disconnected ) {
try {
listener.onFailure(failure);
=====================================
mqtt-client/src/main/java/org/fusesource/mqtt/codec/MessageSupport.java
=====================================
@@ -62,6 +62,9 @@ public final class MessageSupport {
static protected UTF8Buffer readUTF(DataByteArrayInputStream is) throws ProtocolException {
int size = is.readUnsignedShort();
+ if (size < 0) {
+ throw new ProtocolException("Invalid message encoding");
+ }
Buffer buffer = is.readBuffer(size);
if (buffer == null || buffer.length != size) {
throw new ProtocolException("Invalid message encoding");
=====================================
pom.xml
=====================================
@@ -30,7 +30,7 @@
<groupId>org.fusesource.mqtt-client</groupId>
<artifactId>mqtt-client-project</artifactId>
- <version>1.14</version>
+ <version>1.16</version>
<packaging>pom</packaging>
<name>${project.artifactId}</name>
@@ -94,7 +94,7 @@
<connection>scm:git:git://forge.fusesource.com/${forge-project-id}.git</connection>
<developerConnection>scm:git:ssh://git@forge.fusesource.com/${forge-project-id}.git</developerConnection>
<url>http://github.com/fusesource/mqtt-client/tree/master</url>
- <tag>mqtt-client-project-1.14</tag>
+ <tag>mqtt-client-project-1.16</tag>
</scm>
<distributionManagement>
View it on GitLab: https://salsa.debian.org/java-team/mqtt-client/-/compare/c755197bb63cfcd574f7128671df7c962c6fef22...05b56bde8d12ef05246d5751884bb230e55bffb4
--
View it on GitLab: https://salsa.debian.org/java-team/mqtt-client/-/compare/c755197bb63cfcd574f7128671df7c962c6fef22...05b56bde8d12ef05246d5751884bb230e55bffb4
You're receiving this email because of your account on salsa.debian.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/pkg-java-commits/attachments/20200910/dcc6c412/attachment.html>
More information about the pkg-java-commits
mailing list