[feat] 初始化数据中台框架_02

This commit is contained in:
Kris 2024-12-17 17:51:53 +08:00
parent 5775e932a1
commit ee5cc55a76
53 changed files with 0 additions and 4240 deletions

View File

@ -1,67 +0,0 @@
package com.czsj.rpc.registry;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
/**
*
* @Author: czsj
* @Date: 2022/9/16 11:14
* @Description: service registry
*
* /xxl-rpc/dev/
* - key01(service01)
* - value01 (ip:port01)
* - value02 (ip:port02)
**/
public abstract class ServiceRegistry {
/**
* start
*/
public abstract void start(Map<String, String> param);
/**
* start
*/
public abstract void stop();
/**
* registry service, for mult
*
* @param keys service key
* @param value service value/ip:port
* @return
*/
public abstract boolean registry(Set<String> keys, String value);
/**
* remove service, for mult
*
* @param keys
* @param value
* @return
*/
public abstract boolean remove(Set<String> keys, String value);
/**
* discovery services, for mult
*
* @param keys
* @return
*/
public abstract Map<String, TreeSet<String>> discovery(Set<String> keys);
/**
* discovery service, for one
*
* @param key service key
* @return service value/ip:port
*/
public abstract TreeSet<String> discovery(String key);
}

View File

@ -1,88 +0,0 @@
package com.czsj.rpc.registry.impl;
import com.czsj.rpc.registry.ServiceRegistry;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
/**
*
* @Author: czsj
* @Date: 2022/9/16 11:14
* @Description: service registry for "local"
**/
public class LocalServiceRegistry extends ServiceRegistry {
/**
* registry data
*/
private Map<String, TreeSet<String>> registryData;
/**
* @param param ignore, not use
*/
@Override
public void start(Map<String, String> param) {
registryData = new HashMap<>();
}
@Override
public void stop() {
registryData.clear();
}
@Override
public boolean registry(Set<String> keys, String value) {
if (keys == null || keys.size() == 0 || value == null || value.trim().length() == 0) {
return false;
}
for (String key : keys) {
TreeSet<String> values = registryData.get(key);
if (values == null) {
values = new TreeSet<>();
registryData.put(key, values);
}
values.add(value);
}
return true;
}
@Override
public boolean remove(Set<String> keys, String value) {
if (keys == null || keys.size() == 0 || value == null || value.trim().length() == 0) {
return false;
}
for (String key : keys) {
TreeSet<String> values = registryData.get(key);
if (values != null) {
values.remove(value);
}
}
return true;
}
@Override
public Map<String, TreeSet<String>> discovery(Set<String> keys) {
if (keys == null || keys.size() == 0) {
return null;
}
Map<String, TreeSet<String>> registryDataTmp = new HashMap<String, TreeSet<String>>();
for (String key : keys) {
TreeSet<String> valueSetTmp = discovery(key);
if (valueSetTmp != null) {
registryDataTmp.put(key, valueSetTmp);
}
}
return registryDataTmp;
}
@Override
public TreeSet<String> discovery(String key) {
return registryData.get(key);
}
}

View File

