changed
CHANGELOG.md
|
@@ -1,3 1,33 @@
|
1
|
# 2.1.0
|
2
|
|
3
|
Version 2.1 add support for requeing jobs, fixes to the job reservation algorithm and dialyzer warnings.
|
4
|
|
5
|
[#34](https://github.com/mbuhot/ecto_job/pull/34) - [mkorszun](https://github.com/mkorszun) New API to requeue a failed job :
|
6
|
|
7
|
Requeuing will:
|
8
|
|
9
|
* set `state` to `SCHEDULED`
|
10
|
* set `attempt` to `0`
|
11
|
* set `expires` to `nil`
|
12
|
|
13
|
```elixir
|
14
|
Ecto.Multi.new()
|
15
|
|> MyApp.Job.requeue("requeue_job", failed_job)
|
16
|
|> MyApp.Repo.transaction()
|
17
|
```
|
18
|
|
19
|
[#43](https://github.com/mbuhot/ecto_job/pull/43) - [mbuhot](https://github.com/mbuhot), [seangeo](https://github.com/seangeo) - Fixed issue where too many rows would be locked, causing negative demand in GenStage producer. See [this document](https://github.com/feikesteenbergen/demos/blob/master/bugs/update_from_correlated.adoc) for additional details.
|
20
|
|
21
|
[#41](https://github.com/mbuhot/ecto_job/pull/41) - [mbuhot](https://github.com/mbuhot) - Fixed dialyzer warnings in `JobQueue` modules
|
22
|
|
23
|
[#42](https://github.com/mbuhot/ecto_job/pull/42) - [sneako](https://github.com/sneako) - Improved documentation
|
24
|
|
25
|
|
26
|
[#48](https://github.com/mbuhot/ecto_job/pull/48) - [darksheik](https://github.com/darksheik) - Improved documentation
|
27
|
|
28
|
Thankyou contributors!
|
29
|
|
30
|
|
1
31
|
# 2.0.0
|
2
32
|
|
3
33
|
EctoJob 2.0 adds support for Ecto 3.0.
|
changed
README.md
|
@@ -24,7 24,7 @@ A transactional job queue built with Ecto, PostgreSQL and GenStage
|
24
24
|
Add `:ecto_job` to your `dependencies`
|
25
25
|
|
26
26
|
```elixir
|
27
|
- {:ecto_job, "~> 2.0"}
|
27
|
{:ecto_job, "~> 2.1"}
|
28
28
|
```
|
29
29
|
|
30
30
|
## Installation
|
|
@@ -125,7 125,7 @@ defmodule MyApp.JobQueue do
|
125
125
|
|
126
126
|
def perform(multi = �to.Multi{}, job = %{"type" => "SendEmail", "recipient" => recipient, "body" => body}) do
|
127
127
|
multi
|
128
|
- |> Ecto.Multi.run(:send, fn _ -> EmailService.send(recipient, body) end)
|
128
|
|> Ecto.Multi.run(:send, fn _repo, _changes -> EmailService.send(recipient, body) end)
|
129
129
|
|> Ecto.Multi.insert(:stats, %EmailSendStats{recipient: recipient})
|
130
130
|
|> MyApp.Repo.transaction()
|
131
131
|
end
|
|
@@ -138,9 138,9 @@ When a queue handles multiple job types, it is useful to pattern match on the jo
|
138
138
|
defmodule MyApp.JobQueue do
|
139
139
|
use EctoJob.JobQueue, table_name: "jobs"
|
140
140
|
|
141
|
- def perform(multi = �to.Multi{}, job = %{"type" => "SendEmail"}, do: MyApp.SendEmail.perform(multi, job)
|
142
|
- def perform(multi = �to.Multi{}, job = %{"type" => "CustomerReport"}, do: MyApp.CustomerReport.perform(multi, job)
|
143
|
- def perform(multi = �to.Multi{}, job = %{"type" => "SyncWithCRM"}, do: MyApp.CRMSync.perform(multi, job)
|
141
|
def perform(multi = �to.Multi{}, job = %{"type" => "SendEmail"}), do: MyApp.SendEmail.perform(multi, job)
|
142
|
def perform(multi = �to.Multi{}, job = %{"type" => "CustomerReport"}), do: MyApp.CustomerReport.perform(multi, job)
|
143
|
def perform(multi = �to.Multi{}, job = %{"type" => "SyncWithCRM"}), do: MyApp.CRMSync.perform(multi, job)
|
144
144
|
...
|
145
145
|
end
|
146
146
|
```
|
changed
hex_metadata.config
|
@@ -11,7 11,6 @@
|
11
11
|
<<"mix.exs">>,<<"README.md">>,<<"LICENSE">>,<<"CHANGELOG.md">>]}.
|
12
12
|
{<<"licenses">>,[<<"MIT">>]}.
|
13
13
|
{<<"links">>,[{<<"Github">>,<<"https://github.com/mbuhot/ecto_job">>}]}.
|
14
|
- {<<"maintainers">>,[<<"Mike Buhot ([email protected])">>]}.
|
15
14
|
{<<"name">>,<<"ecto_job">>}.
|
16
15
|
{<<"requirements">>,
|
17
16
|
[[{<<"app">>,<<"gen_stage">>},
|
|
@@ -34,4 33,4 @@
|
34
33
|
{<<"optional">>,false},
|
35
34
|
{<<"repository">>,<<"hexpm">>},
|
36
35
|
{<<"requirement">>,<<"~> 3.0">>}]]}.
|
37
|
- {<<"version">>,<<"2.0.0">>}.
|
36
|
{<<"version">>,<<"2.1.0">>}.
|
changed
lib/ecto_job/job_queue.ex
|
@@ -45,11 45,12 @@ defmodule EctoJob.JobQueue do
|
45
45
|
"""
|
46
46
|
@type job :: %{
|
47
47
|
__struct__: module,
|
48
|
__meta__: Ecto.Schema.Metadata.t(),
|
48
49
|
id: integer | nil,
|
49
50
|
state: state,
|
50
51
|
expires: DateTime.t() | nil,
|
51
52
|
schedule: DateTime.t() | nil,
|
52
|
- attempt: integer(),
|
53
|
attempt: integer,
|
53
54
|
max_attempts: integer | nil,
|
54
55
|
params: map(),
|
55
56
|
notify: String.t() | nil,
|
|
@@ -175,6 176,28 @@ defmodule EctoJob.JobQueue do
|
175
176
|
def enqueue(multi, name, params, opts \\ []) do
|
176
177
|
Multi.insert(multi, name, new(params, opts))
|
177
178
|
end
|
179
|
|
180
|
@doc """
|
181
|
Requeues failed job by adding to an `Ecto.Multi` update statement,
|
182
|
which will:
|
183
|
* set `state` to `SCHEDULED`
|
184
|
* set `attempt` to `0`
|
185
|
* set `expires` to `nil`
|
186
|
|
187
|
## Example:
|
188
|
|
189
|
Ecto.Multi.new()
|
190
|
|> MyApp.Job.requeue("requeue_job", failed_job)
|
191
|
|> MyApp.Repo.transaction()
|
192
|
"""
|
193
|
@spec requeue(Multi.t(), term, EctoJob.JobQueue.job()) ::
|
194
|
Multi.t() | {:error, :non_failed_job}
|
195
|
def requeue(multi, name, job = %__MODULE__{state: "FAILED"}) do
|
196
|
job_to_requeue = Changeset.change(job, %{state: "SCHEDULED", attempt: 0, expires: nil})
|
197
|
Multi.update(multi, name, job_to_requeue)
|
198
|
end
|
199
|
|
200
|
def requeue(_, _, _), do: {:error, :non_failed_job}
|
178
201
|
end
|
179
202
|
end
|
180
203
|
|
|
@@ -248,10 271,20 @@ defmodule EctoJob.JobQueue do
|
248
271
|
"""
|
249
272
|
@spec reserve_available_jobs(repo, schema, integer, DateTime.t(), integer) :: {integer, [job]}
|
250
273
|
def reserve_available_jobs(repo, schema, demand, now = �teTime{}, timeout_ms) do
|
251
|
- repo.update_all(
|
252
|
- available_jobs(schema, demand),
|
253
|
- set: [state: "RESERVED", expires: reservation_expiry(now, timeout_ms), updated_at: now]
|
254
|
- )
|
274
|
{:ok, result} =
|
275
|
repo.transaction(fn ->
|
276
|
# Force the materialization of the claimed Job IDs,
|
277
|
# otherwise postgres may return more rows than intended.
|
278
|
# see (https://github.com/feikesteenbergen/demos/blob/master/bugs/update_from_correlated.adoc)
|
279
|
job_ids = repo.all(available_jobs(schema, demand))
|
280
|
|
281
|
repo.update_all(
|
282
|
Query.from(job in schema, where: job.id in ^job_ids, select: job),
|
283
|
set: [state: "RESERVED", expires: reservation_expiry(now, timeout_ms), updated_at: now]
|
284
|
)
|
285
|
end)
|
286
|
|
287
|
result
|
255
288
|
end
|
256
289
|
|
257
290
|
@doc """
|
|
@@ -261,18 294,14 @@ defmodule EctoJob.JobQueue do
|
261
294
|
"""
|
262
295
|
@spec available_jobs(schema, integer) :: Ecto.Query.t()
|
263
296
|
def available_jobs(schema, demand) do
|
264
|
- query =
|
265
|
- Query.from(
|
266
|
- job in schema,
|
267
|
- where: job.state == "AVAILABLE",
|
268
|
- order_by: [asc: job.schedule, asc: job.id],
|
269
|
- lock: "FOR UPDATE SKIP LOCKED",
|
270
|
- limit: ^demand,
|
271
|
- select: [:id]
|
272
|
- )
|
273
|
-
|
274
|
- # Ecto doesn't support subquery in where clause, so use join as workaround
|
275
|
- Query.from(job in schema, join: x in subquery(query), on: job.id == x.id, select: job)
|
297
|
Query.from(
|
298
|
job in schema,
|
299
|
where: job.state == "AVAILABLE",
|
300
|
order_by: [asc: job.schedule, asc: job.id],
|
301
|
lock: "FOR UPDATE SKIP LOCKED",
|
302
|
limit: ^demand,
|
303
|
select: job.id
|
304
|
)
|
276
305
|
end
|
277
306
|
|
278
307
|
@doc """
|
changed
lib/ecto_job/worker.ex
|
@@ -31,7 31,7 @@ defmodule EctoJob.Worker do
|
31
31
|
|
32
32
|
@spec log_duration(Config.t, EctoJob.JobQueue.job(), DateTime.t()) :: :ok
|
33
33
|
defp log_duration(%Config{log: true, log_level: log_level}, _job = %queue{id: id}, start = �teTime{}) do
|
34
|
- duration = DateTime.diff(DateTime.utc_now(), start, :microseconds)
|
34
|
duration = DateTime.diff(DateTime.utc_now(), start, :microsecond)
|
35
35
|
Logger.log(log_level, fn -> "#{queue}[#{id}] done: #{duration} µs" end)
|
36
36
|
end
|
37
37
|
defp log_duration(_config, _job, _start), do: :ok
|
changed
mix.exs
|
@@ -1,7 1,7 @@
|
1
1
|
defmodule EctoJob.Mixfile do
|
2
2
|
use Mix.Project
|
3
3
|
|
4
|
- @version "2.0.0"
|
4
|
@version "2.1.0"
|
5
5
|
@url "https://github.com/mbuhot/ecto_job"
|
6
6
|
|
7
7
|
def project do
|
|
@@ -39,10 39,7 @@ defmodule EctoJob.Mixfile do
|
39
39
|
|
40
40
|
defp dialyzer do
|
41
41
|
[
|
42
|
- plt_add_apps: [:mix],
|
43
|
- plt_add_deps: :apps_direct,
|
44
|
- flags: ["-Werror_handling", "-Wno_unused", "-Wunmatched_returns", "-Wunderspecs"],
|
45
|
- remove_defaults: [:unknown]
|
42
|
flags: ["-Werror_handling", "-Wno_unused", "-Wunmatched_returns", "-Wunderspecs"]
|
46
43
|
]
|
47
44
|
end
|
48
45
|
|
|
@@ -63,7 60,7 @@ defmodule EctoJob.Mixfile do
|
63
60
|
{:jason, "~> 1.0"},
|
64
61
|
{:gen_stage, "~> 0.13"},
|
65
62
|
{:credo, "~> 1.0", only: :dev, runtime: false},
|
66
|
- {:dialyxir, "~> 0.5", only: :dev, runtime: false},
|
63
|
{:dialyxir, "~> 1.0.0-rc.6", only: :dev, runtime: false},
|
67
64
|
{:ex_doc, "~> 0.12", only: :dev, runtime: false},
|
68
65
|
{:inch_ex, ">= 0.4.0", only: :dev, runtime: false}
|
69
66
|
]
|