Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge #9

Merged
merged 4 commits into from
Oct 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions hulk-common/src/main/java/com/mtl/hulk/common/Resource.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 6,6 @@ public interface Resource {

void destroyNow();

void closeFuture();

}
8 changes: 4 additions & 4 deletions hulk-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 20,12 @@
</dependency>
<dependency>
<groupId>com.mtl.hulk</groupId>
<artifactId>hulk-extension</artifactId>
<artifactId>hulk-extensions</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.mtl.hulk</groupId>
<artifactId>hulk-db</artifactId>
<artifactId>hulk-database</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
Expand All @@ -47,11 47,11 @@
</dependency>
<dependency>
<groupId>com.mtl.hulk</groupId>
<artifactId>hulk-extension</artifactId>
<artifactId>hulk-extensions</artifactId>
</dependency>
<dependency>
<groupId>com.mtl.hulk</groupId>
<artifactId>hulk-db</artifactId>
<artifactId>hulk-database</artifactId>
</dependency>
<dependency>
<groupId>com.mtl.hulk</groupId>
Expand Down
4 changes: 4 additions & 0 deletions hulk-core/src/main/java/com/mtl/hulk/AbstractHulk.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 35,8 @@ public void setProperties(HulkProperties properties) {
this.properties = properties;
}

public HulkProperties getProperties() {
return properties;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 29,7 @@ public BusinessActivityLogger(HulkDataSource ds, HulkSerializer serializer) {

public abstract int updateBusinessActivityState(String businessActivityId, BusinessActivityStatus businessActivityStatus) throws SQLException;

public abstract HulkTransactionActivity getTranactionBusinessActivity(BusinessActivityId businessActivityId);
public abstract HulkTransactionActivity getTranactionBusinessActivity(BusinessActivityId businessActivityId) throws SQLException;

public static String getBusinessActivityIdStr(BusinessActivityId businessActivityId) {
if (null == businessActivityId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 63,8 @@ public void run() {
businessActivityLogger.remove(businessActivityIds);
} catch (SQLException ex) {
logger.error("Hulk Retry Exception", ex);
} catch (Exception ex) {
logger.error("Hulk Retry Exception", ex);
}
}

Expand All @@ -74,4 76,5 @@ private int getRetryCount(String businessActivityIdStr) {
}
return map.get(businessActivityIdStr).incrementAndGet();
}

}
2 changes: 1 addition & 1 deletion hulk-core/src/main/java/com/mtl/hulk/HulkListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 18,6 @@ public HulkListener(HulkProperties properties, ApplicationContext apc) {
this.action = null;
}

public abstract boolean process();
public abstract boolean process() throws Exception;

}
4 changes: 2 additions & 2 deletions hulk-core/src/main/java/com/mtl/hulk/HulkResponseFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 5,8 @@

