Skip to content

Commit

Permalink
reconstruct; move recover to cola/worker; add recover to wiki and gen…
Browse files Browse the repository at this point in the history
…eric
  • Loading branch information
qinxuye committed Nov 24, 2013
1 parent c333f1a commit e3f947c
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 90 deletions.
58 changes: 58 additions & 0 deletions cola/worker/recover.py
Original file line number Diff line number Diff line change
@@ -0,0 1,58 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
Copyright (c) 2013 Qin Xuye <[email protected]>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Created on 2013-11-24
@author: Chine
'''

import os

from cola.core.utils import root_dir, import_job

def recover(job_path):
job = import_job(job_path)

data_path = os.path.join(root_dir(), 'data')
root = os.path.join(data_path, 'worker', 'jobs', job.real_name)
if os.path.exists(root):
lock_path = os.path.join(root, 'lock')
if os.path.exists(lock_path):
os.remove(lock_path)

def _recover_dir(dir_):
for f in os.listdir(dir_):
if f.endswith('.old'):
f_path = os.path.join(dir_, f)
os.remove(f_path)

for f in os.listdir(dir_):
if f == 'lock':
lock_f = os.path.join(dir_, f)
os.remove(lock_f)

f_path = os.path.join(dir_, f)
if os.path.isfile(f_path) and not f.endswith('.old'):
os.rename(f_path, f_path '.old')

mq_store_dir = os.path.join(root, 'store')
mq_backup_dir = os.path.join(root, 'backup')
if os.path.exists(mq_store_dir):
_recover_dir(mq_store_dir)
if os.path.exists(mq_backup_dir):
_recover_dir(mq_backup_dir)
19 changes: 10 additions & 9 deletions contrib/generic/stop.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 27,10 @@
from cola.core.utils import get_ip
from cola.core.config import Config
from cola.core.logs import get_logger
from cola.worker.recover import recover

logger = get_logger(name='generic_stop')

def _client_call(*args):
try:
return client_call(*args)
except socket.error:
logger.error('Cannot connect to single running worker.')
except:
pass

get_user_conf = lambda s: os.path.join(os.path.dirname(os.path.abspath(__file__)), s)
user_conf = get_user_conf('test.yaml')
if not os.path.exists(user_conf):
Expand All @@ -47,5 40,13 @@ def _client_call(*args):
if __name__ == '__main__':
ip, port = get_ip(), getattr(user_config.job, 'port')
logger.info('Trying to stop single running worker')
_client_call('%s:%s' % (ip, port), 'stop')
try:
client_call('%s:%s' % (ip, port), 'stop')
except socket.error:
stop = raw_input("Force to stop? (y or n) ").strip()
if stop == 'y' or stop == 'yes':
job_path = os.path.split(os.path.abspath(__file__))[0]
recover()
else:
print 'ignore'
logger.info('Successfully stopped single running worker')
40 changes: 4 additions & 36 deletions contrib/weibo/stop.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,47 24,14 @@
import os

from cola.core.rpc import client_call
from cola.core.utils import get_ip, root_dir, import_job
from cola.core.utils import get_ip
from cola.core.logs import get_logger
from cola.worker.recover import recover

from conf import user_config

logger = get_logger(name='sina_stop')

def _recover():
job_path = os.path.split(os.path.abspath(__file__))[0]
job = import_job(job_path)

data_path = os.path.join(root_dir(), 'data')
root = os.path.join(data_path, 'worker', 'jobs', job.real_name)
if os.path.exists(root):
lock_path = os.path.join(root, 'lock')
if os.path.exists(lock_path):
os.remove(lock_path)

def _recover_dir(dir_):
for f in os.listdir(dir_):
if f.endswith('.old'):
f_path = os.path.join(dir_, f)
os.remove(f_path)

for f in os.listdir(dir_):
if f == 'lock':
lock_f = os.path.join(dir_, f)
os.remove(lock_f)

f_path = os.path.join(dir_, f)
if os.path.isfile(f_path) and not f.endswith('.old'):
os.rename(f_path, f_path '.old')

mq_store_dir = os.path.join(root, 'store')
mq_backup_dir = os.path.join(root, 'backup')
if os.path.exists(mq_store_dir):
_recover_dir(mq_store_dir)
if os.path.exists(mq_backup_dir):
_recover_dir(mq_backup_dir)


if __name__ == '__main__':
ip, port = get_ip(), getattr(user_config.job, 'port')
logger.info('Trying to stop single running worker')
Expand All @@ -73,7 40,8 @@ def _recover_dir(dir_):
except socket.error:
stop = raw_input("Force to stop? (y or n) ").strip()
if stop == 'y' or stop == 'yes':
_recover()
job_path = os.path.split(os.path.abspath(__file__))[0]
recover(job_path)
else:
print 'ignore'
logger.info('Successfully stopped single running worker')
40 changes: 4 additions & 36 deletions contrib/weibosearch/stop.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,47 24,14 @@
import os

from cola.core.rpc import client_call
from cola.core.utils import get_ip, root_dir, import_job
from cola.core.utils import get_ip
from cola.core.logs import get_logger
from cola.worker.recover import recover

from conf import user_config

logger = get_logger(name='weibosearch_stop')

def _recover():
job_path = os.path.split(os.path.abspath(__file__))[0]
job = import_job(job_path)

data_path = os.path.join(root_dir(), 'data')
root = os.path.join(data_path, 'worker', 'jobs', job.real_name)
if os.path.exists(root):
lock_path = os.path.join(root, 'lock')
if os.path.exists(lock_path):
os.remove(lock_path)

def _recover_dir(dir_):
for f in os.listdir(dir_):
if f.endswith('.old'):
f_path = os.path.join(dir_, f)
os.remove(f_path)

for f in os.listdir(dir_):
if f == 'lock':
lock_f = os.path.join(dir_, f)
os.remove(lock_f)

f_path = os.path.join(dir_, f)
if os.path.isfile(f_path) and not f.endswith('.old'):
os.rename(f_path, f_path '.old')

mq_store_dir = os.path.join(root, 'store')
mq_backup_dir = os.path.join(root, 'backup')
if os.path.exists(mq_store_dir):
_recover_dir(mq_store_dir)
if os.path.exists(mq_backup_dir):
_recover_dir(mq_backup_dir)


if __name__ == '__main__':
ip, port = get_ip(), getattr(user_config.job, 'port')
logger.info('Trying to stop single running worker')
Expand All @@ -73,7 40,8 @@ def _recover_dir(dir_):
except socket.error:
stop = raw_input("Force to stop? (y or n) ").strip()
if stop == 'y' or stop == 'yes':
_recover()
job_path = os.path.split(os.path.abspath(__file__))[0]
recover()
else:
print 'ignore'
logger.info('Successfully stopped single running worker')
19 changes: 10 additions & 9 deletions contrib/wiki/stop.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 27,10 @@
from cola.core.utils import get_ip
from cola.core.config import Config
from cola.core.logs import get_logger
from cola.worker.recover import recover

logger = get_logger(name='wiki_stop')

def _client_call(*args):
try:
return client_call(*args)
except socket.error:
logger.error('Cannot connect to single running worker.')
except:
pass

get_user_conf = lambda s: os.path.join(os.path.dirname(os.path.abspath(__file__)), s)
user_conf = get_user_conf('test.yaml')
if not os.path.exists(user_conf):
Expand All @@ -47,5 40,13 @@ def _client_call(*args):
if __name__ == '__main__':
ip, port = get_ip(), getattr(user_config.job, 'port')
logger.info('Trying to stop single running worker')
_client_call('%s:%s' % (ip, port), 'stop')
try:
client_call('%s:%s' % (ip, port), 'stop')
except socket.error:
stop = raw_input("Force to stop? (y or n) ").strip()
if stop == 'y' or stop == 'yes':
job_path = os.path.split(os.path.abspath(__file__))[0]
recover()
else:
print 'ignore'
logger.info('Successfully stopped single running worker')

0 comments on commit e3f947c

Please sign in to comment.