[Git][java-team/mqtt-client][upstream] 2 commits: New upstream version 1.15

Emmanuel Bourg gitlab at salsa.debian.org
Thu Sep 10 14:57:07 BST 2020



Emmanuel Bourg pushed to branch upstream at Debian Java Maintainers / mqtt-client


Commits:
289bbf16 by Emmanuel Bourg at 2020-09-10T15:51:28+02:00
New upstream version 1.15
- - - - -
05b56bde by Emmanuel Bourg at 2020-09-10T15:51:46+02:00
New upstream version 1.16
- - - - -


6 changed files:

- mqtt-client-java1.4-uber/pom.xml
- mqtt-client-website/pom.xml
- mqtt-client/pom.xml
- mqtt-client/src/main/java/org/fusesource/mqtt/client/CallbackConnection.java
- mqtt-client/src/main/java/org/fusesource/mqtt/codec/MessageSupport.java
- pom.xml


Changes:

=====================================
mqtt-client-java1.4-uber/pom.xml
=====================================
@@ -25,13 +25,13 @@
   <parent>
     <groupId>org.fusesource.mqtt-client</groupId>
     <artifactId>mqtt-client-project</artifactId>
-    <version>1.14</version>
+    <version>1.16</version>
   </parent>
   
   
   <groupId>org.fusesource.mqtt-client</groupId>
   <artifactId>mqtt-client-java1.4-uber</artifactId>
-  <version>1.14</version>
+  <version>1.16</version>
   
   <name>${project.artifactId}</name>
   <description>
@@ -150,12 +150,12 @@
       <plugin>
         <groupId>com.github.wvengen</groupId>
         <artifactId>proguard-maven-plugin</artifactId>
-        <version>2.0.5</version>
+        <version>2.0.14</version>
         <dependencies>
            <dependency>
                <groupId>net.sf.proguard</groupId>
                <artifactId>proguard-base</artifactId>
-               <version>4.8</version>
+               <version>6.0.3</version>
                <scope>runtime</scope>
            </dependency>
         </dependencies>
@@ -168,7 +168,7 @@
           </execution>
         </executions>
         <configuration>
-          <proguardVersion>4.8</proguardVersion>
+          <proguardVersion>6.0.3</proguardVersion>
           <injar>${project.build.finalName}.jar</injar>
           <outjar>${project.build.finalName}.jar</outjar>          
           <libs>


=====================================
mqtt-client-website/pom.xml
=====================================
@@ -26,12 +26,12 @@
   <parent>
     <groupId>org.fusesource.mqtt-client</groupId>
     <artifactId>mqtt-client-project</artifactId>
-    <version>1.14</version>
+    <version>1.16</version>
   </parent>
   
   <groupId>org.fusesource.mqtt-client</groupId>
   <artifactId>mqtt-client-website</artifactId>
-  <version>1.14</version>
+  <version>1.16</version>
   <packaging>war</packaging>
 
   <name>${project.artifactId}</name>


=====================================
mqtt-client/pom.xml
=====================================
@@ -25,12 +25,12 @@
   <parent>
     <groupId>org.fusesource.mqtt-client</groupId>
     <artifactId>mqtt-client-project</artifactId>
-    <version>1.14</version>
+    <version>1.16</version>
   </parent>
   
   <groupId>org.fusesource.mqtt-client</groupId>
   <artifactId>mqtt-client</artifactId>
-  <version>1.14</version>
+  <version>1.16</version>
   <packaging>bundle</packaging>
   
   <name>${project.artifactId}</name>
@@ -131,13 +131,13 @@
         
         <groupId>com.github.wvengen</groupId>
         <artifactId>proguard-maven-plugin</artifactId>
-        <version>2.0.5</version>
+        <version>2.0.14</version>
  
         <dependencies>
            <dependency>
                <groupId>net.sf.proguard</groupId>
                <artifactId>proguard-base</artifactId>
-               <version>4.8</version>
+               <version>6.0.3</version>
                <scope>runtime</scope>
            </dependency>
         </dependencies>
@@ -151,7 +151,7 @@
            </execution>
         </executions>
         <configuration>
-          <proguardVersion>4.8</proguardVersion>
+          <proguardVersion>6.0.3</proguardVersion>
           <injar>${project.build.finalName}.jar</injar>
           <outjar>${project.build.finalName}-uber.jar</outjar>          
           <attach>true</attach>


