Skip to content

Commit

Permalink
Revert v2 zipkin support due to pantsbuild#7415. (pantsbuild#7773)
Browse files Browse the repository at this point in the history
### Problem

As described in pantsbuild#7415, background workunits fail when zipkin is enabled. 

### Solution

Revert pantsbuild#7342 until that can be fixed.
  • Loading branch information
stuhood authored May 20, 2019
1 parent d4d8f4c commit 58706c1
Show file tree
Hide file tree
Showing 17 changed files with 27 additions and 226 deletions.
8 changes: 3 additions & 5 deletions src/python/pants/bin/local_pants_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 89,7 @@ def parse_options(args, env, options_bootstrapper=None):
return options, build_config, options_bootstrapper

@staticmethod
def _maybe_init_graph_session(graph_session, options_bootstrapper,build_config, options):
def _maybe_init_graph_session(graph_session, options_bootstrapper,build_config, global_options):
if graph_session:
return graph_session

Expand All @@ -101,9 101,7 @@ def _maybe_init_graph_session(graph_session, options_bootstrapper,build_config,
build_config
)

v2_ui = options.for_global_scope().v2_ui
zipkin_trace_v2 = options.for_scope('reporting').zipkin_trace_v2
return graph_scheduler_helper.new_session(zipkin_trace_v2, v2_ui)
return graph_scheduler_helper.new_session(global_options.v2_ui)

@staticmethod
def _maybe_init_target_roots(target_roots, graph_session, options, build_root):
Expand Down Expand Up @@ -158,7 156,7 @@ def create(cls, exiter, args, env, target_roots=None, daemon_graph_session=None,
daemon_graph_session,
options_bootstrapper,
build_config,
options
global_options
)

target_roots = cls._maybe_init_target_roots(
Expand Down
4 changes: 2 additions & 2 deletions src/python/pants/engine/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -747,8 747,8 @@ def new_execution_request(self):
self.lib.execution_request_create(),
self.lib.execution_request_destroy)

def new_session(self, scheduler, should_record_zipkin_spans, should_render_ui, ui_worker_count):
return self.gc(self.lib.session_create(scheduler, should_record_zipkin_spans, should_render_ui, ui_worker_count), self.lib.session_destroy)
def new_session(self, scheduler, should_render_ui, ui_worker_count):
return self.gc(self.lib.session_create(scheduler, should_render_ui, ui_worker_count), self.lib.session_destroy)