@ -1,179 +0,0 @@
package com.czsj.rpc.remoting.invoker;
import com.czsj.rpc.registry.ServiceRegistry;
import com.czsj.rpc.registry.impl.LocalServiceRegistry;
import com.czsj.rpc.remoting.net.params.BaseCallback;
import com.czsj.rpc.remoting.net.params.XxlRpcFutureResponse;
import com.czsj.rpc.remoting.net.params.XxlRpcResponse;
import com.czsj.rpc.util.XxlRpcException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
/**
* xxl-rpc invoker factory, init service-registry
*
* @author xuxueli 2018-10-19
*/
public class XxlRpcInvokerFactory {
private static Logger logger = LoggerFactory.getLogger(XxlRpcInvokerFactory.class);
// ---------------------- default instance ----------------------
private static volatile XxlRpcInvokerFactory instance = new XxlRpcInvokerFactory(LocalServiceRegistry.class, null);
public static XxlRpcInvokerFactory getInstance() {
return instance;
}
// ---------------------- config ----------------------
private Class<? extends ServiceRegistry> serviceRegistryClass; // class.forname
private Map<String, String> serviceRegistryParam;
public XxlRpcInvokerFactory() {
}
public XxlRpcInvokerFactory(Class<? extends ServiceRegistry> serviceRegistryClass, Map<String, String> serviceRegistryParam) {
this.serviceRegistryClass = serviceRegistryClass;
this.serviceRegistryParam = serviceRegistryParam;
}
// ---------------------- start / stop ----------------------
public void start() throws Exception {
// start registry
if (serviceRegistryClass != null) {
serviceRegistry = serviceRegistryClass.newInstance();
serviceRegistry.start(serviceRegistryParam);
}
}
public void stop() throws Exception {
// stop registry
if (serviceRegistry != null) {
serviceRegistry.stop();
}
// stop callback
if (stopCallbackList.size() > 0) {
for (BaseCallback callback : stopCallbackList) {
try {
callback.run();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
// stop CallbackThreadPool
stopCallbackThreadPool();
}
// ---------------------- service registry ----------------------
private ServiceRegistry serviceRegistry;
public ServiceRegistry getServiceRegistry() {
return serviceRegistry;
}
// ---------------------- service registry ----------------------
private List<BaseCallback> stopCallbackList = new ArrayList<BaseCallback>();
public void addStopCallBack(BaseCallback callback) {
stopCallbackList.add(callback);
}
// ---------------------- future-response pool ----------------------
// XxlRpcFutureResponseFactory
private ConcurrentMap<String, XxlRpcFutureResponse> futureResponsePool = new ConcurrentHashMap<String, XxlRpcFutureResponse>();
public void setInvokerFuture(String requestId, XxlRpcFutureResponse futureResponse) {
futureResponsePool.put(requestId, futureResponse);
}
public void removeInvokerFuture(String requestId) {
futureResponsePool.remove(requestId);
}
public void notifyInvokerFuture(String requestId, final XxlRpcResponse xxlRpcResponse) {
// get
final XxlRpcFutureResponse futureResponse = futureResponsePool.get(requestId);
if (futureResponse == null) {
return;
}
// notify
if (futureResponse.getInvokeCallback() != null) {
// callback type
try {
executeResponseCallback(() -> {
if (xxlRpcResponse.getErrorMsg() != null) {
futureResponse.getInvokeCallback().onFailure(new XxlRpcException(xxlRpcResponse.getErrorMsg()));
} else {
futureResponse.getInvokeCallback().onSuccess(xxlRpcResponse.getResult());
}
});
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
} else {
// other nomal type
futureResponse.setResponse(xxlRpcResponse);
}
// do remove
futureResponsePool.remove(requestId);
}
// ---------------------- response callback ThreadPool ----------------------
private ThreadPoolExecutor responseCallbackThreadPool = null;
public void executeResponseCallback(Runnable runnable) {
if (responseCallbackThreadPool == null) {
synchronized (this) {
if (responseCallbackThreadPool == null) {
responseCallbackThreadPool = new ThreadPoolExecutor(
10,
100,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
r -> new Thread(r, "xxl-rpc, XxlRpcInvokerFactory-responseCallbackThreadPool-" + r.hashCode()),
(r, executor) -> {
throw new XxlRpcException("xxl-rpc Invoke Callback Thread pool is EXHAUSTED!");
}); // default maxThreads 300, minThreads 60
}
}
}
responseCallbackThreadPool.execute(runnable);
}
public void stopCallbackThreadPool() {
if (responseCallbackThreadPool != null) {
responseCallbackThreadPool.shutdown();
}
}
}

View File

@ -1,42 +0,0 @@
package com.czsj.rpc.remoting.invoker.annotation;
import com.czsj.rpc.remoting.invoker.call.CallType;
import com.czsj.rpc.remoting.invoker.route.LoadBalance;
import com.czsj.rpc.remoting.net.Client;
import com.czsj.rpc.remoting.net.impl.netty.client.NettyClient;
import com.czsj.rpc.serialize.Serializer;
import com.czsj.rpc.serialize.impl.HessianSerializer;
import java.lang.annotation.*;
/**
*
* @Author: czsj
* @Date: 2022/9/16 11:14
* @Description: rpc service annotation, skeleton of stub ("@Inherited" allow service use "Transactional")
**/
@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface XxlRpcReference {
Class<? extends Client> client() default NettyClient.class;
Class<? extends Serializer> serializer() default HessianSerializer.class;
CallType callType() default CallType.SYNC;
LoadBalance loadBalance() default LoadBalance.ROUND;
//Class<?> iface;
String version() default "";
long timeout() default 1000;
String address() default "";
String accessToken() default "";
//XxlRpcInvokeCallback invokeCallback() ;
}

View File

@ -1,30 +0,0 @@
package com.czsj.rpc.remoting.invoker.call;
/**
*
* @Author: czsj
* @Date: 2022/9/16 11:14
* @Description: 远程调用的类型
**/
public enum CallType {
SYNC,
FUTURE,
CALLBACK,
ONEWAY;
public static CallType match(String name, CallType defaultCallType) {
for (CallType item : CallType.values()) {
if (item.name().equals(name)) {
return item;
}
}
return defaultCallType;
}
}

View File

@ -1,49 +0,0 @@
package com.czsj.rpc.remoting.invoker.call;
/**
*
* @Author: czsj
* @Date: 2022/9/16 11:14
* @Description:
**/
public abstract class XxlRpcInvokeCallback<T> {
public abstract void onSuccess(T result);
public abstract void onFailure(Throwable exception);
// ---------------------- thread invoke callback ----------------------
private static ThreadLocal<XxlRpcInvokeCallback> threadInvokerFuture = new ThreadLocal<XxlRpcInvokeCallback>();
/**
* get callback
*
* @return
*/
public static XxlRpcInvokeCallback getCallback() {
XxlRpcInvokeCallback invokeCallback = threadInvokerFuture.get();
threadInvokerFuture.remove();
return invokeCallback;
}
/**
* set future
*
* @param invokeCallback
*/
public static void setCallback(XxlRpcInvokeCallback invokeCallback) {
threadInvokerFuture.set(invokeCallback);
}
/**
* remove future
*/
public static void removeCallback() {
threadInvokerFuture.remove();
}
}

View File

@ -1,106 +0,0 @@
package com.czsj.rpc.remoting.invoker.call;
import com.czsj.rpc.remoting.net.params.XxlRpcFutureResponse;
import com.czsj.rpc.remoting.net.params.XxlRpcResponse;
import com.czsj.rpc.util.XxlRpcException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
*
* @Author: czsj
* @Date: 2022/9/16 11:14
* @Description:
**/
public class XxlRpcInvokeFuture implements Future {
private XxlRpcFutureResponse futureResponse;
public XxlRpcInvokeFuture(XxlRpcFutureResponse futureResponse) {
this.futureResponse = futureResponse;
}
public void stop() {
// remove-InvokerFuture
futureResponse.removeInvokerFuture();
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return futureResponse.cancel(mayInterruptIfRunning);
}
@Override
public boolean isCancelled() {
return futureResponse.isCancelled();
}
@Override
public boolean isDone() {
return futureResponse.isDone();
}
@Override
public Object get() throws ExecutionException, InterruptedException {
try {
return get(-1, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
throw new XxlRpcException(e);
}
}
@Override
public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
try {
// future get
XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, unit);
if (xxlRpcResponse.getErrorMsg() != null) {
throw new XxlRpcException(xxlRpcResponse.getErrorMsg());
}
return xxlRpcResponse.getResult();
} finally {
stop();
}
}
// ---------------------- thread invoke future ----------------------
private static ThreadLocal<XxlRpcInvokeFuture> threadInvokerFuture = new ThreadLocal<XxlRpcInvokeFuture>();
/**
* get future
*
* @param type
* @param <T>
* @return
*/
public static <T> Future<T> getFuture(Class<T> type) {
Future<T> future = (Future<T>) threadInvokerFuture.get();
threadInvokerFuture.remove();
return future;
}
/**
* set future
*
* @param future
*/
public static void setFuture(XxlRpcInvokeFuture future) {
threadInvokerFuture.set(future);
}
/**
* remove future
*/
public static void removeFuture() {
threadInvokerFuture.remove();
}
}

View File

@ -1,23 +0,0 @@
package com.czsj.rpc.remoting.invoker.generic;
/**
*
* @Author: czsj
* @Date: 2022/9/16 11:14
* @Description:
**/
public interface XxlRpcGenericService {
/**
* generic invoke
*
* @param iface iface name
* @param version iface version
* @param method method name
* @param parameterTypes parameter types, limit base type like "int、java.lang.Integer、java.util.List、java.util.Map ..."
* @param args
* @return
*/
Object invoke(String iface, String version, String method, String[] parameterTypes, Object[] args);
}

View File

@ -1,142 +0,0 @@
package com.czsj.rpc.remoting.invoker.impl;
import com.czsj.rpc.registry.ServiceRegistry;
import com.czsj.rpc.remoting.invoker.XxlRpcInvokerFactory;
import com.czsj.rpc.remoting.invoker.annotation.XxlRpcReference;
import com.czsj.rpc.remoting.invoker.reference.XxlRpcReferenceBean;
import com.czsj.rpc.remoting.provider.XxlRpcProviderFactory;
import com.czsj.rpc.util.XxlRpcException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.config.InstantiationAwareBeanPostProcessorAdapter;
import org.springframework.util.ReflectionUtils;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**
*
* @author xuxueli 2018-10-19
*/
/**
*
* @Author: czsj
* @Date: 2022/9/16 11:14
* @Description:
**/
public class XxlRpcSpringInvokerFactory extends InstantiationAwareBeanPostProcessorAdapter implements InitializingBean, DisposableBean, BeanFactoryAware {
private Logger logger = LoggerFactory.getLogger(XxlRpcSpringInvokerFactory.class);
// ---------------------- config ----------------------
private Class<? extends ServiceRegistry> serviceRegistryClass; // class.forname
private Map<String, String> serviceRegistryParam;
public void setServiceRegistryClass(Class<? extends ServiceRegistry> serviceRegistryClass) {
this.serviceRegistryClass = serviceRegistryClass;
}
public void setServiceRegistryParam(Map<String, String> serviceRegistryParam) {
this.serviceRegistryParam = serviceRegistryParam;
}
// ---------------------- util ----------------------
private XxlRpcInvokerFactory xxlRpcInvokerFactory;
@Override
public void afterPropertiesSet() throws Exception {
// start invoker factory
xxlRpcInvokerFactory = new XxlRpcInvokerFactory(serviceRegistryClass, serviceRegistryParam);
xxlRpcInvokerFactory.start();
}
@Override
public boolean postProcessAfterInstantiation(final Object bean, final String beanName) throws BeansException {
// collection
final Set<String> serviceKeyList = new HashSet<>();
// parse XxlRpcReferenceBean
ReflectionUtils.doWithFields(bean.getClass(), field -> {
if (field.isAnnotationPresent(XxlRpcReference.class)) {
// valid
Class iface = field.getType();
if (!iface.isInterface()) {
throw new XxlRpcException("xxl-rpc, reference(XxlRpcReference) must be interface.");
}
XxlRpcReference rpcReference = field.getAnnotation(XxlRpcReference.class);
// init reference bean
XxlRpcReferenceBean referenceBean = new XxlRpcReferenceBean();
referenceBean.setClient(rpcReference.client());
referenceBean.setSerializer(rpcReference.serializer());
referenceBean.setCallType(rpcReference.callType());
referenceBean.setLoadBalance(rpcReference.loadBalance());
referenceBean.setIface(iface);
referenceBean.setVersion(rpcReference.version());
referenceBean.setTimeout(rpcReference.timeout());
referenceBean.setAddress(rpcReference.address());
referenceBean.setAccessToken(rpcReference.accessToken());
referenceBean.setInvokeCallback(null);
referenceBean.setInvokerFactory(xxlRpcInvokerFactory);
// get proxyObj
Object serviceProxy;
try {
serviceProxy = referenceBean.getObject();
} catch (Exception e) {
throw new RuntimeException(e);
}
// set bean
field.setAccessible(true);
field.set(bean, serviceProxy);
logger.info(">>>>>>>>>>> xxl-rpc, invoker factory init reference bean success. serviceKey = {}, bean.field = {}.{}",
XxlRpcProviderFactory.makeServiceKey(iface.getName(), rpcReference.version()), beanName, field.getName());
// collection
String serviceKey = XxlRpcProviderFactory.makeServiceKey(iface.getName(), rpcReference.version());
serviceKeyList.add(serviceKey);
}
});
// mult discovery
if (xxlRpcInvokerFactory.getServiceRegistry() != null) {
try {
xxlRpcInvokerFactory.getServiceRegistry().discovery(serviceKeyList);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
return super.postProcessAfterInstantiation(bean, beanName);
}
@Override
public void destroy() throws Exception {
// stop invoker factory
xxlRpcInvokerFactory.stop();
}
private BeanFactory beanFactory;
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
}
}

View File

@ -1,320 +0,0 @@
package com.czsj.rpc.remoting.invoker.reference;
import com.czsj.rpc.remoting.invoker.XxlRpcInvokerFactory;
import com.czsj.rpc.remoting.invoker.call.CallType;
import com.czsj.rpc.remoting.invoker.call.XxlRpcInvokeCallback;
import com.czsj.rpc.remoting.invoker.call.XxlRpcInvokeFuture;
import com.czsj.rpc.remoting.invoker.generic.XxlRpcGenericService;
import com.czsj.rpc.remoting.invoker.route.LoadBalance;
import com.czsj.rpc.remoting.net.Client;
import com.czsj.rpc.remoting.net.impl.netty.client.NettyClient;
import com.czsj.rpc.remoting.net.params.XxlRpcFutureResponse;
import com.czsj.rpc.remoting.net.params.XxlRpcRequest;
import com.czsj.rpc.remoting.net.params.XxlRpcResponse;
import com.czsj.rpc.remoting.provider.XxlRpcProviderFactory;
import com.czsj.rpc.serialize.Serializer;
import com.czsj.rpc.serialize.impl.HessianSerializer;
import com.czsj.rpc.util.ClassUtil;
import com.czsj.rpc.util.XxlRpcException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Proxy;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* rpc reference bean, use by api
*
* @author xuxueli 2015-10-29 20:18:32
*/
public class XxlRpcReferenceBean {
private static final Logger logger = LoggerFactory.getLogger(XxlRpcReferenceBean.class);
// [tips01: save 30ms/100invoke. why why why??? with this logger, it can save lots of time.]
// ---------------------- config ----------------------
private Class<? extends Client> client = NettyClient.class;
private Class<? extends Serializer> serializer = HessianSerializer.class;
private CallType callType = CallType.SYNC;
private LoadBalance loadBalance = LoadBalance.ROUND;
private Class<?> iface = null;
private String version = null;
private long timeout = 10000;
private String address = null;
private String accessToken = null;
private XxlRpcInvokeCallback invokeCallback = null;
private XxlRpcInvokerFactory invokerFactory = null;
// set
public void setClient(Class<? extends Client> client) {
this.client = client;
}
public void setSerializer(Class<? extends Serializer> serializer) {
this.serializer = serializer;
}
public void setCallType(CallType callType) {
this.callType = callType;
}
public void setLoadBalance(LoadBalance loadBalance) {
this.loadBalance = loadBalance;
}
public void setIface(Class<?> iface) {
this.iface = iface;
}
public void setVersion(String version) {
this.version = version;
}
public void setTimeout(long timeout) {
this.timeout = timeout;
}
public void setAddress(String address) {
this.address = address;
}
public void setAccessToken(String accessToken) {
this.accessToken = accessToken;
}
public void setInvokeCallback(XxlRpcInvokeCallback invokeCallback) {
this.invokeCallback = invokeCallback;
}
public void setInvokerFactory(XxlRpcInvokerFactory invokerFactory) {
this.invokerFactory = invokerFactory;
}
// get
public Serializer getSerializerInstance() {
return serializerInstance;
}
public long getTimeout() {
return timeout;
}
public XxlRpcInvokerFactory getInvokerFactory() {
return invokerFactory;
}
public Class<?> getIface() {
return iface;
}
// ---------------------- initClient ----------------------
private Client clientInstance = null;
private Serializer serializerInstance = null;
public XxlRpcReferenceBean initClient() throws Exception {
// valid
if (this.client == null) {
throw new XxlRpcException("xxl-rpc reference client missing.");
}
if (this.serializer == null) {
throw new XxlRpcException("xxl-rpc reference serializer missing.");
}
if (this.callType == null) {
throw new XxlRpcException("xxl-rpc reference callType missing.");
}
if (this.loadBalance == null) {
throw new XxlRpcException("xxl-rpc reference loadBalance missing.");
}
if (this.iface == null) {
throw new XxlRpcException("xxl-rpc reference iface missing.");
}
if (this.timeout < 0) {
this.timeout = 0;
}
if (this.invokerFactory == null) {
this.invokerFactory = XxlRpcInvokerFactory.getInstance();
}
// init serializerInstance
this.serializerInstance = serializer.newInstance();
// init Client
clientInstance = client.newInstance();
clientInstance.init(this);
return this;
}
// ---------------------- util ----------------------
public Object getObject() throws Exception {
// initClient
initClient();
// newProxyInstance
return Proxy.newProxyInstance(Thread.currentThread()
.getContextClassLoader(), new Class[]{iface},
(proxy, method, args) -> {
// method param
String className = method.getDeclaringClass().getName(); // iface.getName()
String varsion_ = version;
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
Object[] parameters = args;
// filter for generic
if (className.equals(XxlRpcGenericService.class.getName()) && methodName.equals("invoke")) {
Class<?>[] paramTypes = null;
if (args[3] != null) {
String[] paramTypes_str = (String[]) args[3];
if (paramTypes_str.length > 0) {
paramTypes = new Class[paramTypes_str.length];
for (int i = 0; i < paramTypes_str.length; i++) {
paramTypes[i] = ClassUtil.resolveClass(paramTypes_str[i]);
}
}
}
className = (String) args[0];
varsion_ = (String) args[1];
methodName = (String) args[2];
parameterTypes = paramTypes;
parameters = (Object[]) args[4];
}
// filter method like "Object.toString()"
if (className.equals(Object.class.getName())) {
logger.info(">>>>>>>>>>> xxl-rpc proxy class-method not support [{}#{}]", className, methodName);
throw new XxlRpcException("xxl-rpc proxy class-method not support");
}
// address
String finalAddress = address;
if (finalAddress == null || finalAddress.trim().length() == 0) {
if (invokerFactory != null && invokerFactory.getServiceRegistry() != null) {
// discovery
String serviceKey = XxlRpcProviderFactory.makeServiceKey(className, varsion_);
TreeSet<String> addressSet = invokerFactory.getServiceRegistry().discovery(serviceKey);
// load balance
if (addressSet == null || addressSet.size() == 0) {
// pass
} else if (addressSet.size() == 1) {
finalAddress = addressSet.first();
} else {
finalAddress = loadBalance.xxlRpcInvokerRouter.route(serviceKey, addressSet);
}
}
}
if (finalAddress == null || finalAddress.trim().length() == 0) {
throw new XxlRpcException("xxl-rpc reference bean[" + className + "] address empty");
}
// request
XxlRpcRequest xxlRpcRequest = new XxlRpcRequest();
xxlRpcRequest.setRequestId(UUID.randomUUID().toString());
xxlRpcRequest.setCreateMillisTime(System.currentTimeMillis());
xxlRpcRequest.setAccessToken(accessToken);
xxlRpcRequest.setClassName(className);
xxlRpcRequest.setMethodName(methodName);
xxlRpcRequest.setParameterTypes(parameterTypes);
xxlRpcRequest.setParameters(parameters);
xxlRpcRequest.setVersion(version);
// send
if (CallType.SYNC == callType) {
// future-response set
XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null);
try {
// do invoke
clientInstance.asyncSend(finalAddress, xxlRpcRequest);
// future get
XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);
if (xxlRpcResponse.getErrorMsg() != null) {
throw new XxlRpcException(xxlRpcResponse.getErrorMsg());
}
return xxlRpcResponse.getResult();
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
throw (e instanceof XxlRpcException) ? e : new XxlRpcException(e);
} finally {
// future-response remove
futureResponse.removeInvokerFuture();
}
} else if (CallType.FUTURE == callType) {
// future-response set
XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null);
try {
// invoke future set
XxlRpcInvokeFuture invokeFuture = new XxlRpcInvokeFuture(futureResponse);
XxlRpcInvokeFuture.setFuture(invokeFuture);
// do invoke
clientInstance.asyncSend(finalAddress, xxlRpcRequest);
return null;
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
// future-response remove
futureResponse.removeInvokerFuture();
throw (e instanceof XxlRpcException) ? e : new XxlRpcException(e);
}
} else if (CallType.CALLBACK == callType) {
// get callback
XxlRpcInvokeCallback finalInvokeCallback = invokeCallback;
XxlRpcInvokeCallback threadInvokeCallback = XxlRpcInvokeCallback.getCallback();
if (threadInvokeCallback != null) {
finalInvokeCallback = threadInvokeCallback;
}
if (finalInvokeCallback == null) {
throw new XxlRpcException("xxl-rpc XxlRpcInvokeCallbackCallType=" + CallType.CALLBACK.name() + " cannot be null.");
}
// future-response set
XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, finalInvokeCallback);
try {
clientInstance.asyncSend(finalAddress, xxlRpcRequest);
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
// future-response remove
futureResponse.removeInvokerFuture();
throw (e instanceof XxlRpcException) ? e : new XxlRpcException(e);
}
return null;
} else if (CallType.ONEWAY == callType) {
clientInstance.asyncSend(finalAddress, xxlRpcRequest);
return null;
} else {
throw new XxlRpcException("xxl-rpc callType[" + callType + "] invalid");
}
});
}
}

View File