public class HulkResponseFactory {

public static HulkResponse getResponse(Integer result) {
if (result == 1) {
public static HulkResponse getResponse(boolean result) {
if (result) {
return new HulkResponse(0, RuntimeContextHolder.getContext().getActivity().getStatus().getDesc(), null);
}
if (RuntimeContextHolder.getContext().getActivity().getStatus() == BusinessActivityStatus.COMMITING_FAILED) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 2,14 @@

import com.alibaba.fastjson.JSONObject;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.mtl.hulk.HulkException;
import com.mtl.hulk.HulkInterceptor;
import com.mtl.hulk.HulkResourceManager;
import com.mtl.hulk.aop.HulkAspectSupport;
import com.mtl.hulk.configuration.HulkProperties;
import com.mtl.hulk.context.*;
import com.mtl.hulk.exception.ActionException;
import com.mtl.hulk.message.HulkErrorCode;
import com.mtl.hulk.tools.ExecutorUtil;
import com.mtl.hulk.tools.FutureUtil;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.slf4j.Logger;
Expand All @@ -18,10 18,7 @@
import java.io.Serializable;
import java.text.MessageFormat;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;

public class BrokerInterceptor extends HulkAspectSupport implements HulkInterceptor, MethodInterceptor, Serializable {

Expand All @@ -31,52 28,72 @@ public class BrokerInterceptor extends HulkAspectSupport implements HulkIntercep
Integer.MAX_VALUE, 10L,
TimeUnit.SECONDS, new SynchronousQueue<>(),
(new ThreadFactoryBuilder()).setNameFormat("Try-Thread-%d").build());
private static List<String> orders;
private static final List<Future> tryFutures = new CopyOnWriteArrayList<Future>();

public BrokerInterceptor(HulkProperties properties) {
super(properties);
}

/**
* 通过发起方发起事务请求,异步远程调用Try接口
* @param methodInvocation
* @return
* @throws Throwable
*/
@Override
public Object invoke(MethodInvocation methodInvocation) throws Throwable {
if (!orders.contains(methodInvocation.getMethod().getName())) {
orders.add(methodInvocation.getMethod().getName());
}
HulkResourceManager.getBam().setTryFuture(HulkResourceManager.getBam().getTryFuture().thenApplyAsync(am -> {
try {
String name = methodInvocation.getMethod().getName();
logger.info("Try request sending: {}", name);
Object result = methodInvocation.proceed();
HulkContext subBusinessActivity = JSONObject.parseObject((String) result, HulkContext.class);
am.put(orders.indexOf(name), subBusinessActivity);
return am;
} catch (Throwable t) {
logger.error("Broker Request Exception", t);
RuntimeContextHolder.getContext().setException(new HulkException(HulkErrorCode.TRY_FAIL.getCode(),
MessageFormat.format(HulkErrorCode.TRY_FAIL.getMessage(),
RuntimeContextHolder.getContext().getActivity().getId().formatString(), methodInvocation.getMethod().getName())));
Future<HulkContext> tryFuture = tryExecutor.submit(new Callable<HulkContext>() {
@Override
public HulkContext call() throws Exception {
logger.info("Try Request: {}", methodInvocation.getMethod().getName());
try {
return JSONObject.parseObject((String)methodInvocation.proceed(), HulkContext.class);
} catch (Throwable t) {
RuntimeContextHolder.getContext().setException(new com.mtl.hulk.HulkException(HulkErrorCode.TRY_FAIL.getCode(),
MessageFormat.format(HulkErrorCode.TRY_FAIL.getMessage(),
RuntimeContextHolder.getContext().getActivity().getId().formatString(), methodInvocation.getMethod().getName())));
throw new ActionException(methodInvocation.getMethod().getName(), t);
}
}
return null;
}, tryExecutor));
});
tryFutures.add(tryFuture);
return "ok";
}

public static void setOrders(List<String> orders) {
BrokerInterceptor.orders = orders;
}

public static List<String> getOrders() {
return orders;
public static List<Future> getTryFutures() {
return tryFutures;
}

@Override
public void destroy() {
orders.clear();
if (tryFutures.size() > 0) {
for (Future tryFuture : tryFutures) {
FutureUtil.gracefulCancel(tryFuture);
}
tryFutures.clear();
}
ExecutorUtil.gracefulShutdown(tryExecutor);
}

@Override
public void destroyNow() {
if (tryFutures.size() > 0) {
for (Future tryFuture : tryFutures) {
FutureUtil.cancelNow(tryFuture);
}
tryFutures.clear();
}
ExecutorUtil.shutdownNow(tryExecutor);
}

@Override
public void closeFuture() {
if (tryFutures.size() > 0) {
for (Future tryFuture : tryFutures) {
FutureUtil.cancelNow(tryFuture);
}
tryFutures.clear();
}
}

}
Original file line number Diff line number Diff line change
@@ -1,6 1,7 @@
package com.mtl.hulk.aop.interceptor;

import com.alibaba.fastjson.JSONObject;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.mtl.hulk.*;
import com.mtl.hulk.annotation.MTLDTActivity;
import com.mtl.hulk.annotation.MTLTwoPhaseAction;
Expand All @@ -15,7 16,6 @@
import com.mtl.hulk.tools.FutureUtil;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -29,12 29,20 @@ public class TransactionInterceptor extends HulkAspectSupport implements HulkInt
private static final Logger logger = LoggerFactory.getLogger(TransactionInterceptor.class);

private final ExecutorService transactionExecutor = Executors.newFixedThreadPool(properties.getTransactionThreadPoolSize());
private Future<Integer> future;
private final ScheduledExecutorService timeoutScheduledExecutorService = Executors.newScheduledThreadPool(properties.getTransactionThreadPoolSize(),
(new ThreadFactoryBuilder()).setNameFormat("Run-Timeout-Thread-%d").build());
private Future<Boolean> future;

public TransactionInterceptor(HulkProperties properties, ApplicationContext apc) {
super(properties, apc);
}

/**
* Try请求拦截处理
* @param methodInvocation
* @return
* @throws Throwable
*/
@Override
public Object invoke(MethodInvocation methodInvocation) throws Throwable {
if (!prepareContext(methodInvocation)) {
Expand All @@ -44,70 52,76 @@ public Object invoke(MethodInvocation methodInvocation) throws Throwable {
RuntimeContext context = RuntimeContextHolder.getContext();

HulkResponse response = null;
boolean status = true;
Integer result = 1;
ExecutorService loggerExecutor = HulkResourceManager.getBam().getLogExecutor();
try {
status = HulkResourceManager.getBam().start(methodInvocation);
if (status) {
RuntimeContextHolder.getContext().getActivity().setStatus(BusinessActivityStatus.TRIED);
future = transactionExecutor.submit(new BusinessActivityExecutor(new HulkContext(BusinessActivityContextHolder.getContext(),
RuntimeContextHolder.getContext())));
result = future.get(RuntimeContextHolder.getContext().getActivity().getTimeout(), TimeUnit.SECONDS);
} else {
RuntimeContextHolder.getContext().getActivity().setStatus(BusinessActivityStatus.TRYING_EXPT);
result = BooleanUtils.toInteger(status);
}

if (context.getActivity().getId() == null) {
HulkContext hulkContext = new HulkContext();
hulkContext.setBac(BusinessActivityContextHolder.getContext());
hulkContext.setRc(RuntimeContextHolder.getContext());
return JSONObject.toJSONString(hulkContext);
}

boolean status = HulkResourceManager.getBam().start(methodInvocation);
if (status) {
RuntimeContextHolder.getContext().getActivity().setStatus(BusinessActivityStatus.TRIED);
future = transactionExecutor.submit(new BusinessActivityExecutor(new HulkContext(BusinessActivityContextHolder.getContext(),
RuntimeContextHolder.getContext())));
timeoutScheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
if (!future.isDone()) {
logger.error("Transaction Execute Timeout!");
HulkResourceManager.getBam().getListener().closeFuture();
for (HulkInterceptor interceptor : HulkResourceManager.getInterceptors()) {
interceptor.closeFuture();
}
}
}
}, RuntimeContextHolder.getContext().getActivity().getTimeout(), TimeUnit.SECONDS);
status = future.get();
} else {
RuntimeContextHolder.getContext().getActivity().setStatus(BusinessActivityStatus.TRYING_EXPT);
}

loggerExecutor.submit(new BusinessActivityLoggerThread(properties,
new HulkContext(BusinessActivityContextHolder.getContext(), RuntimeContextHolder.getContext())));
response = HulkResponseFactory.getResponse(result);
} catch (TimeoutException ex) {
logger.error("Transaction Interceptor Error", ex);
response = processException(HulkErrorCode.COMMIT_TIMEOUT);
} catch (NullPointerException ex) {
logger.error("Transaction Interceptor Error", ex);
response = processException(HulkErrorCode.RUN_EXCEPTION);
response = HulkResponseFactory.getResponse(status);
} catch (Exception ex) {
logger.error("Transaction Interceptor Error", ex);
response = processException(HulkErrorCode.RUN_EXCEPTION);
logger.error("Transaction Execute Error", ex);
RuntimeContextHolder.getContext().setException(new HulkException(
HulkErrorCode.RUN_EXCEPTION.getCode(),
HulkErrorCode.RUN_EXCEPTION.getMessage()));
response = processException();
} finally {
BusinessActivityContextHolder.clearContext();
RuntimeContextHolder.clearContext();
}
return JSONObject.toJSONString(response);
}

@Override
public void destroy() {
FutureUtil.gracefulCancel(future);
ExecutorUtil.gracefulShutdown(transactionExecutor);
}
private HulkResponse processException() throws Exception {
boolean status = false;

@Override
public void destroyNow() {
HulkResourceManager.getBam().getListener().destroyNow();
FutureUtil.cancelNow(future);
}

private HulkResponse processException(HulkErrorCode hulkErrorCode) throws Exception {
Integer result = 1;

RuntimeContextHolder.getContext().setException(new HulkException(hulkErrorCode.getCode(),
hulkErrorCode.getMessage()));
destroyNow();
future = transactionExecutor.submit(new BusinessActivityExecutor(new HulkContext(BusinessActivityContextHolder.getContext(),
RuntimeContextHolder.getContext())));
result = future.get(RuntimeContextHolder.getContext().getActivity().getTimeout(), TimeUnit.SECONDS);

return HulkResponseFactory.getResponse(result);
try {
status = future.get();
} catch (Exception ex) {
throw ex;
}
timeoutScheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
if (!future.isDone()) {
logger.error("Transaction Execute Timeout!");
HulkResourceManager.getBam().getListener().closeFuture();
for (HulkInterceptor interceptor : HulkResourceManager.getInterceptors()) {
interceptor.closeFuture();
}
}
}
}, RuntimeContextHolder.getContext().getActivity().getTimeout(), TimeUnit.SECONDS);
return HulkResponseFactory.getResponse(status);
}

private boolean prepareContext(MethodInvocation methodInvocation) {
Expand Down Expand Up @@ -169,4 183,21 @@ private boolean prepareContext(MethodInvocation methodInvocation) {
return true;
}

@Override
public void destroy() {
FutureUtil.gracefulCancel(future);
ExecutorUtil.gracefulShutdown(transactionExecutor);
}

@Override
public void destroyNow() {
FutureUtil.cancelNow(future);
ExecutorUtil.shutdownNow(transactionExecutor);
}

@Override
public void closeFuture() {
FutureUtil.cancelNow(future);
}

}
Loading