Skip to content

Commit

Permalink
Merge pull request #9 from wchswchs/1.0.1
Browse files Browse the repository at this point in the history
merge
  • Loading branch information
wchswchs committed Oct 15, 2018
2 parents 30f512f 93c5eec commit b034094
Show file tree
Hide file tree
Showing 25 changed files with 363 additions and 285 deletions.
2 changes: 2 additions & 0 deletions hulk-common/src/main/java/com/mtl/hulk/common/Resource.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 6,6 @@ public interface Resource {

void destroyNow();

void closeFuture();

}
8 changes: 4 additions & 4 deletions hulk-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 20,12 @@
</dependency>
<dependency>
<groupId>com.mtl.hulk</groupId>
<artifactId>hulk-extension</artifactId>
<artifactId>hulk-extensions</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.mtl.hulk</groupId>
<artifactId>hulk-db</artifactId>
<artifactId>hulk-database</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
Expand All @@ -47,11 47,11 @@
</dependency>
<dependency>
<groupId>com.mtl.hulk</groupId>
<artifactId>hulk-extension</artifactId>
<artifactId>hulk-extensions</artifactId>
</dependency>
<dependency>
<groupId>com.mtl.hulk</groupId>
<artifactId>hulk-db</artifactId>
<artifactId>hulk-database</artifactId>
</dependency>
<dependency>
<groupId>com.mtl.hulk</groupId>
Expand Down
4 changes: 4 additions & 0 deletions hulk-core/src/main/java/com/mtl/hulk/AbstractHulk.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 35,8 @@ public void setProperties(HulkProperties properties) {
this.properties = properties;
}

public HulkProperties getProperties() {
return properties;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 29,7 @@ public BusinessActivityLogger(HulkDataSource ds, HulkSerializer serializer) {

public abstract int updateBusinessActivityState(String businessActivityId, BusinessActivityStatus businessActivityStatus) throws SQLException;

public abstract HulkTransactionActivity getTranactionBusinessActivity(BusinessActivityId businessActivityId);
public abstract HulkTransactionActivity getTranactionBusinessActivity(BusinessActivityId businessActivityId) throws SQLException;

public static String getBusinessActivityIdStr(BusinessActivityId businessActivityId) {
if (null == businessActivityId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 63,8 @@ public void run() {
businessActivityLogger.remove(businessActivityIds);
} catch (SQLException ex) {
logger.error("Hulk Retry Exception", ex);
} catch (Exception ex) {
logger.error("Hulk Retry Exception", ex);
}
}

Expand All @@ -74,4 76,5 @@ private int getRetryCount(String businessActivityIdStr) {
}
return map.get(businessActivityIdStr).incrementAndGet();
}

}
2 changes: 1 addition & 1 deletion hulk-core/src/main/java/com/mtl/hulk/HulkListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 18,6 @@ public HulkListener(HulkProperties properties, ApplicationContext apc) {
this.action = null;
}

public abstract boolean process();
public abstract boolean process() throws Exception;

}
4 changes: 2 additions & 2 deletions hulk-core/src/main/java/com/mtl/hulk/HulkResponseFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 5,8 @@