@ -1,51 +0,0 @@
package com.czsj.rpc.remoting.invoker.reference.impl;
import com.czsj.rpc.remoting.invoker.reference.XxlRpcReferenceBean;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
/**
* rpc reference bean, use by spring xml and annotation (for spring)
*
* @author xuxueli 2015-10-29 20:18:32
*/
public class XxlRpcSpringReferenceBean implements FactoryBean<Object>, InitializingBean {
// ---------------------- util ----------------------
private XxlRpcReferenceBean xxlRpcReferenceBean;
@Override
public void afterPropertiesSet() {
// init config
this.xxlRpcReferenceBean = new XxlRpcReferenceBean();
}
@Override
public Object getObject() throws Exception {
return xxlRpcReferenceBean.getObject();
}
@Override
public Class<?> getObjectType() {
return xxlRpcReferenceBean.getIface();
}
@Override
public boolean isSingleton() {
return false;
}
/**
* public static <T> ClientProxy ClientProxy<T> getFuture(Class<T> type) {
* <T> ClientProxy proxy = (<T>) new ClientProxy();
* return proxy;
* }
*/
}

View File

@ -1,58 +0,0 @@
package com.czsj.rpc.remoting.invoker.route;
import com.czsj.rpc.remoting.invoker.route.impl.*;
/**
* @author xuxueli 2018-12-04
*/
public enum LoadBalance {
RANDOM(new XxlRpcLoadBalanceRandomStrategy()),
ROUND(new XxlRpcLoadBalanceRoundStrategy()),
LRU(new XxlRpcLoadBalanceLRUStrategy()),
LFU(new XxlRpcLoadBalanceLFUStrategy()),
CONSISTENT_HASH(new XxlRpcLoadBalanceConsistentHashStrategy());
public final XxlRpcLoadBalance xxlRpcInvokerRouter;
private LoadBalance(XxlRpcLoadBalance xxlRpcInvokerRouter) {
this.xxlRpcInvokerRouter = xxlRpcInvokerRouter;
}
public static LoadBalance match(String name, LoadBalance defaultRouter) {
for (LoadBalance item : LoadBalance.values()) {
if (item.equals(name)) {
return item;
}
}
return defaultRouter;
}
/*public static void main(String[] args) {
String serviceKey = "service";
TreeSet<String> addressSet = new TreeSet<String>(){{
add("1");
add("2");
add("3");
add("4");
add("5");
}};
for (LoadBalance item : LoadBalance.values()) {
long start = System.currentTimeMillis();
for (int i = 0; i < 100000; i++) {
String address = LoadBalance.LFU.xxlRpcInvokerRouter.route(serviceKey, addressSet);
//System.out.println(address);;
}
long end = System.currentTimeMillis();
System.out.println(item.name() + " --- " + (end-start));
}
}*/
}

View File

@ -1,16 +0,0 @@
package com.czsj.rpc.remoting.invoker.route;
import java.util.TreeSet;
/**
* 分组下机器地址相同不同JOB均匀散列在不同机器上保证分组下机器分配JOB平均且每个JOB固定调度其中一台机器
* avirtual node解决不均衡问题
* bhash method replace hashCodeString的hashCode可能重复需要进一步扩大hashCode的取值范围
*
* @author xuxueli 2018-12-04
*/
public abstract class XxlRpcLoadBalance {
public abstract String route(String serviceKey, TreeSet<String> addressSet);
}

View File

@ -1,86 +0,0 @@
package com.czsj.rpc.remoting.invoker.route.impl;
import com.czsj.rpc.remoting.invoker.route.XxlRpcLoadBalance;
import java.io.UnsupportedEncodingException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
/**
* consustent hash
*
* 单个JOB对应的每个执行器使用频率最低的优先被选举
* a(*)LFU(Least Frequently Used)最不经常使用频率/次数
* bLRU(Least Recently Used)最近最久未使用时间
*
* @author xuxueli 2018-12-04
*/
public class XxlRpcLoadBalanceConsistentHashStrategy extends XxlRpcLoadBalance {
private int VIRTUAL_NODE_NUM = 5;
/**
* get hash code on 2^32 ring (md5散列的方式计算hash值)
* @param key
* @return
*/
private long hash(String key) {
// md5 byte
MessageDigest md5;
try {
md5 = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException("MD5 not supported", e);
}
md5.reset();
byte[] keyBytes = null;
try {
keyBytes = key.getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
throw new RuntimeException("Unknown string :" + key, e);
}
md5.update(keyBytes);
byte[] digest = md5.digest();
// hash code, Truncate to 32-bits
long hashCode = ((long) (digest[3] & 0xFF) << 24)
| ((long) (digest[2] & 0xFF) << 16)
| ((long) (digest[1] & 0xFF) << 8)
| (digest[0] & 0xFF);
long truncateHashCode = hashCode & 0xffffffffL;
return truncateHashCode;
}
public String doRoute(String serviceKey, TreeSet<String> addressSet) {
// ------A1------A2-------A3------
// -----------J1------------------
TreeMap<Long, String> addressRing = new TreeMap<Long, String>();
for (String address: addressSet) {
for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {
long addressHash = hash("SHARD-" + address + "-NODE-" + i);
addressRing.put(addressHash, address);
}
}
long jobHash = hash(serviceKey);
SortedMap<Long, String> lastRing = addressRing.tailMap(jobHash);
if (!lastRing.isEmpty()) {
return lastRing.get(lastRing.firstKey());
}
return addressRing.firstEntry().getValue();
}
@Override
public String route(String serviceKey, TreeSet<String> addressSet) {
String finalAddress = doRoute(serviceKey, addressSet);
return finalAddress;
}
}

View File

@ -1,76 +0,0 @@
package com.czsj.rpc.remoting.invoker.route.impl;
import com.czsj.rpc.remoting.invoker.route.XxlRpcLoadBalance;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* lru
*
* @author xuxueli 2018-12-04
*/
public class XxlRpcLoadBalanceLFUStrategy extends XxlRpcLoadBalance {
private ConcurrentMap<String, HashMap<String, Integer>> jobLfuMap = new ConcurrentHashMap<String, HashMap<String, Integer>>();
private long CACHE_VALID_TIME = 0;
public String doRoute(String serviceKey, TreeSet<String> addressSet) {
// cache clear
if (System.currentTimeMillis() > CACHE_VALID_TIME) {
jobLfuMap.clear();
CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;
}
// lfu item init
HashMap<String, Integer> lfuItemMap = jobLfuMap.get(serviceKey); // Key排序可以用TreeMap+构造入参CompareValue排序暂时只能通过ArrayList
if (lfuItemMap == null) {
lfuItemMap = new HashMap<String, Integer>();
jobLfuMap.putIfAbsent(serviceKey, lfuItemMap); // 避免重复覆盖
}
// put new
for (String address: addressSet) {
if (!lfuItemMap.containsKey(address) || lfuItemMap.get(address) >1000000 ) {
lfuItemMap.put(address, 0);
}
}
// remove old
List<String> delKeys = new ArrayList<>();
for (String existKey: lfuItemMap.keySet()) {
if (!addressSet.contains(existKey)) {
delKeys.add(existKey);
}
}
if (delKeys.size() > 0) {
for (String delKey: delKeys) {
lfuItemMap.remove(delKey);
}
}
// load least userd count address
List<Map.Entry<String, Integer>> lfuItemList = new ArrayList<Map.Entry<String, Integer>>(lfuItemMap.entrySet());
Collections.sort(lfuItemList, new Comparator<Map.Entry<String, Integer>>() {
@Override
public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) {
return o1.getValue().compareTo(o2.getValue());
}
});
Map.Entry<String, Integer> addressItem = lfuItemList.get(0);
String minAddress = addressItem.getKey();
addressItem.setValue(addressItem.getValue() + 1);
return minAddress;
}
@Override
public String route(String serviceKey, TreeSet<String> addressSet) {
String finalAddress = doRoute(serviceKey, addressSet);
return finalAddress;
}
}

View File

@ -1,79 +0,0 @@
package com.czsj.rpc.remoting.invoker.route.impl;
import com.czsj.rpc.remoting.invoker.route.XxlRpcLoadBalance;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* lru
*
* @author xuxueli 2018-12-04
*/
public class XxlRpcLoadBalanceLRUStrategy extends XxlRpcLoadBalance {
private ConcurrentMap<String, LinkedHashMap<String, String>> jobLRUMap = new ConcurrentHashMap<String, LinkedHashMap<String, String>>();
private long CACHE_VALID_TIME = 0;
public String doRoute(String serviceKey, TreeSet<String> addressSet) {
// cache clear
if (System.currentTimeMillis() > CACHE_VALID_TIME) {
jobLRUMap.clear();
CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;
}
// init lru
LinkedHashMap<String, String> lruItem = jobLRUMap.get(serviceKey);
if (lruItem == null) {
/**
* LinkedHashMap
* aaccessOrderture=访问顺序排序get/put时排序/ACCESS-LASTfalse=插入顺序排期/FIFO
* bremoveEldestEntry新增元素时将会调用返回true时会删除最老元素可封装LinkedHashMap并重写该方法比如定义最大容量超出是返回true即可实现固定长度的LRU算法
*/
lruItem = new LinkedHashMap<String, String>(16, 0.75f, true){
@Override
protected boolean removeEldestEntry(Map.Entry<String, String> eldest) {
if(super.size() > 1000){
return true;
}else{
return false;
}
}
};
jobLRUMap.putIfAbsent(serviceKey, lruItem);
}
// put new
for (String address: addressSet) {
if (!lruItem.containsKey(address)) {
lruItem.put(address, address);
}
}
// remove old
List<String> delKeys = new ArrayList<>();
for (String existKey: lruItem.keySet()) {
if (!addressSet.contains(existKey)) {
delKeys.add(existKey);
}
}
if (delKeys.size() > 0) {
for (String delKey: delKeys) {
lruItem.remove(delKey);
}
}
// load
String eldestKey = lruItem.entrySet().iterator().next().getKey();
String eldestValue = lruItem.get(eldestKey);
return eldestValue;
}
@Override
public String route(String serviceKey, TreeSet<String> addressSet) {
String finalAddress = doRoute(serviceKey, addressSet);
return finalAddress;
}
}

View File

@ -1,27 +0,0 @@
package com.czsj.rpc.remoting.invoker.route.impl;
import com.czsj.rpc.remoting.invoker.route.XxlRpcLoadBalance;
import java.util.Random;
import java.util.TreeSet;
/**
* random
*
* @author xuxueli 2018-12-04
*/
public class XxlRpcLoadBalanceRandomStrategy extends XxlRpcLoadBalance {
private Random random = new Random();
@Override
public String route(String serviceKey, TreeSet<String> addressSet) {
// arr
String[] addressArr = addressSet.toArray(new String[addressSet.size()]);
// random
String finalAddress = addressArr[random.nextInt(addressSet.size())];
return finalAddress;
}
}

View File

@ -1,43 +0,0 @@
package com.czsj.rpc.remoting.invoker.route.impl;
import com.czsj.rpc.remoting.invoker.route.XxlRpcLoadBalance;
import java.util.Random;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* round
*
* @author xuxueli 2018-12-04
*/
public class XxlRpcLoadBalanceRoundStrategy extends XxlRpcLoadBalance {
private ConcurrentMap<String, Integer> routeCountEachJob = new ConcurrentHashMap<String, Integer>();
private long CACHE_VALID_TIME = 0;
private int count(String serviceKey) {
// cache clear
if (System.currentTimeMillis() > CACHE_VALID_TIME) {
routeCountEachJob.clear();
CACHE_VALID_TIME = System.currentTimeMillis() + 24*60*60*1000;
}
// count++
Integer count = routeCountEachJob.get(serviceKey);
count = (count==null || count>1000000)?(new Random().nextInt(100)):++count; // 初始化时主动Random一次缓解首次压力
routeCountEachJob.put(serviceKey, count);
return count;
}
@Override
public String route(String serviceKey, TreeSet<String> addressSet) {
// arr
String[] addressArr = addressSet.toArray(new String[addressSet.size()]);
// round
String finalAddress = addressArr[count(serviceKey)%addressArr.length];
return finalAddress;
}
}

View File

@ -1,37 +0,0 @@
package com.czsj.rpc.remoting.net;
import com.czsj.rpc.remoting.invoker.reference.XxlRpcReferenceBean;
import com.czsj.rpc.remoting.net.params.XxlRpcRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* i client
* @author xuxueli 2015-11-24 22:18:10
*/
public abstract class Client {
protected static final Logger logger = LoggerFactory.getLogger(Client.class);
// ---------------------- init ----------------------
protected volatile XxlRpcReferenceBean xxlRpcReferenceBean;
public void init(XxlRpcReferenceBean xxlRpcReferenceBean) {
this.xxlRpcReferenceBean = xxlRpcReferenceBean;
}
// ---------------------- send ----------------------
/**
* async send, bind requestId and future-response
*
* @param address
* @param xxlRpcRequest
* @return
* @throws Exception
*/
public abstract void asyncSend(String address, XxlRpcRequest xxlRpcRequest) throws Exception;
}

