From 5cea6592734b8c148996c97a64b399daa026da34 Mon Sep 17 00:00:00 2001 From: Abhinav Gupta Date: Mon, 20 May 2024 20:20:13 +0530 Subject: [PATCH] TP-66092 | add mqtt clean session and qos backend driven (#10905) --- .../usecase/MqttSdkInitUseCase.kt | 15 ++++--- .../java/com/navi/common/utils/Constants.kt | 3 ++ .../main/java/com/navi/mqtt/CourierService.kt | 4 -- .../main/java/com/navi/mqtt/MqttConnector.kt | 3 +- .../main/java/com/navi/mqtt/MqttManager.kt | 40 ++++++++++++++----- .../com/navi/mqtt/model/MqttSdkInitParams.kt | 3 ++ 6 files changed, 47 insertions(+), 21 deletions(-) diff --git a/android/app/src/main/java/com/naviapp/registration/usecase/MqttSdkInitUseCase.kt b/android/app/src/main/java/com/naviapp/registration/usecase/MqttSdkInitUseCase.kt index 18d1787d0b..d4a377a330 100644 --- a/android/app/src/main/java/com/naviapp/registration/usecase/MqttSdkInitUseCase.kt +++ b/android/app/src/main/java/com/naviapp/registration/usecase/MqttSdkInitUseCase.kt @@ -48,19 +48,22 @@ constructor( clientId = deviceId, brokerIP = data.brokerIP.orEmpty(), port = - data.port - ?: com.navi.common.utils.Constants.MQTT_DEFAULT_PORT, + data.port + ?: com.navi.common.utils.Constants.MQTT_DEFAULT_PORT, scheme = data.scheme.orEmpty(), keepAlive = - data.keepAlive - ?: com.navi.common.utils.Constants.MQTT_KEEP_ALIVE, - topics = clientIdWithTopics + data.keepAlive + ?: com.navi.common.utils.Constants.MQTT_KEEP_ALIVE, + topics = clientIdWithTopics, + subscribeQos = data.subscribeQos, + cleanSession = data.cleanSession ) ) MqttManager.subscribe( clientIdWithTopics, PAGE_HOME, - mqttMessageProviderImpl + mqttMessageProviderImpl, + data.subscribeQos ) NaviTrackEvent.trackEvent("mqtt_sdk_init_triggered") } else { diff --git a/android/navi-common/src/main/java/com/navi/common/utils/Constants.kt b/android/navi-common/src/main/java/com/navi/common/utils/Constants.kt index c3c182cac5..af6057bbb1 100644 --- a/android/navi-common/src/main/java/com/navi/common/utils/Constants.kt +++ b/android/navi-common/src/main/java/com/navi/common/utils/Constants.kt @@ -267,6 +267,9 @@ object Constants { const val PAGE_LOAD_FAILED = "PAGE_LOAD_FAILED" const val WIFI_ERROR_ICON = "WIFI_ERROR_ICON" const val RESPONSE = "response" + const val ZERO = "ZERO" + const val ONE = "ONE" + const val TWO = "TWO" /* Cta_Data */ const val KEY_CTA_DATA = "CtaData" const val REACT_NATIVE = "react_native" diff --git a/android/navi-mqtt/src/main/java/com/navi/mqtt/CourierService.kt b/android/navi-mqtt/src/main/java/com/navi/mqtt/CourierService.kt index cc9998ec59..084d134bde 100644 --- a/android/navi-mqtt/src/main/java/com/navi/mqtt/CourierService.kt +++ b/android/navi-mqtt/src/main/java/com/navi/mqtt/CourierService.kt @@ -8,15 +8,11 @@ package com.navi.mqtt import com.gojek.courier.QoS -import com.gojek.courier.annotation.Path -import com.gojek.courier.annotation.Subscribe import com.gojek.courier.annotation.SubscribeMultiple import com.gojek.courier.annotation.TopicMap import io.reactivex.Observable interface CourierService { - @Subscribe(topic = "{topic}", qos = QoS.TWO) - fun subscribe(@Path("topic") topic: String): Observable @SubscribeMultiple fun subscribeAll(@TopicMap topicMap: Map): Observable } diff --git a/android/navi-mqtt/src/main/java/com/navi/mqtt/MqttConnector.kt b/android/navi-mqtt/src/main/java/com/navi/mqtt/MqttConnector.kt index fead4a8877..48726d0372 100644 --- a/android/navi-mqtt/src/main/java/com/navi/mqtt/MqttConnector.kt +++ b/android/navi-mqtt/src/main/java/com/navi/mqtt/MqttConnector.kt @@ -14,6 +14,7 @@ import com.gojek.mqtt.model.ServerUri import com.navi.common.utils.Constants.MQTT_DEFAULT_PORT import com.navi.common.utils.Constants.MQTT_KEEP_ALIVE import com.navi.mqtt.model.MqttSdkInitParams +import com.navi.uitron.utils.orFalse class MqttConnector(private val mqttClient: MqttClient) { @@ -36,7 +37,7 @@ class MqttConnector(private val mqttClient: MqttClient) { .clientId(params.clientId.orEmpty()) .userName(params.username.orEmpty()) .password(params.password.orEmpty()) - .cleanSession(false) + .cleanSession(params.cleanSession.orFalse()) .keepAlive(KeepAlive(timeSeconds = params.keepAlive ?: MQTT_KEEP_ALIVE)) .build() } diff --git a/android/navi-mqtt/src/main/java/com/navi/mqtt/MqttManager.kt b/android/navi-mqtt/src/main/java/com/navi/mqtt/MqttManager.kt index 36562eddc8..31b87dbee4 100644 --- a/android/navi-mqtt/src/main/java/com/navi/mqtt/MqttManager.kt +++ b/android/navi-mqtt/src/main/java/com/navi/mqtt/MqttManager.kt @@ -28,6 +28,9 @@ import com.gojek.timer.pingsender.TimerPingSenderConfig import com.gojek.timer.pingsender.TimerPingSenderFactory import com.google.gson.Gson import com.navi.common.utils.Constants.COMMA +import com.navi.common.utils.Constants.ONE +import com.navi.common.utils.Constants.TWO +import com.navi.common.utils.Constants.ZERO import com.navi.common.utils.log import com.navi.mqtt.analytics.MqttAnalytics import com.navi.mqtt.model.MqttMessageResponse @@ -40,6 +43,7 @@ object MqttManager { private val mqttListeners = hashMapOf>() private val mqttListenersScreen = hashSetOf() private val mqttSubscribeTopics = hashSetOf() + private var subscribeQos: String? = TWO private val analyticsEventTracker by lazy { MqttAnalytics.mqttAnalytics } fun init(context: Context, mqttSdkInitParams: MqttSdkInitParams?) { @@ -101,10 +105,18 @@ object MqttManager { private fun connectMqtt(mqttSdkInitParams: MqttSdkInitParams?) { val mqttConnector = MqttConnector(mqttClient) - mqttSdkInitParams?.let { params -> mqttConnector.connectMqtt(params) } + mqttSdkInitParams?.let { params -> + subscribeQos = params.subscribeQos + mqttConnector.connectMqtt(params) + } } - fun subscribe(topic: String, screenName: String, listener: MqttMessageProvider) { + fun subscribe( + topic: String, + screenName: String, + listener: MqttMessageProvider, + subscribeQos: String? + ) { var listeners = mqttListeners[topic] if (listeners.isNullOrEmpty()) { listeners = arrayListOf(listener) @@ -117,7 +129,7 @@ object MqttManager { mqttListenersScreen.add(screenName) if (isConnected()) { if (mqttSubscribeTopics.contains(topic).not()) { - subscribeToTopics(topic) + subscribeToTopics(topic, subscribeQos) mqttSubscribeTopics.add(topic) } } else { @@ -140,9 +152,7 @@ object MqttManager { override fun onEvent(mqttEvent: MqttEvent) { when (mqttEvent) { is MqttConnectSuccessEvent -> { - analyticsEventTracker.onMqttEventReceived( - mqttEvent::class.java.name.orEmpty() - ) + analyticsEventTracker.onMqttEventReceived(mqttEvent.toString()) subscribePendingTopics() } is MqttEvent.MqttSubscribeFailureEvent, @@ -164,21 +174,22 @@ object MqttManager { val pendingTopics = ArrayList(addToQueue) pendingTopics.forEach { if (mqttSubscribeTopics.contains(it).not()) { - subscribeToTopics(it) + subscribeToTopics(it, subscribeQos) mqttSubscribeTopics.add(it) } addToQueue.remove(it) } } - private fun subscribeToTopics(topics: String) { + private fun subscribeToTopics(topics: String, subscribeQos: String?) { topics .takeIf { it.isNotEmpty() } ?.run { val stream = courierService.run { - if (COMMA !in topics) subscribe(topic = topics) - else subscribeAll(topicMap = split(COMMA).associateWith { QoS.TWO }) + subscribeAll( + topicMap = topics.split(COMMA).associateWith { getQos(subscribeQos) } + ) } stream.subscribe { message -> try { @@ -198,6 +209,15 @@ object MqttManager { } } + private fun getQos(subscribeQos: String?): QoS { + return when (subscribeQos) { + ONE -> QoS.ONE + TWO -> QoS.TWO + ZERO -> QoS.ZERO + else -> QoS.TWO + } + } + private fun isConnected(): Boolean { return mqttClient.getCurrentState() == ConnectionState.CONNECTED } diff --git a/android/navi-mqtt/src/main/java/com/navi/mqtt/model/MqttSdkInitParams.kt b/android/navi-mqtt/src/main/java/com/navi/mqtt/model/MqttSdkInitParams.kt index 5141b301fe..d406dc7737 100644 --- a/android/navi-mqtt/src/main/java/com/navi/mqtt/model/MqttSdkInitParams.kt +++ b/android/navi-mqtt/src/main/java/com/navi/mqtt/model/MqttSdkInitParams.kt @@ -8,6 +8,7 @@ package com.navi.mqtt.model import com.google.gson.annotations.SerializedName +import com.navi.common.utils.Constants data class MqttSdkInitParams( val username: String? = null, @@ -19,4 +20,6 @@ data class MqttSdkInitParams( @SerializedName("brokerIP") val brokerIP: String? = null, @SerializedName("topics") val topics: String? = null, @SerializedName("enable") val enable: Boolean? = null, + @SerializedName("subscribeQos") val subscribeQos: String? = Constants.TWO, + @SerializedName("cleanSession") val cleanSession: Boolean? = null, )