Skip to content

Commit

Permalink
Under distributed mode, an instance will not terminate itself unless …
Browse files Browse the repository at this point in the history
…it is stopped by RPC, this will not affect the single mode
  • Loading branch information
qinxuye committed Nov 24, 2013
1 parent e3f947c commit a254006
Showing 1 changed file with 16 additions and 3 deletions.
19 changes: 16 additions & 3 deletions cola/worker/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 80,10 @@ def __init__(self, job, data_dir, context=None, logger=None,
# budget
self.budget = 0

# lock when not stopped
self.stop_lock = threading.Lock()
self.stop_lock.acquire()

self.check()
# init rpc server
self.init_rpc_server()
Expand Down Expand Up @@ -124,6 128,12 @@ def init_mq(self):
copies=self.copies)
self.mq.init_store(mq_store_dir, mq_backup_dir,
verify_exists_hook=self._init_bloom_filter())

def _release_stop_lock(self):
try:
self.stop_lock.release()
except:
pass

def check(self):
env_legal = self.check_env(force=self.force)
Expand Down Expand Up @@ -155,6 165,7 @@ def error(self, obj):

def stop(self):
self.mq.put(self.executings, force=True)
self._release_stop_lock()
super(BasicWorkerJobLoader, self).stop()

def signal_handler(self, signum, frame):
Expand Down Expand Up @@ -283,7 294,7 @@ def add_node(self, node):
if self.mq is not None:
self.mq.add_node(node)

def _run(self):
def _run(self, stop_when_finish=False):
def _call(opener=None):
if opener is None:
opener = self.job.opener_cls()
Expand All @@ -306,6 317,8 @@ def _call(opener=None):

try:
threads = [threading.Thread(target=_call) for _ in range(self.instances)]
if not stop_when_finish:
threads.append(threading.Thread(target=self.stop_lock.acquire))
for t in threads:
t.start()
for t in threads:
Expand Down Expand Up @@ -341,7 354,7 @@ def __init__(self, job, data_dir, master=None, local=None, nodes=None,
def finish(self):
LimitionJobLoader.finish(self)
BasicWorkerJobLoader.finish(self)

def stop(self):
LimitionJobLoader.stop(self)
BasicWorkerJobLoader.stop(self)
Expand Down Expand Up @@ -371,7 384,7 @@ def _require_budget(self):
def run(self, put_starts=True):
if put_starts:
self.mq.put(self.job.starts)
self._run()
self._run(stop_when_finish=True)

class WorkerJobLoader(BasicWorkerJobLoader):
def __init__(self, job, data_dir, master, local=None, nodes=None,
Expand Down

0 comments on commit a254006

Please sign in to comment.