View File

@ -1,70 +0,0 @@
package com.czsj.rpc.remoting.net;
import com.czsj.rpc.remoting.net.params.BaseCallback;
import com.czsj.rpc.remoting.provider.XxlRpcProviderFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* server
*
* @author xuxueli 2015-11-24 20:59:49
*/
public abstract class Server {
protected static final Logger logger = LoggerFactory.getLogger(Server.class);
private BaseCallback startedCallback;
private BaseCallback stopedCallback;
public void setStartedCallback(BaseCallback startedCallback) {
this.startedCallback = startedCallback;
}
public void setStopedCallback(BaseCallback stopedCallback) {
this.stopedCallback = stopedCallback;
}
/**
* start server
*
* @param xxlRpcProviderFactory
* @throws Exception
*/
public abstract void start(final XxlRpcProviderFactory xxlRpcProviderFactory) throws Exception;
/**
* callback when started
*/
public void onStarted() {
if (startedCallback != null) {
try {
startedCallback.run();
} catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-rpc, server startedCallback error.", e);
}
}
}
/**
* stop server
*
* @throws Exception
*/
public abstract void stop() throws Exception;
/**
* callback when stoped
*/
public void onStopped() {
if (stopedCallback != null) {
try {
stopedCallback.run();
} catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-rpc, server stopedCallback error.", e);
}
}
}
}

View File

@ -1,124 +0,0 @@
package com.czsj.rpc.remoting.net.common;
import com.czsj.rpc.remoting.invoker.XxlRpcInvokerFactory;
import com.czsj.rpc.remoting.invoker.reference.XxlRpcReferenceBean;
import com.czsj.rpc.remoting.net.params.BaseCallback;
import com.czsj.rpc.remoting.net.params.XxlRpcRequest;
import com.czsj.rpc.serialize.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* @author xuxueli 2018-10-19
*/
public abstract class ConnectClient {
protected static transient Logger logger = LoggerFactory.getLogger(ConnectClient.class);
// ---------------------- iface ----------------------
public abstract void init(String address, final Serializer serializer, final XxlRpcInvokerFactory xxlRpcInvokerFactory) throws Exception;
public abstract void close();
public abstract boolean isValidate();
public abstract void send(XxlRpcRequest xxlRpcRequest) throws Exception;
// ---------------------- client pool map ----------------------
/**
* async send
*/
public static void asyncSend(XxlRpcRequest xxlRpcRequest, String address,
Class<? extends ConnectClient> connectClientImpl,
final XxlRpcReferenceBean xxlRpcReferenceBean) throws Exception {
// client pool [tips03 : may save 35ms/100invoke if move it to constructor, but it is necessary. cause by ConcurrentHashMap.get]
ConnectClient clientPool = ConnectClient.getPool(address, connectClientImpl, xxlRpcReferenceBean);
try {
// do invoke
clientPool.send(xxlRpcRequest);
} catch (Exception e) {
throw e;
}
}
private static volatile ConcurrentMap<String, ConnectClient> connectClientMap; // (static) alread addStopCallBack
private static volatile ConcurrentMap<String, Object> connectClientLockMap = new ConcurrentHashMap<>();
private static ConnectClient getPool(String address, Class<? extends ConnectClient> connectClientImpl,
final XxlRpcReferenceBean xxlRpcReferenceBean) throws Exception {
// init base compont, avoid repeat init
if (connectClientMap == null) {
synchronized (ConnectClient.class) {
if (connectClientMap == null) {
// init
connectClientMap = new ConcurrentHashMap<String, ConnectClient>();
// stop callback
xxlRpcReferenceBean.getInvokerFactory().addStopCallBack(new BaseCallback() {
@Override
public void run() throws Exception {
if (connectClientMap.size() > 0) {
for (String key : connectClientMap.keySet()) {
ConnectClient clientPool = connectClientMap.get(key);
clientPool.close();
}
connectClientMap.clear();
}
}
});
}
}
}
// get-valid client
ConnectClient connectClient = connectClientMap.get(address);
if (connectClient != null && connectClient.isValidate()) {
return connectClient;
}
// lock
Object clientLock = connectClientLockMap.get(address);
if (clientLock == null) {
connectClientLockMap.putIfAbsent(address, new Object());
clientLock = connectClientLockMap.get(address);
}
// remove-create new client
synchronized (clientLock) {
// get-valid client, avlid repeat
connectClient = connectClientMap.get(address);
if (connectClient != null && connectClient.isValidate()) {
return connectClient;
}
// remove old
if (connectClient != null) {
connectClient.close();
connectClientMap.remove(address);
}
// set pool
ConnectClient connectClient_new = connectClientImpl.newInstance();
try {
connectClient_new.init(address, xxlRpcReferenceBean.getSerializerInstance(), xxlRpcReferenceBean.getInvokerFactory());
connectClientMap.put(address, connectClient_new);
} catch (Exception e) {
connectClient_new.close();
throw e;
}
return connectClient_new;
}
}
}

View File

@ -1,6 +0,0 @@
package com.czsj.rpc.remoting.net.common;
public class NettyConstant {
public static int MAX_LENGTH = 20 * 1024 * 1024;
}

View File

@ -1,21 +0,0 @@
package com.czsj.rpc.remoting.net.impl.netty.client;
import com.czsj.rpc.remoting.net.Client;
import com.czsj.rpc.remoting.net.common.ConnectClient;
import com.czsj.rpc.remoting.net.params.XxlRpcRequest;
/**
* netty client
*
* @author xuxueli 2015-11-24 22:25:15
*/
public class NettyClient extends Client {
private Class<? extends ConnectClient> connectClientImpl = NettyConnectClient.class;
@Override
public void asyncSend(String address, XxlRpcRequest xxlRpcRequest) throws Exception {
ConnectClient.asyncSend(xxlRpcRequest, address, connectClientImpl, xxlRpcReferenceBean);
}
}

View File

@ -1,56 +0,0 @@
package com.czsj.rpc.remoting.net.impl.netty.client;
import com.czsj.rpc.remoting.invoker.XxlRpcInvokerFactory;
import com.czsj.rpc.remoting.net.params.Beat;
import com.czsj.rpc.remoting.net.params.XxlRpcResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* rpc netty client handler
*
* @author xuxueli 2015-10-31 18:00:27
*/
public class NettyClientHandler extends SimpleChannelInboundHandler<XxlRpcResponse> {
private static final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);
private XxlRpcInvokerFactory xxlRpcInvokerFactory;
private NettyConnectClient nettyConnectClient;
public NettyClientHandler(final XxlRpcInvokerFactory xxlRpcInvokerFactory, NettyConnectClient nettyConnectClient) {
this.xxlRpcInvokerFactory = xxlRpcInvokerFactory;
this.nettyConnectClient = nettyConnectClient;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, XxlRpcResponse xxlRpcResponse) throws Exception {
// notify response
xxlRpcInvokerFactory.notifyInvokerFuture(xxlRpcResponse.getRequestId(), xxlRpcResponse);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error(">>>>>>>>>>> xxl-rpc netty client caught exception", cause);
ctx.close();
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent){
/*ctx.channel().close(); // close idle channel
logger.debug(">>>>>>>>>>> xxl-rpc netty client close an idle channel.");*/
nettyConnectClient.send(Beat.BEAT_PING); // beat N, close if fail(may throw error)
logger.debug(">>>>>>>>>>> xxl-rpc netty client send beat-ping.");
} else {
super.userEventTriggered(ctx, evt);
}
}
}

View File

@ -1,98 +0,0 @@
package com.czsj.rpc.remoting.net.impl.netty.client;
import com.czsj.rpc.remoting.invoker.XxlRpcInvokerFactory;
import com.czsj.rpc.remoting.net.common.ConnectClient;
import com.czsj.rpc.remoting.net.impl.netty.codec.NettyDecoder;
import com.czsj.rpc.remoting.net.impl.netty.codec.NettyEncoder;
import com.czsj.rpc.remoting.net.params.Beat;
import com.czsj.rpc.remoting.net.params.XxlRpcRequest;
import com.czsj.rpc.remoting.net.params.XxlRpcResponse;
import com.czsj.rpc.serialize.Serializer;
import com.czsj.rpc.util.IpUtil;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
/**
* netty pooled client
*
* @author xuxueli
*/
public class NettyConnectClient extends ConnectClient {
private EventLoopGroup group;
private Channel channel;
@Override
public void init(String address, final Serializer serializer, final XxlRpcInvokerFactory xxlRpcInvokerFactory) throws Exception {
final NettyConnectClient thisClient = this;
Object[] array = IpUtil.parseIpPort(address);
String host = (String) array[0];
int port = (int) array[1];
this.group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new IdleStateHandler(0,0, Beat.BEAT_INTERVAL, TimeUnit.SECONDS)) // beat N, close if fail
.addLast(new NettyEncoder(XxlRpcRequest.class, serializer))
.addLast(new NettyDecoder(XxlRpcResponse.class, serializer))
.addLast(new NettyClientHandler(xxlRpcInvokerFactory, thisClient));
}
})
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);
this.channel = bootstrap.connect(host, port).sync().channel();
// valid
if (!isValidate()) {
close();
return;
}
logger.debug(">>>>>>>>>>> xxl-rpc netty client proxy, connect to server success at host:{}, port:{}", host, port);
}
@Override
public boolean isValidate() {
if (this.channel != null) {
return this.channel.isActive();
}
return false;
}
@Override
public void close() {
if (this.channel != null && this.channel.isActive()) {
this.channel.close(); // if this.channel.isOpen()
}
if (this.group != null && !this.group.isShutdown()) {
this.group.shutdownGracefully();
}
logger.debug(">>>>>>>>>>> xxl-rpc netty client close.");
}
@Override
public void send(XxlRpcRequest xxlRpcRequest) throws Exception {
this.channel.writeAndFlush(xxlRpcRequest).sync();
}
}

View File

@ -1,45 +0,0 @@
package com.czsj.rpc.remoting.net.impl.netty.codec;
import com.czsj.rpc.serialize.Serializer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
/**
* decoder
*
* @author xuxueli 2015-10-29 19:02:36
*/
public class NettyDecoder extends ByteToMessageDecoder {
private Class<?> genericClass;
private Serializer serializer;
public NettyDecoder(Class<?> genericClass, final Serializer serializer) {
this.genericClass = genericClass;
this.serializer = serializer;
}
@Override
public final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() < 4) {
return;
}
in.markReaderIndex();
int dataLength = in.readInt();
if (dataLength < 0) {
ctx.close();
}
if (in.readableBytes() < dataLength) {
in.resetReaderIndex();
return; // fix 1024k buffer splice limix
}
byte[] data = new byte[dataLength];
in.readBytes(data);
Object obj = serializer.deserialize(data, genericClass);
out.add(obj);
}
}

View File

@ -1,31 +0,0 @@
package com.czsj.rpc.remoting.net.impl.netty.codec;
import com.czsj.rpc.serialize.Serializer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
/**
* encoder
*
* @author xuxueli 2015-10-29 19:43:00
*/
public class NettyEncoder extends MessageToByteEncoder<Object> {
private Class<?> genericClass;
private Serializer serializer;
public NettyEncoder(Class<?> genericClass, final Serializer serializer) {
this.genericClass = genericClass;
this.serializer = serializer;
}
@Override
public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception {
if (genericClass.isInstance(in)) {
byte[] data = serializer.serialize(in);
out.writeInt(data.length);
out.writeBytes(data);
}
}
}

View File

@ -1,117 +0,0 @@
package com.czsj.rpc.remoting.net.impl.netty.server;
import com.czsj.rpc.remoting.net.Server;
import com.czsj.rpc.remoting.net.impl.netty.codec.NettyDecoder;
import com.czsj.rpc.remoting.net.impl.netty.codec.NettyEncoder;
import com.czsj.rpc.remoting.net.params.Beat;
import com.czsj.rpc.remoting.net.params.XxlRpcRequest;
import com.czsj.rpc.remoting.net.params.XxlRpcResponse;
import com.czsj.rpc.remoting.provider.XxlRpcProviderFactory;
import com.czsj.rpc.util.ThreadPoolUtil;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* netty rpc server
*
* @author xuxueli 2015-10-29 18:17:14
*/
public class NettyServer extends Server {
private Thread thread;
@Override
public void start(final XxlRpcProviderFactory xxlRpcProviderFactory) throws Exception {
thread = new Thread(new Runnable() {
@Override
public void run() {
// param
final ThreadPoolExecutor serverHandlerPool = ThreadPoolUtil.makeServerThreadPool(
NettyServer.class.getSimpleName(),
xxlRpcProviderFactory.getCorePoolSize(),
xxlRpcProviderFactory.getMaxPoolSize());
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// start server
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new IdleStateHandler(0,0, Beat.BEAT_INTERVAL*3, TimeUnit.SECONDS)) // beat 3N, close if idle
.addLast(new NettyDecoder(XxlRpcRequest.class, xxlRpcProviderFactory.getSerializerInstance()))
.addLast(new NettyEncoder(XxlRpcResponse.class, xxlRpcProviderFactory.getSerializerInstance()))
.addLast(new NettyServerHandler(xxlRpcProviderFactory, serverHandlerPool));
}
})
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// bind
ChannelFuture future = bootstrap.bind(xxlRpcProviderFactory.getPort()).sync();
logger.info(">>>>>>>>>>> xxl-rpc remoting server start success, nettype = {}, port = {}", NettyServer.class.getName(), xxlRpcProviderFactory.getPort());
onStarted();
// wait util stop
future.channel().closeFuture().sync();
} catch (Exception e) {
if (e instanceof InterruptedException) {
logger.info(">>>>>>>>>>> xxl-rpc remoting server stop.");
} else {
logger.error(">>>>>>>>>>> xxl-rpc remoting server error.", e);
}
} finally {
// stop
try {
serverHandlerPool.shutdown(); // shutdownNow
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
try {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
});
thread.setDaemon(true);
thread.start();
}
@Override
public void stop() throws Exception {
// destroy server thread
if (thread != null && thread.isAlive()) {
thread.interrupt();
}
// on stop
onStopped();
logger.info(">>>>>>>>>>> xxl-rpc remoting server destroy success.");
}
}

