changed CHANGELOG.md
 
@@ -1,3 1,33 @@
1
# 2.1.0
2
3
Version 2.1 add support for requeing jobs, fixes to the job reservation algorithm and dialyzer warnings.
4
5
[#34](https://github.com/mbuhot/ecto_job/pull/34) - [mkorszun](https://github.com/mkorszun) New API to requeue a failed job :
6
7
Requeuing will:
8
9
* set `state` to `SCHEDULED`
10
* set `attempt` to `0`
11
* set `expires` to `nil`
12
13
```elixir
14
Ecto.Multi.new()
15
|> MyApp.Job.requeue("requeue_job", failed_job)
16
|> MyApp.Repo.transaction()
17
```
18
19
[#43](https://github.com/mbuhot/ecto_job/pull/43) - [mbuhot](https://github.com/mbuhot), [seangeo](https://github.com/seangeo) - Fixed issue where too many rows would be locked, causing negative demand in GenStage producer. See [this document](https://github.com/feikesteenbergen/demos/blob/master/bugs/update_from_correlated.adoc) for additional details.
20
21
[#41](https://github.com/mbuhot/ecto_job/pull/41) - [mbuhot](https://github.com/mbuhot) - Fixed dialyzer warnings in `JobQueue` modules
22
23
[#42](https://github.com/mbuhot/ecto_job/pull/42) - [sneako](https://github.com/sneako) - Improved documentation
24
25
26
[#48](https://github.com/mbuhot/ecto_job/pull/48) - [darksheik](https://github.com/darksheik) - Improved documentation
27
28
Thankyou contributors!
29
30
1
31
# 2.0.0
2
32
3
33
EctoJob 2.0 adds support for Ecto 3.0.
changed README.md
 
@@ -24,7 24,7 @@ A transactional job queue built with Ecto, PostgreSQL and GenStage
24
24
Add `:ecto_job` to your `dependencies`
25
25
26
26
```elixir
27
- {:ecto_job, "~> 2.0"}
27
{:ecto_job, "~> 2.1"}
28
28
```
29
29
30
30
## Installation
 
@@ -125,7 125,7 @@ defmodule MyApp.JobQueue do
125
125
126
126
def perform(multi = �to.Multi{}, job = %{"type" => "SendEmail", "recipient" => recipient, "body" => body}) do
127
127
multi
128
- |> Ecto.Multi.run(:send, fn _ -> EmailService.send(recipient, body) end)
128
|> Ecto.Multi.run(:send, fn _repo, _changes -> EmailService.send(recipient, body) end)
129
129
|> Ecto.Multi.insert(:stats, %EmailSendStats{recipient: recipient})
130
130
|> MyApp.Repo.transaction()
131
131
end
 
@@ -138,9 138,9 @@ When a queue handles multiple job types, it is useful to pattern match on the jo
138
138
defmodule MyApp.JobQueue do
139
139
use EctoJob.JobQueue, table_name: "jobs"
140
140
141
- def perform(multi = �to.Multi{}, job = %{"type" => "SendEmail"}, do: MyApp.SendEmail.perform(multi, job)
142
- def perform(multi = �to.Multi{}, job = %{"type" => "CustomerReport"}, do: MyApp.CustomerReport.perform(multi, job)
143
- def perform(multi = �to.Multi{}, job = %{"type" => "SyncWithCRM"}, do: MyApp.CRMSync.perform(multi, job)
141
def perform(multi = �to.Multi{}, job = %{"type" => "SendEmail"}), do: MyApp.SendEmail.perform(multi, job)
142
def perform(multi = �to.Multi{}, job = %{"type" => "CustomerReport"}), do: MyApp.CustomerReport.perform(multi, job)
143
def perform(multi = �to.Multi{}, job = %{"type" => "SyncWithCRM"}), do: MyApp.CRMSync.perform(multi, job)
144
144
...
145
145
end
146
146
```
changed hex_metadata.config
 
@@ -11,7 11,6 @@
11
11
<<"mix.exs">>,<<"README.md">>,<<"LICENSE">>,<<"CHANGELOG.md">>]}.
12
12
{<<"licenses">>,[<<"MIT">>]}.
13
13
{<<"links">>,[{<<"Github">>,<<"https://github.com/mbuhot/ecto_job">>}]}.
14
- {<<"maintainers">>,[<<"Mike Buhot ([email protected])">>]}.
15
14
{<<"name">>,<<"ecto_job">>}.
16
15
{<<"requirements">>,
17
16
[[{<<"app">>,<<"gen_stage">>},
 
@@ -34,4 33,4 @@
34
33
{<<"optional">>,false},
35
34
{<<"repository">>,<<"hexpm">>},
36
35
{<<"requirement">>,<<"~> 3.0">>}]]}.
37
- {<<"version">>,<<"2.0.0">>}.
36
{<<"version">>,<<"2.1.0">>}.
changed lib/ecto_job/job_queue.ex
 
@@ -45,11 45,12 @@ defmodule EctoJob.JobQueue do
45
45
"""
46
46
@type job :: %{
47
47
__struct__: module,
48
__meta__: Ecto.Schema.Metadata.t(),
48
49
id: integer | nil,
49
50
state: state,
50
51
expires: DateTime.t() | nil,
51
52
schedule: DateTime.t() | nil,
52
- attempt: integer(),
53
attempt: integer,
53
54
max_attempts: integer | nil,
54
55
params: map(),
55
56
notify: String.t() | nil,
 
@@ -175,6 176,28 @@ defmodule EctoJob.JobQueue do
175
176
def enqueue(multi, name, params, opts \\ []) do
176
177
Multi.insert(multi, name, new(params, opts))
177
178
end
179
180
@doc """
181
Requeues failed job by adding to an `Ecto.Multi` update statement,
182
which will:
183
* set `state` to `SCHEDULED`
184
* set `attempt` to `0`
185
* set `expires` to `nil`
186
187
## Example:
188
189
Ecto.Multi.new()
190
|> MyApp.Job.requeue("requeue_job", failed_job)
191
|> MyApp.Repo.transaction()
192
"""
193
@spec requeue(Multi.t(), term, EctoJob.JobQueue.job()) ::
194
Multi.t() | {:error, :non_failed_job}
195
def requeue(multi, name, job = %__MODULE__{state: "FAILED"}) do
196
job_to_requeue = Changeset.change(job, %{state: "SCHEDULED", attempt: 0, expires: nil})
197
Multi.update(multi, name, job_to_requeue)
198
end
199
200
def requeue(_, _, _), do: {:error, :non_failed_job}
178
201
end
179
202
end
180
203
 
@@ -248,10 271,20 @@ defmodule EctoJob.JobQueue do
248
271
"""
249
272
@spec reserve_available_jobs(repo, schema, integer, DateTime.t(), integer) :: {integer, [job]}
250
273
def reserve_available_jobs(repo, schema, demand, now = �teTime{}, timeout_ms) do
251
- repo.update_all(
252
- available_jobs(schema, demand),
253
- set: [state: "RESERVED", expires: reservation_expiry(now, timeout_ms), updated_at: now]
254
- )
274
{:ok, result} =
275
repo.transaction(fn ->
276
# Force the materialization of the claimed Job IDs,
277
# otherwise postgres may return more rows than intended.
278
# see (https://github.com/feikesteenbergen/demos/blob/master/bugs/update_from_correlated.adoc)
279
job_ids = repo.all(available_jobs(schema, demand))
280
281
repo.update_all(
282
Query.from(job in schema, where: job.id in ^job_ids, select: job),
283
set: [state: "RESERVED", expires: reservation_expiry(now, timeout_ms), updated_at: now]
284
)
285
end)
286
287
result
255
288
end
256
289
257
290
@doc """
 
@@ -261,18 294,14 @@ defmodule EctoJob.JobQueue do
261
294
"""
262
295
@spec available_jobs(schema, integer) :: Ecto.Query.t()
263
296
def available_jobs(schema, demand) do
264
- query =
265
- Query.from(
266
- job in schema,
267
- where: job.state == "AVAILABLE",
268
- order_by: [asc: job.schedule, asc: job.id],
269
- lock: "FOR UPDATE SKIP LOCKED",
270
- limit: ^demand,
271
- select: [:id]
272
- )
273
-
274
- # Ecto doesn't support subquery in where clause, so use join as workaround
275
- Query.from(job in schema, join: x in subquery(query), on: job.id == x.id, select: job)
297
Query.from(
298
job in schema,
299
where: job.state == "AVAILABLE",
300
order_by: [asc: job.schedule, asc: job.id],
301
lock: "FOR UPDATE SKIP LOCKED",
302
limit: ^demand,
303
select: job.id
304
)
276
305
end
277
306
278
307
@doc """
changed lib/ecto_job/worker.ex
 
@@ -31,7 31,7 @@ defmodule EctoJob.Worker do
31
31
32
32
@spec log_duration(Config.t, EctoJob.JobQueue.job(), DateTime.t()) :: :ok
33
33
defp log_duration(%Config{log: true, log_level: log_level}, _job = %queue{id: id}, start = �teTime{}) do
34
- duration = DateTime.diff(DateTime.utc_now(), start, :microseconds)
34
duration = DateTime.diff(DateTime.utc_now(), start, :microsecond)
35
35
Logger.log(log_level, fn -> "#{queue}[#{id}] done: #{duration} µs" end)
36
36
end
37
37
defp log_duration(_config, _job, _start), do: :ok
changed mix.exs
 
@@ -1,7 1,7 @@
1
1
defmodule EctoJob.Mixfile do
2
2
use Mix.Project
3
3
4
- @version "2.0.0"
4
@version "2.1.0"
5
5
@url "https://github.com/mbuhot/ecto_job"
6
6
7
7
def project do
 
@@ -39,10 39,7 @@ defmodule EctoJob.Mixfile do
39
39
40
40
defp dialyzer do
41
41
[
42
- plt_add_apps: [:mix],
43
- plt_add_deps: :apps_direct,
44
- flags: ["-Werror_handling", "-Wno_unused", "-Wunmatched_returns", "-Wunderspecs"],
45
- remove_defaults: [:unknown]
42
flags: ["-Werror_handling", "-Wno_unused", "-Wunmatched_returns", "-Wunderspecs"]
46
43
]
47
44
end
48
45
 
@@ -63,7 60,7 @@ defmodule EctoJob.Mixfile do
63
60
{:jason, "~> 1.0"},
64
61
{:gen_stage, "~> 0.13"},
65
62
{:credo, "~> 1.0", only: :dev, runtime: false},
66
- {:dialyxir, "~> 0.5", only: :dev, runtime: false},
63
{:dialyxir, "~> 1.0.0-rc.6", only: :dev, runtime: false},
67
64
{:ex_doc, "~> 0.12", only: :dev, runtime: false},
68
65
{:inch_ex, ">= 0.4.0", only: :dev, runtime: false}
69
66
]