public class HulkResponseFactory {

public static HulkResponse getResponse(Integer result) {
if (result == 1) {
public static HulkResponse getResponse(boolean result) {
if (result) {
return new HulkResponse(0, RuntimeContextHolder.getContext().getActivity().getStatus().getDesc(), null);
}
if (RuntimeContextHolder.getContext().getActivity().getStatus() == BusinessActivityStatus.COMMITING_FAILED) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 2,14 @@

import com.alibaba.fastjson.JSONObject;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.mtl.hulk.HulkException;
import com.mtl.hulk.HulkInterceptor;
import com.mtl.hulk.HulkResourceManager;
import com.mtl.hulk.aop.HulkAspectSupport;
import com.mtl.hulk.configuration.HulkProperties;
import com.mtl.hulk.context.*;
import com.mtl.hulk.exception.ActionException;
import com.mtl.hulk.message.HulkErrorCode;
import com.mtl.hulk.tools.ExecutorUtil;
import com.mtl.hulk.tools.FutureUtil;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.slf4j.Logger;
Expand All @@ -18,10 18,7 @@
import java.io.Serializable;
import java.text.MessageFormat;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;

public class BrokerInterceptor extends HulkAspectSupport implements HulkInterceptor, MethodInterceptor, Serializable {

Expand All @@ -31,52 28,72 @@ public class BrokerInterceptor extends HulkAspectSupport implements HulkIntercep
Integer.MAX_VALUE, 10L,
TimeUnit.SECONDS, new SynchronousQueue<>(),
(new ThreadFactoryBuilder()).setNameFormat("Try-Thread-%d").build());
private static List<String> orders;
private static final List<Future> tryFutures = new CopyOnWriteArrayList<Future>();

public BrokerInterceptor(HulkProperties properties) {
super(properties);
}

/**
* 通过发起方发起事务请求,异步远程调用Try接口
* @param methodInvocation
* @return
* @throws Throwable
*/
@Override
public Object invoke(MethodInvocation methodInvocation) throws Throwable {
if (!orders.contains(methodInvocation.getMethod().getName())) {
orders.add(methodInvocation.getMethod().getName());
}
HulkResourceManager.getBam().setTryFuture(HulkResourceManager.getBam().getTryFuture().thenApplyAsync(am -> {
try {
String name = methodInvocation.getMethod().getName();
logger.info("Try request sending: {}", name);
Object result = methodInvocation.proceed();
HulkContext subBusinessActivity = JSONObject.parseObject((String) result, HulkContext.class);
am.put(orders.indexOf(name), subBusinessActivity);
return am;
} catch (Throwable t) {
logger.error("Broker Request Exception", t);
RuntimeContextHolder.getContext().setException(new HulkException(HulkErrorCode.TRY_FAIL.getCode(),
MessageFormat.format(HulkErrorCode.TRY_FAIL.getMessage(),
RuntimeContextHolder.getContext().getActivity().getId().formatString(), methodInvocation.getMethod().getName())));
Future<HulkContext> tryFuture = tryExecutor.submit(new Callable<HulkContext>() {
@Override
public HulkContext call() throws Exception {
logger.info("Try Request: {}", methodInvocation.getMethod().getName());
try {
return JSONObject.parseObject((String)methodInvocation.proceed(), HulkContext.class);
} catch (Throwable t) {
RuntimeContextHolder.getContext().setException(new com.mtl.hulk.HulkException(HulkErrorCode.TRY_FAIL.getCode(),
MessageFormat.format(HulkErrorCode.TRY_FAIL.getMessage(),
RuntimeContextHolder.getContext().getActivity().getId().formatString(), methodInvocation.getMethod().getName())));
throw new ActionException(methodInvocation.getMethod().getName(), t);
}
}
return null;
}, tryExecutor));
});
tryFutures.add(tryFuture);
return "ok";
}

public static void setOrders(List<String> orders) {
BrokerInterceptor.orders = orders;
}

public static List<String> getOrders() {
return orders;
public static List<Future> getTryFutures() {
return tryFutures;
}

@Override
public void destroy() {
orders.clear();
if (tryFutures.size() > 0) {
for (Future tryFuture : tryFutures) {
FutureUtil.gracefulCancel(tryFuture);
}
tryFutures.clear();
}
ExecutorUtil.gracefulShutdown(tryExecutor);
}

@Override
public void destroyNow() {
if (tryFutures.size() > 0) {
for (Future tryFuture : tryFutures) {
FutureUtil.cancelNow(tryFuture);
}
tryFutures.clear();
}
ExecutorUtil.shutdownNow(tryExecutor);
}

@Override
public void closeFuture() {
if (tryFutures.size() > 0) {
for (Future tryFuture : tryFutures) {
FutureUtil.cancelNow(tryFuture);
}
tryFutures.clear();
}
}

}
Original file line number Diff line number Diff line change
@@ -1,6 1,7 @@
package com.mtl.hulk.aop.interceptor;

import com.alibaba.fastjson.JSONObject;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.mtl.hulk.*;
import com.mtl.hulk.annotation.MTLDTActivity;
import com.mtl.hulk.annotation.MTLTwoPhaseAction;
Expand All @@ -15,7 16,6 @@
import com.mtl.hulk.tools.FutureUtil;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -29,12 29,20 @@ public class TransactionInterceptor extends HulkAspectSupport implements HulkInt
private static final Logger logger = LoggerFactory.getLogger(TransactionInterceptor.class);

private final ExecutorService transactionExecutor = Executors.newFixedThreadPool(properties.getTransactionThreadPoolSize());
private Future<Integer> future;
private final ScheduledExecutorService timeoutScheduledExecutorService = Executors.newScheduledThreadPool(properties.getTransactionThreadPoolSize(),
(new ThreadFactoryBuilder()).setNameFormat("Run-Timeout-Thread-%d").build());
private Future<Boolean> future;

public TransactionInterceptor(HulkProperties properties, ApplicationContext apc) {
super(properties, apc);
}

/**
* Try请求拦截处理
* @param methodInvocation
* @return
* @throws Throwable
*/
@Override
public Object invoke(MethodInvocation methodInvocation) throws Throwable {
if (!prepareContext(methodInvocation)) {
Expand All @@ -44,70 52,76 @@ public Object invoke(MethodInvocation methodInvocation) throws Throwable {
RuntimeContext context = RuntimeContextHolder.getContext();

HulkResponse response = null;
boolean status = true;
Integer result = 1;
ExecutorService loggerExecutor = HulkResourceManager.getBam().getLogExecutor();
try {
status = HulkResourceManager.getBam().start(methodInvocation);
if (status) {
RuntimeContextHolder.getContext().getActivity().setStatus(BusinessActivityStatus.TRIED);
future = transactionExecutor.submit(new BusinessActivityExecutor(new HulkContext(BusinessActivityContextHolder.getContext(),
RuntimeContextHolder.getContext())));
result = future.get(RuntimeContextHolder.getContext().getActivity().getTimeout(), TimeUnit.SECONDS);
} else {
RuntimeContextHolder.getContext().getActivity().setStatus(BusinessActivityStatus.TRYING_EXPT);
result = BooleanUtils.toInteger(status);
}

if (context.getActivity().getId() == null) {
HulkContext hulkContext = new HulkContext();
hulkContext.setBac(BusinessActivityContextHolder.getContext());
hulkContext.setRc(RuntimeContextHolder.getContext());
return JSONObject.toJSONString(hulkContext);
}

boolean status = HulkResourceManager.getBam().start(methodInvocation);
if (status) {
RuntimeContextHolder.getContext().getActivity().setStatus(BusinessActivityStatus.TRIED);
future = transactionExecutor.submit(new BusinessActivityExecutor(new HulkContext(BusinessActivityContextHolder.getContext(),
RuntimeContextHolder.getContext())));
timeoutScheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
if (!future.isDone()) {
logger.error("Transaction Execute Timeout!");
HulkResourceManager.getBam().getListener().closeFuture();
for (HulkInterceptor interceptor : HulkResourceManager.getInterceptors()) {
interceptor.closeFuture();
}
}
}
}, RuntimeContextHolder.getContext().getActivity().getTimeout(), TimeUnit.SECONDS);
status = future.get();
} else {
RuntimeContextHolder.getContext().getActivity().setStatus(BusinessActivityStatus.TRYING_EXPT);
}

loggerExecutor.submit(new BusinessActivityLoggerThread(properties,
new HulkContext(BusinessActivityContextHolder.getContext(), RuntimeContextHolder.getContext())));
response = HulkResponseFactory.getResponse(result);
} catch (TimeoutException ex) {
logger.error("Transaction Interceptor Error", ex);
response = processException(HulkErrorCode.COMMIT_TIMEOUT);
} catch (NullPointerException ex) {
logger.error("Transaction Interceptor Error", ex);
response = processException(HulkErrorCode.RUN_EXCEPTION);
response = HulkResponseFactory.getResponse(status);
} catch (Exception ex) {
logger.error("Transaction Interceptor Error", ex);
response = processException(HulkErrorCode.RUN_EXCEPTION);
logger.error("Transaction Execute Error", ex);
RuntimeContextHolder.getContext().setException(new HulkException(
HulkErrorCode.RUN_EXCEPTION.getCode(),
HulkErrorCode.RUN_EXCEPTION.getMessage()));
response = processException();
} finally {
BusinessActivityContextHolder.clearContext();
RuntimeContextHolder.clearContext();
}
return JSONObject.toJSONString(response);
}

@Override
public void destroy() {
FutureUtil.gracefulCancel(future);
ExecutorUtil.gracefulShutdown(transactionExecutor);
}
private HulkResponse processException() throws Exception {
boolean status = false;

@Override
public void destroyNow() {
HulkResourceManager.getBam().getListener().destroyNow();
FutureUtil.cancelNow(future);
}

private HulkResponse processException(HulkErrorCode hulkErrorCode) throws Exception {
Integer result = 1;

RuntimeContextHolder.getContext().setException(new HulkException(hulkErrorCode.getCode(),
hulkErrorCode.getMessage()));
destroyNow();
future = transactionExecutor.submit(new BusinessActivityExecutor(new HulkContext(BusinessActivityContextHolder.getContext(),
RuntimeContextHolder.getContext())));
result = future.get(RuntimeContextHolder.getContext().getActivity().getTimeout(), TimeUnit.SECONDS);

return HulkResponseFactory.getResponse(result);
try {
status = future.get();
} catch (Exception ex) {
throw ex;
}
timeoutScheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
if (!future.isDone()) {
logger.error("Transaction Execute Timeout!");
HulkResourceManager.getBam().getListener().closeFuture();
for (HulkInterceptor interceptor : HulkResourceManager.getInterceptors()) {
interceptor.closeFuture();
}
}
}
}, RuntimeContextHolder.getContext().getActivity().getTimeout(), TimeUnit.SECONDS);
return HulkResponseFactory.getResponse(status);
}

private boolean prepareContext(MethodInvocation methodInvocation) {
Expand Down Expand Up @@ -169,4 183,21 @@ private boolean prepareContext(MethodInvocation methodInvocation) {
return true;
}

@Override
public void destroy() {
FutureUtil.gracefulCancel(future);
ExecutorUtil.gracefulShutdown(transactionExecutor);
}

@Override
public void destroyNow() {
FutureUtil.cancelNow(future);
ExecutorUtil.shutdownNow(transactionExecutor);
}

@Override
public void closeFuture() {
FutureUtil.cancelNow(future);
}

}
Loading

0 comments on commit b034094

Please sign in to comment.