View File

@ -1,81 +0,0 @@
package com.czsj.rpc.remoting.net.impl.netty.server;
import com.czsj.rpc.remoting.net.params.Beat;
import com.czsj.rpc.remoting.net.params.XxlRpcRequest;
import com.czsj.rpc.remoting.net.params.XxlRpcResponse;
import com.czsj.rpc.remoting.provider.XxlRpcProviderFactory;
import com.czsj.rpc.util.ThrowableUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ThreadPoolExecutor;
/**
* netty server handler
*
* @author xuxueli 2015-10-29 20:07:37
*/
public class NettyServerHandler extends SimpleChannelInboundHandler<XxlRpcRequest> {
private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
private XxlRpcProviderFactory xxlRpcProviderFactory;
private ThreadPoolExecutor serverHandlerPool;
public NettyServerHandler(final XxlRpcProviderFactory xxlRpcProviderFactory, final ThreadPoolExecutor serverHandlerPool) {
this.xxlRpcProviderFactory = xxlRpcProviderFactory;
this.serverHandlerPool = serverHandlerPool;
}
@Override
public void channelRead0(final ChannelHandlerContext ctx, final XxlRpcRequest xxlRpcRequest) throws Exception {
// filter beat
if (Beat.BEAT_ID.equalsIgnoreCase(xxlRpcRequest.getRequestId())){
logger.debug(">>>>>>>>>>> xxl-rpc provider netty server read beat-ping.");
return;
}
// do invoke
try {
serverHandlerPool.execute(new Runnable() {
@Override
public void run() {
// invoke + response
XxlRpcResponse xxlRpcResponse = xxlRpcProviderFactory.invokeService(xxlRpcRequest);
ctx.writeAndFlush(xxlRpcResponse);
}
});
} catch (Exception e) {
// catch error
XxlRpcResponse xxlRpcResponse = new XxlRpcResponse();
xxlRpcResponse.setRequestId(xxlRpcRequest.getRequestId());
xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(e));
ctx.writeAndFlush(xxlRpcResponse);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
logger.error(">>>>>>>>>>> xxl-rpc provider netty server caught exception", cause);
ctx.close();
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent){
ctx.channel().close(); // beat 3N, close if idle
logger.debug(">>>>>>>>>>> xxl-rpc provider netty server close an idle channel.");
} else {
super.userEventTriggered(ctx, evt);
}
}
}

View File

@ -1,21 +0,0 @@
package com.czsj.rpc.remoting.net.impl.netty_http.client;
import com.czsj.rpc.remoting.net.Client;
import com.czsj.rpc.remoting.net.common.ConnectClient;
import com.czsj.rpc.remoting.net.params.XxlRpcRequest;
/**
* netty_http client
*
* @author xuxueli 2015-11-24 22:25:15
*/
public class NettyHttpClient extends Client {
private Class<? extends ConnectClient> connectClientImpl = NettyHttpConnectClient.class;
@Override
public void asyncSend(String address, XxlRpcRequest xxlRpcRequest) throws Exception {
ConnectClient.asyncSend(xxlRpcRequest, address, connectClientImpl, xxlRpcReferenceBean);
}
}

View File

@ -1,85 +0,0 @@
package com.czsj.rpc.remoting.net.impl.netty_http.client;
import com.czsj.rpc.remoting.invoker.XxlRpcInvokerFactory;
import com.czsj.rpc.remoting.net.params.Beat;
import com.czsj.rpc.remoting.net.params.XxlRpcResponse;
import com.czsj.rpc.serialize.Serializer;
import com.czsj.rpc.util.XxlRpcException;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* netty_http
*
* @author xuxueli 2015-11-24 22:25:15
*/
public class NettyHttpClientHandler extends SimpleChannelInboundHandler<FullHttpResponse> {
private static final Logger logger = LoggerFactory.getLogger(NettyHttpClientHandler.class);
private XxlRpcInvokerFactory xxlRpcInvokerFactory;
private Serializer serializer;
private NettyHttpConnectClient nettyHttpConnectClient;
public NettyHttpClientHandler(final XxlRpcInvokerFactory xxlRpcInvokerFactory, Serializer serializer, final NettyHttpConnectClient nettyHttpConnectClient) {
this.xxlRpcInvokerFactory = xxlRpcInvokerFactory;
this.serializer = serializer;
this.nettyHttpConnectClient = nettyHttpConnectClient;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) throws Exception {
// valid status
if (!HttpResponseStatus.OK.equals(msg.status())) {
throw new XxlRpcException("xxl-rpc response status invalid.");
}
// response parse
byte[] responseBytes = ByteBufUtil.getBytes(msg.content());
// valid length
if (responseBytes.length == 0) {
throw new XxlRpcException("xxl-rpc response data empty.");
}
// response deserialize
XxlRpcResponse xxlRpcResponse = (XxlRpcResponse) serializer.deserialize(responseBytes, XxlRpcResponse.class);
// notify response
xxlRpcInvokerFactory.notifyInvokerFuture(xxlRpcResponse.getRequestId(), xxlRpcResponse);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//super.exceptionCaught(ctx, cause);
logger.error(">>>>>>>>>>> xxl-rpc netty_http client caught exception", cause);
ctx.close();
}
/*@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// retry
super.channelInactive(ctx);
}*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent){
/*ctx.channel().close(); // close idle channel
logger.debug(">>>>>>>>>>> xxl-rpc netty_http client close an idle channel.");*/
nettyHttpConnectClient.send(Beat.BEAT_PING); // beat N, close if fail(may throw error)
logger.debug(">>>>>>>>>>> xxl-rpc netty_http client send beat-ping.");
} else {
super.userEventTriggered(ctx, evt);
}
}
}

View File

@ -1,116 +0,0 @@
package com.czsj.rpc.remoting.net.impl.netty_http.client;
import com.czsj.rpc.remoting.invoker.XxlRpcInvokerFactory;
import com.czsj.rpc.remoting.net.common.ConnectClient;
import com.czsj.rpc.remoting.net.common.NettyConstant;
import com.czsj.rpc.remoting.net.params.Beat;
import com.czsj.rpc.remoting.net.params.XxlRpcRequest;
import com.czsj.rpc.serialize.Serializer;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.timeout.IdleStateHandler;
import java.net.URI;
import java.net.URL;
import java.util.concurrent.TimeUnit;
/**
* netty_http
*
* @author xuxueli 2015-11-24 22:25:15
*/
public class NettyHttpConnectClient extends ConnectClient {
private EventLoopGroup group;
private Channel channel;
private Serializer serializer;
private String address;
private String host;
private DefaultFullHttpRequest beatRequest;
@Override
public void init(String address, final Serializer serializer, final XxlRpcInvokerFactory xxlRpcInvokerFactory) throws Exception {
final NettyHttpConnectClient thisClient = this;
if (!address.toLowerCase().startsWith("http")) {
address = "http://" + address; // IP:PORT, need parse to url
}
this.address = address;
URL url = new URL(address);
this.host = url.getHost();
int port = url.getPort() > -1 ? url.getPort() : 80;
this.group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new IdleStateHandler(0, 0, Beat.BEAT_INTERVAL, TimeUnit.SECONDS)) // beat N, close if fail
.addLast(new HttpClientCodec())
.addLast(new HttpObjectAggregator(NettyConstant.MAX_LENGTH))
.addLast(new NettyHttpClientHandler(xxlRpcInvokerFactory, serializer, thisClient));
}
})
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);
this.channel = bootstrap.connect(host, port).sync().channel();
this.serializer = serializer;
// valid
if (!isValidate()) {
close();
return;
}
logger.debug(">>>>>>>>>>> xxl-rpc netty client proxy, connect to server success at host:{}, port:{}", host, port);
}
@Override
public boolean isValidate() {
if (this.channel != null) {
return this.channel.isActive();
}
return false;
}
@Override
public void close() {
if (this.channel != null && this.channel.isActive()) {
this.channel.close(); // if this.channel.isOpen()
}
if (this.group != null && !this.group.isShutdown()) {
this.group.shutdownGracefully();
}
logger.debug(">>>>>>>>>>> xxl-rpc netty client close.");
}
@Override
public void send(XxlRpcRequest xxlRpcRequest) throws Exception {
byte[] requestBytes = serializer.serialize(xxlRpcRequest);
DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, new URI(address).getRawPath(), Unpooled.wrappedBuffer(requestBytes));
request.headers().set(HttpHeaderNames.HOST, host);
request.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
request.headers().set(HttpHeaderNames.CONTENT_LENGTH, request.content().readableBytes());
this.channel.writeAndFlush(request).sync();
}
}

View File

@ -1,109 +0,0 @@
package com.czsj.rpc.remoting.net.impl.netty_http.server;
import com.czsj.rpc.remoting.net.Server;
import com.czsj.rpc.remoting.net.common.NettyConstant;
import com.czsj.rpc.remoting.net.params.Beat;
import com.czsj.rpc.remoting.provider.XxlRpcProviderFactory;
import com.czsj.rpc.util.ThreadPoolUtil;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* netty_http
*
* @author xuxueli 2015-11-24 22:25:15
*/
public class NettyHttpServer extends Server {
private Thread thread;
@Override
public void start(final XxlRpcProviderFactory xxlRpcProviderFactory) {
thread = new Thread(() -> {
// param
final ThreadPoolExecutor serverHandlerPool = ThreadPoolUtil.makeServerThreadPool(
NettyHttpServer.class.getSimpleName(),
xxlRpcProviderFactory.getCorePoolSize(),
xxlRpcProviderFactory.getMaxPoolSize());
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// start server
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) {
channel.pipeline()
.addLast(new IdleStateHandler(0, 0, Beat.BEAT_INTERVAL * 3, TimeUnit.SECONDS)) // beat 3N, close if idle
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(NettyConstant.MAX_LENGTH)) // merge request & reponse to FULL
.addLast(new NettyHttpServerHandler(xxlRpcProviderFactory, serverHandlerPool));
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true);
// bind
ChannelFuture future = bootstrap.bind(xxlRpcProviderFactory.getPort()).sync();
logger.info(">>>>>>>>>>> xxl-rpc remoting server start success, nettype = {}, port = {}", NettyHttpServer.class.getName(), xxlRpcProviderFactory.getPort());
onStarted();
// wait util stop
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
if (e instanceof InterruptedException) {
logger.info(">>>>>>>>>>> xxl-rpc remoting server stop.");
} else {
logger.error(">>>>>>>>>>> xxl-rpc remoting server error.", e);
}
} finally {
// stop
try {
serverHandlerPool.shutdown(); // shutdownNow
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
try {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
});
thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
thread.start();
}
@Override
public void stop() {
// destroy server thread
if (thread != null && thread.isAlive()) {
thread.interrupt();
}
// on stop
onStopped();
logger.info(">>>>>>>>>>> xxl-rpc remoting server destroy success.");
}
}

View File