def new_scheduler(self,
tasks,
Expand Down
10 changes: 2 additions & 8 deletions src/python/pants/engine/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,11 343,9 @@ def lease_files_in_graph(self):
def garbage_collect_store(self):
self._native.lib.garbage_collect_store(self._scheduler)

def new_session(self, zipkin_trace_v2, v2_ui=False):
def new_session(self, v2_ui=False):
"""Creates a new SchedulerSession for this Scheduler."""
return SchedulerSession(self, self._native.new_session(
self._scheduler, zipkin_trace_v2, v2_ui, multiprocessing.cpu_count())
)
return SchedulerSession(self, self._native.new_session(self._scheduler, v2_ui, multiprocessing.cpu_count()))


_PathGlobsAndRootCollection = Collection.of(PathGlobsAndRoot)
Expand Down Expand Up @@ -435,10 433,6 @@ def metrics(self):
"""Returns metrics for this SchedulerSession as a dict of metric name to metric value."""
return self._scheduler._metrics(self._session)

@staticmethod
def engine_workunits(metrics):
return metrics.get("engine_workunits")

def with_fork_context(self, func):
return self._scheduler.with_fork_context(func)

Expand Down
6 changes: 1 addition & 5 deletions src/python/pants/goal/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 156,7 @@ def executing(self):
"""A contextmanager that sets metrics in the context of a (v1) engine execution."""
self._set_target_root_count_in_runtracker()
yield
metrics = self._scheduler.metrics()
self.run_tracker.pantsd_stats.set_scheduler_metrics(metrics)
engine_workunits = self._scheduler.engine_workunits(metrics)
if engine_workunits:
self.run_tracker.report.bulk_record_workunits(engine_workunits)
self.run_tracker.pantsd_stats.set_scheduler_metrics(self._scheduler.metrics())
self._set_affected_target_count_in_runtracker()

def _set_target_root_count_in_runtracker(self):
Expand Down
4 changes: 2 additions & 2 deletions src/python/pants/init/engine_initializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 154,8 @@ class GlobsHandlingTargetAdaptor(base_class):
class LegacyGraphScheduler(datatype(['scheduler', 'build_file_aliases', 'goal_map'])):
"""A thin wrapper around a Scheduler configured with @rules for a symbol table."""

def new_session(self, zipkin_trace_v2, v2_ui=False):
session = self.scheduler.new_session(zipkin_trace_v2, v2_ui)
def new_session(self, v2_ui=False):
session = self.scheduler.new_session(v2_ui)
return LegacyGraphSession(session, self.build_file_aliases, self.goal_map)


Expand Down
3 changes: 1 addition & 2 deletions src/python/pants/pantsd/service/scheduler_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 180,7 @@ def prefork(self, options, options_bootstrapper):
self._logger.debug('graph len was {}, waiting for initial watchman event'.format(graph_len))
self._watchman_is_running.wait()
v2_ui = options.for_global_scope().v2_ui
zipkin_trace_v2 = options.for_scope('reporting').zipkin_trace_v2
session = self._graph_helper.new_session(zipkin_trace_v2, v2_ui)
session = self._graph_helper.new_session(v2_ui)

if options.for_global_scope().loop:
prefork_fn = self._prefork_loop
Expand Down
5 changes: 0 additions & 5 deletions src/python/pants/reporting/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 140,3 @@ def _notify(self):
if len(s) > 0:
for reporter in self._reporters.values():
reporter.handle_output(workunit, label, s)

def bulk_record_workunits(self, engine_workunits):
with self._lock:
for reporter in self._reporters.values():
reporter.bulk_record_workunits(engine_workunits)
4 changes: 0 additions & 4 deletions src/python/pants/reporting/reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 47,6 @@ def end_workunit(self, workunit):
"""A workunit has finished."""
pass

def bulk_record_workunits(self, engine_workunits):
"""A collection of workunits from v2 engine part"""
pass

def handle_log(self, workunit, level, *msg_elements):
"""Handle a message logged by pants code.
Expand Down
2 changes: 0 additions & 2 deletions src/python/pants/reporting/reporting.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 62,6 @@ def register_options(cls, register):
'or not set when running a Pants command.')
register('--zipkin-sample-rate', advanced=True, default=100.0,
help='Rate at which to sample Zipkin traces. Value 0.0 - 100.0.')
register('--zipkin-trace-v2', advanced=True, type=bool, default=False,
help='If enabled, the zipkin spans are tracked for v2 engine execution progress.')

def initialize(self, run_tracker, all_options, start_time=None):
"""Initialize with the given RunTracker.
Expand Down
37 changes: 3 additions & 34 deletions src/python/pants/reporting/zipkin_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 7,7 @@
import logging

import requests
from py_zipkin import Encoding, storage
from py_zipkin import Encoding
from py_zipkin.transport import BaseTransportHandler
from py_zipkin.util import generate_random_64bit_string
from py_zipkin.zipkin import ZipkinAttrs, create_attrs_for_span, zipkin_span
Expand Down Expand Up @@ -64,7 64,6 @@ def __init__(self, run_tracker, settings, endpoint, trace_id, parent_id, sample_
self.parent_id = parent_id
self.sample_rate = float(sample_rate)
self.endpoint = endpoint
self.span_storage = storage.default_span_storage()

def start_workunit(self, workunit):
"""Implementation of Reporter callback."""
Expand Down Expand Up @@ -97,24 96,19 @@ def start_workunit(self, workunit):
sample_rate=self.sample_rate, # Value between 0.0 and 100.0
)
self.trace_id = zipkin_attrs.trace_id
# TODO delete this line when parent_id will be passed in v2 engine:
# - with ExecutionRequest when Nodes from v2 engine are called by a workunit;
# - when a v2 engine Node is called by another v2 engine Node.
self.parent_id = zipkin_attrs.span_id


span = zipkin_span(
service_name=service_name,
span_name=workunit.name,
transport_handler=self.handler,
encoding=Encoding.V1_THRIFT,
zipkin_attrs=zipkin_attrs,
span_storage=self.span_storage,
zipkin_attrs=zipkin_attrs
)
else:
span = zipkin_span(
service_name=service_name,
span_name=workunit.name,
span_storage=self.span_storage,
)
self._workunits_to_spans[workunit] = span
span.start()
Expand All @@ -135,28 129,3 @@ def close(self):
endpoint = self.endpoint.replace("/api/v1/spans", "")

logger.debug("Zipkin trace may be located at this URL {}/traces/{}".format(endpoint, self.trace_id))

def bulk_record_workunits(self, engine_workunits):
"""A collection of workunits from v2 engine part"""
for workunit in engine_workunits:
duration = workunit['end_timestamp'] - workunit['start_timestamp']

span = zipkin_span(
service_name="pants",
span_name=workunit['name'],
duration=duration,
span_storage=self.span_storage,
)
span.zipkin_attrs = ZipkinAttrs(
trace_id=self.trace_id,
span_id=workunit['span_id'],
# TODO change it when we properly pass parent_id to the v2 engine Nodes
# TODO Pass parent_id with ExecutionRequest when v2 engine is called by a workunit
# TODO pass parent_id when v2 engine Node is called by another v2 engine Node
parent_span_id=workunit.get("parent_id", self.parent_id),
flags='0', # flags: stores flags header. Currently unused
is_sampled=True,
)
span.start()
span.start_timestamp = workunit['start_timestamp']
span.stop()
6 changes: 1 addition & 5 deletions src/rust/engine/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 16,6 @@ use crate::core::{Failure, TypeId};
use crate::handles::maybe_drop_handles;
use crate::nodes::{NodeKey, WrappedNode};
use crate::rule_graph::RuleGraph;
use crate::scheduler::Session;
use crate::tasks::Tasks;
use crate::types::Types;
use boxfuture::{BoxFuture, Boxable};
Expand Down Expand Up @@ -275,15 274,13 @@ impl Core {
pub struct Context {
pub entry_id: EntryId,
pub core: Arc<Core>,
pub session: Session,
}

impl Context {
pub fn new(entry_id: EntryId, core: Arc<Core>, session: Session) -> Context {
pub fn new(entry_id: EntryId, core: Arc<Core>) -> Context {
Context {
entry_id: entry_id,
core: core,
session: session,
}
}

Expand Down Expand Up @@ -317,7 314,6 @@ impl NodeContext for Context {
Context {
entry_id: entry_id,
core: self.core.clone(),
session: self.session.clone(),
}
}

Expand Down
26 changes: 1 addition & 25 deletions src/rust/engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,33 337,11 @@ pub extern "C" fn scheduler_metrics(
) -> Handle {
with_scheduler(scheduler_ptr, |scheduler| {
with_session(session_ptr, |session| {
let mut values = scheduler
let values = scheduler
.metrics(session)
.into_iter()
.flat_map(|(metric, value)| vec![externs::store_utf8(metric), externs::store_i64(value)])
.collect::<Vec<_>>();
if session.should_record_zipkin_spans() {
let workunits = session
.get_workunits()
.lock()
.iter()
.map(|workunit| {
let workunit_zipkin_trace_info = vec![
externs::store_utf8("name"),
externs::store_utf8(&workunit.name),
externs::store_utf8("start_timestamp"),
externs::store_f64(workunit.start_timestamp),
externs::store_utf8("end_timestamp"),
externs::store_f64(workunit.end_timestamp),
externs::store_utf8("span_id"),
externs::store_utf8(&workunit.span_id),
];
externs::store_dict(&workunit_zipkin_trace_info)
})
.collect::<Vec<_>>();
values.push(externs::store_utf8("engine_workunits"));
values.push(externs::store_tuple(&workunits));
};
externs::store_dict(&values).into()
})
})
Expand Down Expand Up @@ -569,14 547,12 @@ pub extern "C" fn nodes_destroy(raw_nodes_ptr: *mut RawNodes) {
#[no_mangle]
pub extern "C" fn session_create(
scheduler_ptr: *mut Scheduler,
should_record_zipkin_spans: bool,
should_render_ui: bool,
ui_worker_count: u64,
) -> *const Session {
with_scheduler(scheduler_ptr, |scheduler| {
Box::into_raw(Box::new(Session::new(
scheduler,
should_record_zipkin_spans,
should_render_ui,
ui_worker_count as usize,
)))
Expand Down
45 changes: 0 additions & 45 deletions src/rust/engine/src/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 29,7 @@ use fs::{
use hashing;
use process_execution::{self, CommandRunner};

use crate::scheduler::WorkUnit;
use graph::{Entry, Node, NodeError, NodeTracer, NodeVisualizer};
use rand::thread_rng;
use rand::Rng;

pub type NodeFuture<T> = BoxFuture<T, Failure>;

Expand Down Expand Up @@ -1088,17 1085,6 @@ impl Node for NodeKey {
type Error = Failure;

fn run(self, context: Context) -> NodeFuture<NodeResult> {
let node_name_and_start_timestamp = if context.session.should_record_zipkin_spans() {
let node_name = format!("{}", self);
let start_timestamp_duration = std::time::SystemTime::now()
.duration_since(std::time::SystemTime::UNIX_EPOCH)
.unwrap();
let start_timestamp = duration_as_float_secs(&start_timestamp_duration);
Some((node_name, start_timestamp))
} else {
None
};
let context2 = context.clone();
match self {
NodeKey::DigestFile(n) => n.run(context).map(NodeResult::from).to_boxed(),
NodeKey::DownloadedFile(n) => n.run(context).map(NodeResult::from).to_boxed(),
Expand All @@ -1109,22 1095,6 @@ impl Node for NodeKey {
NodeKey::Snapshot(n) => n.run(context).map(NodeResult::from).to_boxed(),
NodeKey::Task(n) => n.run(context).map(NodeResult::from).to_boxed(),
}
.inspect(move |_: &NodeResult| {
if let Some((node_name, start_timestamp)) = node_name_and_start_timestamp {
let end_timestamp_duration = std::time::SystemTime::now()
.duration_since(std::time::SystemTime::UNIX_EPOCH)
.unwrap();
let end_timestamp = duration_as_float_secs(&end_timestamp_duration);
let workunit = WorkUnit {
name: node_name,
start_timestamp: start_timestamp,
end_timestamp: end_timestamp,
span_id: generate_random_64bit_string(),
};
context2.session.add_workunit(workunit)
};
})
.to_boxed()
}

fn digest(res: NodeResult) -> Option<hashing::Digest> {
Expand All @@ -1148,21 1118,6 @@ impl Node for NodeKey {
}
}

fn duration_as_float_secs(duration: &Duration) -> f64 {
// Returning value is formed by representing duration as a hole number of seconds (u64) plus
// a hole number of microseconds (u32) turned into a f64 type.
// Reverting time from duration to f64 decrease precision.
let whole_secs_in_duration = duration.as_secs() as f64;
let fract_part_of_duration_in_micros = f64::from(duration.subsec_micros());
whole_secs_in_duration fract_part_of_duration_in_micros / 1_000_000.0
}

fn generate_random_64bit_string() -> String {
let mut rng = thread_rng();
let random_u64: u64 = rng.gen();
format!("{:16.x}", random_u64)
}

impl Display for NodeKey {
fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
match self {
Expand Down
Loading

0 comments on commit 58706c1

Please sign in to comment.