Skip to content

Commit

Permalink
[rubygems/rubygems] Move compact index concurrency to fetcher
Browse files Browse the repository at this point in the history
  • Loading branch information
martinemde authored and matzbot committed May 31, 2024
1 parent 10c256f commit 78860b8
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 51 deletions.
44 changes: 19 additions & 25 deletions lib/bundler/compact_index_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 12,13 @@ class CompactIndexClient
SUPPORTED_DIGESTS = { "sha-256" => :SHA256, "md5" => :MD5 }.freeze
DEBUG_MUTEX = Thread::Mutex.new

# info returns an Array of INFO Arrays. Each INFO Array has the following indices:
INFO_NAME = 0
INFO_VERSION = 1
INFO_PLATFORM = 2
INFO_DEPS = 3
INFO_REQS = 4

def self.debug
return unless ENV["DEBUG_COMPACT_INDEX"]
DEBUG_MUTEX.synchronize { warn("[#{self}] #{yield}") }
Expand All @@ -29,29 36,6 @@ def initialize(directory, fetcher = nil)
@parser = Parser.new(@cache)
end

def execution_mode=(block)
Bundler::CompactIndexClient.debug { "execution_mode=" }
@cache.reset!
@execution_mode = block
end

# @return [Lambda] A lambda that takes an array of inputs and a block, and
# maps the inputs with the block in parallel.
#
def execution_mode
@execution_mode || sequentially
end

def sequential_execution_mode!
self.execution_mode = sequentially
end

def sequentially
@sequentially ||= lambda do |inputs, &blk|
inputs.map(&blk)
end
end

def names
Bundler::CompactIndexClient.debug { "names" }
@parser.names
Expand All @@ -64,17 48,27 @@ def versions

def dependencies(names)
Bundler::CompactIndexClient.debug { "dependencies(#{names})" }
execution_mode.call(names) {|name| @parser.info(name) }.flatten(1)
names.map {|name| info(name) }
end

def info(name)
Bundler::CompactIndexClient.debug { "info(#{names})" }
@parser.info(name)
end

def latest_version(name)
Bundler::CompactIndexClient.debug { "latest_version(#{name})" }
@parser.info(name).map {|d| Gem::Version.new(d[1]) }.max
@parser.info(name).map {|d| Gem::Version.new(d[INFO_VERSION]) }.max
end

def available?
Bundler::CompactIndexClient.debug { "available?" }
@parser.available?
end

def reset!
Bundler::CompactIndexClient.debug { "reset!" }
@cache.reset!
end
end
end
35 changes: 14 additions & 21 deletions lib/bundler/fetcher/compact_index.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 34,8 @@ def specs_for_names(gem_names)

until remaining_gems.empty?
log_specs { "Looking up gems #{remaining_gems.inspect}" }

deps = begin
parallel_compact_index_client.dependencies(remaining_gems)
rescue TooManyRequestsError
@bundle_worker&.stop
@bundle_worker = nil # reset it. Not sure if necessary
serial_compact_index_client.dependencies(remaining_gems)
end
next_gems = deps.flat_map {|d| d[3].flat_map(&:first) }.uniq
deps = fetch_gem_infos(remaining_gems).flatten(1)
next_gems = deps.flat_map {|d| d[CompactIndexClient::INFO_DEPS].flat_map(&:first) }.uniq
deps.each {|dep| gem_info << dep }
complete_gems.concat(deps.map(&:first)).uniq!
remaining_gems = next_gems - complete_gems
Expand Down Expand Up @@ -79,20 72,20 @@ def compact_index_client
end
end

def parallel_compact_index_client
compact_index_client.execution_mode = lambda do |inputs, &blk|
func = lambda {|object, _index| blk.call(object) }
worker = bundle_worker(func)
inputs.each {|input| worker.enq(input) }
inputs.map { worker.deq }
end

compact_index_client
def fetch_gem_infos(names)
in_parallel(names) {|name| compact_index_client.info(name) }
rescue TooManyRequestsError # rubygems.org is rate limiting us, slow down.
@bundle_worker&.stop
@bundle_worker = nil # reset it. Not sure if necessary
compact_index_client.reset!
names.map {|name| compact_index_client.info(name) }
end

def serial_compact_index_client
compact_index_client.sequential_execution_mode!
compact_index_client
def in_parallel(inputs, &blk)
func = lambda {|object, _index| blk.call(object) }
worker = bundle_worker(func)
inputs.each {|input| worker.enq(input) }
inputs.map { worker.deq }
end

def bundle_worker(func = nil)
Expand Down
7 changes: 2 additions & 5 deletions spec/bundler/bundler/fetcher/compact_index_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 10,12 @@
let(:remote) { double(:remote, cache_slug: "lsjdf", uri: display_uri) }
let(:gem_remote_fetcher) { nil }
let(:compact_index) { described_class.new(downloader, remote, display_uri, gem_remote_fetcher) }
let(:compact_index_client) { double(:compact_index_client, available?: true, info: [["lskdjf", "1", nil, [], []]]) }

before do
allow(response).to receive(:is_a?).with(Gem::Net::HTTPNotModified).and_return(true)
allow(compact_index).to receive(:log_specs) {}
allow(compact_index).to receive(:compact_index_client).and_return(compact_index_client)
end

describe "#specs_for_names" do
Expand All @@ -34,11 36,6 @@
end

describe "#available?" do
before do
allow(compact_index).to receive(:compact_index_client).
and_return(double(:compact_index_client, available?: true))
end

it "returns true" do
expect(compact_index).to be_available
end
Expand Down

0 comments on commit 78860b8

Please sign in to comment.