@ -1,150 +0,0 @@
package com.czsj.rpc.remoting.net.impl.netty_http.server;
import com.czsj.rpc.remoting.net.params.Beat;
import com.czsj.rpc.remoting.net.params.XxlRpcRequest;
import com.czsj.rpc.remoting.net.params.XxlRpcResponse;
import com.czsj.rpc.remoting.provider.XxlRpcProviderFactory;
import com.czsj.rpc.util.ThrowableUtil;
import com.czsj.rpc.util.XxlRpcException;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ThreadPoolExecutor;
/**
* netty_http
*
* @author xuxueli 2015-11-24 22:25:15
*/
public class NettyHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private static final Logger logger = LoggerFactory.getLogger(NettyHttpServerHandler.class);
private XxlRpcProviderFactory xxlRpcProviderFactory;
private ThreadPoolExecutor serverHandlerPool;
public NettyHttpServerHandler(final XxlRpcProviderFactory xxlRpcProviderFactory, final ThreadPoolExecutor serverHandlerPool) {
this.xxlRpcProviderFactory = xxlRpcProviderFactory;
this.serverHandlerPool = serverHandlerPool;
}
@Override
protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
// request parse
final byte[] requestBytes = ByteBufUtil.getBytes(msg.content()); // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8);
final String uri = msg.uri();
final boolean keepAlive = HttpUtil.isKeepAlive(msg);
// do invoke
serverHandlerPool.execute(new Runnable() {
@Override
public void run() {
process(ctx, uri, requestBytes, keepAlive);
}
});
}
private void process(ChannelHandlerContext ctx, String uri, byte[] requestBytes, boolean keepAlive){
String requestId = null;
try {
if ("/services".equals(uri)) { // services mapping
// request
StringBuilder stringBuffer = new StringBuilder("<ui>");
for (String serviceKey: xxlRpcProviderFactory.getServiceData().keySet()) {
stringBuffer.append("<li>").append(serviceKey).append(": ").append(xxlRpcProviderFactory.getServiceData().get(serviceKey)).append("</li>");
}
stringBuffer.append("</ui>");
// response serialize
byte[] responseBytes = stringBuffer.toString().getBytes("UTF-8");
// response-write
writeResponse(ctx, keepAlive, responseBytes);
} else {
System.out.println("=================");
// valid
if (requestBytes.length == 0) {
throw new XxlRpcException("xxl-rpc request data empty.");
}
// request deserialize
XxlRpcRequest xxlRpcRequest = (XxlRpcRequest) xxlRpcProviderFactory.getSerializerInstance().deserialize(requestBytes, XxlRpcRequest.class);
requestId = xxlRpcRequest.getRequestId();
// filter beat
if (Beat.BEAT_ID.equalsIgnoreCase(xxlRpcRequest.getRequestId())){
logger.debug(">>>>>>>>>>> xxl-rpc provider netty_http server read beat-ping.");
return;
}
// invoke + response
XxlRpcResponse xxlRpcResponse = xxlRpcProviderFactory.invokeService(xxlRpcRequest);
// response serialize
byte[] responseBytes = xxlRpcProviderFactory.getSerializerInstance().serialize(xxlRpcResponse);
// response-write
writeResponse(ctx, keepAlive, responseBytes);
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
// response error
XxlRpcResponse xxlRpcResponse = new XxlRpcResponse();
xxlRpcResponse.setRequestId(requestId);
xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(e));
// response serialize
byte[] responseBytes = xxlRpcProviderFactory.getSerializerInstance().serialize(xxlRpcResponse);
// response-write
writeResponse(ctx, keepAlive, responseBytes);
}
}
/**
* write response
*/
private void writeResponse(ChannelHandlerContext ctx, boolean keepAlive, byte[] responseBytes){
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(responseBytes));
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=UTF-8"); // HttpHeaderValues.TEXT_PLAIN.toString()
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
if (keepAlive) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}
ctx.writeAndFlush(response);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
logger.error(">>>>>>>>>>> xxl-rpc provider netty_http server caught exception", cause);
ctx.close();
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent){
ctx.channel().close(); // beat 3N, close if idle
logger.debug(">>>>>>>>>>> xxl-rpc provider netty_http server close an idle channel.");
} else {
super.userEventTriggered(ctx, evt);
}
}
}

View File

@ -1,10 +0,0 @@
package com.czsj.rpc.remoting.net.params;
/**
* @author xuxueli 2018-10-19
*/
public abstract class BaseCallback {
public abstract void run() throws Exception;
}

View File

@ -1,20 +0,0 @@
package com.czsj.rpc.remoting.net.params;
/**
* beat for keep-alive
*
* @author xuxueli 2019-09-27
*/
public final class Beat {
public static final int BEAT_INTERVAL = 30;
public static final String BEAT_ID = "BEAT_PING_PONG";
public static XxlRpcRequest BEAT_PING;
static {
BEAT_PING = new XxlRpcRequest(){};
BEAT_PING.setRequestId(BEAT_ID);
}
}

View File

@ -1,127 +0,0 @@
package com.czsj.rpc.remoting.net.params;
import com.czsj.rpc.remoting.invoker.XxlRpcInvokerFactory;
import com.czsj.rpc.remoting.invoker.call.XxlRpcInvokeCallback;
import com.czsj.rpc.util.XxlRpcException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* call back future
*
* @author xuxueli 2015-11-5 14:26:37
*/
public class XxlRpcFutureResponse implements Future<XxlRpcResponse> {
private XxlRpcInvokerFactory invokerFactory;
// net data
private XxlRpcRequest request;
private XxlRpcResponse response;
// future lock
private boolean done = false;
private Object lock = new Object();
// callback, can be null
private XxlRpcInvokeCallback invokeCallback;
public XxlRpcFutureResponse(final XxlRpcInvokerFactory invokerFactory, XxlRpcRequest request, XxlRpcInvokeCallback invokeCallback) {
this.invokerFactory = invokerFactory;
this.request = request;
this.invokeCallback = invokeCallback;
// set-InvokerFuture
setInvokerFuture();
}
// ---------------------- response pool ----------------------
public void setInvokerFuture() {
this.invokerFactory.setInvokerFuture(request.getRequestId(), this);
}
public void removeInvokerFuture() {
this.invokerFactory.removeInvokerFuture(request.getRequestId());
}
// ---------------------- get ----------------------
public XxlRpcRequest getRequest() {
return request;
}
public XxlRpcInvokeCallback getInvokeCallback() {
return invokeCallback;
}
// ---------------------- for invoke back ----------------------
public void setResponse(XxlRpcResponse response) {
this.response = response;
synchronized (lock) {
done = true;
lock.notifyAll();
}
}
// ---------------------- for invoke ----------------------
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
// TODO
return false;
}
@Override
public boolean isCancelled() {
// TODO
return false;
}
@Override
public boolean isDone() {
return done;
}
@Override
public XxlRpcResponse get() throws InterruptedException {
try {
return get(-1, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
throw new XxlRpcException(e);
}
}
@Override
public XxlRpcResponse get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
if (!done) {
synchronized (lock) {
try {
if (timeout < 0) {
lock.wait();
} else {
long timeoutMillis = (TimeUnit.MILLISECONDS == unit) ? timeout : TimeUnit.MILLISECONDS.convert(timeout, unit);
lock.wait(timeoutMillis);
}
} catch (InterruptedException e) {
throw e;
}
}
}
if (!done) {
throw new XxlRpcException("xxl-rpc, request timeout at:" + System.currentTimeMillis() + ", request:" + request.toString());
}
return response;
}
}

View File

@ -1,104 +0,0 @@
package com.czsj.rpc.remoting.net.params;
import java.io.Serializable;
import java.util.Arrays;
/**
* request
*
* @author xuxueli 2015-10-29 19:39:12
*/
public class XxlRpcRequest implements Serializable {
private static final long serialVersionUID = 42L;
private String requestId;
private long createMillisTime;
private String accessToken;
private String className;
private String methodName;
private Class<?>[] parameterTypes;
private Object[] parameters;
private String version;
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public long getCreateMillisTime() {
return createMillisTime;
}
public void setCreateMillisTime(long createMillisTime) {
this.createMillisTime = createMillisTime;
}
public String getAccessToken() {
return accessToken;
}
public void setAccessToken(String accessToken) {
this.accessToken = accessToken;
}
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Class<?>[] getParameterTypes() {
return parameterTypes;
}
public void setParameterTypes(Class<?>[] parameterTypes) {
this.parameterTypes = parameterTypes;
}
public Object[] getParameters() {
return parameters;
}
public void setParameters(Object[] parameters) {
this.parameters = parameters;
}
public String getVersion() {
return version;
}
public void setVersion(String version) {
this.version = version;
}
@Override
public String toString() {
return "XxlRpcRequest{" +
"requestId='" + requestId + '\'' +
", createMillisTime=" + createMillisTime +
", accessToken='" + accessToken + '\'' +
", className='" + className + '\'' +
", methodName='" + methodName + '\'' +
", parameterTypes=" + Arrays.toString(parameterTypes) +
", parameters=" + Arrays.toString(parameters) +
", version='" + version + '\'' +
'}';
}
}

View File

@ -1,52 +0,0 @@
package com.czsj.rpc.remoting.net.params;
import java.io.Serializable;
/**
* response
*
* @author xuxueli 2015-10-29 19:39:54
*/
public class XxlRpcResponse implements Serializable {
private static final long serialVersionUID = 42L;
private String requestId;
private String errorMsg;
private Object result;
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public String getErrorMsg() {
return errorMsg;
}
public void setErrorMsg(String errorMsg) {
this.errorMsg = errorMsg;
}
public Object getResult() {
return result;
}
public void setResult(Object result) {
this.result = result;
}
@Override
public String toString() {
return "XxlRpcResponse{" +
"requestId='" + requestId + '\'' +
", errorMsg='" + errorMsg + '\'' +
", result=" + result +
'}';
}
}

View File

@ -1,256 +0,0 @@
package com.czsj.rpc.remoting.provider;
import com.czsj.rpc.registry.ServiceRegistry;
import com.czsj.rpc.remoting.net.Server;
import com.czsj.rpc.remoting.net.impl.netty.server.NettyServer;
import com.czsj.rpc.remoting.net.params.BaseCallback;
import com.czsj.rpc.remoting.net.params.XxlRpcRequest;
import com.czsj.rpc.remoting.net.params.XxlRpcResponse;
import com.czsj.rpc.serialize.Serializer;
import com.czsj.rpc.serialize.impl.HessianSerializer;
import com.czsj.rpc.util.IpUtil;
import com.czsj.rpc.util.NetUtil;
import com.czsj.rpc.util.ThrowableUtil;
import com.czsj.rpc.util.XxlRpcException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
/**
* provider
*
* @author xuxueli 2015-10-31 22:54:27
*/
public class XxlRpcProviderFactory {
private static final Logger logger = LoggerFactory.getLogger(XxlRpcProviderFactory.class);
// ---------------------- config ----------------------
private Class<? extends Server> server = NettyServer.class;
private Class<? extends Serializer> serializer = HessianSerializer.class;
private int corePoolSize = 60;
private int maxPoolSize = 300;
private String ip = null; // for registry
private int port = 7080; // default port
private String accessToken = null;
private Class<? extends ServiceRegistry> serviceRegistry = null;
private Map<String, String> serviceRegistryParam = null;
// set
public void setServer(Class<? extends Server> server) {
this.server = server;
}
public void setSerializer(Class<? extends Serializer> serializer) {
this.serializer = serializer;
}
public void setCorePoolSize(int corePoolSize) {
this.corePoolSize = corePoolSize;
}
public void setMaxPoolSize(int maxPoolSize) {
this.maxPoolSize = maxPoolSize;
}
public void setIp(String ip) {
this.ip = ip;
}
public void setPort(int port) {
this.port = port;
}
public void setAccessToken(String accessToken) {
this.accessToken = accessToken;
}
public void setServiceRegistry(Class<? extends ServiceRegistry> serviceRegistry) {
this.serviceRegistry = serviceRegistry;
}
public void setServiceRegistryParam(Map<String, String> serviceRegistryParam) {
this.serviceRegistryParam = serviceRegistryParam;
}
// get
public Serializer getSerializerInstance() {
return serializerInstance;
}
public int getPort() {
return port;
}
public int getCorePoolSize() {
return corePoolSize;
}
public int getMaxPoolSize() {
return maxPoolSize;
}
// ---------------------- start / stop ----------------------
private Server serverInstance;
private Serializer serializerInstance;
private ServiceRegistry serviceRegistryInstance;
private String serviceAddress;
public void start() throws Exception {
// valid
if (this.server == null) {
throw new XxlRpcException("xxl-rpc provider server missing.");
}
if (this.serializer==null) {
throw new XxlRpcException("xxl-rpc provider serializer missing.");
}
if (!(this.corePoolSize>0 && this.maxPoolSize>0 && this.maxPoolSize>=this.corePoolSize)) {
this.corePoolSize = 60;
this.maxPoolSize = 300;
}
if (this.ip == null) {
this.ip = IpUtil.getIp();
}
if (this.port <= 0) {
this.port = 7080;
}
if (NetUtil.isPortUsed(this.port)) {
throw new XxlRpcException("xxl-rpc provider port["+ this.port +"] is used.");
}
// init serializerInstance
this.serializerInstance = serializer.newInstance();
// start server
serviceAddress = IpUtil.getIpPort(this.ip, port);
serverInstance = server.newInstance();
serverInstance.setStartedCallback(new BaseCallback() { // serviceRegistry started
@Override
public void run() throws Exception {
// start registry
if (serviceRegistry != null) {
serviceRegistryInstance = serviceRegistry.newInstance();
serviceRegistryInstance.start(serviceRegistryParam);
if (serviceData.size() > 0) {
serviceRegistryInstance.registry(serviceData.keySet(), serviceAddress);
}
}
}
});
serverInstance.setStopedCallback(new BaseCallback() { // serviceRegistry stoped
@Override
public void run() {
// stop registry
if (serviceRegistryInstance != null) {
if (serviceData.size() > 0) {
serviceRegistryInstance.remove(serviceData.keySet(), serviceAddress);
}
serviceRegistryInstance.stop();
serviceRegistryInstance = null;
}
}
});
serverInstance.start(this);
}
public void stop() throws Exception {
// stop server
serverInstance.stop();
}
// ---------------------- server invoke ----------------------
/**
* init local rpc service map
*/
private Map<String, Object> serviceData = new HashMap<String, Object>();
public Map<String, Object> getServiceData() {
return serviceData;
}
/**
* make service key
*
* @param iface
* @param version
* @return
*/
public static String makeServiceKey(String iface, String version){
String serviceKey = iface;
if (version!=null && version.trim().length()>0) {
serviceKey += "#".concat(version);
}
return serviceKey;
}
/**
* add service
*
* @param iface
* @param version
* @param serviceBean
*/
public void addService(String iface, String version, Object serviceBean){
String serviceKey = makeServiceKey(iface, version);
serviceData.put(serviceKey, serviceBean);
logger.info(">>>>>>>>>>> xxl-rpc, provider factory add service success. serviceKey = {}, serviceBean = {}", serviceKey, serviceBean.getClass());
}
/**
* invoke service
*
* @param xxlRpcRequest
* @return
*/
public XxlRpcResponse invokeService(XxlRpcRequest xxlRpcRequest) {
// make response
XxlRpcResponse xxlRpcResponse = new XxlRpcResponse();
xxlRpcResponse.setRequestId(xxlRpcRequest.getRequestId());
// match service bean
String serviceKey = makeServiceKey(xxlRpcRequest.getClassName(), xxlRpcRequest.getVersion());
Object serviceBean = serviceData.get(serviceKey);
// valid
if (serviceBean == null) {
xxlRpcResponse.setErrorMsg("The serviceKey["+ serviceKey +"] not found.");
return xxlRpcResponse;
}
if (System.currentTimeMillis() - xxlRpcRequest.getCreateMillisTime() > 3*60*1000) {
xxlRpcResponse.setErrorMsg("The timestamp difference between admin and executor exceeds the limit.");
return xxlRpcResponse;
}
if (accessToken!=null && accessToken.trim().length()>0 && !accessToken.trim().equals(xxlRpcRequest.getAccessToken())) {
xxlRpcResponse.setErrorMsg("The access token[" + xxlRpcRequest.getAccessToken() + "] is wrong.");
return xxlRpcResponse;
}
try {
// invoke
Class<?> serviceClass = serviceBean.getClass();
String methodName = xxlRpcRequest.getMethodName();
Class<?>[] parameterTypes = xxlRpcRequest.getParameterTypes();
Object[] parameters = xxlRpcRequest.getParameters();
Method method = serviceClass.getMethod(methodName, parameterTypes);
method.setAccessible(true);
Object result = method.invoke(serviceBean, parameters);
/*FastClass serviceFastClass = FastClass.create(serviceClass);
FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
Object result = serviceFastMethod.invoke(serviceBean, parameters);*/
xxlRpcResponse.setResult(result);
} catch (Throwable t) {
// catch error
logger.error("xxl-rpc provider invokeService error.", t);
xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(t));
}
return xxlRpcResponse;
}
}

