Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel formatting to increase speed #6095

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 49,7 @@ regex = "1.7"
serde = { version = "1.0.160", features = ["derive"] }
serde_json = "1.0"
term = "0.7"
termcolor = "1.2.0"
thiserror = "1.0.40"
toml = "0.7.4"
tracing = "0.1.37"
Expand Down
11 changes: 9 additions & 2 deletions src/attr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 147,7 @@ fn format_derive(
.tactic(tactic)
.trailing_separator(trailing_separator)
.ends_with_newline(false);
let item_str = write_list(&all_items, &fmt)?;
let item_str = write_list(&all_items, &fmt, context.printer)?;

debug!("item_str: '{}'", item_str);

Expand Down Expand Up @@ -235,6 235,7 @@ fn rewrite_initial_doc_comments(
&snippet,
shape.comment(context.config),
context.config,
context.printer,
)?),
));
}
Expand Down Expand Up @@ -318,7 319,12 @@ impl Rewrite for ast::Attribute {
fn rewrite(&self, context: &RewriteContext<'_>, shape: Shape) -> Option<String> {
let snippet = context.snippet(self.span);
if self.is_doc_comment() {
rewrite_doc_comment(snippet, shape.comment(context.config), context.config)
rewrite_doc_comment(
snippet,
shape.comment(context.config),
context.config,
context.printer,
)
} else {
let should_skip = self
.ident()
Expand Down Expand Up @@ -347,6 353,7 @@ impl Rewrite for ast::Attribute {
&doc_comment,
shape.comment(context.config),
context.config,
context.printer,
);
}
}
Expand Down
209 changes: 166 additions & 43 deletions src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 9,19 @@ use rustfmt_nightly as rustfmt;
use tracing_subscriber::EnvFilter;

use std::collections::HashMap;
use std::env;
use std::fs::File;
use std::io::{self, stdout, Read, Write};
use std::num::NonZeroUsize;
use std::path::{Path, PathBuf};
use std::str::FromStr;

use getopts::{Matches, Options};
use std::thread::JoinHandle;
use std::{env, panic};

use crate::rustfmt::{
load_config, CliOptions, Color, Config, Edition, EmitMode, FileLines, FileName,
FormatReportFormatterBuilder, Input, Session, Verbosity,
buf_eprintln, buf_println, load_config, print::Printer, CliOptions, Color, Config, Edition,
EmitMode, FileLines, FileName, FormatReportFormatterBuilder, Input, Session, Verbosity,
};
use getopts::{Matches, Options};

const BUG_REPORT_URL: &str = "https://github.com/rust-lang/rustfmt/issues/new?labels=bug";

Expand Down Expand Up @@ -289,8 290,10 @@ fn format_string(input: String, options: GetOptsOptions) -> Result<i32> {
}

let out = &mut stdout();
let mut session = Session::new(config, Some(out));
let printer = Printer::new(config.color());
let mut session = Session::new(config, Some(out), &printer);
format_and_emit_report(&mut session, Input::Text(input));
printer.write_to_outputs()?;

let exit_code = if session.has_operational_errors() || session.has_parsing_errors() {
1
Expand All @@ -300,50 303,153 @@ fn format_string(input: String, options: GetOptsOptions) -> Result<i32> {
Ok(exit_code)
}

struct ThreadedFileOutput {
session_result: Result<Vec<u8>, std::io::Error>,
id: i32,
exit_code: i32,
printer: Printer,
}

fn format(
files: Vec<PathBuf>,
minimal_config_path: Option<String>,
options: &GetOptsOptions,
) -> Result<i32> {
options.verify_file_lines(&files);
let (config, config_path) = load_config(None, Some(options.clone()))?;
let cfg_path_is_none = config_path.is_none();

if config.verbose() == Verbosity::Verbose {
if let Some(path) = config_path.as_ref() {
println!("Using rustfmt config file {}", path.display());
}
}

let out = &mut stdout();
let mut session = Session::new(config, Some(out));

let parallelism = std::thread::available_parallelism().unwrap_or(NonZeroUsize::MIN);
// Use a channel map to get 'next-completed' thread, rather than
// waiting on the chronologically first handle to join, impactful if there are more files
// than available parallelism.
let (send, recv) = std::sync::mpsc::channel();
let mut handles: HashMap<i32, JoinHandle<_>> = HashMap::new();

let mut exit_code = 0;
let mut outstanding = 0;
// If the thread panics, the channel will just be dropped,
// so keep track of the spinning threads to get a stacktrace from it later
let mut id = 0;
let check = options.check;
let color = config.color();
for file in files {
if !file.exists() {
eprintln!("Error: file `{}` does not exist", file.to_str().unwrap());
session.add_operational_error();
} else if file.is_dir() {
eprintln!("Error: `{}` is a directory", file.to_str().unwrap());
session.add_operational_error();
} else {
// Check the file directory if the config-path could not be read or not provided
if config_path.is_none() {
let (local_config, config_path) =
load_config(Some(file.parent().unwrap()), Some(options.clone()))?;
if local_config.verbose() == Verbosity::Verbose {
if let Some(path) = config_path {
println!(
"Using rustfmt config file {} for {}",
path.display(),
file.display()
);
let cfg = config.clone();
let opts = options.clone();
let s = send.clone();
let handle = std::thread::spawn(move || {
let my_id = id;
let mut session_out = Vec::new();
let printer = Printer::new(color);
let mut session = Session::new(cfg, Some(&mut session_out), &printer);
if !file.exists() {
buf_eprintln!(
printer,
"Error: file `{}` does not exist",
file.to_str().unwrap()
);
session.add_operational_error();
} else if file.is_dir() {
buf_eprintln!(
printer,
"Error: `{}` is a directory",
file.to_str().unwrap()
);
session.add_operational_error();
} else {
// Check the file directory if the config-path could not be read or not provided
if cfg_path_is_none {
let (local_config, config_path) =
match load_config(Some(file.parent().unwrap()), Some(opts)) {
Ok((lc, cf)) => (lc, cf),
Err(e) => {
drop(session);
let _ = s.send(ThreadedFileOutput {
session_result: Err(e),
id: my_id,
exit_code: 1,
printer,
});
return Ok::<_, std::io::Error>(());
}
};
if local_config.verbose() == Verbosity::Verbose {
if let Some(path) = config_path {
buf_println!(
printer,
"Using rustfmt config file {} for {}",
path.display(),
file.display()
);
}
}
}

session.override_config(local_config, |sess| {
format_and_emit_report(sess, Input::File(file))
});
session.override_config(local_config, |sess| {
format_and_emit_report(sess, Input::File(file))
});
} else {
format_and_emit_report(&mut session, Input::File(file));
}
}
let exit_code = if session.has_operational_errors()
|| session.has_parsing_errors()
|| ((session.has_diff() || session.has_check_errors()) && check)
{
1
} else {
0
};
drop(session);
let _ = s.send(ThreadedFileOutput {
session_result: Ok(session_out),
id: my_id,
exit_code,
printer,
});
Ok(())
});
handles.insert(id, handle);
id = 1;
outstanding = 1;
if outstanding >= parallelism.get() {
if let Ok(thread_out) = recv.recv() {
let exit = join_thread_reporting_back(&mut handles, thread_out)?;
if exit != 0 {
exit_code = exit;
}
outstanding -= 1;
} else {
format_and_emit_report(&mut session, Input::File(file));
break;
}
}
}
// Drop sender, or this will deadlock
drop(send);

// Drain running threads
while let Ok(thread_out) = recv.recv() {
let exit = join_thread_reporting_back(&mut handles, thread_out)?;
if exit != 0 {
exit_code = exit;
}
}

// All successful threads have been removed from `handles` only errors are left
for (_id, jh) in handles {
match jh.join() {
Ok(res) => {
res?;
}
Err(panicked) => {
// Propagate the thread's panic, not much to do here
// if the error should be preserved (which it should)
panic::resume_unwind(panicked)
}
}
}
Expand All @@ -352,26 458,43 @@ fn format(
// that were used during formatting as TOML.
if let Some(path) = minimal_config_path {
let mut file = File::create(path)?;
let toml = session.config.used_options().to_toml()?;
let toml = config.used_options().to_toml()?;
file.write_all(toml.as_bytes())?;
}

let exit_code = if session.has_operational_errors()
|| session.has_parsing_errors()
|| ((session.has_diff() || session.has_check_errors()) && options.check)
{
1
} else {
0
};
Ok(exit_code)
}

fn join_thread_reporting_back(
handles: &mut HashMap<i32, JoinHandle<core::result::Result<(), std::io::Error>>>,
threaded_file_output: ThreadedFileOutput,
) -> Result<i32> {
let handle = handles
.remove(&threaded_file_output.id)
.expect("Join thread not found by id");
match handle.join() {
Ok(result) => {
result?;
}
Err(panicked) => {
// Should never end up here logically, since the thread reported back
panic::resume_unwind(panicked)
}
}
let output = threaded_file_output.session_result?;
if !output.is_empty() {
stdout().write_all(&output).unwrap();
}
threaded_file_output.printer.write_to_outputs()?;
Ok(threaded_file_output.exit_code)
}

fn format_and_emit_report<T: Write>(session: &mut Session<'_, T>, input: Input) {
match session.format(input) {
Ok(report) => {
if report.has_warnings() {
eprintln!(
buf_eprintln!(
session.printer,
"{}",
FormatReportFormatterBuilder::new(&report)
.enable_colors(should_print_with_colors(session))
Expand All @@ -380,7 503,7 @@ fn format_and_emit_report<T: Write>(session: &mut Session<'_, T>, input: Input)
}
}
Err(msg) => {
eprintln!("Error writing files: {msg}");
buf_eprintln!(session.printer, "Error writing files: {msg}");
session.add_operational_error();
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/chains.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 293,7 @@ impl Rewrite for ChainItem {
),
ChainItemKind::Await => ".await".to_owned(),
ChainItemKind::Comment(ref comment, _) => {
rewrite_comment(comment, false, shape, context.config)?
rewrite_comment(comment, false, shape, context.config, context.printer)?
}
};
Some(format!("{rewrite}{}", "?".repeat(self.tries)))
Expand Down
2 changes: 1 addition & 1 deletion src/closures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 323,7 @@ fn rewrite_closure_fn_decl(
let fmt = ListFormatting::new(param_shape, context.config)
.tactic(tactic)
.preserve_newline(true);
let list_str = write_list(&item_vec, &fmt)?;
let list_str = write_list(&item_vec, &fmt, context.printer)?;
let mut prefix = format!("{binder}{const_}{immovable}{coro}{mover}|{list_str}|");

if !ret_str.is_empty() {
Expand Down
Loading
Loading