Skip to content

Commit

Permalink
Wrap process manager commands with :ok tagged tuple in telemetry stop
Browse files Browse the repository at this point in the history
  • Loading branch information
slashdotdash committed Jan 31, 2021
1 parent d5e45c8 commit 0292609
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 77 deletions.
4 changes: 2 additions & 2 deletions .tool-versions
Original file line number Diff line number Diff line change
@@ -1,2 1,2 @@
elixir 1.11.2-otp-23
erlang 23.2.1
elixir 1.11.3-otp-23
erlang 23.2.3
116 changes: 59 additions & 57 deletions lib/commanded/process_managers/process_manager_instance.ex
Original file line number Diff line number Diff line change
Expand Up @@ -175,13 175,8 @@ defmodule Commanded.ProcessManagers.ProcessManagerInstance do
defp process_seen_event(%RecordedEvent{} = event, %State{} = state) do
%State{idle_timeout: idle_timeout} = state

telemetry_metadata = telemetry_metadata(event, state)
start_time = telemetry_start(telemetry_metadata)

:ok = ack_event(event, state)

telemetry_stop(start_time, telemetry_metadata, [])

{:noreply, state, idle_timeout}
end

Expand All @@ -196,51 191,64 @@ defmodule Commanded.ProcessManagers.ProcessManagerInstance do

case handle_event(event, state) do
{:error, error} ->
failure_context = build_failure_context(event, context, nil, state)
failure_context = %FailureContext{
context: context,
last_event: event,
process_manager_state: state
}

telemetry_stop(start_time, telemetry_metadata, {:error, error})

handle_event_error({:error, error}, event, failure_context, state)

{:error, error, stacktrace} ->
failure_context = build_failure_context(event, context, stacktrace, state)
failure_context = %FailureContext{
context: context,
last_event: event,
process_manager_state: state,
stacktrace: stacktrace
}

telemetry_stop(start_time, telemetry_metadata, {:error, error, stacktrace})

handle_event_error({:error, error}, event, failure_context, state)

commands ->
commands = List.wrap(commands)

# Copy event id, as causation id, and correlation id from handled event.
opts = [causation_id: event_id, correlation_id: correlation_id, returning: false]

commands
|> List.wrap()
|> dispatch_commands(opts, state, event)
|> case do
:ok ->
telemetry_stop(start_time, telemetry_metadata, commands)

case mutate_state(event, state) do
{:error, error, stacktrace} ->
failure_context = build_failure_context(event, context, stacktrace, state)
with :ok <- dispatch_commands(commands, opts, state, event) do
telemetry_stop(start_time, telemetry_metadata, {:ok, commands})

handle_event_error({:error, error}, event, failure_context, state)
case mutate_state(event, state) do
{:error, error, stacktrace} ->
failure_context = %FailureContext{
context: context,
last_event: event,
process_manager_state: state,
stacktrace: stacktrace
}

process_state ->
state = %State{
state
| process_state: process_state,
last_seen_event: event_number
}
handle_event_error({:error, error}, event, failure_context, state)

:ok = persist_state(event_number, state)
:ok = ack_event(event, state)
process_state ->
state = %State{
state
| process_state: process_state,
last_seen_event: event_number
}

{:noreply, state, idle_timeout}
end
:ok = persist_state(event_number, state)
:ok = ack_event(event, state)

{:noreply, state, idle_timeout}
end
else
{:stop, reason} ->
telemetry_stop(start_time, telemetry_metadata, {:error, reason})

{:stop, reason, state}
end
end
Expand All @@ -267,16 275,6 @@ defmodule Commanded.ProcessManagers.ProcessManagerInstance do
end
end

defp build_failure_context(failed_event, context, stacktrace, state) do
%FailureContext{
context: context,
last_event: failed_event,
pending_commands: [],
process_manager_state: state,
stacktrace: stacktrace
}
end