View File

@ -1,20 +0,0 @@
package com.czsj.rpc.remoting.provider.annotation;
import java.lang.annotation.*;
/**
* rpc service annotation, skeleton of stub ("@Inherited" allow service use "Transactional")
*
* @author 2015-10-29 19:44:33
*/
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface XxlRpcService {
/**
* @return
*/
String version() default "";
}

View File

@ -1,56 +0,0 @@
package com.czsj.rpc.remoting.provider.impl;
import com.czsj.rpc.remoting.provider.XxlRpcProviderFactory;
import com.czsj.rpc.remoting.provider.annotation.XxlRpcService;
import com.czsj.rpc.util.XxlRpcException;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import java.util.Map;
/**
* xxl-rpc provider (for spring)
*
* @author xuxueli 2018-10-18 18:09:20
*/
public class XxlRpcSpringProviderFactory extends XxlRpcProviderFactory
implements ApplicationContextAware, InitializingBean,DisposableBean {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(XxlRpcService.class);
if (serviceBeanMap!=null && serviceBeanMap.size()>0) {
for (Object serviceBean : serviceBeanMap.values()) {
// valid
if (serviceBean.getClass().getInterfaces().length ==0) {
throw new XxlRpcException("xxl-rpc, service(XxlRpcService) must inherit interface.");
}
// add service
XxlRpcService xxlRpcService = serviceBean.getClass().getAnnotation(XxlRpcService.class);
String iface = serviceBean.getClass().getInterfaces()[0].getName();
String version = xxlRpcService.version();
super.addService(iface, version, serviceBean);
}
}
// TODOaddServices by api + prop
}
@Override
public void afterPropertiesSet() throws Exception {
super.start();
}
@Override
public void destroy() throws Exception {
super.stop();
}
}

View File

@ -1,45 +0,0 @@
package com.czsj.rpc.serialize;
/**
* serializer
*
* Tips模板方法模式定义一个操作中算法的骨架或称为顶级逻辑将一些步骤或称为基本方法的执行延迟到其子类中
* Tips基本方法抽象方法 + 具体方法final + 钩子方法
* TipsEnum 时最好的单例方案枚举单例会初始化全部实现此处改为托管Class避免无效的实例化
*
* @author xuxueli 2015-10-30 21:02:55
*/
public abstract class Serializer {
public abstract <T> byte[] serialize(T obj);
public abstract <T> Object deserialize(byte[] bytes, Class<T> clazz);
/*public enum SerializeEnum {
HESSIAN(HessianSerializer.class),
HESSIAN1(Hessian1Serializer.class);
private Class<? extends Serializer> serializerClass;
private SerializeEnum (Class<? extends Serializer> serializerClass) {
this.serializerClass = serializerClass;
}
public Serializer getSerializer() {
try {
return serializerClass.newInstance();
} catch (Exception e) {
throw new XxlRpcException(e);
}
}
public static SerializeEnum match(String name, SerializeEnum defaultSerializer){
for (SerializeEnum item : SerializeEnum.values()) {
if (item.name().equals(name)) {
return item;
}
}
return defaultSerializer;
}
}*/
}

View File

@ -1,67 +0,0 @@
package com.czsj.rpc.serialize.impl;
import com.caucho.hessian.io.Hessian2Input;
import com.caucho.hessian.io.Hessian2Output;
import com.czsj.rpc.serialize.Serializer;
import com.czsj.rpc.util.XxlRpcException;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
/**
* hessian serialize
* @author xuxueli 2015-9-26 02:53:29
*/
public class HessianSerializer extends Serializer {
@Override
public <T> byte[] serialize(T obj) {
ByteArrayOutputStream os = new ByteArrayOutputStream();
Hessian2Output ho = new Hessian2Output(os);
try {
ho.writeObject(obj);
ho.flush();
byte[] result = os.toByteArray();
return result;
} catch (IOException e) {
throw new XxlRpcException(e);
} finally {
try {
ho.close();
} catch (IOException e) {
throw new XxlRpcException(e);
}
try {
os.close();
} catch (IOException e) {
throw new XxlRpcException(e);
}
}
}
@Override
public <T> Object deserialize(byte[] bytes, Class<T> clazz) {
ByteArrayInputStream is = new ByteArrayInputStream(bytes);
Hessian2Input hi = new Hessian2Input(is);
try {
Object result = hi.readObject();
return result;
} catch (IOException e) {
throw new XxlRpcException(e);
} finally {
try {
hi.close();
} catch (Exception e) {
throw new XxlRpcException(e);
}
try {
is.close();
} catch (IOException e) {
throw new XxlRpcException(e);
}
}
}
}

View File

@ -1,46 +0,0 @@
package com.czsj.rpc.util;
import java.util.HashMap;
/**
*
* @Author: czsj
* @Date: 2022/9/16 11:14
* @Description: Class的工具类
**/
public class ClassUtil {
private static final HashMap<String, Class<?>> primClasses = new HashMap<>();
static {
primClasses.put("boolean", boolean.class);
primClasses.put("byte", byte.class);
primClasses.put("char", char.class);
primClasses.put("short", short.class);
primClasses.put("int", int.class);
primClasses.put("long", long.class);
primClasses.put("float", float.class);
primClasses.put("double", double.class);
primClasses.put("void", void.class);
}
/**
*
* @param className 类名
* @return 反射得到类的Class
* @throws ClassNotFoundException
*/
public static Class<?> resolveClass(String className) throws ClassNotFoundException {
try {
return Class.forName(className);
} catch (ClassNotFoundException ex) {
Class<?> cl = primClasses.get(className);
if (cl != null) {
return cl;
} else {
throw ex;
}
}
}
}

View File

