TP-66092 | add mqtt clean session and qos backend driven (#10905)
This commit is contained in:
@@ -48,19 +48,22 @@ constructor(
|
||||
clientId = deviceId,
|
||||
brokerIP = data.brokerIP.orEmpty(),
|
||||
port =
|
||||
data.port
|
||||
?: com.navi.common.utils.Constants.MQTT_DEFAULT_PORT,
|
||||
data.port
|
||||
?: com.navi.common.utils.Constants.MQTT_DEFAULT_PORT,
|
||||
scheme = data.scheme.orEmpty(),
|
||||
keepAlive =
|
||||
data.keepAlive
|
||||
?: com.navi.common.utils.Constants.MQTT_KEEP_ALIVE,
|
||||
topics = clientIdWithTopics
|
||||
data.keepAlive
|
||||
?: com.navi.common.utils.Constants.MQTT_KEEP_ALIVE,
|
||||
topics = clientIdWithTopics,
|
||||
subscribeQos = data.subscribeQos,
|
||||
cleanSession = data.cleanSession
|
||||
)
|
||||
)
|
||||
MqttManager.subscribe(
|
||||
clientIdWithTopics,
|
||||
PAGE_HOME,
|
||||
mqttMessageProviderImpl
|
||||
mqttMessageProviderImpl,
|
||||
data.subscribeQos
|
||||
)
|
||||
NaviTrackEvent.trackEvent("mqtt_sdk_init_triggered")
|
||||
} else {
|
||||
|
||||
@@ -267,6 +267,9 @@ object Constants {
|
||||
const val PAGE_LOAD_FAILED = "PAGE_LOAD_FAILED"
|
||||
const val WIFI_ERROR_ICON = "WIFI_ERROR_ICON"
|
||||
const val RESPONSE = "response"
|
||||
const val ZERO = "ZERO"
|
||||
const val ONE = "ONE"
|
||||
const val TWO = "TWO"
|
||||
/* Cta_Data */
|
||||
const val KEY_CTA_DATA = "CtaData"
|
||||
const val REACT_NATIVE = "react_native"
|
||||
|
||||
@@ -8,15 +8,11 @@
|
||||
package com.navi.mqtt
|
||||
|
||||
import com.gojek.courier.QoS
|
||||
import com.gojek.courier.annotation.Path
|
||||
import com.gojek.courier.annotation.Subscribe
|
||||
import com.gojek.courier.annotation.SubscribeMultiple
|
||||
import com.gojek.courier.annotation.TopicMap
|
||||
import io.reactivex.Observable
|
||||
|
||||
interface CourierService {
|
||||
@Subscribe(topic = "{topic}", qos = QoS.TWO)
|
||||
fun subscribe(@Path("topic") topic: String): Observable<String>
|
||||
|
||||
@SubscribeMultiple fun subscribeAll(@TopicMap topicMap: Map<String, QoS>): Observable<String>
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@ import com.gojek.mqtt.model.ServerUri
|
||||
import com.navi.common.utils.Constants.MQTT_DEFAULT_PORT
|
||||
import com.navi.common.utils.Constants.MQTT_KEEP_ALIVE
|
||||
import com.navi.mqtt.model.MqttSdkInitParams
|
||||
import com.navi.uitron.utils.orFalse
|
||||
|
||||
class MqttConnector(private val mqttClient: MqttClient) {
|
||||
|
||||
@@ -36,7 +37,7 @@ class MqttConnector(private val mqttClient: MqttClient) {
|
||||
.clientId(params.clientId.orEmpty())
|
||||
.userName(params.username.orEmpty())
|
||||
.password(params.password.orEmpty())
|
||||
.cleanSession(false)
|
||||
.cleanSession(params.cleanSession.orFalse())
|
||||
.keepAlive(KeepAlive(timeSeconds = params.keepAlive ?: MQTT_KEEP_ALIVE))
|
||||
.build()
|
||||
}
|
||||
|
||||
@@ -28,6 +28,9 @@ import com.gojek.timer.pingsender.TimerPingSenderConfig
|
||||
import com.gojek.timer.pingsender.TimerPingSenderFactory
|
||||
import com.google.gson.Gson
|
||||
import com.navi.common.utils.Constants.COMMA
|
||||
import com.navi.common.utils.Constants.ONE
|
||||
import com.navi.common.utils.Constants.TWO
|
||||
import com.navi.common.utils.Constants.ZERO
|
||||
import com.navi.common.utils.log
|
||||
import com.navi.mqtt.analytics.MqttAnalytics
|
||||
import com.navi.mqtt.model.MqttMessageResponse
|
||||
@@ -40,6 +43,7 @@ object MqttManager {
|
||||
private val mqttListeners = hashMapOf<String, ArrayList<MqttMessageProvider>>()
|
||||
private val mqttListenersScreen = hashSetOf<String>()
|
||||
private val mqttSubscribeTopics = hashSetOf<String>()
|
||||
private var subscribeQos: String? = TWO
|
||||
private val analyticsEventTracker by lazy { MqttAnalytics.mqttAnalytics }
|
||||
|
||||
fun init(context: Context, mqttSdkInitParams: MqttSdkInitParams?) {
|
||||
@@ -101,10 +105,18 @@ object MqttManager {
|
||||
|
||||
private fun connectMqtt(mqttSdkInitParams: MqttSdkInitParams?) {
|
||||
val mqttConnector = MqttConnector(mqttClient)
|
||||
mqttSdkInitParams?.let { params -> mqttConnector.connectMqtt(params) }
|
||||
mqttSdkInitParams?.let { params ->
|
||||
subscribeQos = params.subscribeQos
|
||||
mqttConnector.connectMqtt(params)
|
||||
}
|
||||
}
|
||||
|
||||
fun subscribe(topic: String, screenName: String, listener: MqttMessageProvider) {
|
||||
fun subscribe(
|
||||
topic: String,
|
||||
screenName: String,
|
||||
listener: MqttMessageProvider,
|
||||
subscribeQos: String?
|
||||
) {
|
||||
var listeners = mqttListeners[topic]
|
||||
if (listeners.isNullOrEmpty()) {
|
||||
listeners = arrayListOf(listener)
|
||||
@@ -117,7 +129,7 @@ object MqttManager {
|
||||
mqttListenersScreen.add(screenName)
|
||||
if (isConnected()) {
|
||||
if (mqttSubscribeTopics.contains(topic).not()) {
|
||||
subscribeToTopics(topic)
|
||||
subscribeToTopics(topic, subscribeQos)
|
||||
mqttSubscribeTopics.add(topic)
|
||||
}
|
||||
} else {
|
||||
@@ -140,9 +152,7 @@ object MqttManager {
|
||||
override fun onEvent(mqttEvent: MqttEvent) {
|
||||
when (mqttEvent) {
|
||||
is MqttConnectSuccessEvent -> {
|
||||
analyticsEventTracker.onMqttEventReceived(
|
||||
mqttEvent::class.java.name.orEmpty()
|
||||
)
|
||||
analyticsEventTracker.onMqttEventReceived(mqttEvent.toString())
|
||||
subscribePendingTopics()
|
||||
}
|
||||
is MqttEvent.MqttSubscribeFailureEvent,
|
||||
@@ -164,21 +174,22 @@ object MqttManager {
|
||||
val pendingTopics = ArrayList(addToQueue)
|
||||
pendingTopics.forEach {
|
||||
if (mqttSubscribeTopics.contains(it).not()) {
|
||||
subscribeToTopics(it)
|
||||
subscribeToTopics(it, subscribeQos)
|
||||
mqttSubscribeTopics.add(it)
|
||||
}
|
||||
addToQueue.remove(it)
|
||||
}
|
||||
}
|
||||
|
||||
private fun subscribeToTopics(topics: String) {
|
||||
private fun subscribeToTopics(topics: String, subscribeQos: String?) {
|
||||
topics
|
||||
.takeIf { it.isNotEmpty() }
|
||||
?.run {
|
||||
val stream =
|
||||
courierService.run {
|
||||
if (COMMA !in topics) subscribe(topic = topics)
|
||||
else subscribeAll(topicMap = split(COMMA).associateWith { QoS.TWO })
|
||||
subscribeAll(
|
||||
topicMap = topics.split(COMMA).associateWith { getQos(subscribeQos) }
|
||||
)
|
||||
}
|
||||
stream.subscribe { message ->
|
||||
try {
|
||||
@@ -198,6 +209,15 @@ object MqttManager {
|
||||
}
|
||||
}
|
||||
|
||||
private fun getQos(subscribeQos: String?): QoS {
|
||||
return when (subscribeQos) {
|
||||
ONE -> QoS.ONE
|
||||
TWO -> QoS.TWO
|
||||
ZERO -> QoS.ZERO
|
||||
else -> QoS.TWO
|
||||
}
|
||||
}
|
||||
|
||||
private fun isConnected(): Boolean {
|
||||
return mqttClient.getCurrentState() == ConnectionState.CONNECTED
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
package com.navi.mqtt.model
|
||||
|
||||
import com.google.gson.annotations.SerializedName
|
||||
import com.navi.common.utils.Constants
|
||||
|
||||
data class MqttSdkInitParams(
|
||||
val username: String? = null,
|
||||
@@ -19,4 +20,6 @@ data class MqttSdkInitParams(
|
||||
@SerializedName("brokerIP") val brokerIP: String? = null,
|
||||
@SerializedName("topics") val topics: String? = null,
|
||||
@SerializedName("enable") val enable: Boolean? = null,
|
||||
@SerializedName("subscribeQos") val subscribeQos: String? = Constants.TWO,
|
||||
@SerializedName("cleanSession") val cleanSession: Boolean? = null,
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user