defp handle_event_error(
{:error, reason} = error,
%RecordedEvent{} = failed_event,
Expand Down Expand Up @@ -352,7 350,7 @@ defmodule Commanded.ProcessManagers.ProcessManagerInstance do
end
end

# update the process instance's state by applying the event
# Update the process instance's state by applying the event.
defp mutate_state(%RecordedEvent{} = event, %State{} = state) do
%RecordedEvent{data: data} = event

Expand Down Expand Up @@ -543,6 541,12 @@ defmodule Commanded.ProcessManagers.ProcessManagerInstance do
event_prefix = [:commanded, :process_manager, :handle]

case handle_result do
{:ok, commands} ->
telemetry_metadata =
telemetry_metadata |> Map.put(:commands, commands) |> Map.put(:error, nil)

Telemetry.stop(event_prefix, start_time, telemetry_metadata)

{:error, error} ->
telemetry_metadata =
telemetry_metadata
Expand All @@ -560,27 564,25 @@ defmodule Commanded.ProcessManagers.ProcessManagerInstance do
stacktrace,
telemetry_metadata
)

commands ->
commands = List.wrap(commands)

telemetry_metadata =
telemetry_metadata |> Map.put(:commands, commands) |> Map.put(:error, nil)

Telemetry.stop(event_prefix, start_time, telemetry_metadata)
end
end

defp telemetry_metadata(%RecordedEvent{} = event, %State{} = state) do
state
|> Map.from_struct()
|> Map.take([
:application,
:process_manager_name,
:process_manager_module,
:process_state,
:process_uuid
])
|> Map.put(:recorded_event, event)
%State{
application: application,
process_manager_name: process_manager_name,
process_manager_module: process_manager_module,
process_state: process_state,
process_uuid: process_uuid
} = state

%{
application: application,
process_manager_name: process_manager_name,
process_manager_module: process_manager_module,
process_state: process_state,
process_uuid: process_uuid,
recorded_event: event
}
end
end
6 changes: 3 additions & 3 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 30,8 @@ defmodule Commanded.Mixfile do
]
end

defp extra_applications(:test), do: [:logger, :phoenix_pubsub]
defp extra_applications(_env), do: [:logger]
defp extra_applications(:test), do: [:crypto, :logger, :phoenix_pubsub]
defp extra_applications(_env), do: [:crypto, :logger]

defp elixirc_paths(env) when env in [:bench, :test],
do: [
Expand Down Expand Up @@ -72,7 72,7 @@ defmodule Commanded.Mixfile do
{:ex_doc, ">= 0.0.0", only: :dev},
{:local_cluster, "~> 1.1", only: :test, runtime: false},
{:mix_test_watch, "~> 1.0", only: :dev},
{:mox, "~> 0.5", only: [:bench, :test]}
{:mox, "~> 1.0", only: [:bench, :test]}
]
end