=====================================
mqtt-client/src/main/java/org/fusesource/mqtt/client/CallbackConnection.java
=====================================
@@ -57,6 +57,7 @@ import java.util.LinkedList;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.fusesource.hawtbuf.Buffer.utf8;
@@ -71,7 +72,7 @@ import static org.fusesource.hawtdispatch.Dispatch.createQueue;
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 public class CallbackConnection {
-    
+
     private static class Request {
         private final MQTTFrame frame;
         private final short id;
@@ -113,11 +114,13 @@ public class CallbackConnection {
     private HeartBeatMonitor heartBeatMonitor;
     private long pingedAt;
     private long reconnects = 0;
+    private AtomicBoolean isReconnecting = new AtomicBoolean(false);
     private final AtomicInteger suspendCount = new AtomicInteger(0);
     private final AtomicInteger suspendChanges = new AtomicInteger(0);
 
     private final HashMap<UTF8Buffer, QoS> activeSubs = new HashMap<UTF8Buffer, QoS>();
 
+    private final Object nextMessageIdLock = new Object();
 
     public CallbackConnection(MQTT mqtt) {
         this.mqtt = mqtt;
@@ -144,7 +147,29 @@ public class CallbackConnection {
         }
     }
 
+    private long calculateDelay() {
+        long reconnectDelay = mqtt.reconnectDelay;
+        if( reconnectDelay > 0 && mqtt.reconnectBackOffMultiplier > 1.0 ) {
+            reconnectDelay = (long)Math.pow(mqtt.reconnectDelay * reconnects, mqtt.reconnectBackOffMultiplier);
+        }
+
+        reconnectDelay = Math.min(reconnectDelay, mqtt.reconnectDelayMax);
+        reconnects += 1;
+        return reconnectDelay;
+    }
+
     void reconnect() {
+        if( isReconnecting.getAndSet(true) ) {
+            return;
+        }
+
+        final long reconnectDelay = calculateDelay();
+        try {
+            Thread.sleep(reconnectDelay);
+        } catch (final InterruptedException e1) {
+            // ignore it
+        }
+
         try {
             // And reconnect.
             createTransport(new LoginHandler(new Callback<Void>() {
@@ -169,7 +194,7 @@ public class CallbackConnection {
                     // Replay any un-acked requests..
                     for (Map.Entry<Short, Request> entry : originalRequests.entrySet()) {
                         MQTTFrame frame = entry.getValue().frame;
-                        frame.dup(true); // set the dup flag as these frames were previously transmitted.
+                        frame.dup(frame.messageType() == PUBLISH.TYPE); // set the dup flag as these frames were previously transmitted.
                         send(entry.getValue());
                     }
 
@@ -179,13 +204,17 @@ public class CallbackConnection {
                         send(request);
                     }
 
+                    reconnects = 0;
+                    isReconnecting.set(false);
                 }
 
                 public void onFailure(Throwable value) {
+                    isReconnecting.set(false);
                     handleFatalFailure(value);
                 }
             }, false));
         } catch (Throwable e) {
+            isReconnecting.set(false);
             handleFatalFailure(e);
         }
     }
@@ -221,13 +250,7 @@ public class CallbackConnection {
     }
 
     void reconnect(final Callback<Transport> onConnect) {
-        long reconnectDelay = mqtt.reconnectDelay;
-        if( reconnectDelay> 0 && mqtt.reconnectBackOffMultiplier > 1.0 ) {
-            reconnectDelay = (long) Math.pow(mqtt.reconnectDelay*reconnects, mqtt.reconnectBackOffMultiplier);
-        }
-        reconnectDelay = Math.min(reconnectDelay, mqtt.reconnectDelayMax);
-        reconnects += 1;
-        queue.executeAfter(reconnectDelay, TimeUnit.MILLISECONDS, new Task() {
+        queue.executeAfter(calculateDelay(), TimeUnit.MILLISECONDS, new Task() {
             @Override
             public void run() {
                 if(disconnected) {
@@ -391,12 +414,12 @@ public class CallbackConnection {
             mqtt.tracer.debug("Logging in");
             assert accepted: "First frame should always be accepted by the transport";
         }
-        
+
         private boolean tryReconnect() {
             if(initialConnect) {
                 return mqtt.connectAttemptsMax<0 || reconnects < mqtt.connectAttemptsMax;
             }
-            
+
             return mqtt.reconnectAttemptsMax<0 || reconnects < mqtt.reconnectAttemptsMax;
         }
 
@@ -447,7 +470,7 @@ public class CallbackConnection {
                     // Don't care if the offer is rejected, just means we have data outbound.
                     if(!disconnected && pingedAt==0) {
                         MQTTFrame encoded = new PINGREQ().encode();
-                        if(CallbackConnection.this.transport.offer(encoded)) {
+                        if(CallbackConnection.this.transport != null && CallbackConnection.this.transport.offer(encoded)) {
                             mqtt.tracer.onSend(encoded);
                             final long now = System.currentTimeMillis();
                             final long suspends = suspendChanges.get();
@@ -471,7 +494,7 @@ public class CallbackConnection {
                             });
                         }
                     }
-                    
+
                 }
             });
             heartBeatMonitor.start();
@@ -575,19 +598,21 @@ public class CallbackConnection {
                         heartBeatMonitor.stop();
                         heartBeatMonitor = null;
                     }
-                    transport.stop(new Task() {
-                        @Override
-                        public void run() {
-                            listener.onDisconnected();
-                            if (onComplete != null) {
-                                onComplete.onSuccess(null);
+                    if(transport != null) {
+                        transport.stop(new Task() {
+                            @Override
+                            public void run() {
+                                listener.onDisconnected();
+                                if (onComplete != null) {
+                                    onComplete.onSuccess(null);
+                                }
                             }
-                        }
-                    });
+                        });
+                    }
                 }
             }
         };
-        
+
         Callback<Void> cb = new Callback<Void>() {
             public void onSuccess(Void v) {
                 // To make sure DISCONNECT has been flushed out to the socket
@@ -607,7 +632,7 @@ public class CallbackConnection {
                 stop.run();
             }
         };
-        
+
         // Pop the frame into a request so it we get notified
         // of any failures so we continue to stop the transport.
         if(transport!=null) {
@@ -720,7 +745,7 @@ public class CallbackConnection {
                 request.cb.onFailure(failure);
             }
         } else {
-            // Put the request in the map before sending it over the wire. 
+            // Put the request in the map before sending it over the wire.
             if(request.id!=0) {
                 this.requests.put(request.id, request);
             }
@@ -731,7 +756,7 @@ public class CallbackConnection {
                     if( request.cb!=null ) {
                         ((Callback<Void>)request.cb).onSuccess(null);
                     }
-                    
+
                 }
             } else {
                 // Remove it from the request.
@@ -743,12 +768,14 @@ public class CallbackConnection {
 
     private short nextMessageId = 1;
     private short getNextMessageId() {
-        short rc = nextMessageId;
-        nextMessageId++;
-        if(nextMessageId==0) {
-            nextMessageId=1;
+        synchronized(nextMessageIdLock) {
+            short rc = nextMessageId;
+            nextMessageId++;
+            if(nextMessageId==0) {
+                nextMessageId=1;
+            }
+            return rc;
         }
-        return rc;
     }
 
     private void drainOverflow() {
@@ -918,7 +945,7 @@ public class CallbackConnection {
     private void handleFatalFailure(Throwable error) {
         if( failure == null ) {
             failure = error;
-            
+
             mqtt.tracer.debug("Fatal connection failure: %s", error);
             // Fail incomplete requests.
             ArrayList<Request> values = new ArrayList(requests.values());
@@ -936,7 +963,7 @@ public class CallbackConnection {
                     entry.cb.onFailure(failure);
                 }
             }
-            
+
             if( listener !=null && !disconnected ) {
                 try {
                     listener.onFailure(failure);


=====================================
mqtt-client/src/main/java/org/fusesource/mqtt/codec/MessageSupport.java
=====================================
@@ -62,6 +62,9 @@ public final class MessageSupport {
 
     static protected UTF8Buffer readUTF(DataByteArrayInputStream is) throws ProtocolException {
         int size = is.readUnsignedShort();
+        if (size < 0) {
+            throw new ProtocolException("Invalid message encoding");
+        }
         Buffer buffer = is.readBuffer(size);
         if (buffer == null || buffer.length != size) {
             throw new ProtocolException("Invalid message encoding");


=====================================
pom.xml
=====================================
@@ -30,7 +30,7 @@
 
   <groupId>org.fusesource.mqtt-client</groupId>
   <artifactId>mqtt-client-project</artifactId>
-  <version>1.14</version>
+  <version>1.16</version>
   <packaging>pom</packaging>
 
   <name>${project.artifactId}</name>
@@ -94,7 +94,7 @@
     <connection>scm:git:git://forge.fusesource.com/${forge-project-id}.git</connection>
     <developerConnection>scm:git:ssh://git@forge.fusesource.com/${forge-project-id}.git</developerConnection>
     <url>http://github.com/fusesource/mqtt-client/tree/master</url>
-    <tag>mqtt-client-project-1.14</tag>
+    <tag>mqtt-client-project-1.16</tag>
   </scm>
 
   <distributionManagement>



View it on GitLab: https://salsa.debian.org/java-team/mqtt-client/-/compare/c755197bb63cfcd574f7128671df7c962c6fef22...05b56bde8d12ef05246d5751884bb230e55bffb4

-- 
View it on GitLab: https://salsa.debian.org/java-team/mqtt-client/-/compare/c755197bb63cfcd574f7128671df7c962c6fef22...05b56bde8d12ef05246d5751884bb230e55bffb4
You're receiving this email because of your account on salsa.debian.org.


-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/pkg-java-commits/attachments/20200910/dcc6c412/attachment.html>


More information about the pkg-java-commits mailing list