Skip to content

Commit

Permalink
Resend persisted messages even with clean sessions. (#87)
Browse files Browse the repository at this point in the history
Co-authored-by: Kasper Lund <[email protected]>
  • Loading branch information
floitsch and kasperl authored Mar 4, 2024
1 parent b9cd4cd commit b0183e5
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 37 deletions.
69 changes: 34 additions & 35 deletions src/full_client.toit
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,9 @@ interface ReconnectionStrategy:
/**
Is called when the client wants to establish a connection through the given $transport.
Returns null if the strategy has been closed.
Otherwise, returns whether the broker had a session for this client (the result of $receive-connect-ack).
The strategy should first call $send-connect, followed by a $receive-connect-ack. If
the connection is unsuccessful, it may retry.
Expand All @@ -211,7 +214,7 @@ interface ReconnectionStrategy:
The $is-initial-connection is true if this is the first time the client connects to the broker.
*/
connect -> none
connect -> bool?
transport/ActivityMonitoringTransport
--is-initial-connection /bool
[--reconnect-transport]
Expand Down Expand Up @@ -287,7 +290,8 @@ abstract class ReconnectionStrategyBase implements ReconnectionStrategy:
Returns null if the strategy was closed.
Returns whether the broker had a session for this client, otherwise.
*/
do-connect transport/ActivityMonitoringTransport
do-connect -> bool?
transport/ActivityMonitoringTransport
--reuse-connection/bool=false
[--reconnect-transport]
[--disconnect-transport]
Expand Down Expand Up @@ -335,7 +339,7 @@ abstract class ReconnectionStrategyBase implements ReconnectionStrategy:

unreachable

abstract connect -> none
abstract connect -> bool?
transport/ActivityMonitoringTransport
--is-initial-connection /bool
[--reconnect-transport]
Expand Down Expand Up @@ -399,17 +403,18 @@ class NoReconnectionStrategy extends ReconnectionStrategyBase:
--delay-lambda=delay-lambda
--attempt-delays=attempt-delays

connect -> none
connect -> bool?
transport/ActivityMonitoringTransport
--is-initial-connection /bool
[--reconnect-transport]
[--disconnect-transport]
[--send-connect]
[--receive-connect-ack]
[--disconnect]:
if is-closed: return null
// No reconnect.
if not is-initial-connection: throw "NO RECONNECT STRATEGY"
session-exists := do-connect transport
return do-connect transport
--reuse-connection = is-initial-connection
--reconnect-transport = reconnect-transport
--disconnect-transport = disconnect-transport
Expand Down Expand Up @@ -462,28 +467,21 @@ class RetryReconnectionStrategy extends ReconnectionStrategyBase:
--delay-lambda=delay-lambda
--attempt-delays=attempt-delays

connect -> none
connect -> bool?
transport/ActivityMonitoringTransport
--is-initial-connection /bool
[--reconnect-transport]
[--disconnect-transport]
[--send-connect]
[--receive-connect-ack]
[--disconnect]:
session-exists := do-connect transport
return do-connect transport
--reuse-connection = is-initial-connection
--reconnect-transport = reconnect-transport
--disconnect-transport = disconnect-transport
--send-connect = send-connect
--receive-connect-ack = receive-connect-ack

if is-initial-connection or session-exists: return

// The session was reset.
// Disconnect and throw. If the user wants to, they should create a new client.
catch: disconnect.call
throw "SESSION_EXPIRED"

should-try-reconnect transport/ActivityMonitoringTransport -> bool:
return transport.supports-reconnect

Expand Down Expand Up @@ -544,25 +542,21 @@ class TenaciousReconnectionStrategy extends ReconnectionStrategyBase:
--receive-connect-timeout=receive-connect-timeout
--delay-lambda=delay-lambda

connect -> none
connect -> bool?
transport/ActivityMonitoringTransport
--is-initial-connection /bool
[--reconnect-transport]
[--disconnect-transport]
[--send-connect]
[--receive-connect-ack]
[--disconnect]:
session-exists := do-connect transport
return do-connect transport
--reuse-connection = is-initial-connection
--reconnect-transport = reconnect-transport
--disconnect-transport = disconnect-transport
--send-connect = send-connect
--receive-connect-ack = receive-connect-ack

// We don't care if the session exists or not.
// This strategy just wants to reconnect.
return

should-try-reconnect transport/ActivityMonitoringTransport -> bool:
return transport.supports-reconnect

Expand Down Expand Up @@ -1261,8 +1255,9 @@ class FullClient:
--keep-alive=session_.options.keep-alive
--logger=logger_

should-force-close := true
try:
reconnection-strategy_.connect transport_
had-session := reconnection-strategy_.connect transport_
--is-initial-connection = is-initial-connection
--reconnect-transport = :
transport_.reconnect
Expand Down Expand Up @@ -1296,24 +1291,28 @@ class FullClient:
ack.session-present
--disconnect = :
connection_.write DisconnectPacket
finally: | is-exception exception |
if is-exception:
close --force

// If we are here, then the reconnection succeeded.
// If we are here, then the reconnection succeeded.
// If we throw now, then we should go through the standard way of dealing with a disconnect.
should-force-close = false

// Make sure the connection sends pings so the broker doesn"t drop us.
connection_.keep-alive --background
// Make sure the connection sends pings so the broker doesn"t drop us.
connection_.keep-alive --background

// Resend the pending messages.
session_.do --pending: | persisted/PersistedPacket |
packet-id := persisted.packet-id
topic := persisted.topic
payload := persisted.payload
retain := persisted.retain
packet := PublishPacket topic payload --packet-id=packet-id --qos=1 --retain=retain --duplicate
connection_.write packet
// Resend the pending messages.
// If we didn't have a session, then we just send the data again. We might send the data too
// often, but there isn't much we can do.
session_.do --pending: | persisted/PersistedPacket |
packet-id := persisted.packet-id
topic := persisted.topic
payload := persisted.payload
retain := persisted.retain
packet := PublishPacket topic payload --packet-id=packet-id --qos=1 --retain=retain --duplicate=had-session
connection_.write packet

finally: | is-exception exception |
if is-exception and should-force-close:
close --force
/** Tears down the client. */
tear-down_:
critical-do:
Expand Down
9 changes: 7 additions & 2 deletions tests/persistence_test.toit
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import .packet-test-client
Tests that the persistence store stores unsent packets, and that a new
client can reuse that persistence store.
*/
test create-transport/Lambda --logger/log.Logger:
test create-transport/Lambda --clean-session/bool --logger/log.Logger:
persistence-store := mqtt.MemoryPersistenceStore
id := "persistence_client_id"

Expand All @@ -34,6 +34,7 @@ test create-transport/Lambda --logger/log.Logger:

with-packet-client create-transport
--client-id = id
--clean-session = clean-session
--write-filter = write-filter
--persistence-store = persistence-store
--logger=logger: | client/mqtt.FullClient wait-for-idle/Lambda _ _ |
Expand Down Expand Up @@ -66,6 +67,7 @@ test create-transport/Lambda --logger/log.Logger:
// We reconnect with a new client reusing the same persistence store.
with-packet-client create-transport
--client-id = id
--clean-session = clean-session
--persistence-store = persistence-store
--read-filter = read-filter
--logger=logger: | client/mqtt.FullClient wait-for-idle/Lambda _ get-activity/Lambda |
Expand All @@ -87,12 +89,15 @@ test create-transport/Lambda --logger/log.Logger:
publish-packets := (activity.filter: it[0] == "write" and it[1] is mqtt.PublishPacket).map: it[1]
publish-packets.filter --in-place: it.topic == "to_be_intercepted"
expect-equals 1 publish-packets.size
expect-equals (not clean-session) publish-packets[0].duplicate

main args:
test-with-mosquitto := args.contains "--mosquitto"
log-level := log.ERROR-LEVEL
logger := log.default.with-level log-level

run-test := : | create-transport/Lambda | test create-transport --logger=logger
run-test := : | create-transport/Lambda |
test create-transport --logger=logger --clean-session
test create-transport --logger=logger --no-clean-session
if test-with-mosquitto: with-mosquitto --logger=logger run-test
else: with-internal-broker --logger=logger run-test

0 comments on commit b0183e5

Please sign in to comment.