Expand Down
18 changes: 9 additions & 9 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 2,19 @@
"backoff": {:hex, :backoff, "1.1.6", "83b72ed2108ba1ee8f7d1c22e0b4a00cfe3593a67dbc792799e8cce9f42f796b", [:rebar3], [], "hexpm", "cf0cfff8995fb20562f822e5cc47d8ccf664c5ecdc26a684cbe85c225f9d7c39"},
"benchfella": {:hex, :benchfella, "0.3.5", "b2122c234117b3f91ed7b43b6e915e19e1ab216971154acd0a80ce0e9b8c05f5", [:mix], [], "hexpm", "23f27cbc482cbac03fc8926441eb60a5e111759c17642bac005c3225f5eb809d"},
"dialyxir": {:hex, :dialyxir, "1.0.0", "6a1fa629f7881a9f5aaf3a78f094b2a51a0357c843871b8bc98824e7342d00a5", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "aeb06588145fac14ca08d8061a142d52753dbc2cf7f0d00fc1013f53f8654654"},
"earmark_parser": {:hex, :earmark_parser, "1.4.10", "6603d7a603b9c18d3d20db69921527f82ef09990885ed7525003c7fe7dc86c56", [:mix], [], "hexpm", "8e2d5370b732385db2c9b22215c3f59c84ac7dda7ed7e544d7c486396ae519c0"},
"earmark_parser": {:hex, :earmark_parser, "1.4.12", "b245e875ec0a311a342320da0551da407d9d2b65d98f7a9597ae078615af3449", [:mix], [], "hexpm", "711e2cc4d64abb7d566d43f54b78f7dc129308a63bc103fbd88550d2174b3160"},
"elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"},
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
"ex_doc": {:hex, :ex_doc, "0.22.2", "03a2a58bdd2ba0d83d004507c4ee113b9c521956938298eba16e55cc4aba4a6c", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm", "cf60e1b3e2efe317095b6bb79651f83a2c1b3edcb4d319c421d7fcda8b3aff26"},
"file_system": {:hex, :file_system, "0.2.8", "f632bd287927a1eed2b718f22af727c5aeaccc9a98d8c2bd7bff709e851dc986", [:mix], [], "hexpm", "97a3b6f8d63ef53bd0113070102db2ce05352ecf0d25390eb8d747c2bde98bca"},
"ex_doc": {:hex, :ex_doc, "0.23.0", "a069bc9b0bf8efe323ecde8c0d62afc13d308b1fa3d228b65bca5cf8703a529d", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm", "f5e2c4702468b2fd11b10d39416ddadd2fcdd173ba2a0285ebd92c39827a5a16"},
"file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"},
"global_flags": {:hex, :global_flags, "1.0.0", "ee6b864979a1fb38d1fbc67838565644baf632212bce864adca21042df036433", [:rebar3], [], "hexpm", "85d944cecd0f8f96b20ce70b5b16ebccedfcd25e744376b131e89ce61ba93176"},
"jason": {:hex, :jason, "1.2.1", "12b22825e22f468c02eb3e4b9985f3d0cb8dc40b9bd704730efa11abd2708c44", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "b659b8571deedf60f79c5a608e15414085fa141344e2716fbd6988a084b5f993"},
"local_cluster": {:hex, :local_cluster, "1.1.0", "a2a0e3e965aa1549939108066bfa537ce89f0107917f5b0260153e2fdb304116", [:mix], [{:global_flags, "~> 1.0", [hex: :global_flags, repo: "hexpm", optional: false]}], "hexpm", "fef6476083cf6f4c0526bb682de7ff75cd8b4bd4b7e20b3be60c1dab05f28ca7"},
"makeup": {:hex, :makeup, "1.0.3", "e339e2f766d12e7260e6672dd4047405963c5ec99661abdc432e6ec67d29ef95", [:mix], [{:nimble_parsec, "~> 0.5", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "2e9b4996d11832947731f7608fed7ad2f9443011b3b479ae288011265cdd3dad"},
"makeup_elixir": {:hex, :makeup_elixir, "0.14.1", "4f0e96847c63c17841d42c08107405a005a2680eb9c7ccadfd757bd31dabccfb", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "f2438b1a80eaec9ede832b5c41cd4f373b38fd7aa33e3b22d9db79e640cbde11"},
"jason": {:hex, :jason, "1.2.2", "ba43e3f2709fd1aa1dce90aaabfd039d000469c05c56f0b8e31978e03fa39052", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "18a228f5f0058ee183f29f9eae0805c6e59d61c3b006760668d8d18ff0d12179"},
"local_cluster": {:hex, :local_cluster, "1.2.1", "8eab3b8a387680f0872eacfb1a8bd5a91cb1d4d61256eec6a655b07ac7030c73", [:mix], [{:global_flags, "~> 1.0", [hex: :global_flags, repo: "hexpm", optional: false]}], "hexpm", "aae80c9bc92c911cb0be085fdeea2a9f5b88f81b6bec2ff1fec244bb0acc232c"},
"makeup": {:hex, :makeup, "1.0.5", "d5a830bc42c9800ce07dd97fa94669dfb93d3bf5fcf6ea7a0c67b2e0e4a7f26c", [:mix], [{:nimble_parsec, "~> 0.5 or ~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cfa158c02d3f5c0c665d0af11512fed3fba0144cf1aadee0f2ce17747fba2ca9"},
"makeup_elixir": {:hex, :makeup_elixir, "0.15.1", "b5888c880d17d1cc3e598f05cdb5b5a91b7b17ac4eaf5f297cb697663a1094dd", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.1", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "db68c173234b07ab2a07f645a5acdc117b9f99d69ebf521821d89690ae6c6ec8"},
"mix_test_watch": {:hex, :mix_test_watch, "1.0.2", "34900184cbbbc6b6ed616ed3a8ea9b791f9fd2088419352a6d3200525637f785", [:mix], [{:file_system, "~> 0.2.1 or ~> 0.3", [hex: :file_system, repo: "hexpm", optional: false]}], "hexpm", "47ac558d8b06f684773972c6d04fcc15590abdb97aeb7666da19fcbfdc441a07"},
"mox": {:hex, :mox, "0.5.2", "55a0a5ba9ccc671518d068c8dddd20eeb436909ea79d1799e2209df7eaa98b6c", [:mix], [], "hexpm", "df4310628cd628ee181df93f50ddfd07be3e5ecc30232d3b6aadf30bdfe6092b"},
"nimble_parsec": {:hex, :nimble_parsec, "0.6.0", "32111b3bf39137144abd7ba1cce0914533b2d16ef35e8abc5ec8be6122944263", [:mix], [], "hexpm", "27eac315a94909d4dc68bc07a4a83e06c8379237c5ea528a9acff4ca1c873c52"},
"mox": {:hex, :mox, "1.0.0", "4b3c7005173f47ff30641ba044eb0fe67287743eec9bd9545e37f3002b0a9f8b", [:mix], [], "hexpm", "201b0a20b7abdaaab083e9cf97884950f8a30a1350a1da403b3145e213c6f4df"},
"nimble_parsec": {:hex, :nimble_parsec, "1.1.0", "3a6fca1550363552e54c216debb6a9e95bd8d32348938e13de5eda962c0d7f89", [:mix], [], "hexpm", "08eb32d66b706e913ff748f11694b17981c0b04a33ef470e33e11b3d3ac8f54b"},
"phoenix_pubsub": {:hex, :phoenix_pubsub, "2.0.0", "a1ae76717bb168cdeb10ec9d92d1480fec99e3080f011402c0a2d68d47395ffb", [:mix], [], "hexpm", "c52d948c4f261577b9c6fa804be91884b381a7f8f18450c5045975435350f771"},
"telemetry": {:hex, :telemetry, "0.4.2", "2808c992455e08d6177322f14d3bdb6b625fbcfd233a73505870d8738a2f4599", [:rebar3], [], "hexpm", "2d1419bd9dda6a206d7b5852179511722e2b18812310d304620c7bd92a13fcef"},
"telemetry_registry": {:hex, :telemetry_registry, "0.2.1", "fe648a691f2128e4279d993cd010994c67f282354dc061e697bf070d4b87b480", [:mix, :rebar3], [{:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "4221cefbcadd0b3e7076960339223742d973f1371bc20f3826af640257bc3690"},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 7,7 @@ defmodule Commanded.Commands.StronglyConsistentEventHandler do
alias Commanded.Commands.{ConsistencyAggregateRoot, ConsistencyApp}
alias ConsistencyAggregateRoot.{ConsistencyCommand, ConsistencyEvent, DispatchRequestedEvent}

@doc """
Dispatch a command with consistency `:strong` after an optional delay.
"""
# Dispatch a command with consistency `:strong` after an optional delay.
def handle(%DispatchRequestedEvent{uuid: uuid, delay: delay}, _metadata) do
:timer.sleep(delay)

Expand All @@ -18,9 16,7 @@ defmodule Commanded.Commands.StronglyConsistentEventHandler do
ConsistencyApp.dispatch(command, consistency: :strong)
end

@doc """
Block for a requested delay.
"""
# Block for a requested delay.
def handle(%ConsistencyEvent{delay: delay}, _metadata) do
:timer.sleep(delay)

Expand Down

0 comments on commit 0292609

Please sign in to comment.