@ -1,195 +0,0 @@
package com.czsj.rpc.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.UnknownHostException;
import java.util.Enumeration;
import java.util.regex.Pattern;
/**
*
* @Author: czsj
* @Date: 2022/9/16 11:14
* @Description: IP的工具类
**/
public class IpUtil {
private static final Logger logger = LoggerFactory.getLogger(IpUtil.class);
private static final String ANYHOST_VALUE = "0.0.0.0";
private static final String LOCALHOST_VALUE = "127.0.0.1";
private static final Pattern IP_PATTERN = Pattern.compile("\\d{1,3}(\\.\\d{1,3}){3,5}$");
private static volatile InetAddress LOCAL_ADDRESS = null;
private static InetAddress toValidAddress(InetAddress address) {
if (address instanceof Inet6Address) {
Inet6Address v6Address = (Inet6Address) address;
if (isPreferIPV6Address()) {
return normalizeV6Address(v6Address);
}
}
if (isValidV4Address(address)) {
return address;
}
return null;
}
/**
*
* @return
*/
private static boolean isPreferIPV6Address() {
return Boolean.getBoolean("java.net.preferIPv6Addresses");
}
/**
*
* @param address
* @return 判断是不是有效的IPV4的地址
*/
private static boolean isValidV4Address(InetAddress address) {
if (address == null || address.isLoopbackAddress()) {
return false;
}
String name = address.getHostAddress();
return (name != null && IP_PATTERN.matcher(name).matches() && !ANYHOST_VALUE.equals(name) && !LOCALHOST_VALUE
.equals(name));
}
/**
*
* @param address
* @return 返回IPV6的地址
*/
private static InetAddress normalizeV6Address(Inet6Address address) {
String addr = address.getHostAddress();
int i = addr.lastIndexOf('%');
if (i > 0) {
try {
return InetAddress.getByName(addr.substring(0, i) + '%' + address.getScopeId());
} catch (UnknownHostException e) {
logger.debug("Unknown IPV6 address: ", e);
}
}
return address;
}
/**
*
* @return 返回本地的IP
*/
private static InetAddress getLocalAddress0() {
InetAddress localAddress = null;
try {
localAddress = InetAddress.getLocalHost();
InetAddress addressItem = toValidAddress(localAddress);
if (addressItem != null) {
return addressItem;
}
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
try {
Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
if (null == interfaces) {
return localAddress;
}
while (interfaces.hasMoreElements()) {
try {
NetworkInterface network = interfaces.nextElement();
if (network.isLoopback() || network.isVirtual() || !network.isUp()) {
continue;
}
Enumeration<InetAddress> addresses = network.getInetAddresses();
while (addresses.hasMoreElements()) {
try {
InetAddress addressItem = toValidAddress(addresses.nextElement());
if (addressItem != null) {
try {
if (addressItem.isReachable(100)) {
return addressItem;
}
} catch (IOException e) {
// ignore
}
}
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
}
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
}
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
return localAddress;
}
/**
*
* @return 返回本地的IP
*/
public static InetAddress getLocalAddress() {
if (LOCAL_ADDRESS != null) {
return LOCAL_ADDRESS;
}
InetAddress localAddress = getLocalAddress0();
LOCAL_ADDRESS = localAddress;
return localAddress;
}
/**
*
* @return 返回本地的IP
*/
public static String getIp() {
return getLocalAddress().getHostAddress();
}
/**
*
* @param port
* @return 返回本地IP:port
*/
public static String getIpPort(int port) {
String ip = getIp();
return getIpPort(ip, port);
}
/**
*
* @param ip
* @param port
* @return ip:port
*/
public static String getIpPort(String ip, int port) {
if (ip == null) {
return null;
}
return ip.concat(":").concat(String.valueOf(port));
}
/**
* @param address
* @return ip和port的数组
*/
public static Object[] parseIpPort(String address) {
String[] array = address.split(":");
String host = array[0];
int port = Integer.parseInt(array[1]);
return new Object[]{host, port};
}
}

View File

@ -1,69 +0,0 @@
package com.czsj.rpc.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.ServerSocket;
/**
*
* @Author: czsj
* @Date: 2022/9/16 11:14
* @Description: 端口的工具类
**/
public class NetUtil {
private static Logger logger = LoggerFactory.getLogger(NetUtil.class);
/**
*
* @param defaultPort
* @return 可用的端口
*/
public static int findAvailablePort(int defaultPort) {
int portTmp = defaultPort;
while (portTmp < 65535) {
if (!isPortUsed(portTmp)) {
return portTmp;
} else {
portTmp++;
}
}
portTmp = defaultPort--;
while (portTmp > 0) {
if (!isPortUsed(portTmp)) {
return portTmp;
} else {
portTmp--;
}
}
throw new XxlRpcException("no available port.");
}
/**
*
* @param port
* @return 可用返回true, 否则返回false
*/
public static boolean isPortUsed(int port) {
boolean used;
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(port);
used = false;
} catch (IOException e) {
logger.info(">>>>>>>>>>> xxl-rpc, port[{}] is in use.", port);
used = true;
} finally {
if (serverSocket != null) {
try {
serverSocket.close();
} catch (IOException e) {
logger.info("");
}
}
}
return used;
}
}

View File

@ -1,25 +0,0 @@
package com.czsj.rpc.util;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
*
* @Author: czsj
* @Date: 2022/9/16 11:14
* @Description: 自定义线程池
**/
public class ThreadPoolUtil {
public static ThreadPoolExecutor makeServerThreadPool(final String serverType, int corePoolSize, int maxPoolSize) {
ThreadPoolExecutor serverHandlerPool = new ThreadPoolExecutor(corePoolSize, maxPoolSize, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
r -> new Thread(r, "xxl-rpc, " + serverType + "-serverHandlerPool-" + r.hashCode()), (r, executor) -> {
throw new XxlRpcException("xxl-rpc " + serverType + " Thread pool is EXHAUSTED!");
}); // default maxThreads 300, minThreads 60
return serverHandlerPool;
}
}

View File

@ -1,21 +0,0 @@
package com.czsj.rpc.util;
import java.io.PrintWriter;
import java.io.StringWriter;
/**
*
* @Author: czsj
* @Date: 2022/9/16 11:14
* @Description: 将异常转换为String
**/
public class ThrowableUtil {
public static String toString(Throwable e) {
StringWriter stringWriter = new StringWriter();
e.printStackTrace(new PrintWriter(stringWriter));
String errorMsg = stringWriter.toString();
return errorMsg;
}
}

View File

@ -1,25 +0,0 @@
package com.czsj.rpc.util;
/**
*
* @Author: czsj
* @Date: 2022/9/16 11:14
* @Description: 自定义异常类
**/
public class XxlRpcException extends RuntimeException {
private static final long serialVersionUID = 42L;
public XxlRpcException(String msg) {
super(msg);
}
public XxlRpcException(String msg, Throwable cause) {
super(msg, cause);
}
public XxlRpcException(Throwable cause) {
super(cause);
}
}

View File

@ -1,68 +0,0 @@
package com.czsj.rpc.util.json;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
*
* @Author: czsj
* @Date: 2022/9/16 11:14
* @Description: Json的父类
**/
public class BasicJson {
private static final BasicJsonReader basicJsonReader = new BasicJsonReader();
private static final BasicJsonwriter basicJsonwriter = new BasicJsonwriter();
/**
* object to json
*
* @param object
* @return
*/
public static String toJson(Object object) {
return basicJsonwriter.toJson(object);
}
/**
* parse json to map
*
* @param json
* @return only for filed type "null、ArrayList、LinkedHashMap、String、Long、Double、..."
*/
public static Map<String, Object> parseMap(String json) {
return basicJsonReader.parseMap(json);
}
/**
* json to List
*
* @param json
* @return
*/
public static List<Object> parseList(String json) {
return basicJsonReader.parseList(json);
}
public static void main(String[] args) {
Map<String, Object> result = new HashMap<>();
result.put("code", 200);
result.put("msg", "success");
result.put("arr", Arrays.asList("111", "222"));
result.put("float", 1.11f);
result.put("temp", null);
String json = toJson(result);
System.out.println(json);
Map<String, Object> mapObj = parseMap(json);
System.out.println(mapObj);
List<Object> listInt = parseList("[111,222,33]");
System.out.println(listInt);
}
}

View File

@ -1,197 +0,0 @@
package com.czsj.rpc.util.json;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
/**
*
* @Author: czsj
* @Date: 2022/9/16 11:14
* @Description: JsonReader的父类
**/
public class BasicJsonReader {
private static Logger logger = LoggerFactory.getLogger(BasicJsonwriter.class);
/**
*
* @param json 类型{}
* @return 将JSON转换为Map
*/
public Map<String, Object> parseMap(String json) {
if (json != null) {
json = json.trim();
if (json.startsWith("{")) {
return parseMapInternal(json);
}
}
throw new IllegalArgumentException("Cannot parse JSON");
}
/**
*
* @param json []
* @return 将Json数组转换为List集合
*/
public List<Object> parseList(String json) {
if (json != null) {
json = json.trim();
if (json.startsWith("[")) {
return parseListInternal(json);
}
}
throw new IllegalArgumentException("Cannot parse JSON");
}
/**
*
* @param json
* @return
*/
private List<Object> parseListInternal(String json) {
List<Object> list = new ArrayList<Object>();
json = trimLeadingCharacter(trimTrailingCharacter(json, ']'), '[');
for (String value : tokenize(json)) {
list.add(parseInternal(value));
}
return list;
}
private Object parseInternal(String json) {
if (json.equals("null")) {
return null;
}
if (json.startsWith("[")) {
return parseListInternal(json);
}
if (json.startsWith("{")) {
return parseMapInternal(json);
}
if (json.startsWith("\"")) {
return trimTrailingCharacter(trimLeadingCharacter(json, '"'), '"');
}
try {
return Long.valueOf(json);
} catch (NumberFormatException ex) {
// ignore
}
try {
return Double.valueOf(json);
} catch (NumberFormatException ex) {
// ignore
}
return json;
}
private Map<String, Object> parseMapInternal(String json) {
Map<String, Object> map = new LinkedHashMap<String, Object>();
json = trimLeadingCharacter(trimTrailingCharacter(json, '}'), '{');
for (String pair : tokenize(json)) {
String[] values = trimArrayElements(split(pair, ":"));
String key = trimLeadingCharacter(trimTrailingCharacter(values[0], '"'), '"');
Object value = parseInternal(values[1]);
map.put(key, value);
}
return map;
}
// append start
private static String[] split(String toSplit, String delimiter) {
if (toSplit != null && !toSplit.isEmpty() && delimiter != null && !delimiter.isEmpty()) {
int offset = toSplit.indexOf(delimiter);
if (offset < 0) {
return null;
} else {
String beforeDelimiter = toSplit.substring(0, offset);
String afterDelimiter = toSplit.substring(offset + delimiter.length());
return new String[]{beforeDelimiter, afterDelimiter};
}
} else {
return null;
}
}
private static String[] trimArrayElements(String[] array) {
if (array == null || array.length == 0) {
return new String[0];
} else {
String[] result = new String[array.length];
for (int i = 0; i < array.length; ++i) {
String element = array[i];
result[i] = element != null ? element.trim() : null;
}
return result;
}
}
private List<String> tokenize(String json) {
List<String> list = new ArrayList<>();
int index = 0;
int inObject = 0;
int inList = 0;
boolean inValue = false;
boolean inEscape = false;
StringBuilder build = new StringBuilder();
while (index < json.length()) {
char current = json.charAt(index);
if (inEscape) {
build.append(current);
index++;
inEscape = false;
continue;
}
if (current == '{') {
inObject++;
}
if (current == '}') {
inObject--;
}
if (current == '[') {
inList++;
}
if (current == ']') {
inList--;
}
if (current == '"') {
inValue = !inValue;
}
if (current == ',' && inObject == 0 && inList == 0 && !inValue) {
list.add(build.toString());
build.setLength(0);
} else if (current == '\\') {
inEscape = true;
} else {
build.append(current);
}
index++;
}
if (build.length() > 0) {
list.add(build.toString());
}
return list;
}
// plugin util
private static String trimTrailingCharacter(String string, char c) {
if (string.length() > 0 && string.charAt(string.length() - 1) == c) {
return string.substring(0, string.length() - 1);
}
return string;
}
private static String trimLeadingCharacter(String string, char c) {
if (string.length() > 0 && string.charAt(0) == c) {
return string.substring(1);
}
return string;
}
}

View File

@ -1,188 +0,0 @@
package com.czsj.rpc.util.json;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.*;
/**
*
* @Author: czsj
* @Date: 2022/9/16 11:14
* @Description: JsonWriter的父类
**/
public class BasicJsonwriter {
private static Logger logger = LoggerFactory.getLogger(BasicJsonwriter.class);
private static final String STR_SLASH = "\"";
private static final String STR_SLASH_STR = "\":";
private static final String STR_COMMA = ",";
private static final String STR_OBJECT_LEFT = "{";
private static final String STR_OBJECT_RIGHT = "}";
private static final String STR_ARRAY_LEFT = "[";
private static final String STR_ARRAY_RIGHT = "]";
private static final Map<String, Field[]> cacheFields = new HashMap<>();
/**
*
* @param object
* @return 将JSON转换为String字符串
*/
public String toJson(Object object) {
StringBuilder json = new StringBuilder();
try {
writeObjItem(null, object, json);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
String str = json.toString();
if (str.contains("\n")) {
str = str.replaceAll("\\n", "\\\\n");
}
if (str.contains("\t")) {
str = str.replaceAll("\\t", "\\\\t");
}
if (str.contains("\r")) {
str = str.replaceAll("\\r", "\\\\r");
}
return str;
}
/**
* 返回json格式的键值对
* @param key :
* @param value :
* @param json :返回值 "key":value or value
*/
private void writeObjItem(String key, Object value, StringBuilder json) {
// "key:"
if (key != null) {
json.append(STR_SLASH).append(key).append(STR_SLASH_STR);
}
// val
if (value == null) {
json.append("null");
} else if (value instanceof String || value instanceof Byte || value instanceof CharSequence) {
// string
json.append(STR_SLASH).append(value.toString()).append(STR_SLASH);
} else if (value instanceof Boolean || value instanceof Short || value instanceof Integer
|| value instanceof Long || value instanceof Float || value instanceof Double) {
// number
json.append(value);
} else if (value instanceof Object[] || value instanceof Collection) {
// collection | array // Array.getLength(array); // Array.get(array, i);
Collection valueColl = null;
if (value instanceof Object[]) {
Object[] valueArr = (Object[]) value;
valueColl = Arrays.asList(valueArr);
} else if (value instanceof Collection) {
valueColl = (Collection) value;
}
json.append(STR_ARRAY_LEFT);
if (valueColl.size() > 0) {
for (Object obj : valueColl) {
writeObjItem(null, obj, json);
json.append(STR_COMMA);
}
json.delete(json.length() - 1, json.length());
}
json.append(STR_ARRAY_RIGHT);
} else if (value instanceof Map) {
// map
Map<?, ?> valueMap = (Map<?, ?>) value;
json.append(STR_OBJECT_LEFT);
if (!valueMap.isEmpty()) {
Set<?> keys = valueMap.keySet();
for (Object valueMapItemKey : keys) {
writeObjItem(valueMapItemKey.toString(), valueMap.get(valueMapItemKey), json);
json.append(STR_COMMA);
}
json.delete(json.length() - 1, json.length());
}
json.append(STR_OBJECT_RIGHT);
} else {
json.append(STR_OBJECT_LEFT);
Field[] fields = getDeclaredFields(value.getClass());
if (fields.length > 0) {
for (Field field : fields) {
Object fieldObj = getFieldObject(field, value);
writeObjItem(field.getName(), fieldObj, json);
json.append(STR_COMMA);
}
json.delete(json.length() - 1, json.length());
}
json.append(STR_OBJECT_RIGHT);
}
}
/**
*
* @param clazz
* @return 返回类的字段名数组并缓存
*/
public synchronized Field[] getDeclaredFields(Class<?> clazz) {
String cacheKey = clazz.getName();
if (cacheFields.containsKey(cacheKey)) {
return cacheFields.get(cacheKey);
}
Field[] fields = getAllDeclaredFields(clazz); //clazz.getDeclaredFields();
cacheFields.put(cacheKey, fields);
return fields;
}
/**
*
* @param clazz
* @return 通过反射返回类的字段名数组
*/
private Field[] getAllDeclaredFields(Class<?> clazz) {
List<Field> list = new ArrayList<Field>();
Class<?> current = clazz;
while (current != null && current != Object.class) {
Field[] fields = current.getDeclaredFields();
for (Field field : fields) {
if (Modifier.isStatic(field.getModifiers())) {
continue;
}
list.add(field);
}
current = current.getSuperclass();
}
return list.toArray(new Field[list.size()]);
}
/**
*
* @param field
* @param obj
* @return 反射获取字段对象
*/
private synchronized Object getFieldObject(Field field, Object obj) {
try {
field.setAccessible(true);
return field.get(obj);
} catch (IllegalArgumentException | IllegalAccessException e) {
logger.error(e.getMessage(), e);
return null;
} finally {
field.setAccessible(false);
}
}
}