changed
CHANGELOG.md
|
@@ -6,6 6,25 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
|
6
6
|
## [Unreleased]
|
7
7
|
|
8
8
|
|
9
|
## [0.18.0] - 2023-06-01
|
10
|
|
11
|
### Added
|
12
|
- Add enqueue_all for enqueuing a batch of jobs atomically #483 by @meysius
|
13
|
|
14
|
### Fixed
|
15
|
- Fix namespaced keys in scripts for ACL redis #481 by @korialis
|
16
|
|
17
|
## [0.17.0] - 2022-11-25
|
18
|
|
19
|
### Added
|
20
|
- Add Exq unique job feature, allow to enforce a single instance of job! #469 by @ananthakumaran
|
21
|
|
22
|
### Fixed
|
23
|
- Setup default redis timeout config #475 by @ananthakumaran
|
24
|
|
25
|
### Changed
|
26
|
- Use erlef/setup-beam and bump versions #476 by @kianmeng
|
27
|
|
9
28
|
## [0.16.2] - 2022-04-15
|
10
29
|
|
11
30
|
### Added
|
changed
README.md
|
@@ -71,7 71,7 @@ Add `:exq` to your `mix.exs` deps (replace version with the latest hex.pm packag
|
71
71
|
defp deps do
|
72
72
|
[
|
73
73
|
# ... other deps
|
74
|
- {:exq, "~> 0.16.2"}
|
74
|
{:exq, "~> 0.18.0"}
|
75
75
|
]
|
76
76
|
end
|
77
77
|
```
|
|
@@ -527,6 527,35 @@ feature depends on `Exq.Middleware.Unique` middleware. If you override
|
527
527
|
`:middleware` configuration, make sure to include it.
|
528
528
|
|
529
529
|
|
530
|
## Enqueuing Many Jobs Atomically
|
531
|
|
532
|
Similar to database transactions, there are cases where you may want to
|
533
|
enqueue/schedule many jobs atomically. A common usecase of this would be when you
|
534
|
have a computationally heavy job and you want to break it down to multiple smaller jobs so they
|
535
|
can be run concurrently. If you use a loop to enqueue/schedule these jobs,
|
536
|
and a network, connectivity, or application error occurs while passing these jobs to Exq,
|
537
|
you will end up in a situation where you have to roll back all the jobs that you may already
|
538
|
have scheduled/enqueued which will be a complicated process. In order to avoid this problem,
|
539
|
Exq comes with an `enqueue_all` method which guarantees atomicity.
|
540
|
|
541
|
|
542
|
```elixir
|
543
|
{:ok, [{:ok, jid_1}, {:ok, jid_2}, {:ok, jid_3}]} = Exq.enqueue_all(Exq, [
|
544
|
[job_1_queue, job_1_worker, job_1_args, job_1_options],
|
545
|
[job_2_queue, job_2_worker, job_2_args, job_2_options],
|
546
|
[job_3_queue, job_3_worker, job_3_args, job_3_options]
|
547
|
])
|
548
|
```
|
549
|
|
550
|
`enqueue_all` also supports scheduling jobs via `schedule` key in the `options` passed for each job:
|
551
|
```elixir
|
552
|
{:ok, [{:ok, jid_1}, {:ok, jid_2}, {:ok, jid_3}]} = Exq.enqueue_all(Exq, [
|
553
|
[job_1_queue, job_1_worker, job_1_args, [schedule: {:in, 60 * 60}]],
|
554
|
[job_2_queue, job_2_worker, job_2_args, [schedule: {:at, midnight}]],
|
555
|
[job_3_queue, job_3_worker, job_3_args, []] # no schedule key is present, it is enqueued immediately
|
556
|
])
|
557
|
```
|
558
|
|
530
559
|
## Web UI
|
531
560
|
|
532
561
|
Exq has a separate repo, exq_ui which provides with a Web UI to monitor your workers:
|
changed
hex_metadata.config
|
@@ -5,54 5,53 @@
|
5
5
|
{<<"elixir">>,<<"~> 1.6">>}.
|
6
6
|
{<<"files">>,
|
7
7
|
[<<"lib">>,<<"lib/mix">>,<<"lib/mix/tasks">>,<<"lib/mix/tasks/exq.run.ex">>,
|
8
|
- <<"lib/exq.ex">>,<<"lib/exq">>,<<"lib/exq/enqueuer.ex">>,
|
9
|
- <<"lib/exq/manager">>,<<"lib/exq/manager/server.ex">>,
|
10
|
- <<"lib/exq/worker_drainer">>,<<"lib/exq/worker_drainer/server.ex">>,
|
11
|
- <<"lib/exq/mock.ex">>,<<"lib/exq/dequeue">>,
|
12
|
- <<"lib/exq/dequeue/behaviour.ex">>,<<"lib/exq/dequeue/local.ex">>,
|
13
|
- <<"lib/exq/stats">>,<<"lib/exq/stats/server.ex">>,<<"lib/exq/heartbeat">>,
|
14
|
- <<"lib/exq/heartbeat/server.ex">>,<<"lib/exq/heartbeat/monitor.ex">>,
|
15
|
- <<"lib/exq/api.ex">>,<<"lib/exq/node_identifier">>,
|
8
|
<<"lib/exq">>,<<"lib/exq/enqueue_api.ex">>,<<"lib/exq/worker">>,
|
9
|
<<"lib/exq/worker/server.ex">>,<<"lib/exq/worker/metadata.ex">>,
|
10
|
<<"lib/exq/worker/supervisor.ex">>,<<"lib/exq/mock.ex">>,
|
11
|
<<"lib/exq/support">>,<<"lib/exq/support/time.ex">>,
|
12
|
<<"lib/exq/support/process.ex">>,<<"lib/exq/support/coercion.ex">>,
|
13
|
<<"lib/exq/support/binary.ex">>,<<"lib/exq/support/opts.ex">>,
|
14
|
<<"lib/exq/support/mode.ex">>,<<"lib/exq/support/node.ex">>,
|
15
|
<<"lib/exq/support/redis.ex">>,<<"lib/exq/support/randomize.ex">>,
|
16
|
<<"lib/exq/support/job.ex">>,<<"lib/exq/support/config.ex">>,
|
17
|
<<"lib/exq/redis">>,<<"lib/exq/redis/job_queue.ex">>,
|
18
|
<<"lib/exq/redis/connection.ex">>,<<"lib/exq/redis/script.ex">>,
|
19
|
<<"lib/exq/redis/job_stat.ex">>,<<"lib/exq/redis/heartbeat.ex">>,
|
20
|
<<"lib/exq/node_identifier">>,
|
16
21
|
<<"lib/exq/node_identifier/hostname_identifier.ex">>,
|
17
|
- <<"lib/exq/node_identifier/behaviour.ex">>,<<"lib/exq/support">>,
|
18
|
- <<"lib/exq/support/binary.ex">>,<<"lib/exq/support/job.ex">>,
|
19
|
- <<"lib/exq/support/node.ex">>,<<"lib/exq/support/coercion.ex">>,
|
20
|
- <<"lib/exq/support/opts.ex">>,<<"lib/exq/support/randomize.ex">>,
|
21
|
- <<"lib/exq/support/config.ex">>,<<"lib/exq/support/time.ex">>,
|
22
|
- <<"lib/exq/support/process.ex">>,<<"lib/exq/support/mode.ex">>,
|
23
|
- <<"lib/exq/support/redis.ex">>,<<"lib/exq/adapters">>,
|
22
|
<<"lib/exq/node_identifier/behaviour.ex">>,<<"lib/exq/backoff">>,
|
23
|
<<"lib/exq/backoff/sidekiq_default.ex">>,<<"lib/exq/backoff/behaviour.ex">>,
|
24
|
<<"lib/exq/api">>,<<"lib/exq/api/server.ex">>,<<"lib/exq/enqueuer.ex">>,
|
25
|
<<"lib/exq/worker_drainer">>,<<"lib/exq/worker_drainer/server.ex">>,
|
26
|
<<"lib/exq/node">>,<<"lib/exq/node/server.ex">>,<<"lib/exq/serializers">>,
|
27
|
<<"lib/exq/serializers/json_serializer.ex">>,
|
28
|
<<"lib/exq/serializers/behaviour.ex">>,<<"lib/exq/adapters">>,
|
24
29
|
<<"lib/exq/adapters/queue.ex">>,<<"lib/exq/adapters/queue">>,
|
25
30
|
<<"lib/exq/adapters/queue/mock.ex">>,<<"lib/exq/adapters/queue/redis.ex">>,
|
26
|
- <<"lib/exq/node">>,<<"lib/exq/node/server.ex">>,<<"lib/exq/enqueuer">>,
|
27
|
- <<"lib/exq/enqueuer/server.ex">>,<<"lib/exq/worker">>,
|
28
|
- <<"lib/exq/worker/metadata.ex">>,<<"lib/exq/worker/server.ex">>,
|
29
|
- <<"lib/exq/worker/supervisor.ex">>,<<"lib/exq/api">>,
|
30
|
- <<"lib/exq/api/server.ex">>,<<"lib/exq/serializers">>,
|
31
|
- <<"lib/exq/serializers/behaviour.ex">>,
|
32
|
- <<"lib/exq/serializers/json_serializer.ex">>,<<"lib/exq/scheduler">>,
|
33
|
- <<"lib/exq/scheduler/server.ex">>,<<"lib/exq/enqueue_api.ex">>,
|
34
|
- <<"lib/exq/middleware">>,<<"lib/exq/middleware/pipeline.ex">>,
|
35
|
- <<"lib/exq/middleware/job.ex">>,<<"lib/exq/middleware/behaviour.ex">>,
|
36
|
- <<"lib/exq/middleware/server.ex">>,<<"lib/exq/middleware/unique.ex">>,
|
37
|
- <<"lib/exq/middleware/logger.ex">>,<<"lib/exq/middleware/telemetry.ex">>,
|
38
|
- <<"lib/exq/middleware/manager.ex">>,<<"lib/exq/middleware/stats.ex">>,
|
39
|
- <<"lib/exq/redis">>,<<"lib/exq/redis/heartbeat.ex">>,
|
40
|
- <<"lib/exq/redis/job_queue.ex">>,<<"lib/exq/redis/script.ex">>,
|
41
|
- <<"lib/exq/redis/job_stat.ex">>,<<"lib/exq/redis/connection.ex">>,
|
42
|
- <<"lib/exq/backoff">>,<<"lib/exq/backoff/behaviour.ex">>,
|
43
|
- <<"lib/exq/backoff/sidekiq_default.ex">>,<<"test">>,
|
31
|
<<"lib/exq/dequeue">>,<<"lib/exq/dequeue/local.ex">>,
|
32
|
<<"lib/exq/dequeue/behaviour.ex">>,<<"lib/exq/heartbeat">>,
|
33
|
<<"lib/exq/heartbeat/server.ex">>,<<"lib/exq/heartbeat/monitor.ex">>,
|
34
|
<<"lib/exq/middleware">>,<<"lib/exq/middleware/server.ex">>,
|
35
|
<<"lib/exq/middleware/manager.ex">>,<<"lib/exq/middleware/logger.ex">>,
|
36
|
<<"lib/exq/middleware/unique.ex">>,<<"lib/exq/middleware/job.ex">>,
|
37
|
<<"lib/exq/middleware/behaviour.ex">>,<<"lib/exq/middleware/telemetry.ex">>,
|
38
|
<<"lib/exq/middleware/pipeline.ex">>,<<"lib/exq/middleware/stats.ex">>,
|
39
|
<<"lib/exq/stats">>,<<"lib/exq/stats/server.ex">>,<<"lib/exq/scheduler">>,
|
40
|
<<"lib/exq/scheduler/server.ex">>,<<"lib/exq/manager">>,
|
41
|
<<"lib/exq/manager/server.ex">>,<<"lib/exq/api.ex">>,<<"lib/exq/enqueuer">>,
|
42
|
<<"lib/exq/enqueuer/server.ex">>,<<"lib/exq.ex">>,<<"test">>,
|
43
|
<<"test/redis_test.exs">>,<<"test/test-sentinel.conf">>,
|
44
|
<<"test/config_test.exs">>,<<"test/inline_mode_test.exs">>,
|
45
|
<<"test/worker_test.exs">>,<<"test/exq">>,<<"test/exq/heartbeat">>,
|
46
|
<<"test/exq/heartbeat/monitor_test.exs">>,<<"test/metadata_test.exs">>,
|
47
|
<<"test/middleware_test.exs">>,<<"test/fake_mode_test.exs">>,
|
48
|
<<"test/exq_test.exs">>,<<"test/flaky_connection_test.exs">>,
|
49
|
<<"test/failure_scenarios_test.exs">>,<<"test/job_queue_test.exs">>,
|
50
|
<<"test/mode_test.exs">>,<<"test/api_test.exs">>,
|
51
|
<<"test/test-redis-replica.conf">>,<<"test/json_serializer_test.exs">>,
|
52
|
<<"test/job_stat_test.exs">>,<<"test/readonly_reconnect_test.exs">>,
|
44
53
|
<<"test/performance_test.exs">>,<<"test/test_helper.exs">>,
|
45
|
- <<"test/readonly_reconnect_test.exs">>,<<"test/job_queue_test.exs">>,
|
46
|
- <<"test/redis_test.exs">>,<<"test/middleware_test.exs">>,
|
47
|
- <<"test/test-redis-replica.conf">>,<<"test/mode_test.exs">>,
|
48
|
- <<"test/flaky_connection_test.exs">>,<<"test/exq_test.exs">>,<<"test/exq">>,
|
49
|
- <<"test/exq/heartbeat">>,<<"test/exq/heartbeat/monitor_test.exs">>,
|
50
|
- <<"test/job_stat_test.exs">>,<<"test/failure_scenarios_test.exs">>,
|
51
|
- <<"test/test-redis.conf">>,<<"test/json_serializer_test.exs">>,
|
52
|
- <<"test/test-sentinel.conf">>,<<"test/fake_mode_test.exs">>,
|
53
|
- <<"test/inline_mode_test.exs">>,<<"test/config_test.exs">>,
|
54
|
- <<"test/worker_test.exs">>,<<"test/metadata_test.exs">>,
|
55
|
- <<"test/api_test.exs">>,<<"LICENSE">>,<<"mix.exs">>,<<"CHANGELOG.md">>,
|
54
|
<<"test/test-redis.conf">>,<<"LICENSE">>,<<"mix.exs">>,<<"CHANGELOG.md">>,
|
56
55
|
<<"README.md">>]}.
|
57
56
|
{<<"licenses">>,[<<"Apache-2.0">>]}.
|
58
57
|
{<<"links">>,[{<<"GitHub">>,<<"https://github.com/akira/exq">>}]}.
|
|
@@ -78,4 77,4 @@
|
78
77
|
{<<"optional">>,true},
|
79
78
|
{<<"repository">>,<<"hexpm">>},
|
80
79
|
{<<"requirement">>,<<"~> 1.0">>}]]}.
|
81
|
- {<<"version">>,<<"0.17.0">>}.
|
80
|
{<<"version">>,<<"0.18.0">>}.
|
changed
lib/exq/adapters/queue.ex
|
@@ -30,4 30,5 @@ defmodule Exq.Adapters.Queue do
|
30
30
|
@callback enqueue(server, String.t(), module(), list(), list()) :: tuple()
|
31
31
|
@callback enqueue_at(server, String.t(), DateTime.t(), module(), list(), list()) :: tuple()
|
32
32
|
@callback enqueue_in(server, String.t(), integer(), module(), list(), list()) :: tuple()
|
33
|
@callback enqueue_all(server, list()) :: tuple()
|
33
34
|
end
|
changed
lib/exq/adapters/queue/mock.ex
|
@@ -12,4 12,6 @@ defmodule Exq.Adapters.Queue.Mock do
|
12
12
|
defdelegate enqueue_at(pid, queue, time, worker, args, options), to: Exq.Mock
|
13
13
|
|
14
14
|
defdelegate enqueue_in(pid, queue, offset, worker, args, options), to: Exq.Mock
|
15
|
|
16
|
defdelegate enqueue_all(pid, jobs), to: Exq.Mock
|
15
17
|
end
|
changed
lib/exq/adapters/queue/redis.ex
|
@@ -23,4 23,9 @@ defmodule Exq.Adapters.Queue.Redis do
|
23
23
|
{redis, namespace} = GenServer.call(pid, :redis, Config.get(:genserver_timeout))
|
24
24
|
JobQueue.enqueue_in(redis, namespace, queue, offset, worker, args, options)
|
25
25
|
end
|
26
|
|
27
|
def enqueue_all(pid, jobs) do
|
28
|
{redis, namespace} = GenServer.call(pid, :redis, Config.get(:genserver_timeout))
|
29
|
JobQueue.enqueue_all(redis, namespace, jobs)
|
30
|
end
|
26
31
|
end
|
changed
lib/exq/enqueue_api.ex
|
@@ -98,6 98,34 @@ defmodule Exq.Enqueuer.EnqueueApi do
|
98
98
|
queue_adapter = Config.get(:queue_adapter)
|
99
99
|
queue_adapter.enqueue_in(pid, queue, offset, worker, args, options)
|
100
100
|
end
|
101
|
|
102
|
@doc """
|
103
|
Schedule multiple jobs to be atomically enqueued at specific times
|
104
|
|
105
|
Expected args:
|
106
|
* `pid` - PID for Exq Manager or Enqueuer to handle this
|
107
|
* `jobs` - List of jobs each defined as `[queue, worker, args, options]`
|
108
|
* `queue` - Name of queue to use
|
109
|
* `worker` - Worker module to target
|
110
|
* `args` - Array of args to send to worker
|
111
|
* `options`: Following job options are supported
|
112
|
* `max_retries` (integer) - max retry count
|
113
|
* `jid` (string) - user supplied jid value
|
114
|
* `unique_for` (integer) - lock expiration duration in seconds
|
115
|
* `unique_token` (string) - unique lock token. By default the token is computed based on the queue, class and args.
|
116
|
* `unique_until` (atom) - defaults to `:success`. Supported values are
|
117
|
* `:success` - unlock on job success
|
118
|
* `:start` - unlock on job first execution
|
119
|
* `:expiry` - unlock when the lock is expired. Depends on `unique_for` value.
|
120
|
* `schedule` - (optional) - used to schedule the job for future. If not present, job will be enqueued immediately by default.
|
121
|
* `{:in, seconds_from_now}`
|
122
|
* `{:at, datetime}`
|
123
|
|
124
|
"""
|
125
|
def enqueue_all(pid, jobs) do
|
126
|
queue_adapter = Config.get(:queue_adapter)
|
127
|
queue_adapter.enqueue_all(pid, jobs)
|
128
|
end
|
101
129
|
end
|
102
130
|
end
|
103
131
|
end
|
changed
lib/exq/mock.ex
|
@@ -98,34 98,55 @@ defmodule Exq.Mock do
|
98
98
|
runnable.()
|
99
99
|
end
|
100
100
|
|
101
|
@doc false
|
102
|
def enqueue_all(pid, jobs) do
|
103
|
{:ok, runnable} = GenServer.call(__MODULE__, {:enqueue_all, self(), pid, jobs}, @timeout)
|
104
|
runnable.()
|
105
|
end
|
106
|
|
101
107
|
@impl true
|
102
108
|
def handle_call({:enqueue, owner_pid, type, args}, _from, state) do
|
103
109
|
state = maybe_add_and_monitor_pid(state, owner_pid, state.default_mode)
|
110
|
{state, runnable} = to_runnable(owner_pid, type, args, state)
|
111
|
{:reply, {:ok, runnable}, state}
|
112
|
end
|
104
113
|
|
105
|
- case state.modes[owner_pid] do
|
106
|
- :redis ->
|
107
|
- runnable = fn -> apply(Redis, type, args) end
|
108
|
- {:reply, {:ok, runnable}, state}
|
114
|
@impl true
|
115
|
def handle_call({:enqueue_all, owner_pid, pid, jobs}, _from, state) do
|
116
|
state = maybe_add_and_monitor_pid(state, owner_pid, state.default_mode)
|
109
117
|
|
110
|
- :inline ->
|
111
|
- runnable = fn ->
|
112
|
- job = to_job(type, args)
|
113
|
- apply(Coercion.to_module(job.class), :perform, job.args)
|
114
|
- {:ok, job.jid}
|
115
|
- end
|
118
|
{state, runnable} =
|
119
|
if state.modes[owner_pid] == :redis do
|
120
|
to_runnable(owner_pid, :enqueue_all, [pid, jobs], state)
|
121
|
else
|
122
|
{state, runnables} =
|
123
|
Enum.reduce(jobs, {state, []}, fn [queue, worker, args, options], {state, runnables} ->
|
124
|
{type, args} =
|
125
|
case options[:schedule] do
|
126
|
{:at, at_time} ->
|
127
|
{:enqueue_at, [pid, queue, at_time, worker, args, options]}
|
116
128
|
|
117
|
- {:reply, {:ok, runnable}, state}
|
129
|
{:in, offset} ->
|
130
|
{:enqueue_in, [pid, queue, offset, worker, args, options]}
|
118
131
|
|
119
|
- :fake ->
|
120
|
- job = to_job(type, args)
|
121
|
- state = update_in(state.jobs[owner_pid], &((&1 || []) [job]))
|
132
|
_ ->
|
133
|
{:enqueue, [pid, queue, worker, args, options]}
|
134
|
end
|
135
|
|
136
|
{state, runnable} = to_runnable(owner_pid, type, args, state)
|
137
|
{state, [runnable | runnables]}
|
138
|
end)
|
139
|
|
140
|
runnables = Enum.reverse(runnables)
|
122
141
|
|
123
142
|
runnable = fn ->
|
124
|
- {:ok, job.jid}
|
143
|
{:ok, Enum.map(runnables, fn f -> f.() end)}
|
125
144
|
end
|
126
145
|
|
127
|
- {:reply, {:ok, runnable}, state}
|
128
|
- end
|
146
|
{state, runnable}
|
147
|
end
|
148
|
|
149
|
{:reply, {:ok, runnable}, state}
|
129
150
|
end
|
130
151
|
|
131
152
|
def handle_call({:mode, owner_pid, mode}, _from, state) do
|
|
@@ -145,6 166,33 @@ defmodule Exq.Mock do
|
145
166
|
{:noreply, state}
|
146
167
|
end
|
147
168
|
|
169
|
defp to_runnable(owner_pid, type, args, state) do
|
170
|
case state.modes[owner_pid] do
|
171
|
:redis ->
|
172
|
runnable = fn -> apply(Redis, type, args) end
|
173
|
{state, runnable}
|
174
|
|
175
|
:inline ->
|
176
|
runnable = fn ->
|
177
|
job = to_job(type, args)
|
178
|
apply(Coercion.to_module(job.class), :perform, job.args)
|
179
|
{:ok, job.jid}
|
180
|
end
|
181
|
|
182
|
{state, runnable}
|
183
|
|
184
|
:fake ->
|
185
|
job = to_job(type, args)
|
186
|
state = update_in(state.jobs[owner_pid], &((&1 || []) [job]))
|
187
|
|
188
|
runnable = fn ->
|
189
|
{:ok, job.jid}
|
190
|
end
|
191
|
|
192
|
{state, runnable}
|
193
|
end
|
194
|
end
|
195
|
|
148
196
|
defp to_job(_, [_pid, queue, worker, args, options]) do
|
149
197
|
%Job{
|
150
198
|
jid: Keyword.get_lazy(options, :jid, fn -> UUID.uuid4() end),
|
changed
lib/exq/redis/job_queue.ex
|
@@ -35,12 35,14 @@ defmodule Exq.Redis.JobQueue do
|
35
35
|
try do
|
36
36
|
[unlocks_in, unique_key] = unique_args(namespace, job, options)
|
37
37
|
|
38
|
keys = keys_list([full_key(namespace, "queues"), queue_key(namespace, queue)], unique_key)
|
39
|
|
38
40
|
response =
|
39
41
|
Script.eval!(
|
40
42
|
redis,
|
41
43
|
:enqueue,
|
42
|
- [queue, full_key(namespace, ""), unique_key],
|
43
|
- [job_serialized, job.jid, unlocks_in]
|
44
|
keys,
|
45
|
[queue, job_serialized, job.jid, unlocks_in]
|
44
46
|
)
|
45
47
|
|
46
48
|
case response do
|
|
@@ -96,8 98,10 @@ defmodule Exq.Redis.JobQueue do
|
96
98
|
try do
|
97
99
|
[unlocks_in, unique_key] = unique_args(namespace, job, options)
|
98
100
|
|
101
|
keys = keys_list([scheduled_queue], unique_key)
|
102
|
|
99
103
|
response =
|
100
|
- Script.eval!(redis, :enqueue_at, [scheduled_queue, unique_key], [
|
104
|
Script.eval!(redis, :enqueue_at, keys, [
|
101
105
|
job_serialized,
|
102
106
|
score,
|
103
107
|
jid,
|
|
@@ -116,6 120,40 @@ defmodule Exq.Redis.JobQueue do
|
116
120
|
end
|
117
121
|
end
|
118
122
|
|
123
|
def enqueue_all(redis, namespace, jobs) do
|
124
|
{keys, args} = extract_enqueue_all_keys_and_args(namespace, jobs)
|
125
|
|
126
|
try do
|
127
|
response =
|
128
|
Script.eval!(
|
129
|
redis,
|
130
|
:enqueue_all,
|
131
|
[scheduled_queue_key(namespace), full_key(namespace, "queues")] keys,
|
132
|
args
|
133
|
)
|
134
|
|
135
|
case response do
|
136
|
{:ok, result} ->
|
137
|
{
|
138
|
:ok,
|
139
|
Enum.map(result, fn [status, jid] ->
|
140
|
case status do
|
141
|
0 -> {:ok, jid}
|
142
|
1 -> {:conflict, jid}
|
143
|
end
|
144
|
end)
|
145
|
}
|
146
|
|
147
|
error ->
|
148
|
error
|
149
|
end
|
150
|
catch
|
151
|
:exit, e ->
|
152
|
Logger.info("Error enqueueing - #{Kernel.inspect(e)}")
|
153
|
{:error, :timeout}
|
154
|
end
|
155
|
end
|
156
|
|
119
157
|
@doc """
|
120
158
|
Dequeue jobs for available queues
|
121
159
|
"""
|
|
@@ -219,6 257,9 @@ defmodule Exq.Redis.JobQueue do
|
219
257
|
end
|
220
258
|
end
|
221
259
|
|
260
|
defp keys_list([_hd | _tl] = keys, nil), do: keys
|
261
|
defp keys_list([_hd | _tl] = keys, key), do: keys [key]
|
262
|
|
222
263
|
def full_key("", key), do: key
|
223
264
|
def full_key(nil, key), do: key
|
224
265
|
|
|
@@ -624,4 665,50 @@ defmodule Exq.Redis.JobQueue do
|
624
665
|
[nil, nil]
|
625
666
|
end
|
626
667
|
end
|
668
|
|
669
|
# Returns
|
670
|
# {
|
671
|
# [
|
672
|
# job_1_unique_key, job_1_queue_key,
|
673
|
# job_2_unique_key, job_2_queue_key,
|
674
|
# ...
|
675
|
# ],
|
676
|
# [
|
677
|
# job_1_jid, job_1_queue, job_1_score, job_1_job_serialized, job_1_unlocks_in,
|
678
|
# job_2_jid, job_2_queue, job_2_score, job_2_job_serialized, job_2_unlocks_in,
|
679
|
# ...
|
680
|
# ]
|
681
|
# }
|
682
|
defp extract_enqueue_all_keys_and_args(namespace, jobs) do
|
683
|
{keys, job_attrs} =
|
684
|
Enum.reduce(jobs, {[], []}, fn job, {keys_acc, job_attrs_acc} ->
|
685
|
[queue, worker, args, options] = job
|
686
|
|
687
|
{score, enqueued_at} =
|
688
|
case options[:schedule] do
|
689
|
{:at, at_time} ->
|
690
|
{Time.time_to_score(at_time), Time.unix_seconds(at_time)}
|
691
|
|
692
|
{:in, offset} ->
|
693
|
at_time = Time.offset_from_now(offset)
|
694
|
{Time.time_to_score(at_time), Time.unix_seconds(at_time)}
|
695
|
|
696
|
_ ->
|
697
|
{"0", Time.unix_seconds()}
|
698
|
end
|
699
|
|
700
|
{jid, job_data, job_serialized} =
|
701
|
to_job_serialized(queue, worker, args, options, enqueued_at)
|
702
|
|
703
|
[unlocks_in, unique_key] = unique_args(namespace, job_data, unique_check: true)
|
704
|
|
705
|
# accumulating in reverse order for efficiency
|
706
|
{
|
707
|
[queue_key(namespace, queue), unique_key] keys_acc,
|
708
|
[unlocks_in, job_serialized, score, queue, jid] job_attrs_acc
|
709
|
}
|
710
|
end)
|
711
|
|
712
|
{Enum.reverse(keys), Enum.reverse(job_attrs)}
|
713
|
end
|
627
714
|
end
|
changed
lib/exq/redis/script.ex
|
@@ -14,8 14,8 @@ defmodule Exq.Redis.Script do
|
14
14
|
@scripts %{
|
15
15
|
enqueue:
|
16
16
|
Prepare.script("""
|
17
|
- local job_queue, namespace_prefix, unique_key = KEYS[1], KEYS[2], KEYS[3]
|
18
|
- local job, jid, unlocks_in = ARGV[1], ARGV[2], tonumber(ARGV[3])
|
17
|
local queues_key, job_queue_key, unique_key = KEYS[1], KEYS[2], KEYS[3]
|
18
|
local job_queue, job, jid, unlocks_in = ARGV[1], ARGV[2], ARGV[3], tonumber(ARGV[4])
|
19
19
|
local unlocked = true
|
20
20
|
local conflict_jid = nil
|
21
21
|
|
|
@@ -24,8 24,8 @@ defmodule Exq.Redis.Script do
|
24
24
|
end
|
25
25
|
|
26
26
|
if unlocked then
|
27
|
- redis.call('SADD', namespace_prefix .. 'queues', job_queue)
|
28
|
- redis.call('LPUSH', namespace_prefix .. 'queue:' .. job_queue, job)
|
27
|
redis.call('SADD', queues_key, job_queue)
|
28
|
redis.call('LPUSH', job_queue_key, job)
|
29
29
|
return 0
|
30
30
|
else
|
31
31
|
conflict_jid = redis.call("get", unique_key)
|
|
@@ -51,6 51,45 @@ defmodule Exq.Redis.Script do
|
51
51
|
return {1, conflict_jid}
|
52
52
|
end
|
53
53
|
"""),
|
54
|
enqueue_all:
|
55
|
Prepare.script("""
|
56
|
local schedule_queue, queues_key = KEYS[1], KEYS[2]
|
57
|
local i = 1
|
58
|
local result = {}
|
59
|
|
60
|
while i <= #(ARGV) / 5 do
|
61
|
local keys_start = i * 2
|
62
|
local args_start = (i - 1) * 5
|
63
|
local unique_key, job_queue_key = KEYS[keys_start 1], KEYS[keys_start 2]
|
64
|
local jid = ARGV[args_start 1]
|
65
|
local job_queue = ARGV[args_start 2]
|
66
|
local score = tonumber(ARGV[args_start 3])
|
67
|
local job = ARGV[args_start 4]
|
68
|
local unlocks_in = tonumber(ARGV[args_start 5])
|
69
|
local unlocked = true
|
70
|
local conflict_jid = nil
|
71
|
|
72
|
if unlocks_in then
|
73
|
unlocked = redis.call("set", unique_key, jid, "px", unlocks_in, "nx")
|
74
|
end
|
75
|
|
76
|
if unlocked and score == 0 then
|
77
|
redis.call('SADD', queues_key, job_queue)
|
78
|
redis.call('LPUSH', job_queue_key, job)
|
79
|
result[i] = {0, jid}
|
80
|
elseif unlocked then
|
81
|
redis.call('ZADD', schedule_queue, score, job)
|
82
|
result[i] = {0, jid}
|
83
|
else
|
84
|
conflict_jid = redis.call("get", unique_key)
|
85
|
result[i] = {1, conflict_jid}
|
86
|
end
|
87
|
|
88
|
i = i 1
|
89
|
end
|
90
|
|
91
|
return result
|
92
|
"""),
|
54
93
|
scheduler_dequeue_jobs:
|
55
94
|
Prepare.script("""
|
56
95
|
local schedule_queue, namespace_prefix = KEYS[1], KEYS[2]
|
changed
mix.exs
|
@@ -2,7 2,7 @@ defmodule Exq.Mixfile do
|
2
2
|
use Mix.Project
|
3
3
|
|
4
4
|
@source_url "https://github.com/akira/exq"
|
5
|
- @version "0.17.0"
|
5
|
@version "0.18.0"
|
6
6
|
|
7
7
|
def project do
|
8
8
|
[
|
changed
test/exq_test.exs
|
@@ -138,6 138,23 @@ defmodule ExqTest do
|
138
138
|
stop_process(sup)
|
139
139
|
end
|
140
140
|
|
141
|
test "enqueue_all and run many jobs" do
|
142
|
Process.register(self(), :exqtest)
|
143
|
{:ok, sup} = Exq.start_link(scheduler_enable: true)
|
144
|
|
145
|
{:ok, [{:ok, _}, {:ok, _}, {:ok, _}]} =
|
146
|
Exq.enqueue_all(Exq, [
|
147
|
["default", ExqTest.PerformArgWorker, [1], []],
|
148
|
["default", ExqTest.PerformArgWorker, [2], [schedule: {:at, DateTime.utc_now()}]],
|
149
|
["default", ExqTest.PerformArgWorker, [3], [schedule: {:in, 0}]]
|
150
|
])
|
151
|
|
152
|
assert_receive {:worked, 1}
|
153
|
assert_receive {:worked, 2}
|
154
|
assert_receive {:worked, 3}
|
155
|
stop_process(sup)
|
156
|
end
|
157
|
|
141
158
|
test "enqueue with separate enqueuer" do
|
142
159
|
Process.register(self(), :exqtest)
|
143
160
|
{:ok, exq_sup} = Exq.start_link()
|
|
@@ -724,6 741,31 @@ defmodule ExqTest do
|
724
741
|
stop_process(sup)
|
725
742
|
end
|
726
743
|
|
744
|
test "prevent duplicate scheduled job while using enqueue_all" do
|
745
|
Process.register(self(), :exqtest)
|
746
|
{:ok, sup} = Exq.start_link(concurrency: 1, queues: ["q1"], scheduler_enable: true)
|
747
|
|
748
|
{:ok, [{:ok, j1}, {:conflict, j2}]} =
|
749
|
Exq.enqueue_all(Exq, [
|
750
|
["q1", PerformWorker, [], [schedule: {:in, 1}, unique_for: 60]],
|
751
|
["q1", PerformWorker, [], [schedule: {:in, 1}, unique_for: 60]]
|
752
|
])
|
753
|
|
754
|
assert j1 == j2
|
755
|
|
756
|
:timer.sleep(2000)
|
757
|
assert_received {:worked}
|
758
|
|
759
|
{:ok, [{:ok, _}]} =
|
760
|
Exq.enqueue_all(Exq, [
|
761
|
["q1", PerformWorker, [], [schedule: {:in, 1}, unique_for: 60]]
|
762
|
])
|
763
|
|
764
|
:timer.sleep(1500)
|
765
|
assert_received {:worked}
|
766
|
stop_process(sup)
|
767
|
end
|
768
|
|
727
769
|
defp enqueue_fail_job(count) do
|
728
770
|
for _ <- 0..(count - 1) do
|
729
771
|
{:ok, _} =
|
changed
test/fake_mode_test.exs
|
@@ -47,6 47,44 @@ defmodule FakeModeTest do
|
47
47
|
assert current_seconds 310 > scheduled_seconds
|
48
48
|
end
|
49
49
|
|
50
|
test "enqueue_all" do
|
51
|
scheduled_at = DateTime.utc_now()
|
52
|
assert [] = Exq.Mock.jobs()
|
53
|
|
54
|
assert {:ok, [{:ok, _}, {:ok, _}, {:ok, _}]} =
|
55
|
Exq.enqueue_all(Exq, [
|
56
|
["low", BrokenWorker, [1], []],
|
57
|
["low", BrokenWorker, [2], [schedule: {:at, scheduled_at}]],
|
58
|
["low", BrokenWorker, [3], [schedule: {:in, 300}]]
|
59
|
])
|
60
|
|
61
|
assert [
|
62
|
%Exq.Support.Job{
|
63
|
args: [1],
|
64
|
class: FakeModeTest.BrokenWorker,
|
65
|
queue: "low"
|
66
|
},
|
67
|
%Exq.Support.Job{
|
68
|
args: [2],
|
69
|
class: FakeModeTest.BrokenWorker,
|
70
|
queue: "low",
|
71
|
enqueued_at: ^scheduled_at
|
72
|
},
|
73
|
%Exq.Support.Job{
|
74
|
args: [3],
|
75
|
class: FakeModeTest.BrokenWorker,
|
76
|
queue: "low",
|
77
|
enqueued_at: scheduled_in
|
78
|
}
|
79
|
] = Exq.Mock.jobs()
|
80
|
|
81
|
scheduled_seconds = Time.unix_seconds(scheduled_in)
|
82
|
current_seconds = Time.unix_seconds(DateTime.utc_now())
|
83
|
|
84
|
assert current_seconds 290 < scheduled_seconds
|
85
|
assert current_seconds 310 > scheduled_seconds
|
86
|
end
|
87
|
|
50
88
|
test "with predetermined job ID" do
|
51
89
|
jid = UUID.uuid4()
|
changed
test/inline_mode_test.exs
|
@@ -23,6 23,15 @@ defmodule InlineModeTest do
|
23
23
|
assert {:ok, _} = Exq.enqueue_in(Exq, "low", 300, EchoWorker, [1])
|
24
24
|
end
|
25
25
|
|
26
|
test "enqueue_all should return the correct value" do
|
27
|
assert {:ok, [{:ok, _}, {:ok, _}, {:ok, _}]} =
|
28
|
Exq.enqueue_all(Exq, [
|
29
|
["low", EchoWorker, [1], [schedule: {:in, 300}]],
|
30
|
["low", EchoWorker, [1], [schedule: {:at, DateTime.utc_now()}]],
|
31
|
["low", EchoWorker, [1], []]
|
32
|
])
|
33
|
end
|
34
|
|
26
35
|
test "enqueue should use the provided job ID, if any" do
|
27
36
|
jid = UUID.uuid4()
|
28
37
|
assert {:ok, jid} == Exq.enqueue(Exq, "low", EchoWorker, [1], jid: jid)
|