changed CHANGELOG.md
 
@@ -6,6 6,25 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
6
6
## [Unreleased]
7
7
8
8
9
## [0.18.0] - 2023-06-01
10
11
### Added
12
- Add enqueue_all for enqueuing a batch of jobs atomically #483 by @meysius
13
14
### Fixed
15
- Fix namespaced keys in scripts for ACL redis #481 by @korialis
16
17
## [0.17.0] - 2022-11-25
18
19
### Added
20
- Add Exq unique job feature, allow to enforce a single instance of job! #469 by @ananthakumaran
21
22
### Fixed
23
- Setup default redis timeout config #475 by @ananthakumaran
24
25
### Changed
26
- Use erlef/setup-beam and bump versions #476 by @kianmeng
27
9
28
## [0.16.2] - 2022-04-15
10
29
11
30
### Added
changed README.md
 
@@ -71,7 71,7 @@ Add `:exq` to your `mix.exs` deps (replace version with the latest hex.pm packag
71
71
defp deps do
72
72
[
73
73
# ... other deps
74
- {:exq, "~> 0.16.2"}
74
{:exq, "~> 0.18.0"}
75
75
]
76
76
end
77
77
```
 
@@ -527,6 527,35 @@ feature depends on `Exq.Middleware.Unique` middleware. If you override
527
527
`:middleware` configuration, make sure to include it.
528
528
529
529
530
## Enqueuing Many Jobs Atomically
531
532
Similar to database transactions, there are cases where you may want to
533
enqueue/schedule many jobs atomically. A common usecase of this would be when you
534
have a computationally heavy job and you want to break it down to multiple smaller jobs so they
535
can be run concurrently. If you use a loop to enqueue/schedule these jobs,
536
and a network, connectivity, or application error occurs while passing these jobs to Exq,
537
you will end up in a situation where you have to roll back all the jobs that you may already
538
have scheduled/enqueued which will be a complicated process. In order to avoid this problem,
539
Exq comes with an `enqueue_all` method which guarantees atomicity.
540
541
542
```elixir
543
{:ok, [{:ok, jid_1}, {:ok, jid_2}, {:ok, jid_3}]} = Exq.enqueue_all(Exq, [
544
[job_1_queue, job_1_worker, job_1_args, job_1_options],
545
[job_2_queue, job_2_worker, job_2_args, job_2_options],
546
[job_3_queue, job_3_worker, job_3_args, job_3_options]
547
])
548
```
549
550
`enqueue_all` also supports scheduling jobs via `schedule` key in the `options` passed for each job:
551
```elixir
552
{:ok, [{:ok, jid_1}, {:ok, jid_2}, {:ok, jid_3}]} = Exq.enqueue_all(Exq, [
553
[job_1_queue, job_1_worker, job_1_args, [schedule: {:in, 60 * 60}]],
554
[job_2_queue, job_2_worker, job_2_args, [schedule: {:at, midnight}]],
555
[job_3_queue, job_3_worker, job_3_args, []] # no schedule key is present, it is enqueued immediately
556
])
557
```
558
530
559
## Web UI
531
560
532
561
Exq has a separate repo, exq_ui which provides with a Web UI to monitor your workers:
changed hex_metadata.config
 
@@ -5,54 5,53 @@
5
5
{<<"elixir">>,<<"~> 1.6">>}.
6
6
{<<"files">>,
7
7
[<<"lib">>,<<"lib/mix">>,<<"lib/mix/tasks">>,<<"lib/mix/tasks/exq.run.ex">>,
8
- <<"lib/exq.ex">>,<<"lib/exq">>,<<"lib/exq/enqueuer.ex">>,
9
- <<"lib/exq/manager">>,<<"lib/exq/manager/server.ex">>,
10
- <<"lib/exq/worker_drainer">>,<<"lib/exq/worker_drainer/server.ex">>,
11
- <<"lib/exq/mock.ex">>,<<"lib/exq/dequeue">>,
12
- <<"lib/exq/dequeue/behaviour.ex">>,<<"lib/exq/dequeue/local.ex">>,
13
- <<"lib/exq/stats">>,<<"lib/exq/stats/server.ex">>,<<"lib/exq/heartbeat">>,
14
- <<"lib/exq/heartbeat/server.ex">>,<<"lib/exq/heartbeat/monitor.ex">>,
15
- <<"lib/exq/api.ex">>,<<"lib/exq/node_identifier">>,
8
<<"lib/exq">>,<<"lib/exq/enqueue_api.ex">>,<<"lib/exq/worker">>,
9
<<"lib/exq/worker/server.ex">>,<<"lib/exq/worker/metadata.ex">>,
10
<<"lib/exq/worker/supervisor.ex">>,<<"lib/exq/mock.ex">>,
11
<<"lib/exq/support">>,<<"lib/exq/support/time.ex">>,
12
<<"lib/exq/support/process.ex">>,<<"lib/exq/support/coercion.ex">>,
13
<<"lib/exq/support/binary.ex">>,<<"lib/exq/support/opts.ex">>,
14
<<"lib/exq/support/mode.ex">>,<<"lib/exq/support/node.ex">>,
15
<<"lib/exq/support/redis.ex">>,<<"lib/exq/support/randomize.ex">>,
16
<<"lib/exq/support/job.ex">>,<<"lib/exq/support/config.ex">>,
17
<<"lib/exq/redis">>,<<"lib/exq/redis/job_queue.ex">>,
18
<<"lib/exq/redis/connection.ex">>,<<"lib/exq/redis/script.ex">>,
19
<<"lib/exq/redis/job_stat.ex">>,<<"lib/exq/redis/heartbeat.ex">>,
20
<<"lib/exq/node_identifier">>,
16
21
<<"lib/exq/node_identifier/hostname_identifier.ex">>,
17
- <<"lib/exq/node_identifier/behaviour.ex">>,<<"lib/exq/support">>,
18
- <<"lib/exq/support/binary.ex">>,<<"lib/exq/support/job.ex">>,
19
- <<"lib/exq/support/node.ex">>,<<"lib/exq/support/coercion.ex">>,
20
- <<"lib/exq/support/opts.ex">>,<<"lib/exq/support/randomize.ex">>,
21
- <<"lib/exq/support/config.ex">>,<<"lib/exq/support/time.ex">>,
22
- <<"lib/exq/support/process.ex">>,<<"lib/exq/support/mode.ex">>,
23
- <<"lib/exq/support/redis.ex">>,<<"lib/exq/adapters">>,
22
<<"lib/exq/node_identifier/behaviour.ex">>,<<"lib/exq/backoff">>,
23
<<"lib/exq/backoff/sidekiq_default.ex">>,<<"lib/exq/backoff/behaviour.ex">>,
24
<<"lib/exq/api">>,<<"lib/exq/api/server.ex">>,<<"lib/exq/enqueuer.ex">>,
25
<<"lib/exq/worker_drainer">>,<<"lib/exq/worker_drainer/server.ex">>,
26
<<"lib/exq/node">>,<<"lib/exq/node/server.ex">>,<<"lib/exq/serializers">>,
27
<<"lib/exq/serializers/json_serializer.ex">>,
28
<<"lib/exq/serializers/behaviour.ex">>,<<"lib/exq/adapters">>,
24
29
<<"lib/exq/adapters/queue.ex">>,<<"lib/exq/adapters/queue">>,
25
30
<<"lib/exq/adapters/queue/mock.ex">>,<<"lib/exq/adapters/queue/redis.ex">>,
26
- <<"lib/exq/node">>,<<"lib/exq/node/server.ex">>,<<"lib/exq/enqueuer">>,
27
- <<"lib/exq/enqueuer/server.ex">>,<<"lib/exq/worker">>,
28
- <<"lib/exq/worker/metadata.ex">>,<<"lib/exq/worker/server.ex">>,
29
- <<"lib/exq/worker/supervisor.ex">>,<<"lib/exq/api">>,
30
- <<"lib/exq/api/server.ex">>,<<"lib/exq/serializers">>,
31
- <<"lib/exq/serializers/behaviour.ex">>,
32
- <<"lib/exq/serializers/json_serializer.ex">>,<<"lib/exq/scheduler">>,
33
- <<"lib/exq/scheduler/server.ex">>,<<"lib/exq/enqueue_api.ex">>,
34
- <<"lib/exq/middleware">>,<<"lib/exq/middleware/pipeline.ex">>,
35
- <<"lib/exq/middleware/job.ex">>,<<"lib/exq/middleware/behaviour.ex">>,
36
- <<"lib/exq/middleware/server.ex">>,<<"lib/exq/middleware/unique.ex">>,
37
- <<"lib/exq/middleware/logger.ex">>,<<"lib/exq/middleware/telemetry.ex">>,
38
- <<"lib/exq/middleware/manager.ex">>,<<"lib/exq/middleware/stats.ex">>,
39
- <<"lib/exq/redis">>,<<"lib/exq/redis/heartbeat.ex">>,
40
- <<"lib/exq/redis/job_queue.ex">>,<<"lib/exq/redis/script.ex">>,
41
- <<"lib/exq/redis/job_stat.ex">>,<<"lib/exq/redis/connection.ex">>,
42
- <<"lib/exq/backoff">>,<<"lib/exq/backoff/behaviour.ex">>,
43
- <<"lib/exq/backoff/sidekiq_default.ex">>,<<"test">>,
31
<<"lib/exq/dequeue">>,<<"lib/exq/dequeue/local.ex">>,
32
<<"lib/exq/dequeue/behaviour.ex">>,<<"lib/exq/heartbeat">>,
33
<<"lib/exq/heartbeat/server.ex">>,<<"lib/exq/heartbeat/monitor.ex">>,
34
<<"lib/exq/middleware">>,<<"lib/exq/middleware/server.ex">>,
35
<<"lib/exq/middleware/manager.ex">>,<<"lib/exq/middleware/logger.ex">>,
36
<<"lib/exq/middleware/unique.ex">>,<<"lib/exq/middleware/job.ex">>,
37
<<"lib/exq/middleware/behaviour.ex">>,<<"lib/exq/middleware/telemetry.ex">>,
38
<<"lib/exq/middleware/pipeline.ex">>,<<"lib/exq/middleware/stats.ex">>,
39
<<"lib/exq/stats">>,<<"lib/exq/stats/server.ex">>,<<"lib/exq/scheduler">>,
40
<<"lib/exq/scheduler/server.ex">>,<<"lib/exq/manager">>,
41
<<"lib/exq/manager/server.ex">>,<<"lib/exq/api.ex">>,<<"lib/exq/enqueuer">>,
42
<<"lib/exq/enqueuer/server.ex">>,<<"lib/exq.ex">>,<<"test">>,
43
<<"test/redis_test.exs">>,<<"test/test-sentinel.conf">>,
44
<<"test/config_test.exs">>,<<"test/inline_mode_test.exs">>,
45
<<"test/worker_test.exs">>,<<"test/exq">>,<<"test/exq/heartbeat">>,
46
<<"test/exq/heartbeat/monitor_test.exs">>,<<"test/metadata_test.exs">>,
47
<<"test/middleware_test.exs">>,<<"test/fake_mode_test.exs">>,
48
<<"test/exq_test.exs">>,<<"test/flaky_connection_test.exs">>,
49
<<"test/failure_scenarios_test.exs">>,<<"test/job_queue_test.exs">>,
50
<<"test/mode_test.exs">>,<<"test/api_test.exs">>,
51
<<"test/test-redis-replica.conf">>,<<"test/json_serializer_test.exs">>,
52
<<"test/job_stat_test.exs">>,<<"test/readonly_reconnect_test.exs">>,
44
53
<<"test/performance_test.exs">>,<<"test/test_helper.exs">>,
45
- <<"test/readonly_reconnect_test.exs">>,<<"test/job_queue_test.exs">>,
46
- <<"test/redis_test.exs">>,<<"test/middleware_test.exs">>,
47
- <<"test/test-redis-replica.conf">>,<<"test/mode_test.exs">>,
48
- <<"test/flaky_connection_test.exs">>,<<"test/exq_test.exs">>,<<"test/exq">>,
49
- <<"test/exq/heartbeat">>,<<"test/exq/heartbeat/monitor_test.exs">>,
50
- <<"test/job_stat_test.exs">>,<<"test/failure_scenarios_test.exs">>,
51
- <<"test/test-redis.conf">>,<<"test/json_serializer_test.exs">>,
52
- <<"test/test-sentinel.conf">>,<<"test/fake_mode_test.exs">>,
53
- <<"test/inline_mode_test.exs">>,<<"test/config_test.exs">>,
54
- <<"test/worker_test.exs">>,<<"test/metadata_test.exs">>,
55
- <<"test/api_test.exs">>,<<"LICENSE">>,<<"mix.exs">>,<<"CHANGELOG.md">>,
54
<<"test/test-redis.conf">>,<<"LICENSE">>,<<"mix.exs">>,<<"CHANGELOG.md">>,
56
55
<<"README.md">>]}.
57
56
{<<"licenses">>,[<<"Apache-2.0">>]}.
58
57
{<<"links">>,[{<<"GitHub">>,<<"https://github.com/akira/exq">>}]}.
 
@@ -78,4 77,4 @@
78
77
{<<"optional">>,true},
79
78
{<<"repository">>,<<"hexpm">>},
80
79
{<<"requirement">>,<<"~> 1.0">>}]]}.
81
- {<<"version">>,<<"0.17.0">>}.
80
{<<"version">>,<<"0.18.0">>}.
changed lib/exq/adapters/queue.ex
 
@@ -30,4 30,5 @@ defmodule Exq.Adapters.Queue do
30
30
@callback enqueue(server, String.t(), module(), list(), list()) :: tuple()
31
31
@callback enqueue_at(server, String.t(), DateTime.t(), module(), list(), list()) :: tuple()
32
32
@callback enqueue_in(server, String.t(), integer(), module(), list(), list()) :: tuple()
33
@callback enqueue_all(server, list()) :: tuple()
33
34
end
changed lib/exq/adapters/queue/mock.ex
 
@@ -12,4 12,6 @@ defmodule Exq.Adapters.Queue.Mock do
12
12
defdelegate enqueue_at(pid, queue, time, worker, args, options), to: Exq.Mock
13
13
14
14
defdelegate enqueue_in(pid, queue, offset, worker, args, options), to: Exq.Mock
15
16
defdelegate enqueue_all(pid, jobs), to: Exq.Mock
15
17
end
changed lib/exq/adapters/queue/redis.ex
 
@@ -23,4 23,9 @@ defmodule Exq.Adapters.Queue.Redis do
23
23
{redis, namespace} = GenServer.call(pid, :redis, Config.get(:genserver_timeout))
24
24
JobQueue.enqueue_in(redis, namespace, queue, offset, worker, args, options)
25
25
end
26
27
def enqueue_all(pid, jobs) do
28
{redis, namespace} = GenServer.call(pid, :redis, Config.get(:genserver_timeout))
29
JobQueue.enqueue_all(redis, namespace, jobs)
30
end
26
31
end
changed lib/exq/enqueue_api.ex
 
@@ -98,6 98,34 @@ defmodule Exq.Enqueuer.EnqueueApi do
98
98
queue_adapter = Config.get(:queue_adapter)
99
99
queue_adapter.enqueue_in(pid, queue, offset, worker, args, options)
100
100
end
101
102
@doc """
103
Schedule multiple jobs to be atomically enqueued at specific times
104
105
Expected args:
106
* `pid` - PID for Exq Manager or Enqueuer to handle this
107
* `jobs` - List of jobs each defined as `[queue, worker, args, options]`
108
* `queue` - Name of queue to use
109
* `worker` - Worker module to target
110
* `args` - Array of args to send to worker
111
* `options`: Following job options are supported
112
* `max_retries` (integer) - max retry count
113
* `jid` (string) - user supplied jid value
114
* `unique_for` (integer) - lock expiration duration in seconds
115
* `unique_token` (string) - unique lock token. By default the token is computed based on the queue, class and args.
116
* `unique_until` (atom) - defaults to `:success`. Supported values are
117
* `:success` - unlock on job success
118
* `:start` - unlock on job first execution
119
* `:expiry` - unlock when the lock is expired. Depends on `unique_for` value.
120
* `schedule` - (optional) - used to schedule the job for future. If not present, job will be enqueued immediately by default.
121
* `{:in, seconds_from_now}`
122
* `{:at, datetime}`
123
124
"""
125
def enqueue_all(pid, jobs) do
126
queue_adapter = Config.get(:queue_adapter)
127
queue_adapter.enqueue_all(pid, jobs)
128
end
101
129
end
102
130
end
103
131
end
changed lib/exq/mock.ex
 
@@ -98,34 98,55 @@ defmodule Exq.Mock do
98
98
runnable.()
99
99
end
100
100
101
@doc false
102
def enqueue_all(pid, jobs) do
103
{:ok, runnable} = GenServer.call(__MODULE__, {:enqueue_all, self(), pid, jobs}, @timeout)
104
runnable.()
105
end
106
101
107
@impl true
102
108
def handle_call({:enqueue, owner_pid, type, args}, _from, state) do
103
109
state = maybe_add_and_monitor_pid(state, owner_pid, state.default_mode)
110
{state, runnable} = to_runnable(owner_pid, type, args, state)
111
{:reply, {:ok, runnable}, state}
112
end
104
113
105
- case state.modes[owner_pid] do
106
- :redis ->
107
- runnable = fn -> apply(Redis, type, args) end
108
- {:reply, {:ok, runnable}, state}
114
@impl true
115
def handle_call({:enqueue_all, owner_pid, pid, jobs}, _from, state) do
116
state = maybe_add_and_monitor_pid(state, owner_pid, state.default_mode)
109
117
110
- :inline ->
111
- runnable = fn ->
112
- job = to_job(type, args)
113
- apply(Coercion.to_module(job.class), :perform, job.args)
114
- {:ok, job.jid}
115
- end
118
{state, runnable} =
119
if state.modes[owner_pid] == :redis do
120
to_runnable(owner_pid, :enqueue_all, [pid, jobs], state)
121
else
122
{state, runnables} =
123
Enum.reduce(jobs, {state, []}, fn [queue, worker, args, options], {state, runnables} ->
124
{type, args} =
125
case options[:schedule] do
126
{:at, at_time} ->
127
{:enqueue_at, [pid, queue, at_time, worker, args, options]}
116
128
117
- {:reply, {:ok, runnable}, state}
129
{:in, offset} ->
130
{:enqueue_in, [pid, queue, offset, worker, args, options]}
118
131
119
- :fake ->
120
- job = to_job(type, args)
121
- state = update_in(state.jobs[owner_pid], &((&1 || []) [job]))
132
_ ->
133
{:enqueue, [pid, queue, worker, args, options]}
134
end
135
136
{state, runnable} = to_runnable(owner_pid, type, args, state)
137
{state, [runnable | runnables]}
138
end)
139
140
runnables = Enum.reverse(runnables)
122
141
123
142
runnable = fn ->
124
- {:ok, job.jid}
143
{:ok, Enum.map(runnables, fn f -> f.() end)}
125
144
end
126
145
127
- {:reply, {:ok, runnable}, state}
128
- end
146
{state, runnable}
147
end
148
149
{:reply, {:ok, runnable}, state}
129
150
end
130
151
131
152
def handle_call({:mode, owner_pid, mode}, _from, state) do
 
@@ -145,6 166,33 @@ defmodule Exq.Mock do
145
166
{:noreply, state}
146
167
end
147
168
169
defp to_runnable(owner_pid, type, args, state) do
170
case state.modes[owner_pid] do
171
:redis ->
172
runnable = fn -> apply(Redis, type, args) end
173
{state, runnable}
174
175
:inline ->
176
runnable = fn ->
177
job = to_job(type, args)
178
apply(Coercion.to_module(job.class), :perform, job.args)
179
{:ok, job.jid}
180
end
181
182
{state, runnable}
183
184
:fake ->
185
job = to_job(type, args)
186
state = update_in(state.jobs[owner_pid], &((&1 || []) [job]))
187
188
runnable = fn ->
189
{:ok, job.jid}
190
end
191
192
{state, runnable}
193
end
194
end
195
148
196
defp to_job(_, [_pid, queue, worker, args, options]) do
149
197
%Job{
150
198
jid: Keyword.get_lazy(options, :jid, fn -> UUID.uuid4() end),
changed lib/exq/redis/job_queue.ex
 
@@ -35,12 35,14 @@ defmodule Exq.Redis.JobQueue do
35
35
try do
36
36
[unlocks_in, unique_key] = unique_args(namespace, job, options)
37
37
38
keys = keys_list([full_key(namespace, "queues"), queue_key(namespace, queue)], unique_key)
39
38
40
response =
39
41
Script.eval!(
40
42
redis,
41
43
:enqueue,
42
- [queue, full_key(namespace, ""), unique_key],
43
- [job_serialized, job.jid, unlocks_in]
44
keys,
45
[queue, job_serialized, job.jid, unlocks_in]
44
46
)
45
47
46
48
case response do
 
@@ -96,8 98,10 @@ defmodule Exq.Redis.JobQueue do
96
98
try do
97
99
[unlocks_in, unique_key] = unique_args(namespace, job, options)
98
100
101
keys = keys_list([scheduled_queue], unique_key)
102
99
103
response =
100
- Script.eval!(redis, :enqueue_at, [scheduled_queue, unique_key], [
104
Script.eval!(redis, :enqueue_at, keys, [
101
105
job_serialized,
102
106
score,
103
107
jid,
 
@@ -116,6 120,40 @@ defmodule Exq.Redis.JobQueue do
116
120
end
117
121
end
118
122
123
def enqueue_all(redis, namespace, jobs) do
124
{keys, args} = extract_enqueue_all_keys_and_args(namespace, jobs)
125
126
try do
127
response =
128
Script.eval!(
129
redis,
130
:enqueue_all,
131
[scheduled_queue_key(namespace), full_key(namespace, "queues")] keys,
132
args
133
)
134
135
case response do
136
{:ok, result} ->
137
{
138
:ok,
139
Enum.map(result, fn [status, jid] ->
140
case status do
141
0 -> {:ok, jid}
142
1 -> {:conflict, jid}
143
end
144
end)
145
}
146
147
error ->
148
error
149
end
150
catch
151
:exit, e ->
152
Logger.info("Error enqueueing - #{Kernel.inspect(e)}")
153
{:error, :timeout}
154
end
155
end
156
119
157
@doc """
120
158
Dequeue jobs for available queues
121
159
"""
 
@@ -219,6 257,9 @@ defmodule Exq.Redis.JobQueue do
219
257
end
220
258
end
221
259
260
defp keys_list([_hd | _tl] = keys, nil), do: keys
261
defp keys_list([_hd | _tl] = keys, key), do: keys [key]
262
222
263
def full_key("", key), do: key
223
264
def full_key(nil, key), do: key
224
265
 
@@ -624,4 665,50 @@ defmodule Exq.Redis.JobQueue do
624
665
[nil, nil]
625
666
end
626
667
end
668
669
# Returns
670
# {
671
# [
672
# job_1_unique_key, job_1_queue_key,
673
# job_2_unique_key, job_2_queue_key,
674
# ...
675
# ],
676
# [
677
# job_1_jid, job_1_queue, job_1_score, job_1_job_serialized, job_1_unlocks_in,
678
# job_2_jid, job_2_queue, job_2_score, job_2_job_serialized, job_2_unlocks_in,
679
# ...
680
# ]
681
# }
682
defp extract_enqueue_all_keys_and_args(namespace, jobs) do
683
{keys, job_attrs} =
684
Enum.reduce(jobs, {[], []}, fn job, {keys_acc, job_attrs_acc} ->
685
[queue, worker, args, options] = job
686
687
{score, enqueued_at} =
688
case options[:schedule] do
689
{:at, at_time} ->
690
{Time.time_to_score(at_time), Time.unix_seconds(at_time)}
691
692
{:in, offset} ->
693
at_time = Time.offset_from_now(offset)
694
{Time.time_to_score(at_time), Time.unix_seconds(at_time)}
695
696
_ ->
697
{"0", Time.unix_seconds()}
698
end
699
700
{jid, job_data, job_serialized} =
701
to_job_serialized(queue, worker, args, options, enqueued_at)
702
703
[unlocks_in, unique_key] = unique_args(namespace, job_data, unique_check: true)
704
705
# accumulating in reverse order for efficiency
706
{
707
[queue_key(namespace, queue), unique_key] keys_acc,
708
[unlocks_in, job_serialized, score, queue, jid] job_attrs_acc
709
}
710
end)
711
712
{Enum.reverse(keys), Enum.reverse(job_attrs)}
713
end
627
714
end
changed lib/exq/redis/script.ex
 
@@ -14,8 14,8 @@ defmodule Exq.Redis.Script do
14
14
@scripts %{
15
15
enqueue:
16
16
Prepare.script("""
17
- local job_queue, namespace_prefix, unique_key = KEYS[1], KEYS[2], KEYS[3]
18
- local job, jid, unlocks_in = ARGV[1], ARGV[2], tonumber(ARGV[3])
17
local queues_key, job_queue_key, unique_key = KEYS[1], KEYS[2], KEYS[3]
18
local job_queue, job, jid, unlocks_in = ARGV[1], ARGV[2], ARGV[3], tonumber(ARGV[4])
19
19
local unlocked = true
20
20
local conflict_jid = nil
21
21
 
@@ -24,8 24,8 @@ defmodule Exq.Redis.Script do
24
24
end
25
25
26
26
if unlocked then
27
- redis.call('SADD', namespace_prefix .. 'queues', job_queue)
28
- redis.call('LPUSH', namespace_prefix .. 'queue:' .. job_queue, job)
27
redis.call('SADD', queues_key, job_queue)
28
redis.call('LPUSH', job_queue_key, job)
29
29
return 0
30
30
else
31
31
conflict_jid = redis.call("get", unique_key)
 
@@ -51,6 51,45 @@ defmodule Exq.Redis.Script do
51
51
return {1, conflict_jid}
52
52
end
53
53
"""),
54
enqueue_all:
55
Prepare.script("""
56
local schedule_queue, queues_key = KEYS[1], KEYS[2]
57
local i = 1
58
local result = {}
59
60
while i <= #(ARGV) / 5 do
61
local keys_start = i * 2
62
local args_start = (i - 1) * 5
63
local unique_key, job_queue_key = KEYS[keys_start 1], KEYS[keys_start 2]
64
local jid = ARGV[args_start 1]
65
local job_queue = ARGV[args_start 2]
66
local score = tonumber(ARGV[args_start 3])
67
local job = ARGV[args_start 4]
68
local unlocks_in = tonumber(ARGV[args_start 5])
69
local unlocked = true
70
local conflict_jid = nil
71
72
if unlocks_in then
73
unlocked = redis.call("set", unique_key, jid, "px", unlocks_in, "nx")
74
end
75
76
if unlocked and score == 0 then
77
redis.call('SADD', queues_key, job_queue)
78
redis.call('LPUSH', job_queue_key, job)
79
result[i] = {0, jid}
80
elseif unlocked then
81
redis.call('ZADD', schedule_queue, score, job)
82
result[i] = {0, jid}
83
else
84
conflict_jid = redis.call("get", unique_key)
85
result[i] = {1, conflict_jid}
86
end
87
88
i = i 1
89
end
90
91
return result
92
"""),
54
93
scheduler_dequeue_jobs:
55
94
Prepare.script("""
56
95
local schedule_queue, namespace_prefix = KEYS[1], KEYS[2]
changed mix.exs
 
@@ -2,7 2,7 @@ defmodule Exq.Mixfile do
2
2
use Mix.Project
3
3
4
4
@source_url "https://github.com/akira/exq"
5
- @version "0.17.0"
5
@version "0.18.0"
6
6
7
7
def project do
8
8
[
changed test/exq_test.exs
 
@@ -138,6 138,23 @@ defmodule ExqTest do
138
138
stop_process(sup)
139
139
end
140
140
141
test "enqueue_all and run many jobs" do
142
Process.register(self(), :exqtest)
143
{:ok, sup} = Exq.start_link(scheduler_enable: true)
144
145
{:ok, [{:ok, _}, {:ok, _}, {:ok, _}]} =
146
Exq.enqueue_all(Exq, [
147
["default", ExqTest.PerformArgWorker, [1], []],
148
["default", ExqTest.PerformArgWorker, [2], [schedule: {:at, DateTime.utc_now()}]],
149
["default", ExqTest.PerformArgWorker, [3], [schedule: {:in, 0}]]
150
])
151
152
assert_receive {:worked, 1}
153
assert_receive {:worked, 2}
154
assert_receive {:worked, 3}
155
stop_process(sup)
156
end
157
141
158
test "enqueue with separate enqueuer" do
142
159
Process.register(self(), :exqtest)
143
160
{:ok, exq_sup} = Exq.start_link()
 
@@ -724,6 741,31 @@ defmodule ExqTest do
724
741
stop_process(sup)
725
742
end
726
743
744
test "prevent duplicate scheduled job while using enqueue_all" do
745
Process.register(self(), :exqtest)
746
{:ok, sup} = Exq.start_link(concurrency: 1, queues: ["q1"], scheduler_enable: true)
747
748
{:ok, [{:ok, j1}, {:conflict, j2}]} =
749
Exq.enqueue_all(Exq, [
750
["q1", PerformWorker, [], [schedule: {:in, 1}, unique_for: 60]],
751
["q1", PerformWorker, [], [schedule: {:in, 1}, unique_for: 60]]
752
])
753
754
assert j1 == j2
755
756
:timer.sleep(2000)
757
assert_received {:worked}
758
759
{:ok, [{:ok, _}]} =
760
Exq.enqueue_all(Exq, [
761
["q1", PerformWorker, [], [schedule: {:in, 1}, unique_for: 60]]
762
])
763
764
:timer.sleep(1500)
765
assert_received {:worked}
766
stop_process(sup)
767
end
768
727
769
defp enqueue_fail_job(count) do
728
770
for _ <- 0..(count - 1) do
729
771
{:ok, _} =
changed test/fake_mode_test.exs
 
@@ -47,6 47,44 @@ defmodule FakeModeTest do
47
47
assert current_seconds 310 > scheduled_seconds
48
48
end
49
49
50
test "enqueue_all" do
51
scheduled_at = DateTime.utc_now()
52
assert [] = Exq.Mock.jobs()
53
54
assert {:ok, [{:ok, _}, {:ok, _}, {:ok, _}]} =
55
Exq.enqueue_all(Exq, [
56
["low", BrokenWorker, [1], []],
57
["low", BrokenWorker, [2], [schedule: {:at, scheduled_at}]],
58
["low", BrokenWorker, [3], [schedule: {:in, 300}]]
59
])
60
61
assert [
62
%Exq.Support.Job{
63
args: [1],
64
class: FakeModeTest.BrokenWorker,
65
queue: "low"
66
},
67
%Exq.Support.Job{
68
args: [2],
69
class: FakeModeTest.BrokenWorker,
70
queue: "low",
71
enqueued_at: ^scheduled_at
72
},
73
%Exq.Support.Job{
74
args: [3],
75
class: FakeModeTest.BrokenWorker,
76
queue: "low",
77
enqueued_at: scheduled_in
78
}
79
] = Exq.Mock.jobs()
80
81
scheduled_seconds = Time.unix_seconds(scheduled_in)
82
current_seconds = Time.unix_seconds(DateTime.utc_now())
83
84
assert current_seconds 290 < scheduled_seconds
85
assert current_seconds 310 > scheduled_seconds
86
end
87
50
88
test "with predetermined job ID" do
51
89
jid = UUID.uuid4()
changed test/inline_mode_test.exs
 
@@ -23,6 23,15 @@ defmodule InlineModeTest do
23
23
assert {:ok, _} = Exq.enqueue_in(Exq, "low", 300, EchoWorker, [1])
24
24
end
25
25
26
test "enqueue_all should return the correct value" do
27
assert {:ok, [{:ok, _}, {:ok, _}, {:ok, _}]} =
28
Exq.enqueue_all(Exq, [
29
["low", EchoWorker, [1], [schedule: {:in, 300}]],
30
["low", EchoWorker, [1], [schedule: {:at, DateTime.utc_now()}]],
31
["low", EchoWorker, [1], []]
32
])
33
end
34
26
35
test "enqueue should use the provided job ID, if any" do
27
36
jid = UUID.uuid4()
28
37
assert {:ok, jid} == Exq.enqueue(Exq, "low", EchoWorker, [1], jid: jid)