From ee5cc55a7640f330fc219fe3035196ff73193b89 Mon Sep 17 00:00:00 2001 From: Kris <2893855659@qq.com> Date: Tue, 17 Dec 2024 17:51:53 +0800 Subject: [PATCH] =?UTF-8?q?[feat]=20=E5=88=9D=E5=A7=8B=E5=8C=96=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E4=B8=AD=E5=8F=B0=E6=A1=86=E6=9E=B6=5F02?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ruoshui/rpc/registry/ServiceRegistry.java | 67 ---- .../registry/impl/LocalServiceRegistry.java | 88 ----- .../invoker/XxlRpcInvokerFactory.java | 179 ---------- .../invoker/annotation/XxlRpcReference.java | 42 --- .../rpc/remoting/invoker/call/CallType.java | 30 -- .../invoker/call/XxlRpcInvokeCallback.java | 49 --- .../invoker/call/XxlRpcInvokeFuture.java | 106 ------ .../invoker/generic/XxlRpcGenericService.java | 23 -- .../impl/XxlRpcSpringInvokerFactory.java | 142 -------- .../reference/XxlRpcReferenceBean.java | 320 ------------------ .../impl/XxlRpcSpringReferenceBean.java | 51 --- .../remoting/invoker/route/LoadBalance.java | 58 ---- .../invoker/route/XxlRpcLoadBalance.java | 16 - ...lRpcLoadBalanceConsistentHashStrategy.java | 86 ----- .../impl/XxlRpcLoadBalanceLFUStrategy.java | 76 ----- .../impl/XxlRpcLoadBalanceLRUStrategy.java | 79 ----- .../impl/XxlRpcLoadBalanceRandomStrategy.java | 27 -- .../impl/XxlRpcLoadBalanceRoundStrategy.java | 43 --- .../com/ruoshui/rpc/remoting/net/Client.java | 37 -- .../com/ruoshui/rpc/remoting/net/Server.java | 70 ---- .../remoting/net/common/ConnectClient.java | 124 ------- .../remoting/net/common/NettyConstant.java | 6 - .../net/impl/netty/client/NettyClient.java | 21 -- .../impl/netty/client/NettyClientHandler.java | 56 --- .../impl/netty/client/NettyConnectClient.java | 98 ------ .../net/impl/netty/codec/NettyDecoder.java | 45 --- .../net/impl/netty/codec/NettyEncoder.java | 31 -- .../net/impl/netty/server/NettyServer.java | 117 ------- .../impl/netty/server/NettyServerHandler.java | 81 ----- .../netty_http/client/NettyHttpClient.java | 21 -- .../client/NettyHttpClientHandler.java | 85 ----- .../client/NettyHttpConnectClient.java | 116 ------- .../netty_http/server/NettyHttpServer.java | 109 ------ .../server/NettyHttpServerHandler.java | 150 -------- .../rpc/remoting/net/params/BaseCallback.java | 10 - .../ruoshui/rpc/remoting/net/params/Beat.java | 20 -- .../net/params/XxlRpcFutureResponse.java | 127 ------- .../remoting/net/params/XxlRpcRequest.java | 104 ------ .../remoting/net/params/XxlRpcResponse.java | 52 --- .../provider/XxlRpcProviderFactory.java | 256 -------------- .../provider/annotation/XxlRpcService.java | 20 -- .../impl/XxlRpcSpringProviderFactory.java | 56 --- .../com/ruoshui/rpc/serialize/Serializer.java | 45 --- .../rpc/serialize/impl/HessianSerializer.java | 67 ---- .../java/com/ruoshui/rpc/util/ClassUtil.java | 46 --- .../java/com/ruoshui/rpc/util/IpUtil.java | 195 ----------- .../java/com/ruoshui/rpc/util/NetUtil.java | 69 ---- .../com/ruoshui/rpc/util/ThreadPoolUtil.java | 25 -- .../com/ruoshui/rpc/util/ThrowableUtil.java | 21 -- .../com/ruoshui/rpc/util/XxlRpcException.java | 25 -- .../com/ruoshui/rpc/util/json/BasicJson.java | 68 ---- .../rpc/util/json/BasicJsonReader.java | 197 ----------- .../rpc/util/json/BasicJsonwriter.java | 188 ---------- 53 files changed, 4240 deletions(-) delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/registry/ServiceRegistry.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/registry/impl/LocalServiceRegistry.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/XxlRpcInvokerFactory.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/annotation/XxlRpcReference.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/call/CallType.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/call/XxlRpcInvokeCallback.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/call/XxlRpcInvokeFuture.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/generic/XxlRpcGenericService.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/impl/XxlRpcSpringInvokerFactory.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/reference/XxlRpcReferenceBean.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/reference/impl/XxlRpcSpringReferenceBean.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/route/LoadBalance.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/route/XxlRpcLoadBalance.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/route/impl/XxlRpcLoadBalanceConsistentHashStrategy.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/route/impl/XxlRpcLoadBalanceLFUStrategy.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/route/impl/XxlRpcLoadBalanceLRUStrategy.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/route/impl/XxlRpcLoadBalanceRandomStrategy.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/route/impl/XxlRpcLoadBalanceRoundStrategy.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/Client.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/Server.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/common/ConnectClient.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/common/NettyConstant.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/impl/netty/client/NettyClient.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/impl/netty/client/NettyClientHandler.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/impl/netty/client/NettyConnectClient.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/impl/netty/codec/NettyDecoder.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/impl/netty/codec/NettyEncoder.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/impl/netty/server/NettyServer.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/impl/netty/server/NettyServerHandler.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/impl/netty_http/client/NettyHttpClient.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/impl/netty_http/client/NettyHttpClientHandler.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/impl/netty_http/client/NettyHttpConnectClient.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/impl/netty_http/server/NettyHttpServer.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/impl/netty_http/server/NettyHttpServerHandler.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/params/BaseCallback.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/params/Beat.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/params/XxlRpcFutureResponse.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/params/XxlRpcRequest.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/params/XxlRpcResponse.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/provider/XxlRpcProviderFactory.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/provider/annotation/XxlRpcService.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/provider/impl/XxlRpcSpringProviderFactory.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/serialize/Serializer.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/serialize/impl/HessianSerializer.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/util/ClassUtil.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/util/IpUtil.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/util/NetUtil.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/util/ThreadPoolUtil.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/util/ThrowableUtil.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/util/XxlRpcException.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/util/json/BasicJson.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/util/json/BasicJsonReader.java delete mode 100644 czsj-rpc/src/main/java/com/ruoshui/rpc/util/json/BasicJsonwriter.java diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/registry/ServiceRegistry.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/registry/ServiceRegistry.java deleted file mode 100644 index 94696a6..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/registry/ServiceRegistry.java +++ /dev/null @@ -1,67 +0,0 @@ -package com.czsj.rpc.registry; - - -import java.util.Map; -import java.util.Set; -import java.util.TreeSet; - -/** - * - * @Author: czsj - * @Date: 2022/9/16 11:14 - * @Description: service registry - * - * /xxl-rpc/dev/ - * - key01(service01) - * - value01 (ip:port01) - * - value02 (ip:port02) - **/ -public abstract class ServiceRegistry { - - /** - * start - */ - public abstract void start(Map param); - - /** - * start - */ - public abstract void stop(); - - - /** - * registry service, for mult - * - * @param keys service key - * @param value service value/ip:port - * @return - */ - public abstract boolean registry(Set keys, String value); - - - /** - * remove service, for mult - * - * @param keys - * @param value - * @return - */ - public abstract boolean remove(Set keys, String value); - - /** - * discovery services, for mult - * - * @param keys - * @return - */ - public abstract Map> discovery(Set keys); - - /** - * discovery service, for one - * - * @param key service key - * @return service value/ip:port - */ - public abstract TreeSet discovery(String key); - -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/registry/impl/LocalServiceRegistry.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/registry/impl/LocalServiceRegistry.java deleted file mode 100644 index d6e920b..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/registry/impl/LocalServiceRegistry.java +++ /dev/null @@ -1,88 +0,0 @@ -package com.czsj.rpc.registry.impl; - -import com.czsj.rpc.registry.ServiceRegistry; - -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.TreeSet; - -/** - * - * @Author: czsj - * @Date: 2022/9/16 11:14 - * @Description: service registry for "local" - **/ -public class LocalServiceRegistry extends ServiceRegistry { - - /** - * registry data - */ - private Map> registryData; - - - /** - * @param param ignore, not use - */ - @Override - public void start(Map param) { - registryData = new HashMap<>(); - } - - @Override - public void stop() { - registryData.clear(); - } - - - @Override - public boolean registry(Set keys, String value) { - if (keys == null || keys.size() == 0 || value == null || value.trim().length() == 0) { - return false; - } - for (String key : keys) { - TreeSet values = registryData.get(key); - if (values == null) { - values = new TreeSet<>(); - registryData.put(key, values); - } - values.add(value); - } - return true; - } - - @Override - public boolean remove(Set keys, String value) { - if (keys == null || keys.size() == 0 || value == null || value.trim().length() == 0) { - return false; - } - for (String key : keys) { - TreeSet values = registryData.get(key); - if (values != null) { - values.remove(value); - } - } - return true; - } - - @Override - public Map> discovery(Set keys) { - if (keys == null || keys.size() == 0) { - return null; - } - Map> registryDataTmp = new HashMap>(); - for (String key : keys) { - TreeSet valueSetTmp = discovery(key); - if (valueSetTmp != null) { - registryDataTmp.put(key, valueSetTmp); - } - } - return registryDataTmp; - } - - @Override - public TreeSet discovery(String key) { - return registryData.get(key); - } - -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/XxlRpcInvokerFactory.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/XxlRpcInvokerFactory.java deleted file mode 100644 index d087fc8..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/XxlRpcInvokerFactory.java +++ /dev/null @@ -1,179 +0,0 @@ -package com.czsj.rpc.remoting.invoker; - -import com.czsj.rpc.registry.ServiceRegistry; -import com.czsj.rpc.registry.impl.LocalServiceRegistry; -import com.czsj.rpc.remoting.net.params.BaseCallback; -import com.czsj.rpc.remoting.net.params.XxlRpcFutureResponse; -import com.czsj.rpc.remoting.net.params.XxlRpcResponse; -import com.czsj.rpc.util.XxlRpcException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.*; - -/** - * xxl-rpc invoker factory, init service-registry - * - * @author xuxueli 2018-10-19 - */ -public class XxlRpcInvokerFactory { - private static Logger logger = LoggerFactory.getLogger(XxlRpcInvokerFactory.class); - - // ---------------------- default instance ---------------------- - - private static volatile XxlRpcInvokerFactory instance = new XxlRpcInvokerFactory(LocalServiceRegistry.class, null); - - public static XxlRpcInvokerFactory getInstance() { - return instance; - } - - - // ---------------------- config ---------------------- - - private Class serviceRegistryClass; // class.forname - private Map serviceRegistryParam; - - - public XxlRpcInvokerFactory() { - } - - public XxlRpcInvokerFactory(Class serviceRegistryClass, Map serviceRegistryParam) { - this.serviceRegistryClass = serviceRegistryClass; - this.serviceRegistryParam = serviceRegistryParam; - } - - - // ---------------------- start / stop ---------------------- - - public void start() throws Exception { - // start registry - if (serviceRegistryClass != null) { - serviceRegistry = serviceRegistryClass.newInstance(); - serviceRegistry.start(serviceRegistryParam); - } - } - - public void stop() throws Exception { - // stop registry - if (serviceRegistry != null) { - serviceRegistry.stop(); - } - - // stop callback - if (stopCallbackList.size() > 0) { - for (BaseCallback callback : stopCallbackList) { - try { - callback.run(); - } catch (Exception e) { - logger.error(e.getMessage(), e); - } - } - } - - // stop CallbackThreadPool - stopCallbackThreadPool(); - } - - - // ---------------------- service registry ---------------------- - - private ServiceRegistry serviceRegistry; - - public ServiceRegistry getServiceRegistry() { - return serviceRegistry; - } - - - // ---------------------- service registry ---------------------- - - private List stopCallbackList = new ArrayList(); - - public void addStopCallBack(BaseCallback callback) { - stopCallbackList.add(callback); - } - - - // ---------------------- future-response pool ---------------------- - - // XxlRpcFutureResponseFactory - - private ConcurrentMap futureResponsePool = new ConcurrentHashMap(); - - public void setInvokerFuture(String requestId, XxlRpcFutureResponse futureResponse) { - futureResponsePool.put(requestId, futureResponse); - } - - public void removeInvokerFuture(String requestId) { - futureResponsePool.remove(requestId); - } - - public void notifyInvokerFuture(String requestId, final XxlRpcResponse xxlRpcResponse) { - - // get - final XxlRpcFutureResponse futureResponse = futureResponsePool.get(requestId); - if (futureResponse == null) { - return; - } - - // notify - if (futureResponse.getInvokeCallback() != null) { - - // callback type - try { - executeResponseCallback(() -> { - if (xxlRpcResponse.getErrorMsg() != null) { - futureResponse.getInvokeCallback().onFailure(new XxlRpcException(xxlRpcResponse.getErrorMsg())); - } else { - futureResponse.getInvokeCallback().onSuccess(xxlRpcResponse.getResult()); - } - }); - } catch (Exception e) { - logger.error(e.getMessage(), e); - } - } else { - - // other nomal type - futureResponse.setResponse(xxlRpcResponse); - } - - // do remove - futureResponsePool.remove(requestId); - - } - - - // ---------------------- response callback ThreadPool ---------------------- - - private ThreadPoolExecutor responseCallbackThreadPool = null; - - public void executeResponseCallback(Runnable runnable) { - - if (responseCallbackThreadPool == null) { - synchronized (this) { - if (responseCallbackThreadPool == null) { - responseCallbackThreadPool = new ThreadPoolExecutor( - 10, - 100, - 60L, - TimeUnit.SECONDS, - new LinkedBlockingQueue<>(1000), - r -> new Thread(r, "xxl-rpc, XxlRpcInvokerFactory-responseCallbackThreadPool-" + r.hashCode()), - (r, executor) -> { - throw new XxlRpcException("xxl-rpc Invoke Callback Thread pool is EXHAUSTED!"); - }); // default maxThreads 300, minThreads 60 - } - } - } - responseCallbackThreadPool.execute(runnable); - } - - public void stopCallbackThreadPool() { - if (responseCallbackThreadPool != null) { - responseCallbackThreadPool.shutdown(); - } - } - -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/annotation/XxlRpcReference.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/annotation/XxlRpcReference.java deleted file mode 100644 index 125b6c6..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/annotation/XxlRpcReference.java +++ /dev/null @@ -1,42 +0,0 @@ -package com.czsj.rpc.remoting.invoker.annotation; - -import com.czsj.rpc.remoting.invoker.call.CallType; -import com.czsj.rpc.remoting.invoker.route.LoadBalance; -import com.czsj.rpc.remoting.net.Client; -import com.czsj.rpc.remoting.net.impl.netty.client.NettyClient; -import com.czsj.rpc.serialize.Serializer; -import com.czsj.rpc.serialize.impl.HessianSerializer; - -import java.lang.annotation.*; - -/** - * - * @Author: czsj - * @Date: 2022/9/16 11:14 - * @Description: rpc service annotation, skeleton of stub ("@Inherited" allow service use "Transactional") - **/ -@Target({ElementType.FIELD}) -@Retention(RetentionPolicy.RUNTIME) -@Inherited -public @interface XxlRpcReference { - - Class client() default NettyClient.class; - - Class serializer() default HessianSerializer.class; - - CallType callType() default CallType.SYNC; - - LoadBalance loadBalance() default LoadBalance.ROUND; - - //Class iface; - String version() default ""; - - long timeout() default 1000; - - String address() default ""; - - String accessToken() default ""; - - //XxlRpcInvokeCallback invokeCallback() ; - -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/call/CallType.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/call/CallType.java deleted file mode 100644 index 2787f80..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/call/CallType.java +++ /dev/null @@ -1,30 +0,0 @@ -package com.czsj.rpc.remoting.invoker.call; - -/** - * - * @Author: czsj - * @Date: 2022/9/16 11:14 - * @Description: 远程调用的类型 - **/ -public enum CallType { - - - SYNC, - - FUTURE, - - CALLBACK, - - ONEWAY; - - - public static CallType match(String name, CallType defaultCallType) { - for (CallType item : CallType.values()) { - if (item.name().equals(name)) { - return item; - } - } - return defaultCallType; - } - -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/call/XxlRpcInvokeCallback.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/call/XxlRpcInvokeCallback.java deleted file mode 100644 index c92d11e..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/call/XxlRpcInvokeCallback.java +++ /dev/null @@ -1,49 +0,0 @@ -package com.czsj.rpc.remoting.invoker.call; - - -/** - * - * @Author: czsj - * @Date: 2022/9/16 11:14 - * @Description: - **/ -public abstract class XxlRpcInvokeCallback { - - public abstract void onSuccess(T result); - - public abstract void onFailure(Throwable exception); - - - // ---------------------- thread invoke callback ---------------------- - - private static ThreadLocal threadInvokerFuture = new ThreadLocal(); - - /** - * get callback - * - * @return - */ - public static XxlRpcInvokeCallback getCallback() { - XxlRpcInvokeCallback invokeCallback = threadInvokerFuture.get(); - threadInvokerFuture.remove(); - return invokeCallback; - } - - /** - * set future - * - * @param invokeCallback - */ - public static void setCallback(XxlRpcInvokeCallback invokeCallback) { - threadInvokerFuture.set(invokeCallback); - } - - /** - * remove future - */ - public static void removeCallback() { - threadInvokerFuture.remove(); - } - - -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/call/XxlRpcInvokeFuture.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/call/XxlRpcInvokeFuture.java deleted file mode 100644 index 0d2be97..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/call/XxlRpcInvokeFuture.java +++ /dev/null @@ -1,106 +0,0 @@ -package com.czsj.rpc.remoting.invoker.call; - -import com.czsj.rpc.remoting.net.params.XxlRpcFutureResponse; -import com.czsj.rpc.remoting.net.params.XxlRpcResponse; -import com.czsj.rpc.util.XxlRpcException; - -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -/** - * - * @Author: czsj - * @Date: 2022/9/16 11:14 - * @Description: - **/ -public class XxlRpcInvokeFuture implements Future { - - - private XxlRpcFutureResponse futureResponse; - - public XxlRpcInvokeFuture(XxlRpcFutureResponse futureResponse) { - this.futureResponse = futureResponse; - } - - public void stop() { - // remove-InvokerFuture - futureResponse.removeInvokerFuture(); - } - - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - return futureResponse.cancel(mayInterruptIfRunning); - } - - @Override - public boolean isCancelled() { - return futureResponse.isCancelled(); - } - - @Override - public boolean isDone() { - return futureResponse.isDone(); - } - - @Override - public Object get() throws ExecutionException, InterruptedException { - try { - return get(-1, TimeUnit.MILLISECONDS); - } catch (TimeoutException e) { - throw new XxlRpcException(e); - } - } - - @Override - public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - try { - // future get - XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, unit); - if (xxlRpcResponse.getErrorMsg() != null) { - throw new XxlRpcException(xxlRpcResponse.getErrorMsg()); - } - return xxlRpcResponse.getResult(); - } finally { - stop(); - } - } - - - // ---------------------- thread invoke future ---------------------- - - private static ThreadLocal threadInvokerFuture = new ThreadLocal(); - - /** - * get future - * - * @param type - * @param - * @return - */ - public static Future getFuture(Class type) { - Future future = (Future) threadInvokerFuture.get(); - threadInvokerFuture.remove(); - return future; - } - - /** - * set future - * - * @param future - */ - public static void setFuture(XxlRpcInvokeFuture future) { - threadInvokerFuture.set(future); - } - - /** - * remove future - */ - public static void removeFuture() { - threadInvokerFuture.remove(); - } - -} - diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/generic/XxlRpcGenericService.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/generic/XxlRpcGenericService.java deleted file mode 100644 index 2509e53..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/generic/XxlRpcGenericService.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.czsj.rpc.remoting.invoker.generic; - -/** - * - * @Author: czsj - * @Date: 2022/9/16 11:14 - * @Description: - **/ -public interface XxlRpcGenericService { - - /** - * generic invoke - * - * @param iface iface name - * @param version iface version - * @param method method name - * @param parameterTypes parameter types, limit base type like "int、java.lang.Integer、java.util.List、java.util.Map ..." - * @param args - * @return - */ - Object invoke(String iface, String version, String method, String[] parameterTypes, Object[] args); - -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/impl/XxlRpcSpringInvokerFactory.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/impl/XxlRpcSpringInvokerFactory.java deleted file mode 100644 index fde9497..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/impl/XxlRpcSpringInvokerFactory.java +++ /dev/null @@ -1,142 +0,0 @@ -package com.czsj.rpc.remoting.invoker.impl; - -import com.czsj.rpc.registry.ServiceRegistry; -import com.czsj.rpc.remoting.invoker.XxlRpcInvokerFactory; -import com.czsj.rpc.remoting.invoker.annotation.XxlRpcReference; -import com.czsj.rpc.remoting.invoker.reference.XxlRpcReferenceBean; -import com.czsj.rpc.remoting.provider.XxlRpcProviderFactory; -import com.czsj.rpc.util.XxlRpcException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.BeansException; -import org.springframework.beans.factory.BeanFactory; -import org.springframework.beans.factory.BeanFactoryAware; -import org.springframework.beans.factory.DisposableBean; -import org.springframework.beans.factory.InitializingBean; -import org.springframework.beans.factory.config.InstantiationAwareBeanPostProcessorAdapter; -import org.springframework.util.ReflectionUtils; - -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -/** - * - * @author xuxueli 2018-10-19 - */ -/** - * - * @Author: czsj - * @Date: 2022/9/16 11:14 - * @Description: - **/ -public class XxlRpcSpringInvokerFactory extends InstantiationAwareBeanPostProcessorAdapter implements InitializingBean, DisposableBean, BeanFactoryAware { - private Logger logger = LoggerFactory.getLogger(XxlRpcSpringInvokerFactory.class); - - // ---------------------- config ---------------------- - - private Class serviceRegistryClass; // class.forname - private Map serviceRegistryParam; - - - public void setServiceRegistryClass(Class serviceRegistryClass) { - this.serviceRegistryClass = serviceRegistryClass; - } - - public void setServiceRegistryParam(Map serviceRegistryParam) { - this.serviceRegistryParam = serviceRegistryParam; - } - - - // ---------------------- util ---------------------- - - private XxlRpcInvokerFactory xxlRpcInvokerFactory; - - @Override - public void afterPropertiesSet() throws Exception { - // start invoker factory - xxlRpcInvokerFactory = new XxlRpcInvokerFactory(serviceRegistryClass, serviceRegistryParam); - xxlRpcInvokerFactory.start(); - } - - @Override - public boolean postProcessAfterInstantiation(final Object bean, final String beanName) throws BeansException { - - // collection - final Set serviceKeyList = new HashSet<>(); - - // parse XxlRpcReferenceBean - ReflectionUtils.doWithFields(bean.getClass(), field -> { - if (field.isAnnotationPresent(XxlRpcReference.class)) { - // valid - Class iface = field.getType(); - if (!iface.isInterface()) { - throw new XxlRpcException("xxl-rpc, reference(XxlRpcReference) must be interface."); - } - - XxlRpcReference rpcReference = field.getAnnotation(XxlRpcReference.class); - - // init reference bean - XxlRpcReferenceBean referenceBean = new XxlRpcReferenceBean(); - referenceBean.setClient(rpcReference.client()); - referenceBean.setSerializer(rpcReference.serializer()); - referenceBean.setCallType(rpcReference.callType()); - referenceBean.setLoadBalance(rpcReference.loadBalance()); - referenceBean.setIface(iface); - referenceBean.setVersion(rpcReference.version()); - referenceBean.setTimeout(rpcReference.timeout()); - referenceBean.setAddress(rpcReference.address()); - referenceBean.setAccessToken(rpcReference.accessToken()); - referenceBean.setInvokeCallback(null); - referenceBean.setInvokerFactory(xxlRpcInvokerFactory); - - - // get proxyObj - Object serviceProxy; - try { - serviceProxy = referenceBean.getObject(); - } catch (Exception e) { - throw new RuntimeException(e); - } - - // set bean - field.setAccessible(true); - field.set(bean, serviceProxy); - - logger.info(">>>>>>>>>>> xxl-rpc, invoker factory init reference bean success. serviceKey = {}, bean.field = {}.{}", - XxlRpcProviderFactory.makeServiceKey(iface.getName(), rpcReference.version()), beanName, field.getName()); - - // collection - String serviceKey = XxlRpcProviderFactory.makeServiceKey(iface.getName(), rpcReference.version()); - serviceKeyList.add(serviceKey); - - } - }); - - // mult discovery - if (xxlRpcInvokerFactory.getServiceRegistry() != null) { - try { - xxlRpcInvokerFactory.getServiceRegistry().discovery(serviceKeyList); - } catch (Exception e) { - logger.error(e.getMessage(), e); - } - } - - return super.postProcessAfterInstantiation(bean, beanName); - } - - - @Override - public void destroy() throws Exception { - - // stop invoker factory - xxlRpcInvokerFactory.stop(); - } - - private BeanFactory beanFactory; - - @Override - public void setBeanFactory(BeanFactory beanFactory) throws BeansException { - this.beanFactory = beanFactory; - } -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/reference/XxlRpcReferenceBean.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/reference/XxlRpcReferenceBean.java deleted file mode 100644 index bf8e64b..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/reference/XxlRpcReferenceBean.java +++ /dev/null @@ -1,320 +0,0 @@ -package com.czsj.rpc.remoting.invoker.reference; - -import com.czsj.rpc.remoting.invoker.XxlRpcInvokerFactory; -import com.czsj.rpc.remoting.invoker.call.CallType; -import com.czsj.rpc.remoting.invoker.call.XxlRpcInvokeCallback; -import com.czsj.rpc.remoting.invoker.call.XxlRpcInvokeFuture; -import com.czsj.rpc.remoting.invoker.generic.XxlRpcGenericService; -import com.czsj.rpc.remoting.invoker.route.LoadBalance; -import com.czsj.rpc.remoting.net.Client; -import com.czsj.rpc.remoting.net.impl.netty.client.NettyClient; -import com.czsj.rpc.remoting.net.params.XxlRpcFutureResponse; -import com.czsj.rpc.remoting.net.params.XxlRpcRequest; -import com.czsj.rpc.remoting.net.params.XxlRpcResponse; -import com.czsj.rpc.remoting.provider.XxlRpcProviderFactory; -import com.czsj.rpc.serialize.Serializer; -import com.czsj.rpc.serialize.impl.HessianSerializer; -import com.czsj.rpc.util.ClassUtil; -import com.czsj.rpc.util.XxlRpcException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.lang.reflect.Proxy; -import java.util.TreeSet; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - -/** - * rpc reference bean, use by api - * - * @author xuxueli 2015-10-29 20:18:32 - */ -public class XxlRpcReferenceBean { - private static final Logger logger = LoggerFactory.getLogger(XxlRpcReferenceBean.class); - // [tips01: save 30ms/100invoke. why why why??? with this logger, it can save lots of time.] - - - // ---------------------- config ---------------------- - - private Class client = NettyClient.class; - private Class serializer = HessianSerializer.class; - private CallType callType = CallType.SYNC; - private LoadBalance loadBalance = LoadBalance.ROUND; - - private Class iface = null; - private String version = null; - - private long timeout = 10000; - - private String address = null; - private String accessToken = null; - - private XxlRpcInvokeCallback invokeCallback = null; - - private XxlRpcInvokerFactory invokerFactory = null; - - - // set - public void setClient(Class client) { - this.client = client; - } - - public void setSerializer(Class serializer) { - this.serializer = serializer; - } - - public void setCallType(CallType callType) { - this.callType = callType; - } - - public void setLoadBalance(LoadBalance loadBalance) { - this.loadBalance = loadBalance; - } - - public void setIface(Class iface) { - this.iface = iface; - } - - public void setVersion(String version) { - this.version = version; - } - - public void setTimeout(long timeout) { - this.timeout = timeout; - } - - public void setAddress(String address) { - this.address = address; - } - - public void setAccessToken(String accessToken) { - this.accessToken = accessToken; - } - - public void setInvokeCallback(XxlRpcInvokeCallback invokeCallback) { - this.invokeCallback = invokeCallback; - } - - public void setInvokerFactory(XxlRpcInvokerFactory invokerFactory) { - this.invokerFactory = invokerFactory; - } - - - // get - public Serializer getSerializerInstance() { - return serializerInstance; - } - - public long getTimeout() { - return timeout; - } - - public XxlRpcInvokerFactory getInvokerFactory() { - return invokerFactory; - } - - public Class getIface() { - return iface; - } - - - // ---------------------- initClient ---------------------- - - private Client clientInstance = null; - private Serializer serializerInstance = null; - - public XxlRpcReferenceBean initClient() throws Exception { - - // valid - if (this.client == null) { - throw new XxlRpcException("xxl-rpc reference client missing."); - } - if (this.serializer == null) { - throw new XxlRpcException("xxl-rpc reference serializer missing."); - } - if (this.callType == null) { - throw new XxlRpcException("xxl-rpc reference callType missing."); - } - if (this.loadBalance == null) { - throw new XxlRpcException("xxl-rpc reference loadBalance missing."); - } - if (this.iface == null) { - throw new XxlRpcException("xxl-rpc reference iface missing."); - } - if (this.timeout < 0) { - this.timeout = 0; - } - if (this.invokerFactory == null) { - this.invokerFactory = XxlRpcInvokerFactory.getInstance(); - } - - // init serializerInstance - this.serializerInstance = serializer.newInstance(); - - // init Client - clientInstance = client.newInstance(); - clientInstance.init(this); - - return this; - } - - - // ---------------------- util ---------------------- - - public Object getObject() throws Exception { - - // initClient - initClient(); - - // newProxyInstance - return Proxy.newProxyInstance(Thread.currentThread() - .getContextClassLoader(), new Class[]{iface}, - (proxy, method, args) -> { - - // method param - String className = method.getDeclaringClass().getName(); // iface.getName() - String varsion_ = version; - String methodName = method.getName(); - Class[] parameterTypes = method.getParameterTypes(); - Object[] parameters = args; - - // filter for generic - if (className.equals(XxlRpcGenericService.class.getName()) && methodName.equals("invoke")) { - - Class[] paramTypes = null; - if (args[3] != null) { - String[] paramTypes_str = (String[]) args[3]; - if (paramTypes_str.length > 0) { - paramTypes = new Class[paramTypes_str.length]; - for (int i = 0; i < paramTypes_str.length; i++) { - paramTypes[i] = ClassUtil.resolveClass(paramTypes_str[i]); - } - } - } - - className = (String) args[0]; - varsion_ = (String) args[1]; - methodName = (String) args[2]; - parameterTypes = paramTypes; - parameters = (Object[]) args[4]; - } - - // filter method like "Object.toString()" - if (className.equals(Object.class.getName())) { - logger.info(">>>>>>>>>>> xxl-rpc proxy class-method not support [{}#{}]", className, methodName); - throw new XxlRpcException("xxl-rpc proxy class-method not support"); - } - - // address - String finalAddress = address; - if (finalAddress == null || finalAddress.trim().length() == 0) { - if (invokerFactory != null && invokerFactory.getServiceRegistry() != null) { - // discovery - String serviceKey = XxlRpcProviderFactory.makeServiceKey(className, varsion_); - TreeSet addressSet = invokerFactory.getServiceRegistry().discovery(serviceKey); - // load balance - if (addressSet == null || addressSet.size() == 0) { - // pass - } else if (addressSet.size() == 1) { - finalAddress = addressSet.first(); - } else { - finalAddress = loadBalance.xxlRpcInvokerRouter.route(serviceKey, addressSet); - } - - } - } - if (finalAddress == null || finalAddress.trim().length() == 0) { - throw new XxlRpcException("xxl-rpc reference bean[" + className + "] address empty"); - } - - // request - XxlRpcRequest xxlRpcRequest = new XxlRpcRequest(); - xxlRpcRequest.setRequestId(UUID.randomUUID().toString()); - xxlRpcRequest.setCreateMillisTime(System.currentTimeMillis()); - xxlRpcRequest.setAccessToken(accessToken); - xxlRpcRequest.setClassName(className); - xxlRpcRequest.setMethodName(methodName); - xxlRpcRequest.setParameterTypes(parameterTypes); - xxlRpcRequest.setParameters(parameters); - xxlRpcRequest.setVersion(version); - - // send - if (CallType.SYNC == callType) { - // future-response set - XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null); - try { - // do invoke - clientInstance.asyncSend(finalAddress, xxlRpcRequest); - - // future get - XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS); - if (xxlRpcResponse.getErrorMsg() != null) { - throw new XxlRpcException(xxlRpcResponse.getErrorMsg()); - } - return xxlRpcResponse.getResult(); - } catch (Exception e) { - logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest); - - throw (e instanceof XxlRpcException) ? e : new XxlRpcException(e); - } finally { - // future-response remove - futureResponse.removeInvokerFuture(); - } - } else if (CallType.FUTURE == callType) { - // future-response set - XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null); - try { - // invoke future set - XxlRpcInvokeFuture invokeFuture = new XxlRpcInvokeFuture(futureResponse); - XxlRpcInvokeFuture.setFuture(invokeFuture); - -// do invoke - clientInstance.asyncSend(finalAddress, xxlRpcRequest); - - return null; - } catch (Exception e) { - logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest); - - // future-response remove - futureResponse.removeInvokerFuture(); - - throw (e instanceof XxlRpcException) ? e : new XxlRpcException(e); - } - - } else if (CallType.CALLBACK == callType) { - - // get callback - XxlRpcInvokeCallback finalInvokeCallback = invokeCallback; - XxlRpcInvokeCallback threadInvokeCallback = XxlRpcInvokeCallback.getCallback(); - if (threadInvokeCallback != null) { - finalInvokeCallback = threadInvokeCallback; - } - if (finalInvokeCallback == null) { - throw new XxlRpcException("xxl-rpc XxlRpcInvokeCallback(CallType=" + CallType.CALLBACK.name() + ") cannot be null."); - } - - // future-response set - XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, finalInvokeCallback); - try { - clientInstance.asyncSend(finalAddress, xxlRpcRequest); - } catch (Exception e) { - logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest); - - // future-response remove - futureResponse.removeInvokerFuture(); - - throw (e instanceof XxlRpcException) ? e : new XxlRpcException(e); - } - - return null; - } else if (CallType.ONEWAY == callType) { - clientInstance.asyncSend(finalAddress, xxlRpcRequest); - return null; - } else { - throw new XxlRpcException("xxl-rpc callType[" + callType + "] invalid"); - } - - }); - } - -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/reference/impl/XxlRpcSpringReferenceBean.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/reference/impl/XxlRpcSpringReferenceBean.java deleted file mode 100644 index f7ca938..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/reference/impl/XxlRpcSpringReferenceBean.java +++ /dev/null @@ -1,51 +0,0 @@ -package com.czsj.rpc.remoting.invoker.reference.impl; - -import com.czsj.rpc.remoting.invoker.reference.XxlRpcReferenceBean; -import org.springframework.beans.factory.FactoryBean; -import org.springframework.beans.factory.InitializingBean; - -/** - * rpc reference bean, use by spring xml and annotation (for spring) - * - * @author xuxueli 2015-10-29 20:18:32 - */ -public class XxlRpcSpringReferenceBean implements FactoryBean, InitializingBean { - - - // ---------------------- util ---------------------- - - private XxlRpcReferenceBean xxlRpcReferenceBean; - - @Override - public void afterPropertiesSet() { - - // init config - this.xxlRpcReferenceBean = new XxlRpcReferenceBean(); - } - - - @Override - public Object getObject() throws Exception { - return xxlRpcReferenceBean.getObject(); - } - - @Override - public Class getObjectType() { - return xxlRpcReferenceBean.getIface(); - } - - @Override - public boolean isSingleton() { - return false; - } - - - /** - * public static ClientProxy ClientProxy getFuture(Class type) { - * ClientProxy proxy = () new ClientProxy(); - * return proxy; - * } - */ - - -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/route/LoadBalance.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/route/LoadBalance.java deleted file mode 100644 index c1f562f..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/route/LoadBalance.java +++ /dev/null @@ -1,58 +0,0 @@ -package com.czsj.rpc.remoting.invoker.route; - -import com.czsj.rpc.remoting.invoker.route.impl.*; - -/** - * @author xuxueli 2018-12-04 - */ -public enum LoadBalance { - - RANDOM(new XxlRpcLoadBalanceRandomStrategy()), - ROUND(new XxlRpcLoadBalanceRoundStrategy()), - LRU(new XxlRpcLoadBalanceLRUStrategy()), - LFU(new XxlRpcLoadBalanceLFUStrategy()), - CONSISTENT_HASH(new XxlRpcLoadBalanceConsistentHashStrategy()); - - - public final XxlRpcLoadBalance xxlRpcInvokerRouter; - - private LoadBalance(XxlRpcLoadBalance xxlRpcInvokerRouter) { - this.xxlRpcInvokerRouter = xxlRpcInvokerRouter; - } - - - public static LoadBalance match(String name, LoadBalance defaultRouter) { - for (LoadBalance item : LoadBalance.values()) { - if (item.equals(name)) { - return item; - } - } - return defaultRouter; - } - - - - /*public static void main(String[] args) { - String serviceKey = "service"; - TreeSet addressSet = new TreeSet(){{ - add("1"); - add("2"); - add("3"); - add("4"); - add("5"); - }}; - - for (LoadBalance item : LoadBalance.values()) { - long start = System.currentTimeMillis(); - for (int i = 0; i < 100000; i++) { - String address = LoadBalance.LFU.xxlRpcInvokerRouter.route(serviceKey, addressSet); - //System.out.println(address);; - } - long end = System.currentTimeMillis(); - System.out.println(item.name() + " --- " + (end-start)); - } - - }*/ - - -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/route/XxlRpcLoadBalance.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/route/XxlRpcLoadBalance.java deleted file mode 100644 index fa22656..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/route/XxlRpcLoadBalance.java +++ /dev/null @@ -1,16 +0,0 @@ -package com.czsj.rpc.remoting.invoker.route; - -import java.util.TreeSet; - -/** - * 分组下机器地址相同,不同JOB均匀散列在不同机器上,保证分组下机器分配JOB平均;且每个JOB固定调度其中一台机器; - * a、virtual node:解决不均衡问题 - * b、hash method replace hashCode:String的hashCode可能重复,需要进一步扩大hashCode的取值范围 - * - * @author xuxueli 2018-12-04 - */ -public abstract class XxlRpcLoadBalance { - - public abstract String route(String serviceKey, TreeSet addressSet); - -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/route/impl/XxlRpcLoadBalanceConsistentHashStrategy.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/route/impl/XxlRpcLoadBalanceConsistentHashStrategy.java deleted file mode 100644 index fcf9a47..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/route/impl/XxlRpcLoadBalanceConsistentHashStrategy.java +++ /dev/null @@ -1,86 +0,0 @@ -package com.czsj.rpc.remoting.invoker.route.impl; - -import com.czsj.rpc.remoting.invoker.route.XxlRpcLoadBalance; - -import java.io.UnsupportedEncodingException; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.TreeSet; - -/** - * consustent hash - * - * 单个JOB对应的每个执行器,使用频率最低的优先被选举 - * a(*)、LFU(Least Frequently Used):最不经常使用,频率/次数 - * b、LRU(Least Recently Used):最近最久未使用,时间 - * - * @author xuxueli 2018-12-04 - */ -public class XxlRpcLoadBalanceConsistentHashStrategy extends XxlRpcLoadBalance { - - private int VIRTUAL_NODE_NUM = 5; - - /** - * get hash code on 2^32 ring (md5散列的方式计算hash值) - * @param key - * @return - */ - private long hash(String key) { - - // md5 byte - MessageDigest md5; - try { - md5 = MessageDigest.getInstance("MD5"); - } catch (NoSuchAlgorithmException e) { - throw new RuntimeException("MD5 not supported", e); - } - md5.reset(); - byte[] keyBytes = null; - try { - keyBytes = key.getBytes("UTF-8"); - } catch (UnsupportedEncodingException e) { - throw new RuntimeException("Unknown string :" + key, e); - } - - md5.update(keyBytes); - byte[] digest = md5.digest(); - - // hash code, Truncate to 32-bits - long hashCode = ((long) (digest[3] & 0xFF) << 24) - | ((long) (digest[2] & 0xFF) << 16) - | ((long) (digest[1] & 0xFF) << 8) - | (digest[0] & 0xFF); - - long truncateHashCode = hashCode & 0xffffffffL; - return truncateHashCode; - } - - public String doRoute(String serviceKey, TreeSet addressSet) { - - // ------A1------A2-------A3------ - // -----------J1------------------ - TreeMap addressRing = new TreeMap(); - for (String address: addressSet) { - for (int i = 0; i < VIRTUAL_NODE_NUM; i++) { - long addressHash = hash("SHARD-" + address + "-NODE-" + i); - addressRing.put(addressHash, address); - } - } - - long jobHash = hash(serviceKey); - SortedMap lastRing = addressRing.tailMap(jobHash); - if (!lastRing.isEmpty()) { - return lastRing.get(lastRing.firstKey()); - } - return addressRing.firstEntry().getValue(); - } - - @Override - public String route(String serviceKey, TreeSet addressSet) { - String finalAddress = doRoute(serviceKey, addressSet); - return finalAddress; - } - -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/route/impl/XxlRpcLoadBalanceLFUStrategy.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/route/impl/XxlRpcLoadBalanceLFUStrategy.java deleted file mode 100644 index 8b0a88d..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/route/impl/XxlRpcLoadBalanceLFUStrategy.java +++ /dev/null @@ -1,76 +0,0 @@ -package com.czsj.rpc.remoting.invoker.route.impl; - -import com.czsj.rpc.remoting.invoker.route.XxlRpcLoadBalance; - -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -/** - * lru - * - * @author xuxueli 2018-12-04 - */ -public class XxlRpcLoadBalanceLFUStrategy extends XxlRpcLoadBalance { - - private ConcurrentMap> jobLfuMap = new ConcurrentHashMap>(); - private long CACHE_VALID_TIME = 0; - - public String doRoute(String serviceKey, TreeSet addressSet) { - - // cache clear - if (System.currentTimeMillis() > CACHE_VALID_TIME) { - jobLfuMap.clear(); - CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24; - } - - // lfu item init - HashMap lfuItemMap = jobLfuMap.get(serviceKey); // Key排序可以用TreeMap+构造入参Compare;Value排序暂时只能通过ArrayList; - if (lfuItemMap == null) { - lfuItemMap = new HashMap(); - jobLfuMap.putIfAbsent(serviceKey, lfuItemMap); // 避免重复覆盖 - } - - // put new - for (String address: addressSet) { - if (!lfuItemMap.containsKey(address) || lfuItemMap.get(address) >1000000 ) { - lfuItemMap.put(address, 0); - } - } - - // remove old - List delKeys = new ArrayList<>(); - for (String existKey: lfuItemMap.keySet()) { - if (!addressSet.contains(existKey)) { - delKeys.add(existKey); - } - } - if (delKeys.size() > 0) { - for (String delKey: delKeys) { - lfuItemMap.remove(delKey); - } - } - - // load least userd count address - List> lfuItemList = new ArrayList>(lfuItemMap.entrySet()); - Collections.sort(lfuItemList, new Comparator>() { - @Override - public int compare(Map.Entry o1, Map.Entry o2) { - return o1.getValue().compareTo(o2.getValue()); - } - }); - - Map.Entry addressItem = lfuItemList.get(0); - String minAddress = addressItem.getKey(); - addressItem.setValue(addressItem.getValue() + 1); - - return minAddress; - } - - @Override - public String route(String serviceKey, TreeSet addressSet) { - String finalAddress = doRoute(serviceKey, addressSet); - return finalAddress; - } - -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/route/impl/XxlRpcLoadBalanceLRUStrategy.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/route/impl/XxlRpcLoadBalanceLRUStrategy.java deleted file mode 100644 index 23f84e9..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/route/impl/XxlRpcLoadBalanceLRUStrategy.java +++ /dev/null @@ -1,79 +0,0 @@ -package com.czsj.rpc.remoting.invoker.route.impl; - -import com.czsj.rpc.remoting.invoker.route.XxlRpcLoadBalance; - -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -/** - * lru - * - * @author xuxueli 2018-12-04 - */ -public class XxlRpcLoadBalanceLRUStrategy extends XxlRpcLoadBalance { - - private ConcurrentMap> jobLRUMap = new ConcurrentHashMap>(); - private long CACHE_VALID_TIME = 0; - - public String doRoute(String serviceKey, TreeSet addressSet) { - - // cache clear - if (System.currentTimeMillis() > CACHE_VALID_TIME) { - jobLRUMap.clear(); - CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24; - } - - // init lru - LinkedHashMap lruItem = jobLRUMap.get(serviceKey); - if (lruItem == null) { - /** - * LinkedHashMap - * a、accessOrder:ture=访问顺序排序(get/put时排序)/ACCESS-LAST;false=插入顺序排期/FIFO; - * b、removeEldestEntry:新增元素时将会调用,返回true时会删除最老元素;可封装LinkedHashMap并重写该方法,比如定义最大容量,超出是返回true即可实现固定长度的LRU算法; - */ - lruItem = new LinkedHashMap(16, 0.75f, true){ - @Override - protected boolean removeEldestEntry(Map.Entry eldest) { - if(super.size() > 1000){ - return true; - }else{ - return false; - } - } - }; - jobLRUMap.putIfAbsent(serviceKey, lruItem); - } - - // put new - for (String address: addressSet) { - if (!lruItem.containsKey(address)) { - lruItem.put(address, address); - } - } - // remove old - List delKeys = new ArrayList<>(); - for (String existKey: lruItem.keySet()) { - if (!addressSet.contains(existKey)) { - delKeys.add(existKey); - } - } - if (delKeys.size() > 0) { - for (String delKey: delKeys) { - lruItem.remove(delKey); - } - } - - // load - String eldestKey = lruItem.entrySet().iterator().next().getKey(); - String eldestValue = lruItem.get(eldestKey); - return eldestValue; - } - - @Override - public String route(String serviceKey, TreeSet addressSet) { - String finalAddress = doRoute(serviceKey, addressSet); - return finalAddress; - } - -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/route/impl/XxlRpcLoadBalanceRandomStrategy.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/route/impl/XxlRpcLoadBalanceRandomStrategy.java deleted file mode 100644 index 10662d7..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/route/impl/XxlRpcLoadBalanceRandomStrategy.java +++ /dev/null @@ -1,27 +0,0 @@ -package com.czsj.rpc.remoting.invoker.route.impl; - -import com.czsj.rpc.remoting.invoker.route.XxlRpcLoadBalance; - -import java.util.Random; -import java.util.TreeSet; - -/** - * random - * - * @author xuxueli 2018-12-04 - */ -public class XxlRpcLoadBalanceRandomStrategy extends XxlRpcLoadBalance { - - private Random random = new Random(); - - @Override - public String route(String serviceKey, TreeSet addressSet) { - // arr - String[] addressArr = addressSet.toArray(new String[addressSet.size()]); - - // random - String finalAddress = addressArr[random.nextInt(addressSet.size())]; - return finalAddress; - } - -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/route/impl/XxlRpcLoadBalanceRoundStrategy.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/route/impl/XxlRpcLoadBalanceRoundStrategy.java deleted file mode 100644 index 5337cd3..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/invoker/route/impl/XxlRpcLoadBalanceRoundStrategy.java +++ /dev/null @@ -1,43 +0,0 @@ -package com.czsj.rpc.remoting.invoker.route.impl; - -import com.czsj.rpc.remoting.invoker.route.XxlRpcLoadBalance; - -import java.util.Random; -import java.util.TreeSet; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -/** - * round - * - * @author xuxueli 2018-12-04 - */ -public class XxlRpcLoadBalanceRoundStrategy extends XxlRpcLoadBalance { - - private ConcurrentMap routeCountEachJob = new ConcurrentHashMap(); - private long CACHE_VALID_TIME = 0; - private int count(String serviceKey) { - // cache clear - if (System.currentTimeMillis() > CACHE_VALID_TIME) { - routeCountEachJob.clear(); - CACHE_VALID_TIME = System.currentTimeMillis() + 24*60*60*1000; - } - - // count++ - Integer count = routeCountEachJob.get(serviceKey); - count = (count==null || count>1000000)?(new Random().nextInt(100)):++count; // 初始化时主动Random一次,缓解首次压力 - routeCountEachJob.put(serviceKey, count); - return count; - } - - @Override - public String route(String serviceKey, TreeSet addressSet) { - // arr - String[] addressArr = addressSet.toArray(new String[addressSet.size()]); - - // round - String finalAddress = addressArr[count(serviceKey)%addressArr.length]; - return finalAddress; - } - -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/Client.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/Client.java deleted file mode 100644 index 8c52d22..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/Client.java +++ /dev/null @@ -1,37 +0,0 @@ -package com.czsj.rpc.remoting.net; - -import com.czsj.rpc.remoting.invoker.reference.XxlRpcReferenceBean; -import com.czsj.rpc.remoting.net.params.XxlRpcRequest; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * i client - * @author xuxueli 2015-11-24 22:18:10 - */ -public abstract class Client { - protected static final Logger logger = LoggerFactory.getLogger(Client.class); - - - // ---------------------- init ---------------------- - - protected volatile XxlRpcReferenceBean xxlRpcReferenceBean; - - public void init(XxlRpcReferenceBean xxlRpcReferenceBean) { - this.xxlRpcReferenceBean = xxlRpcReferenceBean; - } - - - // ---------------------- send ---------------------- - - /** - * async send, bind requestId and future-response - * - * @param address - * @param xxlRpcRequest - * @return - * @throws Exception - */ - public abstract void asyncSend(String address, XxlRpcRequest xxlRpcRequest) throws Exception; - -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/Server.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/Server.java deleted file mode 100644 index 9cd35cb..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/Server.java +++ /dev/null @@ -1,70 +0,0 @@ -package com.czsj.rpc.remoting.net; - -import com.czsj.rpc.remoting.net.params.BaseCallback; -import com.czsj.rpc.remoting.provider.XxlRpcProviderFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * server - * - * @author xuxueli 2015-11-24 20:59:49 - */ -public abstract class Server { - protected static final Logger logger = LoggerFactory.getLogger(Server.class); - - - private BaseCallback startedCallback; - private BaseCallback stopedCallback; - - public void setStartedCallback(BaseCallback startedCallback) { - this.startedCallback = startedCallback; - } - - public void setStopedCallback(BaseCallback stopedCallback) { - this.stopedCallback = stopedCallback; - } - - - /** - * start server - * - * @param xxlRpcProviderFactory - * @throws Exception - */ - public abstract void start(final XxlRpcProviderFactory xxlRpcProviderFactory) throws Exception; - - /** - * callback when started - */ - public void onStarted() { - if (startedCallback != null) { - try { - startedCallback.run(); - } catch (Exception e) { - logger.error(">>>>>>>>>>> xxl-rpc, server startedCallback error.", e); - } - } - } - - /** - * stop server - * - * @throws Exception - */ - public abstract void stop() throws Exception; - - /** - * callback when stoped - */ - public void onStopped() { - if (stopedCallback != null) { - try { - stopedCallback.run(); - } catch (Exception e) { - logger.error(">>>>>>>>>>> xxl-rpc, server stopedCallback error.", e); - } - } - } - -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/common/ConnectClient.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/common/ConnectClient.java deleted file mode 100644 index 9bca432..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/common/ConnectClient.java +++ /dev/null @@ -1,124 +0,0 @@ -package com.czsj.rpc.remoting.net.common; - -import com.czsj.rpc.remoting.invoker.XxlRpcInvokerFactory; -import com.czsj.rpc.remoting.invoker.reference.XxlRpcReferenceBean; -import com.czsj.rpc.remoting.net.params.BaseCallback; -import com.czsj.rpc.remoting.net.params.XxlRpcRequest; -import com.czsj.rpc.serialize.Serializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -/** - * @author xuxueli 2018-10-19 - */ -public abstract class ConnectClient { - protected static transient Logger logger = LoggerFactory.getLogger(ConnectClient.class); - - // ---------------------- iface ---------------------- - - public abstract void init(String address, final Serializer serializer, final XxlRpcInvokerFactory xxlRpcInvokerFactory) throws Exception; - - public abstract void close(); - - public abstract boolean isValidate(); - - public abstract void send(XxlRpcRequest xxlRpcRequest) throws Exception; - - - // ---------------------- client pool map ---------------------- - - /** - * async send - */ - public static void asyncSend(XxlRpcRequest xxlRpcRequest, String address, - Class connectClientImpl, - final XxlRpcReferenceBean xxlRpcReferenceBean) throws Exception { - - // client pool [tips03 : may save 35ms/100invoke if move it to constructor, but it is necessary. cause by ConcurrentHashMap.get] - ConnectClient clientPool = ConnectClient.getPool(address, connectClientImpl, xxlRpcReferenceBean); - - try { - // do invoke - clientPool.send(xxlRpcRequest); - } catch (Exception e) { - throw e; - } - - } - - private static volatile ConcurrentMap connectClientMap; // (static) alread addStopCallBack - private static volatile ConcurrentMap connectClientLockMap = new ConcurrentHashMap<>(); - - private static ConnectClient getPool(String address, Class connectClientImpl, - final XxlRpcReferenceBean xxlRpcReferenceBean) throws Exception { - - // init base compont, avoid repeat init - if (connectClientMap == null) { - synchronized (ConnectClient.class) { - if (connectClientMap == null) { - // init - connectClientMap = new ConcurrentHashMap(); - // stop callback - xxlRpcReferenceBean.getInvokerFactory().addStopCallBack(new BaseCallback() { - @Override - public void run() throws Exception { - if (connectClientMap.size() > 0) { - for (String key : connectClientMap.keySet()) { - ConnectClient clientPool = connectClientMap.get(key); - clientPool.close(); - } - connectClientMap.clear(); - } - } - }); - } - } - } - - // get-valid client - ConnectClient connectClient = connectClientMap.get(address); - if (connectClient != null && connectClient.isValidate()) { - return connectClient; - } - - // lock - Object clientLock = connectClientLockMap.get(address); - if (clientLock == null) { - connectClientLockMap.putIfAbsent(address, new Object()); - clientLock = connectClientLockMap.get(address); - } - - // remove-create new client - synchronized (clientLock) { - - // get-valid client, avlid repeat - connectClient = connectClientMap.get(address); - if (connectClient != null && connectClient.isValidate()) { - return connectClient; - } - - // remove old - if (connectClient != null) { - connectClient.close(); - connectClientMap.remove(address); - } - - // set pool - ConnectClient connectClient_new = connectClientImpl.newInstance(); - try { - connectClient_new.init(address, xxlRpcReferenceBean.getSerializerInstance(), xxlRpcReferenceBean.getInvokerFactory()); - connectClientMap.put(address, connectClient_new); - } catch (Exception e) { - connectClient_new.close(); - throw e; - } - - return connectClient_new; - } - - } - -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/common/NettyConstant.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/common/NettyConstant.java deleted file mode 100644 index 645b875..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/common/NettyConstant.java +++ /dev/null @@ -1,6 +0,0 @@ -package com.czsj.rpc.remoting.net.common; - -public class NettyConstant { - - public static int MAX_LENGTH = 20 * 1024 * 1024; -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/impl/netty/client/NettyClient.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/impl/netty/client/NettyClient.java deleted file mode 100644 index 7b41659..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/impl/netty/client/NettyClient.java +++ /dev/null @@ -1,21 +0,0 @@ -package com.czsj.rpc.remoting.net.impl.netty.client; - -import com.czsj.rpc.remoting.net.Client; -import com.czsj.rpc.remoting.net.common.ConnectClient; -import com.czsj.rpc.remoting.net.params.XxlRpcRequest; - -/** - * netty client - * - * @author xuxueli 2015-11-24 22:25:15 - */ -public class NettyClient extends Client { - - private Class connectClientImpl = NettyConnectClient.class; - - @Override - public void asyncSend(String address, XxlRpcRequest xxlRpcRequest) throws Exception { - ConnectClient.asyncSend(xxlRpcRequest, address, connectClientImpl, xxlRpcReferenceBean); - } - -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/impl/netty/client/NettyClientHandler.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/impl/netty/client/NettyClientHandler.java deleted file mode 100644 index 3f82070..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/impl/netty/client/NettyClientHandler.java +++ /dev/null @@ -1,56 +0,0 @@ -package com.czsj.rpc.remoting.net.impl.netty.client; - -import com.czsj.rpc.remoting.invoker.XxlRpcInvokerFactory; -import com.czsj.rpc.remoting.net.params.Beat; -import com.czsj.rpc.remoting.net.params.XxlRpcResponse; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.handler.timeout.IdleStateEvent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * rpc netty client handler - * - * @author xuxueli 2015-10-31 18:00:27 - */ -public class NettyClientHandler extends SimpleChannelInboundHandler { - private static final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class); - - - private XxlRpcInvokerFactory xxlRpcInvokerFactory; - private NettyConnectClient nettyConnectClient; - public NettyClientHandler(final XxlRpcInvokerFactory xxlRpcInvokerFactory, NettyConnectClient nettyConnectClient) { - this.xxlRpcInvokerFactory = xxlRpcInvokerFactory; - this.nettyConnectClient = nettyConnectClient; - } - - - @Override - protected void channelRead0(ChannelHandlerContext ctx, XxlRpcResponse xxlRpcResponse) throws Exception { - - // notify response - xxlRpcInvokerFactory.notifyInvokerFuture(xxlRpcResponse.getRequestId(), xxlRpcResponse); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - logger.error(">>>>>>>>>>> xxl-rpc netty client caught exception", cause); - ctx.close(); - } - - @Override - public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - if (evt instanceof IdleStateEvent){ - /*ctx.channel().close(); // close idle channel - logger.debug(">>>>>>>>>>> xxl-rpc netty client close an idle channel.");*/ - - nettyConnectClient.send(Beat.BEAT_PING); // beat N, close if fail(may throw error) - logger.debug(">>>>>>>>>>> xxl-rpc netty client send beat-ping."); - - } else { - super.userEventTriggered(ctx, evt); - } - } - -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/impl/netty/client/NettyConnectClient.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/impl/netty/client/NettyConnectClient.java deleted file mode 100644 index a5f9fec..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/impl/netty/client/NettyConnectClient.java +++ /dev/null @@ -1,98 +0,0 @@ -package com.czsj.rpc.remoting.net.impl.netty.client; - -import com.czsj.rpc.remoting.invoker.XxlRpcInvokerFactory; -import com.czsj.rpc.remoting.net.common.ConnectClient; -import com.czsj.rpc.remoting.net.impl.netty.codec.NettyDecoder; -import com.czsj.rpc.remoting.net.impl.netty.codec.NettyEncoder; -import com.czsj.rpc.remoting.net.params.Beat; -import com.czsj.rpc.remoting.net.params.XxlRpcRequest; -import com.czsj.rpc.remoting.net.params.XxlRpcResponse; -import com.czsj.rpc.serialize.Serializer; -import com.czsj.rpc.util.IpUtil; -import io.netty.bootstrap.Bootstrap; -import io.netty.channel.Channel; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.handler.timeout.IdleStateHandler; - -import java.util.concurrent.TimeUnit; - -/** - * netty pooled client - * - * @author xuxueli - */ -public class NettyConnectClient extends ConnectClient { - - - private EventLoopGroup group; - private Channel channel; - - - @Override - public void init(String address, final Serializer serializer, final XxlRpcInvokerFactory xxlRpcInvokerFactory) throws Exception { - final NettyConnectClient thisClient = this; - - Object[] array = IpUtil.parseIpPort(address); - String host = (String) array[0]; - int port = (int) array[1]; - - - this.group = new NioEventLoopGroup(); - Bootstrap bootstrap = new Bootstrap(); - bootstrap.group(group) - .channel(NioSocketChannel.class) - .handler(new ChannelInitializer() { - @Override - public void initChannel(SocketChannel channel) throws Exception { - channel.pipeline() - .addLast(new IdleStateHandler(0,0, Beat.BEAT_INTERVAL, TimeUnit.SECONDS)) // beat N, close if fail - .addLast(new NettyEncoder(XxlRpcRequest.class, serializer)) - .addLast(new NettyDecoder(XxlRpcResponse.class, serializer)) - .addLast(new NettyClientHandler(xxlRpcInvokerFactory, thisClient)); - } - }) - .option(ChannelOption.TCP_NODELAY, true) - .option(ChannelOption.SO_KEEPALIVE, true) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000); - this.channel = bootstrap.connect(host, port).sync().channel(); - - // valid - if (!isValidate()) { - close(); - return; - } - - logger.debug(">>>>>>>>>>> xxl-rpc netty client proxy, connect to server success at host:{}, port:{}", host, port); - } - - - @Override - public boolean isValidate() { - if (this.channel != null) { - return this.channel.isActive(); - } - return false; - } - - @Override - public void close() { - if (this.channel != null && this.channel.isActive()) { - this.channel.close(); // if this.channel.isOpen() - } - if (this.group != null && !this.group.isShutdown()) { - this.group.shutdownGracefully(); - } - logger.debug(">>>>>>>>>>> xxl-rpc netty client close."); - } - - - @Override - public void send(XxlRpcRequest xxlRpcRequest) throws Exception { - this.channel.writeAndFlush(xxlRpcRequest).sync(); - } -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/impl/netty/codec/NettyDecoder.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/impl/netty/codec/NettyDecoder.java deleted file mode 100644 index c5e9423..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/impl/netty/codec/NettyDecoder.java +++ /dev/null @@ -1,45 +0,0 @@ -package com.czsj.rpc.remoting.net.impl.netty.codec; - -import com.czsj.rpc.serialize.Serializer; -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.ByteToMessageDecoder; - -import java.util.List; - -/** - * decoder - * - * @author xuxueli 2015-10-29 19:02:36 - */ -public class NettyDecoder extends ByteToMessageDecoder { - - private Class genericClass; - private Serializer serializer; - - public NettyDecoder(Class genericClass, final Serializer serializer) { - this.genericClass = genericClass; - this.serializer = serializer; - } - - @Override - public final void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { - if (in.readableBytes() < 4) { - return; - } - in.markReaderIndex(); - int dataLength = in.readInt(); - if (dataLength < 0) { - ctx.close(); - } - if (in.readableBytes() < dataLength) { - in.resetReaderIndex(); - return; // fix 1024k buffer splice limix - } - byte[] data = new byte[dataLength]; - in.readBytes(data); - - Object obj = serializer.deserialize(data, genericClass); - out.add(obj); - } -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/impl/netty/codec/NettyEncoder.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/impl/netty/codec/NettyEncoder.java deleted file mode 100644 index 14930bd..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/impl/netty/codec/NettyEncoder.java +++ /dev/null @@ -1,31 +0,0 @@ -package com.czsj.rpc.remoting.net.impl.netty.codec; - -import com.czsj.rpc.serialize.Serializer; -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToByteEncoder; - -/** - * encoder - * - * @author xuxueli 2015-10-29 19:43:00 - */ -public class NettyEncoder extends MessageToByteEncoder { - - private Class genericClass; - private Serializer serializer; - - public NettyEncoder(Class genericClass, final Serializer serializer) { - this.genericClass = genericClass; - this.serializer = serializer; - } - - @Override - public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception { - if (genericClass.isInstance(in)) { - byte[] data = serializer.serialize(in); - out.writeInt(data.length); - out.writeBytes(data); - } - } -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/impl/netty/server/NettyServer.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/impl/netty/server/NettyServer.java deleted file mode 100644 index 11d7056..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/impl/netty/server/NettyServer.java +++ /dev/null @@ -1,117 +0,0 @@ -package com.czsj.rpc.remoting.net.impl.netty.server; - -import com.czsj.rpc.remoting.net.Server; -import com.czsj.rpc.remoting.net.impl.netty.codec.NettyDecoder; -import com.czsj.rpc.remoting.net.impl.netty.codec.NettyEncoder; -import com.czsj.rpc.remoting.net.params.Beat; -import com.czsj.rpc.remoting.net.params.XxlRpcRequest; -import com.czsj.rpc.remoting.net.params.XxlRpcResponse; -import com.czsj.rpc.remoting.provider.XxlRpcProviderFactory; -import com.czsj.rpc.util.ThreadPoolUtil; -import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.handler.timeout.IdleStateHandler; - -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -/** - * netty rpc server - * - * @author xuxueli 2015-10-29 18:17:14 - */ -public class NettyServer extends Server { - - private Thread thread; - - @Override - public void start(final XxlRpcProviderFactory xxlRpcProviderFactory) throws Exception { - - thread = new Thread(new Runnable() { - @Override - public void run() { - - // param - final ThreadPoolExecutor serverHandlerPool = ThreadPoolUtil.makeServerThreadPool( - NettyServer.class.getSimpleName(), - xxlRpcProviderFactory.getCorePoolSize(), - xxlRpcProviderFactory.getMaxPoolSize()); - EventLoopGroup bossGroup = new NioEventLoopGroup(); - EventLoopGroup workerGroup = new NioEventLoopGroup(); - - try { - // start server - ServerBootstrap bootstrap = new ServerBootstrap(); - bootstrap.group(bossGroup, workerGroup) - .channel(NioServerSocketChannel.class) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(SocketChannel channel) throws Exception { - channel.pipeline() - .addLast(new IdleStateHandler(0,0, Beat.BEAT_INTERVAL*3, TimeUnit.SECONDS)) // beat 3N, close if idle - .addLast(new NettyDecoder(XxlRpcRequest.class, xxlRpcProviderFactory.getSerializerInstance())) - .addLast(new NettyEncoder(XxlRpcResponse.class, xxlRpcProviderFactory.getSerializerInstance())) - .addLast(new NettyServerHandler(xxlRpcProviderFactory, serverHandlerPool)); - } - }) - .childOption(ChannelOption.TCP_NODELAY, true) - .childOption(ChannelOption.SO_KEEPALIVE, true); - - // bind - ChannelFuture future = bootstrap.bind(xxlRpcProviderFactory.getPort()).sync(); - - logger.info(">>>>>>>>>>> xxl-rpc remoting server start success, nettype = {}, port = {}", NettyServer.class.getName(), xxlRpcProviderFactory.getPort()); - onStarted(); - - // wait util stop - future.channel().closeFuture().sync(); - - } catch (Exception e) { - if (e instanceof InterruptedException) { - logger.info(">>>>>>>>>>> xxl-rpc remoting server stop."); - } else { - logger.error(">>>>>>>>>>> xxl-rpc remoting server error.", e); - } - } finally { - - // stop - try { - serverHandlerPool.shutdown(); // shutdownNow - } catch (Exception e) { - logger.error(e.getMessage(), e); - } - try { - workerGroup.shutdownGracefully(); - bossGroup.shutdownGracefully(); - } catch (Exception e) { - logger.error(e.getMessage(), e); - } - - } - } - }); - thread.setDaemon(true); - thread.start(); - - } - - @Override - public void stop() throws Exception { - - // destroy server thread - if (thread != null && thread.isAlive()) { - thread.interrupt(); - } - - // on stop - onStopped(); - logger.info(">>>>>>>>>>> xxl-rpc remoting server destroy success."); - } - -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/impl/netty/server/NettyServerHandler.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/impl/netty/server/NettyServerHandler.java deleted file mode 100644 index fb7786f..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/impl/netty/server/NettyServerHandler.java +++ /dev/null @@ -1,81 +0,0 @@ -package com.czsj.rpc.remoting.net.impl.netty.server; - -import com.czsj.rpc.remoting.net.params.Beat; -import com.czsj.rpc.remoting.net.params.XxlRpcRequest; -import com.czsj.rpc.remoting.net.params.XxlRpcResponse; -import com.czsj.rpc.remoting.provider.XxlRpcProviderFactory; -import com.czsj.rpc.util.ThrowableUtil; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.handler.timeout.IdleStateEvent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.ThreadPoolExecutor; - - -/** - * netty server handler - * - * @author xuxueli 2015-10-29 20:07:37 - */ -public class NettyServerHandler extends SimpleChannelInboundHandler { - private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class); - - private XxlRpcProviderFactory xxlRpcProviderFactory; - private ThreadPoolExecutor serverHandlerPool; - - public NettyServerHandler(final XxlRpcProviderFactory xxlRpcProviderFactory, final ThreadPoolExecutor serverHandlerPool) { - this.xxlRpcProviderFactory = xxlRpcProviderFactory; - this.serverHandlerPool = serverHandlerPool; - } - - - @Override - public void channelRead0(final ChannelHandlerContext ctx, final XxlRpcRequest xxlRpcRequest) throws Exception { - - // filter beat - if (Beat.BEAT_ID.equalsIgnoreCase(xxlRpcRequest.getRequestId())){ - logger.debug(">>>>>>>>>>> xxl-rpc provider netty server read beat-ping."); - return; - } - - // do invoke - try { - serverHandlerPool.execute(new Runnable() { - @Override - public void run() { - // invoke + response - XxlRpcResponse xxlRpcResponse = xxlRpcProviderFactory.invokeService(xxlRpcRequest); - - ctx.writeAndFlush(xxlRpcResponse); - } - }); - } catch (Exception e) { - // catch error - XxlRpcResponse xxlRpcResponse = new XxlRpcResponse(); - xxlRpcResponse.setRequestId(xxlRpcRequest.getRequestId()); - xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(e)); - - ctx.writeAndFlush(xxlRpcResponse); - } - - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - logger.error(">>>>>>>>>>> xxl-rpc provider netty server caught exception", cause); - ctx.close(); - } - - @Override - public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - if (evt instanceof IdleStateEvent){ - ctx.channel().close(); // beat 3N, close if idle - logger.debug(">>>>>>>>>>> xxl-rpc provider netty server close an idle channel."); - } else { - super.userEventTriggered(ctx, evt); - } - } - -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/impl/netty_http/client/NettyHttpClient.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/impl/netty_http/client/NettyHttpClient.java deleted file mode 100644 index c1c134a..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/impl/netty_http/client/NettyHttpClient.java +++ /dev/null @@ -1,21 +0,0 @@ -package com.czsj.rpc.remoting.net.impl.netty_http.client; - -import com.czsj.rpc.remoting.net.Client; -import com.czsj.rpc.remoting.net.common.ConnectClient; -import com.czsj.rpc.remoting.net.params.XxlRpcRequest; - -/** - * netty_http client - * - * @author xuxueli 2015-11-24 22:25:15 - */ -public class NettyHttpClient extends Client { - - private Class connectClientImpl = NettyHttpConnectClient.class; - - @Override - public void asyncSend(String address, XxlRpcRequest xxlRpcRequest) throws Exception { - ConnectClient.asyncSend(xxlRpcRequest, address, connectClientImpl, xxlRpcReferenceBean); - } - -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/impl/netty_http/client/NettyHttpClientHandler.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/impl/netty_http/client/NettyHttpClientHandler.java deleted file mode 100644 index 3402ba8..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/impl/netty_http/client/NettyHttpClientHandler.java +++ /dev/null @@ -1,85 +0,0 @@ -package com.czsj.rpc.remoting.net.impl.netty_http.client; - -import com.czsj.rpc.remoting.invoker.XxlRpcInvokerFactory; -import com.czsj.rpc.remoting.net.params.Beat; -import com.czsj.rpc.remoting.net.params.XxlRpcResponse; -import com.czsj.rpc.serialize.Serializer; -import com.czsj.rpc.util.XxlRpcException; -import io.netty.buffer.ByteBufUtil; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.handler.codec.http.FullHttpResponse; -import io.netty.handler.codec.http.HttpResponseStatus; -import io.netty.handler.timeout.IdleStateEvent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * netty_http - * - * @author xuxueli 2015-11-24 22:25:15 - */ -public class NettyHttpClientHandler extends SimpleChannelInboundHandler { - private static final Logger logger = LoggerFactory.getLogger(NettyHttpClientHandler.class); - - - private XxlRpcInvokerFactory xxlRpcInvokerFactory; - private Serializer serializer; - private NettyHttpConnectClient nettyHttpConnectClient; - public NettyHttpClientHandler(final XxlRpcInvokerFactory xxlRpcInvokerFactory, Serializer serializer, final NettyHttpConnectClient nettyHttpConnectClient) { - this.xxlRpcInvokerFactory = xxlRpcInvokerFactory; - this.serializer = serializer; - this.nettyHttpConnectClient = nettyHttpConnectClient; - } - - @Override - protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) throws Exception { - - // valid status - if (!HttpResponseStatus.OK.equals(msg.status())) { - throw new XxlRpcException("xxl-rpc response status invalid."); - } - - // response parse - byte[] responseBytes = ByteBufUtil.getBytes(msg.content()); - - // valid length - if (responseBytes.length == 0) { - throw new XxlRpcException("xxl-rpc response data empty."); - } - - // response deserialize - XxlRpcResponse xxlRpcResponse = (XxlRpcResponse) serializer.deserialize(responseBytes, XxlRpcResponse.class); - - // notify response - xxlRpcInvokerFactory.notifyInvokerFuture(xxlRpcResponse.getRequestId(), xxlRpcResponse); - - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - //super.exceptionCaught(ctx, cause); - logger.error(">>>>>>>>>>> xxl-rpc netty_http client caught exception", cause); - ctx.close(); - } - - /*@Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - // retry - super.channelInactive(ctx); - }*/ - - @Override - public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - if (evt instanceof IdleStateEvent){ - /*ctx.channel().close(); // close idle channel - logger.debug(">>>>>>>>>>> xxl-rpc netty_http client close an idle channel.");*/ - - nettyHttpConnectClient.send(Beat.BEAT_PING); // beat N, close if fail(may throw error) - logger.debug(">>>>>>>>>>> xxl-rpc netty_http client send beat-ping."); - } else { - super.userEventTriggered(ctx, evt); - } - } - -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/impl/netty_http/client/NettyHttpConnectClient.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/impl/netty_http/client/NettyHttpConnectClient.java deleted file mode 100644 index 68776bc..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/impl/netty_http/client/NettyHttpConnectClient.java +++ /dev/null @@ -1,116 +0,0 @@ -package com.czsj.rpc.remoting.net.impl.netty_http.client; - -import com.czsj.rpc.remoting.invoker.XxlRpcInvokerFactory; -import com.czsj.rpc.remoting.net.common.ConnectClient; -import com.czsj.rpc.remoting.net.common.NettyConstant; -import com.czsj.rpc.remoting.net.params.Beat; -import com.czsj.rpc.remoting.net.params.XxlRpcRequest; -import com.czsj.rpc.serialize.Serializer; -import io.netty.bootstrap.Bootstrap; -import io.netty.buffer.Unpooled; -import io.netty.channel.Channel; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.handler.codec.http.*; -import io.netty.handler.timeout.IdleStateHandler; - -import java.net.URI; -import java.net.URL; -import java.util.concurrent.TimeUnit; - -/** - * netty_http - * - * @author xuxueli 2015-11-24 22:25:15 - */ -public class NettyHttpConnectClient extends ConnectClient { - - private EventLoopGroup group; - private Channel channel; - - private Serializer serializer; - private String address; - private String host; - private DefaultFullHttpRequest beatRequest; - - @Override - public void init(String address, final Serializer serializer, final XxlRpcInvokerFactory xxlRpcInvokerFactory) throws Exception { - final NettyHttpConnectClient thisClient = this; - - if (!address.toLowerCase().startsWith("http")) { - address = "http://" + address; // IP:PORT, need parse to url - } - - this.address = address; - URL url = new URL(address); - this.host = url.getHost(); - int port = url.getPort() > -1 ? url.getPort() : 80; - - - this.group = new NioEventLoopGroup(); - Bootstrap bootstrap = new Bootstrap(); - bootstrap.group(group) - .channel(NioSocketChannel.class) - .handler(new ChannelInitializer() { - @Override - public void initChannel(SocketChannel channel) throws Exception { - channel.pipeline() - .addLast(new IdleStateHandler(0, 0, Beat.BEAT_INTERVAL, TimeUnit.SECONDS)) // beat N, close if fail - .addLast(new HttpClientCodec()) - .addLast(new HttpObjectAggregator(NettyConstant.MAX_LENGTH)) - .addLast(new NettyHttpClientHandler(xxlRpcInvokerFactory, serializer, thisClient)); - } - }) - .option(ChannelOption.SO_KEEPALIVE, true) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000); - this.channel = bootstrap.connect(host, port).sync().channel(); - - this.serializer = serializer; - - // valid - if (!isValidate()) { - close(); - return; - } - - logger.debug(">>>>>>>>>>> xxl-rpc netty client proxy, connect to server success at host:{}, port:{}", host, port); - } - - @Override - public boolean isValidate() { - if (this.channel != null) { - return this.channel.isActive(); - } - return false; - } - - - @Override - public void close() { - if (this.channel != null && this.channel.isActive()) { - this.channel.close(); // if this.channel.isOpen() - } - if (this.group != null && !this.group.isShutdown()) { - this.group.shutdownGracefully(); - } - logger.debug(">>>>>>>>>>> xxl-rpc netty client close."); - } - - - @Override - public void send(XxlRpcRequest xxlRpcRequest) throws Exception { - byte[] requestBytes = serializer.serialize(xxlRpcRequest); - - DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, new URI(address).getRawPath(), Unpooled.wrappedBuffer(requestBytes)); - request.headers().set(HttpHeaderNames.HOST, host); - request.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); - request.headers().set(HttpHeaderNames.CONTENT_LENGTH, request.content().readableBytes()); - - this.channel.writeAndFlush(request).sync(); - } - -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/impl/netty_http/server/NettyHttpServer.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/impl/netty_http/server/NettyHttpServer.java deleted file mode 100644 index 78f6707..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/impl/netty_http/server/NettyHttpServer.java +++ /dev/null @@ -1,109 +0,0 @@ -package com.czsj.rpc.remoting.net.impl.netty_http.server; - -import com.czsj.rpc.remoting.net.Server; -import com.czsj.rpc.remoting.net.common.NettyConstant; -import com.czsj.rpc.remoting.net.params.Beat; -import com.czsj.rpc.remoting.provider.XxlRpcProviderFactory; -import com.czsj.rpc.util.ThreadPoolUtil; -import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.handler.codec.http.HttpObjectAggregator; -import io.netty.handler.codec.http.HttpServerCodec; -import io.netty.handler.timeout.IdleStateHandler; - -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -/** - * netty_http - * - * @author xuxueli 2015-11-24 22:25:15 - */ -public class NettyHttpServer extends Server { - - private Thread thread; - - @Override - public void start(final XxlRpcProviderFactory xxlRpcProviderFactory) { - - thread = new Thread(() -> { - // param - final ThreadPoolExecutor serverHandlerPool = ThreadPoolUtil.makeServerThreadPool( - NettyHttpServer.class.getSimpleName(), - xxlRpcProviderFactory.getCorePoolSize(), - xxlRpcProviderFactory.getMaxPoolSize()); - EventLoopGroup bossGroup = new NioEventLoopGroup(); - EventLoopGroup workerGroup = new NioEventLoopGroup(); - - try { - // start server - ServerBootstrap bootstrap = new ServerBootstrap(); - bootstrap.group(bossGroup, workerGroup) - .channel(NioServerSocketChannel.class) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(SocketChannel channel) { - channel.pipeline() - .addLast(new IdleStateHandler(0, 0, Beat.BEAT_INTERVAL * 3, TimeUnit.SECONDS)) // beat 3N, close if idle - .addLast(new HttpServerCodec()) - .addLast(new HttpObjectAggregator(NettyConstant.MAX_LENGTH)) // merge request & reponse to FULL - .addLast(new NettyHttpServerHandler(xxlRpcProviderFactory, serverHandlerPool)); - } - }) - .childOption(ChannelOption.SO_KEEPALIVE, true); - - // bind - ChannelFuture future = bootstrap.bind(xxlRpcProviderFactory.getPort()).sync(); - - logger.info(">>>>>>>>>>> xxl-rpc remoting server start success, nettype = {}, port = {}", NettyHttpServer.class.getName(), xxlRpcProviderFactory.getPort()); - onStarted(); - - // wait util stop - future.channel().closeFuture().sync(); - - } catch (InterruptedException e) { - if (e instanceof InterruptedException) { - logger.info(">>>>>>>>>>> xxl-rpc remoting server stop."); - } else { - logger.error(">>>>>>>>>>> xxl-rpc remoting server error.", e); - } - } finally { - - // stop - try { - serverHandlerPool.shutdown(); // shutdownNow - } catch (Exception e) { - logger.error(e.getMessage(), e); - } - try { - workerGroup.shutdownGracefully(); - bossGroup.shutdownGracefully(); - } catch (Exception e) { - logger.error(e.getMessage(), e); - } - } - - }); - thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave - thread.start(); - } - - @Override - public void stop() { - // destroy server thread - if (thread != null && thread.isAlive()) { - thread.interrupt(); - } - - // on stop - onStopped(); - logger.info(">>>>>>>>>>> xxl-rpc remoting server destroy success."); - } - -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/impl/netty_http/server/NettyHttpServerHandler.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/impl/netty_http/server/NettyHttpServerHandler.java deleted file mode 100644 index 442c655..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/impl/netty_http/server/NettyHttpServerHandler.java +++ /dev/null @@ -1,150 +0,0 @@ -package com.czsj.rpc.remoting.net.impl.netty_http.server; - -import com.czsj.rpc.remoting.net.params.Beat; -import com.czsj.rpc.remoting.net.params.XxlRpcRequest; -import com.czsj.rpc.remoting.net.params.XxlRpcResponse; -import com.czsj.rpc.remoting.provider.XxlRpcProviderFactory; -import com.czsj.rpc.util.ThrowableUtil; -import com.czsj.rpc.util.XxlRpcException; -import io.netty.buffer.ByteBufUtil; -import io.netty.buffer.Unpooled; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.handler.codec.http.*; -import io.netty.handler.timeout.IdleStateEvent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.ThreadPoolExecutor; - - -/** - * netty_http - * - * @author xuxueli 2015-11-24 22:25:15 - */ -public class NettyHttpServerHandler extends SimpleChannelInboundHandler { - private static final Logger logger = LoggerFactory.getLogger(NettyHttpServerHandler.class); - - - private XxlRpcProviderFactory xxlRpcProviderFactory; - private ThreadPoolExecutor serverHandlerPool; - - public NettyHttpServerHandler(final XxlRpcProviderFactory xxlRpcProviderFactory, final ThreadPoolExecutor serverHandlerPool) { - this.xxlRpcProviderFactory = xxlRpcProviderFactory; - this.serverHandlerPool = serverHandlerPool; - } - - @Override - protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception { - - // request parse - final byte[] requestBytes = ByteBufUtil.getBytes(msg.content()); // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8); - final String uri = msg.uri(); - final boolean keepAlive = HttpUtil.isKeepAlive(msg); - - // do invoke - serverHandlerPool.execute(new Runnable() { - @Override - public void run() { - process(ctx, uri, requestBytes, keepAlive); - } - }); - } - - private void process(ChannelHandlerContext ctx, String uri, byte[] requestBytes, boolean keepAlive){ - String requestId = null; - try { - if ("/services".equals(uri)) { // services mapping - - // request - StringBuilder stringBuffer = new StringBuilder(""); - for (String serviceKey: xxlRpcProviderFactory.getServiceData().keySet()) { - stringBuffer.append("
  • ").append(serviceKey).append(": ").append(xxlRpcProviderFactory.getServiceData().get(serviceKey)).append("
  • "); - } - stringBuffer.append("
    "); - - // response serialize - byte[] responseBytes = stringBuffer.toString().getBytes("UTF-8"); - - // response-write - writeResponse(ctx, keepAlive, responseBytes); - - } else { - System.out.println("================="); - // valid - if (requestBytes.length == 0) { - throw new XxlRpcException("xxl-rpc request data empty."); - } - - // request deserialize - XxlRpcRequest xxlRpcRequest = (XxlRpcRequest) xxlRpcProviderFactory.getSerializerInstance().deserialize(requestBytes, XxlRpcRequest.class); - requestId = xxlRpcRequest.getRequestId(); - - // filter beat - if (Beat.BEAT_ID.equalsIgnoreCase(xxlRpcRequest.getRequestId())){ - logger.debug(">>>>>>>>>>> xxl-rpc provider netty_http server read beat-ping."); - return; - } - - // invoke + response - XxlRpcResponse xxlRpcResponse = xxlRpcProviderFactory.invokeService(xxlRpcRequest); - - // response serialize - byte[] responseBytes = xxlRpcProviderFactory.getSerializerInstance().serialize(xxlRpcResponse); - - // response-write - writeResponse(ctx, keepAlive, responseBytes); - } - } catch (Exception e) { - logger.error(e.getMessage(), e); - - // response error - XxlRpcResponse xxlRpcResponse = new XxlRpcResponse(); - xxlRpcResponse.setRequestId(requestId); - xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(e)); - - // response serialize - byte[] responseBytes = xxlRpcProviderFactory.getSerializerInstance().serialize(xxlRpcResponse); - - // response-write - writeResponse(ctx, keepAlive, responseBytes); - } - - } - - /** - * write response - */ - private void writeResponse(ChannelHandlerContext ctx, boolean keepAlive, byte[] responseBytes){ - FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(responseBytes)); - response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=UTF-8"); // HttpHeaderValues.TEXT_PLAIN.toString() - response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes()); - if (keepAlive) { - response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); - } - ctx.writeAndFlush(response); - } - - @Override - public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { - ctx.flush(); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - logger.error(">>>>>>>>>>> xxl-rpc provider netty_http server caught exception", cause); - ctx.close(); - } - - @Override - public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - if (evt instanceof IdleStateEvent){ - ctx.channel().close(); // beat 3N, close if idle - logger.debug(">>>>>>>>>>> xxl-rpc provider netty_http server close an idle channel."); - } else { - super.userEventTriggered(ctx, evt); - } - } - -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/params/BaseCallback.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/params/BaseCallback.java deleted file mode 100644 index 5cc4e09..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/params/BaseCallback.java +++ /dev/null @@ -1,10 +0,0 @@ -package com.czsj.rpc.remoting.net.params; - -/** - * @author xuxueli 2018-10-19 - */ -public abstract class BaseCallback { - - public abstract void run() throws Exception; - -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/params/Beat.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/params/Beat.java deleted file mode 100644 index ec39f56..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/params/Beat.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.czsj.rpc.remoting.net.params; - -/** - * beat for keep-alive - * - * @author xuxueli 2019-09-27 - */ -public final class Beat { - - public static final int BEAT_INTERVAL = 30; - public static final String BEAT_ID = "BEAT_PING_PONG"; - - public static XxlRpcRequest BEAT_PING; - - static { - BEAT_PING = new XxlRpcRequest(){}; - BEAT_PING.setRequestId(BEAT_ID); - } - -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/params/XxlRpcFutureResponse.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/params/XxlRpcFutureResponse.java deleted file mode 100644 index afcb043..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/params/XxlRpcFutureResponse.java +++ /dev/null @@ -1,127 +0,0 @@ -package com.czsj.rpc.remoting.net.params; - -import com.czsj.rpc.remoting.invoker.XxlRpcInvokerFactory; -import com.czsj.rpc.remoting.invoker.call.XxlRpcInvokeCallback; -import com.czsj.rpc.util.XxlRpcException; - -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -/** - * call back future - * - * @author xuxueli 2015-11-5 14:26:37 - */ -public class XxlRpcFutureResponse implements Future { - - private XxlRpcInvokerFactory invokerFactory; - - // net data - private XxlRpcRequest request; - private XxlRpcResponse response; - - // future lock - private boolean done = false; - private Object lock = new Object(); - - // callback, can be null - private XxlRpcInvokeCallback invokeCallback; - - - public XxlRpcFutureResponse(final XxlRpcInvokerFactory invokerFactory, XxlRpcRequest request, XxlRpcInvokeCallback invokeCallback) { - this.invokerFactory = invokerFactory; - this.request = request; - this.invokeCallback = invokeCallback; - - // set-InvokerFuture - setInvokerFuture(); - } - - - // ---------------------- response pool ---------------------- - - public void setInvokerFuture() { - this.invokerFactory.setInvokerFuture(request.getRequestId(), this); - } - - public void removeInvokerFuture() { - this.invokerFactory.removeInvokerFuture(request.getRequestId()); - } - - - // ---------------------- get ---------------------- - - public XxlRpcRequest getRequest() { - return request; - } - - public XxlRpcInvokeCallback getInvokeCallback() { - return invokeCallback; - } - - - // ---------------------- for invoke back ---------------------- - - public void setResponse(XxlRpcResponse response) { - this.response = response; - synchronized (lock) { - done = true; - lock.notifyAll(); - } - } - - - // ---------------------- for invoke ---------------------- - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - // TODO - return false; - } - - @Override - public boolean isCancelled() { - // TODO - return false; - } - - @Override - public boolean isDone() { - return done; - } - - @Override - public XxlRpcResponse get() throws InterruptedException { - try { - return get(-1, TimeUnit.MILLISECONDS); - } catch (TimeoutException e) { - throw new XxlRpcException(e); - } - } - - @Override - public XxlRpcResponse get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { - if (!done) { - synchronized (lock) { - try { - if (timeout < 0) { - lock.wait(); - } else { - long timeoutMillis = (TimeUnit.MILLISECONDS == unit) ? timeout : TimeUnit.MILLISECONDS.convert(timeout, unit); - lock.wait(timeoutMillis); - } - } catch (InterruptedException e) { - throw e; - } - } - } - - if (!done) { - throw new XxlRpcException("xxl-rpc, request timeout at:" + System.currentTimeMillis() + ", request:" + request.toString()); - } - return response; - } - - -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/params/XxlRpcRequest.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/params/XxlRpcRequest.java deleted file mode 100644 index 91c3b12..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/params/XxlRpcRequest.java +++ /dev/null @@ -1,104 +0,0 @@ -package com.czsj.rpc.remoting.net.params; - -import java.io.Serializable; -import java.util.Arrays; - -/** - * request - * - * @author xuxueli 2015-10-29 19:39:12 - */ -public class XxlRpcRequest implements Serializable { - private static final long serialVersionUID = 42L; - - private String requestId; - private long createMillisTime; - private String accessToken; - - private String className; - private String methodName; - private Class[] parameterTypes; - private Object[] parameters; - - private String version; - - - public String getRequestId() { - return requestId; - } - - public void setRequestId(String requestId) { - this.requestId = requestId; - } - - public long getCreateMillisTime() { - return createMillisTime; - } - - public void setCreateMillisTime(long createMillisTime) { - this.createMillisTime = createMillisTime; - } - - public String getAccessToken() { - return accessToken; - } - - public void setAccessToken(String accessToken) { - this.accessToken = accessToken; - } - - public String getClassName() { - return className; - } - - public void setClassName(String className) { - this.className = className; - } - - public String getMethodName() { - return methodName; - } - - public void setMethodName(String methodName) { - this.methodName = methodName; - } - - public Class[] getParameterTypes() { - return parameterTypes; - } - - public void setParameterTypes(Class[] parameterTypes) { - this.parameterTypes = parameterTypes; - } - - public Object[] getParameters() { - return parameters; - } - - public void setParameters(Object[] parameters) { - this.parameters = parameters; - } - - public String getVersion() { - return version; - } - - public void setVersion(String version) { - this.version = version; - } - - @Override - public String toString() { - return "XxlRpcRequest{" + - "requestId='" + requestId + '\'' + - ", createMillisTime=" + createMillisTime + - ", accessToken='" + accessToken + '\'' + - ", className='" + className + '\'' + - ", methodName='" + methodName + '\'' + - ", parameterTypes=" + Arrays.toString(parameterTypes) + - ", parameters=" + Arrays.toString(parameters) + - ", version='" + version + '\'' + - '}'; - } - -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/params/XxlRpcResponse.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/params/XxlRpcResponse.java deleted file mode 100644 index 128de24..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/net/params/XxlRpcResponse.java +++ /dev/null @@ -1,52 +0,0 @@ -package com.czsj.rpc.remoting.net.params; - -import java.io.Serializable; - -/** - * response - * - * @author xuxueli 2015-10-29 19:39:54 - */ -public class XxlRpcResponse implements Serializable { - private static final long serialVersionUID = 42L; - - - private String requestId; - private String errorMsg; - private Object result; - - - public String getRequestId() { - return requestId; - } - - public void setRequestId(String requestId) { - this.requestId = requestId; - } - - public String getErrorMsg() { - return errorMsg; - } - - public void setErrorMsg(String errorMsg) { - this.errorMsg = errorMsg; - } - - public Object getResult() { - return result; - } - - public void setResult(Object result) { - this.result = result; - } - - @Override - public String toString() { - return "XxlRpcResponse{" + - "requestId='" + requestId + '\'' + - ", errorMsg='" + errorMsg + '\'' + - ", result=" + result + - '}'; - } - -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/provider/XxlRpcProviderFactory.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/provider/XxlRpcProviderFactory.java deleted file mode 100644 index 354eecf..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/provider/XxlRpcProviderFactory.java +++ /dev/null @@ -1,256 +0,0 @@ -package com.czsj.rpc.remoting.provider; - -import com.czsj.rpc.registry.ServiceRegistry; -import com.czsj.rpc.remoting.net.Server; -import com.czsj.rpc.remoting.net.impl.netty.server.NettyServer; -import com.czsj.rpc.remoting.net.params.BaseCallback; -import com.czsj.rpc.remoting.net.params.XxlRpcRequest; -import com.czsj.rpc.remoting.net.params.XxlRpcResponse; -import com.czsj.rpc.serialize.Serializer; -import com.czsj.rpc.serialize.impl.HessianSerializer; -import com.czsj.rpc.util.IpUtil; -import com.czsj.rpc.util.NetUtil; -import com.czsj.rpc.util.ThrowableUtil; -import com.czsj.rpc.util.XxlRpcException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.lang.reflect.Method; -import java.util.HashMap; -import java.util.Map; - -/** - * provider - * - * @author xuxueli 2015-10-31 22:54:27 - */ -public class XxlRpcProviderFactory { - private static final Logger logger = LoggerFactory.getLogger(XxlRpcProviderFactory.class); - - // ---------------------- config ---------------------- - - private Class server = NettyServer.class; - private Class serializer = HessianSerializer.class; - - private int corePoolSize = 60; - private int maxPoolSize = 300; - - private String ip = null; // for registry - private int port = 7080; // default port - private String accessToken = null; - - private Class serviceRegistry = null; - private Map serviceRegistryParam = null; - - // set - public void setServer(Class server) { - this.server = server; - } - public void setSerializer(Class serializer) { - this.serializer = serializer; - } - public void setCorePoolSize(int corePoolSize) { - this.corePoolSize = corePoolSize; - } - public void setMaxPoolSize(int maxPoolSize) { - this.maxPoolSize = maxPoolSize; - } - public void setIp(String ip) { - this.ip = ip; - } - public void setPort(int port) { - this.port = port; - } - public void setAccessToken(String accessToken) { - this.accessToken = accessToken; - } - - public void setServiceRegistry(Class serviceRegistry) { - this.serviceRegistry = serviceRegistry; - } - - public void setServiceRegistryParam(Map serviceRegistryParam) { - this.serviceRegistryParam = serviceRegistryParam; - } - - // get - public Serializer getSerializerInstance() { - return serializerInstance; - } - public int getPort() { - return port; - } - public int getCorePoolSize() { - return corePoolSize; - } - public int getMaxPoolSize() { - return maxPoolSize; - } - - // ---------------------- start / stop ---------------------- - - private Server serverInstance; - private Serializer serializerInstance; - private ServiceRegistry serviceRegistryInstance; - private String serviceAddress; - - public void start() throws Exception { - - // valid - if (this.server == null) { - throw new XxlRpcException("xxl-rpc provider server missing."); - } - if (this.serializer==null) { - throw new XxlRpcException("xxl-rpc provider serializer missing."); - } - if (!(this.corePoolSize>0 && this.maxPoolSize>0 && this.maxPoolSize>=this.corePoolSize)) { - this.corePoolSize = 60; - this.maxPoolSize = 300; - } - if (this.ip == null) { - this.ip = IpUtil.getIp(); - } - if (this.port <= 0) { - this.port = 7080; - } - if (NetUtil.isPortUsed(this.port)) { - throw new XxlRpcException("xxl-rpc provider port["+ this.port +"] is used."); - } - - // init serializerInstance - this.serializerInstance = serializer.newInstance(); - - // start server - serviceAddress = IpUtil.getIpPort(this.ip, port); - serverInstance = server.newInstance(); - serverInstance.setStartedCallback(new BaseCallback() { // serviceRegistry started - @Override - public void run() throws Exception { - // start registry - if (serviceRegistry != null) { - serviceRegistryInstance = serviceRegistry.newInstance(); - serviceRegistryInstance.start(serviceRegistryParam); - if (serviceData.size() > 0) { - serviceRegistryInstance.registry(serviceData.keySet(), serviceAddress); - } - } - } - }); - serverInstance.setStopedCallback(new BaseCallback() { // serviceRegistry stoped - @Override - public void run() { - // stop registry - if (serviceRegistryInstance != null) { - if (serviceData.size() > 0) { - serviceRegistryInstance.remove(serviceData.keySet(), serviceAddress); - } - serviceRegistryInstance.stop(); - serviceRegistryInstance = null; - } - } - }); - serverInstance.start(this); - } - - public void stop() throws Exception { - // stop server - serverInstance.stop(); - } - - - // ---------------------- server invoke ---------------------- - - /** - * init local rpc service map - */ - private Map serviceData = new HashMap(); - public Map getServiceData() { - return serviceData; - } - - /** - * make service key - * - * @param iface - * @param version - * @return - */ - public static String makeServiceKey(String iface, String version){ - String serviceKey = iface; - if (version!=null && version.trim().length()>0) { - serviceKey += "#".concat(version); - } - return serviceKey; - } - - /** - * add service - * - * @param iface - * @param version - * @param serviceBean - */ - public void addService(String iface, String version, Object serviceBean){ - String serviceKey = makeServiceKey(iface, version); - serviceData.put(serviceKey, serviceBean); - - logger.info(">>>>>>>>>>> xxl-rpc, provider factory add service success. serviceKey = {}, serviceBean = {}", serviceKey, serviceBean.getClass()); - } - - /** - * invoke service - * - * @param xxlRpcRequest - * @return - */ - public XxlRpcResponse invokeService(XxlRpcRequest xxlRpcRequest) { - - // make response - XxlRpcResponse xxlRpcResponse = new XxlRpcResponse(); - xxlRpcResponse.setRequestId(xxlRpcRequest.getRequestId()); - - // match service bean - String serviceKey = makeServiceKey(xxlRpcRequest.getClassName(), xxlRpcRequest.getVersion()); - Object serviceBean = serviceData.get(serviceKey); - - // valid - if (serviceBean == null) { - xxlRpcResponse.setErrorMsg("The serviceKey["+ serviceKey +"] not found."); - return xxlRpcResponse; - } - - if (System.currentTimeMillis() - xxlRpcRequest.getCreateMillisTime() > 3*60*1000) { - xxlRpcResponse.setErrorMsg("The timestamp difference between admin and executor exceeds the limit."); - return xxlRpcResponse; - } - if (accessToken!=null && accessToken.trim().length()>0 && !accessToken.trim().equals(xxlRpcRequest.getAccessToken())) { - xxlRpcResponse.setErrorMsg("The access token[" + xxlRpcRequest.getAccessToken() + "] is wrong."); - return xxlRpcResponse; - } - - try { - // invoke - Class serviceClass = serviceBean.getClass(); - String methodName = xxlRpcRequest.getMethodName(); - Class[] parameterTypes = xxlRpcRequest.getParameterTypes(); - Object[] parameters = xxlRpcRequest.getParameters(); - - Method method = serviceClass.getMethod(methodName, parameterTypes); - method.setAccessible(true); - Object result = method.invoke(serviceBean, parameters); - - /*FastClass serviceFastClass = FastClass.create(serviceClass); - FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes); - Object result = serviceFastMethod.invoke(serviceBean, parameters);*/ - - xxlRpcResponse.setResult(result); - } catch (Throwable t) { - // catch error - logger.error("xxl-rpc provider invokeService error.", t); - xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(t)); - } - - return xxlRpcResponse; - } - -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/provider/annotation/XxlRpcService.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/provider/annotation/XxlRpcService.java deleted file mode 100644 index c0472d9..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/provider/annotation/XxlRpcService.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.czsj.rpc.remoting.provider.annotation; - -import java.lang.annotation.*; - -/** - * rpc service annotation, skeleton of stub ("@Inherited" allow service use "Transactional") - * - * @author 2015-10-29 19:44:33 - */ -@Target({ElementType.TYPE}) -@Retention(RetentionPolicy.RUNTIME) -@Inherited -public @interface XxlRpcService { - - /** - * @return - */ - String version() default ""; - -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/provider/impl/XxlRpcSpringProviderFactory.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/provider/impl/XxlRpcSpringProviderFactory.java deleted file mode 100644 index 857329d..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/remoting/provider/impl/XxlRpcSpringProviderFactory.java +++ /dev/null @@ -1,56 +0,0 @@ -package com.czsj.rpc.remoting.provider.impl; - -import com.czsj.rpc.remoting.provider.XxlRpcProviderFactory; -import com.czsj.rpc.remoting.provider.annotation.XxlRpcService; -import com.czsj.rpc.util.XxlRpcException; -import org.springframework.beans.BeansException; -import org.springframework.beans.factory.DisposableBean; -import org.springframework.beans.factory.InitializingBean; -import org.springframework.context.ApplicationContext; -import org.springframework.context.ApplicationContextAware; - -import java.util.Map; - -/** - * xxl-rpc provider (for spring) - * - * @author xuxueli 2018-10-18 18:09:20 - */ -public class XxlRpcSpringProviderFactory extends XxlRpcProviderFactory - implements ApplicationContextAware, InitializingBean,DisposableBean { - - @Override - public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { - - Map serviceBeanMap = applicationContext.getBeansWithAnnotation(XxlRpcService.class); - if (serviceBeanMap!=null && serviceBeanMap.size()>0) { - for (Object serviceBean : serviceBeanMap.values()) { - // valid - if (serviceBean.getClass().getInterfaces().length ==0) { - throw new XxlRpcException("xxl-rpc, service(XxlRpcService) must inherit interface."); - } - // add service - XxlRpcService xxlRpcService = serviceBean.getClass().getAnnotation(XxlRpcService.class); - - String iface = serviceBean.getClass().getInterfaces()[0].getName(); - String version = xxlRpcService.version(); - - super.addService(iface, version, serviceBean); - } - } - - // TODO,addServices by api + prop - - } - - @Override - public void afterPropertiesSet() throws Exception { - super.start(); - } - - @Override - public void destroy() throws Exception { - super.stop(); - } - -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/serialize/Serializer.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/serialize/Serializer.java deleted file mode 100644 index 9a1b4df..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/serialize/Serializer.java +++ /dev/null @@ -1,45 +0,0 @@ -package com.czsj.rpc.serialize; - -/** - * serializer - * - * Tips:模板方法模式:定义一个操作中算法的骨架(或称为顶级逻辑),将一些步骤(或称为基本方法)的执行延迟到其子类中; - * Tips:基本方法:抽象方法 + 具体方法final + 钩子方法; - * Tips:Enum 时最好的单例方案;枚举单例会初始化全部实现,此处改为托管Class,避免无效的实例化; - * - * @author xuxueli 2015-10-30 21:02:55 - */ -public abstract class Serializer { - - public abstract byte[] serialize(T obj); - - public abstract Object deserialize(byte[] bytes, Class clazz); - - /*public enum SerializeEnum { - HESSIAN(HessianSerializer.class), - HESSIAN1(Hessian1Serializer.class); - - private Class serializerClass; - private SerializeEnum (Class serializerClass) { - this.serializerClass = serializerClass; - } - - public Serializer getSerializer() { - try { - return serializerClass.newInstance(); - } catch (Exception e) { - throw new XxlRpcException(e); - } - } - - public static SerializeEnum match(String name, SerializeEnum defaultSerializer){ - for (SerializeEnum item : SerializeEnum.values()) { - if (item.name().equals(name)) { - return item; - } - } - return defaultSerializer; - } - }*/ - -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/serialize/impl/HessianSerializer.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/serialize/impl/HessianSerializer.java deleted file mode 100644 index 9deff19..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/serialize/impl/HessianSerializer.java +++ /dev/null @@ -1,67 +0,0 @@ -package com.czsj.rpc.serialize.impl; - -import com.caucho.hessian.io.Hessian2Input; -import com.caucho.hessian.io.Hessian2Output; -import com.czsj.rpc.serialize.Serializer; -import com.czsj.rpc.util.XxlRpcException; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; - -/** - * hessian serialize - * @author xuxueli 2015-9-26 02:53:29 - */ -public class HessianSerializer extends Serializer { - - @Override - public byte[] serialize(T obj) { - ByteArrayOutputStream os = new ByteArrayOutputStream(); - Hessian2Output ho = new Hessian2Output(os); - try { - ho.writeObject(obj); - ho.flush(); - byte[] result = os.toByteArray(); - return result; - } catch (IOException e) { - throw new XxlRpcException(e); - } finally { - try { - ho.close(); - } catch (IOException e) { - throw new XxlRpcException(e); - } - try { - os.close(); - } catch (IOException e) { - throw new XxlRpcException(e); - } - } - - } - - @Override - public Object deserialize(byte[] bytes, Class clazz) { - ByteArrayInputStream is = new ByteArrayInputStream(bytes); - Hessian2Input hi = new Hessian2Input(is); - try { - Object result = hi.readObject(); - return result; - } catch (IOException e) { - throw new XxlRpcException(e); - } finally { - try { - hi.close(); - } catch (Exception e) { - throw new XxlRpcException(e); - } - try { - is.close(); - } catch (IOException e) { - throw new XxlRpcException(e); - } - } - } - -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/util/ClassUtil.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/util/ClassUtil.java deleted file mode 100644 index cf96576..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/util/ClassUtil.java +++ /dev/null @@ -1,46 +0,0 @@ -package com.czsj.rpc.util; - -import java.util.HashMap; - -/** - * - * @Author: czsj - * @Date: 2022/9/16 11:14 - * @Description: Class的工具类 - **/ -public class ClassUtil { - - private static final HashMap> primClasses = new HashMap<>(); - - static { - primClasses.put("boolean", boolean.class); - primClasses.put("byte", byte.class); - primClasses.put("char", char.class); - primClasses.put("short", short.class); - primClasses.put("int", int.class); - primClasses.put("long", long.class); - primClasses.put("float", float.class); - primClasses.put("double", double.class); - primClasses.put("void", void.class); - } - - /** - * - * @param className 类名 - * @return 反射得到类的Class - * @throws ClassNotFoundException - */ - public static Class resolveClass(String className) throws ClassNotFoundException { - try { - return Class.forName(className); - } catch (ClassNotFoundException ex) { - Class cl = primClasses.get(className); - if (cl != null) { - return cl; - } else { - throw ex; - } - } - } - -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/util/IpUtil.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/util/IpUtil.java deleted file mode 100644 index e7ef304..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/util/IpUtil.java +++ /dev/null @@ -1,195 +0,0 @@ -package com.czsj.rpc.util; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.Inet6Address; -import java.net.InetAddress; -import java.net.NetworkInterface; -import java.net.UnknownHostException; -import java.util.Enumeration; -import java.util.regex.Pattern; - -/** - * - * @Author: czsj - * @Date: 2022/9/16 11:14 - * @Description: IP的工具类 - **/ -public class IpUtil { - private static final Logger logger = LoggerFactory.getLogger(IpUtil.class); - - private static final String ANYHOST_VALUE = "0.0.0.0"; - private static final String LOCALHOST_VALUE = "127.0.0.1"; - private static final Pattern IP_PATTERN = Pattern.compile("\\d{1,3}(\\.\\d{1,3}){3,5}$"); - - - private static volatile InetAddress LOCAL_ADDRESS = null; - - - private static InetAddress toValidAddress(InetAddress address) { - if (address instanceof Inet6Address) { - Inet6Address v6Address = (Inet6Address) address; - if (isPreferIPV6Address()) { - return normalizeV6Address(v6Address); - } - } - if (isValidV4Address(address)) { - return address; - } - return null; - } - - /** - * - * @return - */ - private static boolean isPreferIPV6Address() { - return Boolean.getBoolean("java.net.preferIPv6Addresses"); - } - - /** - * - * @param address - * @return 判断是不是有效的IPV4的地址 - */ - private static boolean isValidV4Address(InetAddress address) { - if (address == null || address.isLoopbackAddress()) { - return false; - } - String name = address.getHostAddress(); - return (name != null && IP_PATTERN.matcher(name).matches() && !ANYHOST_VALUE.equals(name) && !LOCALHOST_VALUE - .equals(name)); - } - - /** - * - * @param address - * @return 返回IPV6的地址 - */ - private static InetAddress normalizeV6Address(Inet6Address address) { - String addr = address.getHostAddress(); - int i = addr.lastIndexOf('%'); - if (i > 0) { - try { - return InetAddress.getByName(addr.substring(0, i) + '%' + address.getScopeId()); - } catch (UnknownHostException e) { - logger.debug("Unknown IPV6 address: ", e); - } - } - return address; - } - - /** - * - * @return 返回本地的IP - */ - private static InetAddress getLocalAddress0() { - InetAddress localAddress = null; - try { - localAddress = InetAddress.getLocalHost(); - InetAddress addressItem = toValidAddress(localAddress); - if (addressItem != null) { - return addressItem; - } - } catch (Throwable e) { - logger.error(e.getMessage(), e); - } - - try { - Enumeration interfaces = NetworkInterface.getNetworkInterfaces(); - if (null == interfaces) { - return localAddress; - } - while (interfaces.hasMoreElements()) { - try { - NetworkInterface network = interfaces.nextElement(); - if (network.isLoopback() || network.isVirtual() || !network.isUp()) { - continue; - } - Enumeration addresses = network.getInetAddresses(); - while (addresses.hasMoreElements()) { - try { - InetAddress addressItem = toValidAddress(addresses.nextElement()); - if (addressItem != null) { - try { - if (addressItem.isReachable(100)) { - return addressItem; - } - } catch (IOException e) { - // ignore - } - } - } catch (Throwable e) { - logger.error(e.getMessage(), e); - } - } - } catch (Throwable e) { - logger.error(e.getMessage(), e); - } - } - } catch (Throwable e) { - logger.error(e.getMessage(), e); - } - return localAddress; - } - - - /** - * - * @return 返回本地的IP - */ - public static InetAddress getLocalAddress() { - if (LOCAL_ADDRESS != null) { - return LOCAL_ADDRESS; - } - InetAddress localAddress = getLocalAddress0(); - LOCAL_ADDRESS = localAddress; - return localAddress; - } - - /** - * - * @return 返回本地的IP - */ - public static String getIp() { - return getLocalAddress().getHostAddress(); - } - - /** - * - * @param port - * @return 返回本地IP:port - */ - public static String getIpPort(int port) { - String ip = getIp(); - return getIpPort(ip, port); - } - - /** - * - * @param ip - * @param port - * @return ip:port - */ - public static String getIpPort(String ip, int port) { - if (ip == null) { - return null; - } - return ip.concat(":").concat(String.valueOf(port)); - } - - /** - * @param address - * @return ip和port的数组 - */ - public static Object[] parseIpPort(String address) { - String[] array = address.split(":"); - - String host = array[0]; - int port = Integer.parseInt(array[1]); - - return new Object[]{host, port}; - } -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/util/NetUtil.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/util/NetUtil.java deleted file mode 100644 index 1274f9e..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/util/NetUtil.java +++ /dev/null @@ -1,69 +0,0 @@ -package com.czsj.rpc.util; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.ServerSocket; - -/** - * - * @Author: czsj - * @Date: 2022/9/16 11:14 - * @Description: 端口的工具类 - **/ -public class NetUtil { - private static Logger logger = LoggerFactory.getLogger(NetUtil.class); - - /** - * - * @param defaultPort - * @return 可用的端口 - */ - public static int findAvailablePort(int defaultPort) { - int portTmp = defaultPort; - while (portTmp < 65535) { - if (!isPortUsed(portTmp)) { - return portTmp; - } else { - portTmp++; - } - } - portTmp = defaultPort--; - while (portTmp > 0) { - if (!isPortUsed(portTmp)) { - return portTmp; - } else { - portTmp--; - } - } - throw new XxlRpcException("no available port."); - } - - /** - * - * @param port - * @return 可用返回true, 否则返回false - */ - public static boolean isPortUsed(int port) { - boolean used; - ServerSocket serverSocket = null; - try { - serverSocket = new ServerSocket(port); - used = false; - } catch (IOException e) { - logger.info(">>>>>>>>>>> xxl-rpc, port[{}] is in use.", port); - used = true; - } finally { - if (serverSocket != null) { - try { - serverSocket.close(); - } catch (IOException e) { - logger.info(""); - } - } - } - return used; - } - -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/util/ThreadPoolUtil.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/util/ThreadPoolUtil.java deleted file mode 100644 index 9395739..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/util/ThreadPoolUtil.java +++ /dev/null @@ -1,25 +0,0 @@ -package com.czsj.rpc.util; - -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -/** - * - * @Author: czsj - * @Date: 2022/9/16 11:14 - * @Description: 自定义线程池 - **/ -public class ThreadPoolUtil { - - public static ThreadPoolExecutor makeServerThreadPool(final String serverType, int corePoolSize, int maxPoolSize) { - ThreadPoolExecutor serverHandlerPool = new ThreadPoolExecutor(corePoolSize, maxPoolSize, 60L, TimeUnit.SECONDS, - new LinkedBlockingQueue<>(1000), - r -> new Thread(r, "xxl-rpc, " + serverType + "-serverHandlerPool-" + r.hashCode()), (r, executor) -> { - throw new XxlRpcException("xxl-rpc " + serverType + " Thread pool is EXHAUSTED!"); - }); // default maxThreads 300, minThreads 60 - - return serverHandlerPool; - } - -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/util/ThrowableUtil.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/util/ThrowableUtil.java deleted file mode 100644 index aab8dc1..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/util/ThrowableUtil.java +++ /dev/null @@ -1,21 +0,0 @@ -package com.czsj.rpc.util; - -import java.io.PrintWriter; -import java.io.StringWriter; - -/** - * - * @Author: czsj - * @Date: 2022/9/16 11:14 - * @Description: 将异常转换为String - **/ -public class ThrowableUtil { - - public static String toString(Throwable e) { - StringWriter stringWriter = new StringWriter(); - e.printStackTrace(new PrintWriter(stringWriter)); - String errorMsg = stringWriter.toString(); - return errorMsg; - } - -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/util/XxlRpcException.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/util/XxlRpcException.java deleted file mode 100644 index 453db3e..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/util/XxlRpcException.java +++ /dev/null @@ -1,25 +0,0 @@ -package com.czsj.rpc.util; - -/** - * - * @Author: czsj - * @Date: 2022/9/16 11:14 - * @Description: 自定义异常类 - **/ -public class XxlRpcException extends RuntimeException { - - private static final long serialVersionUID = 42L; - - public XxlRpcException(String msg) { - super(msg); - } - - public XxlRpcException(String msg, Throwable cause) { - super(msg, cause); - } - - public XxlRpcException(Throwable cause) { - super(cause); - } - -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/util/json/BasicJson.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/util/json/BasicJson.java deleted file mode 100644 index a97eb90..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/util/json/BasicJson.java +++ /dev/null @@ -1,68 +0,0 @@ -package com.czsj.rpc.util.json; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * - * @Author: czsj - * @Date: 2022/9/16 11:14 - * @Description: Json的父类 - **/ -public class BasicJson { - - private static final BasicJsonReader basicJsonReader = new BasicJsonReader(); - private static final BasicJsonwriter basicJsonwriter = new BasicJsonwriter(); - - /** - * object to json - * - * @param object - * @return - */ - public static String toJson(Object object) { - return basicJsonwriter.toJson(object); - } - - /** - * parse json to map - * - * @param json - * @return only for filed type "null、ArrayList、LinkedHashMap、String、Long、Double、..." - */ - public static Map parseMap(String json) { - return basicJsonReader.parseMap(json); - } - - /** - * json to List - * - * @param json - * @return - */ - public static List parseList(String json) { - return basicJsonReader.parseList(json); - } - - - public static void main(String[] args) { - Map result = new HashMap<>(); - result.put("code", 200); - result.put("msg", "success"); - result.put("arr", Arrays.asList("111", "222")); - result.put("float", 1.11f); - result.put("temp", null); - - String json = toJson(result); - System.out.println(json); - - Map mapObj = parseMap(json); - System.out.println(mapObj); - - List listInt = parseList("[111,222,33]"); - System.out.println(listInt); - - } -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/util/json/BasicJsonReader.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/util/json/BasicJsonReader.java deleted file mode 100644 index 5fa557c..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/util/json/BasicJsonReader.java +++ /dev/null @@ -1,197 +0,0 @@ -package com.czsj.rpc.util.json; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; - -/** - * - * @Author: czsj - * @Date: 2022/9/16 11:14 - * @Description: JsonReader的父类 - **/ -public class BasicJsonReader { - private static Logger logger = LoggerFactory.getLogger(BasicJsonwriter.class); - - - /** - * - * @param json 类型{} - * @return 将JSON转换为Map - */ - public Map parseMap(String json) { - if (json != null) { - json = json.trim(); - if (json.startsWith("{")) { - return parseMapInternal(json); - } - } - throw new IllegalArgumentException("Cannot parse JSON"); - } - - /** - * - * @param json [] - * @return 将Json数组转换为List集合 - */ - public List parseList(String json) { - if (json != null) { - json = json.trim(); - if (json.startsWith("[")) { - return parseListInternal(json); - } - } - throw new IllegalArgumentException("Cannot parse JSON"); - } - - /** - * - * @param json - * @return - */ - private List parseListInternal(String json) { - List list = new ArrayList(); - json = trimLeadingCharacter(trimTrailingCharacter(json, ']'), '['); - for (String value : tokenize(json)) { - list.add(parseInternal(value)); - } - return list; - } - - private Object parseInternal(String json) { - if (json.equals("null")) { - return null; - } - if (json.startsWith("[")) { - return parseListInternal(json); - } - if (json.startsWith("{")) { - return parseMapInternal(json); - } - if (json.startsWith("\"")) { - return trimTrailingCharacter(trimLeadingCharacter(json, '"'), '"'); - } - try { - return Long.valueOf(json); - } catch (NumberFormatException ex) { - // ignore - } - try { - return Double.valueOf(json); - } catch (NumberFormatException ex) { - // ignore - } - return json; - } - - private Map parseMapInternal(String json) { - Map map = new LinkedHashMap(); - json = trimLeadingCharacter(trimTrailingCharacter(json, '}'), '{'); - for (String pair : tokenize(json)) { - String[] values = trimArrayElements(split(pair, ":")); - String key = trimLeadingCharacter(trimTrailingCharacter(values[0], '"'), '"'); - Object value = parseInternal(values[1]); - map.put(key, value); - } - return map; - } - - // append start - private static String[] split(String toSplit, String delimiter) { - if (toSplit != null && !toSplit.isEmpty() && delimiter != null && !delimiter.isEmpty()) { - int offset = toSplit.indexOf(delimiter); - if (offset < 0) { - return null; - } else { - String beforeDelimiter = toSplit.substring(0, offset); - String afterDelimiter = toSplit.substring(offset + delimiter.length()); - return new String[]{beforeDelimiter, afterDelimiter}; - } - } else { - return null; - } - } - - private static String[] trimArrayElements(String[] array) { - if (array == null || array.length == 0) { - return new String[0]; - } else { - String[] result = new String[array.length]; - - for (int i = 0; i < array.length; ++i) { - String element = array[i]; - result[i] = element != null ? element.trim() : null; - } - - return result; - } - } - - - private List tokenize(String json) { - List list = new ArrayList<>(); - int index = 0; - int inObject = 0; - int inList = 0; - boolean inValue = false; - boolean inEscape = false; - StringBuilder build = new StringBuilder(); - while (index < json.length()) { - char current = json.charAt(index); - if (inEscape) { - build.append(current); - index++; - inEscape = false; - continue; - } - if (current == '{') { - inObject++; - } - if (current == '}') { - inObject--; - } - if (current == '[') { - inList++; - } - if (current == ']') { - inList--; - } - if (current == '"') { - inValue = !inValue; - } - if (current == ',' && inObject == 0 && inList == 0 && !inValue) { - list.add(build.toString()); - build.setLength(0); - } else if (current == '\\') { - inEscape = true; - } else { - build.append(current); - } - index++; - } - if (build.length() > 0) { - list.add(build.toString()); - } - return list; - } - - // plugin util - private static String trimTrailingCharacter(String string, char c) { - if (string.length() > 0 && string.charAt(string.length() - 1) == c) { - return string.substring(0, string.length() - 1); - } - return string; - } - - private static String trimLeadingCharacter(String string, char c) { - if (string.length() > 0 && string.charAt(0) == c) { - return string.substring(1); - } - return string; - } - -} diff --git a/czsj-rpc/src/main/java/com/ruoshui/rpc/util/json/BasicJsonwriter.java b/czsj-rpc/src/main/java/com/ruoshui/rpc/util/json/BasicJsonwriter.java deleted file mode 100644 index 1d57ad2..0000000 --- a/czsj-rpc/src/main/java/com/ruoshui/rpc/util/json/BasicJsonwriter.java +++ /dev/null @@ -1,188 +0,0 @@ -package com.czsj.rpc.util.json; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.lang.reflect.Field; -import java.lang.reflect.Modifier; -import java.util.*; - -/** - * - * @Author: czsj - * @Date: 2022/9/16 11:14 - * @Description: JsonWriter的父类 - **/ -public class BasicJsonwriter { - private static Logger logger = LoggerFactory.getLogger(BasicJsonwriter.class); - - - private static final String STR_SLASH = "\""; - private static final String STR_SLASH_STR = "\":"; - private static final String STR_COMMA = ","; - private static final String STR_OBJECT_LEFT = "{"; - private static final String STR_OBJECT_RIGHT = "}"; - private static final String STR_ARRAY_LEFT = "["; - private static final String STR_ARRAY_RIGHT = "]"; - - private static final Map cacheFields = new HashMap<>(); - - - /** - * - * @param object - * @return 将JSON转换为String字符串 - */ - public String toJson(Object object) { - StringBuilder json = new StringBuilder(); - try { - writeObjItem(null, object, json); - } catch (Exception e) { - logger.error(e.getMessage(), e); - } - - String str = json.toString(); - if (str.contains("\n")) { - str = str.replaceAll("\\n", "\\\\n"); - } - if (str.contains("\t")) { - str = str.replaceAll("\\t", "\\\\t"); - } - if (str.contains("\r")) { - str = str.replaceAll("\\r", "\\\\r"); - } - return str; - } - - /** - * 返回json格式的键值对 - * @param key :键 - * @param value :值 - * @param json :返回值 "key":value or value - */ - private void writeObjItem(String key, Object value, StringBuilder json) { - - // "key:" - if (key != null) { - json.append(STR_SLASH).append(key).append(STR_SLASH_STR); - } - - // val - if (value == null) { - json.append("null"); - } else if (value instanceof String || value instanceof Byte || value instanceof CharSequence) { - // string - json.append(STR_SLASH).append(value.toString()).append(STR_SLASH); - } else if (value instanceof Boolean || value instanceof Short || value instanceof Integer - || value instanceof Long || value instanceof Float || value instanceof Double) { - // number - json.append(value); - } else if (value instanceof Object[] || value instanceof Collection) { - // collection | array // Array.getLength(array); // Array.get(array, i); - Collection valueColl = null; - if (value instanceof Object[]) { - Object[] valueArr = (Object[]) value; - valueColl = Arrays.asList(valueArr); - } else if (value instanceof Collection) { - valueColl = (Collection) value; - } - - json.append(STR_ARRAY_LEFT); - if (valueColl.size() > 0) { - for (Object obj : valueColl) { - writeObjItem(null, obj, json); - json.append(STR_COMMA); - } - json.delete(json.length() - 1, json.length()); - } - json.append(STR_ARRAY_RIGHT); - - } else if (value instanceof Map) { - // map - - Map valueMap = (Map) value; - - json.append(STR_OBJECT_LEFT); - if (!valueMap.isEmpty()) { - Set keys = valueMap.keySet(); - for (Object valueMapItemKey : keys) { - writeObjItem(valueMapItemKey.toString(), valueMap.get(valueMapItemKey), json); - json.append(STR_COMMA); - } - json.delete(json.length() - 1, json.length()); - } - json.append(STR_OBJECT_RIGHT); - } else { - json.append(STR_OBJECT_LEFT); - Field[] fields = getDeclaredFields(value.getClass()); - if (fields.length > 0) { - for (Field field : fields) { - Object fieldObj = getFieldObject(field, value); - writeObjItem(field.getName(), fieldObj, json); - json.append(STR_COMMA); - } - json.delete(json.length() - 1, json.length()); - } - - json.append(STR_OBJECT_RIGHT); - } - } - - /** - * - * @param clazz 类 - * @return 返回类的字段名数组,并缓存 - */ - public synchronized Field[] getDeclaredFields(Class clazz) { - String cacheKey = clazz.getName(); - if (cacheFields.containsKey(cacheKey)) { - return cacheFields.get(cacheKey); - } - Field[] fields = getAllDeclaredFields(clazz); //clazz.getDeclaredFields(); - cacheFields.put(cacheKey, fields); - return fields; - } - - /** - * - * @param clazz - * @return 通过反射返回类的字段名数组 - */ - private Field[] getAllDeclaredFields(Class clazz) { - List list = new ArrayList(); - Class current = clazz; - - while (current != null && current != Object.class) { - Field[] fields = current.getDeclaredFields(); - - for (Field field : fields) { - if (Modifier.isStatic(field.getModifiers())) { - continue; - } - list.add(field); - } - - current = current.getSuperclass(); - } - - return list.toArray(new Field[list.size()]); - } - - /** - * - * @param field - * @param obj - * @return 反射获取字段对象 - */ - private synchronized Object getFieldObject(Field field, Object obj) { - try { - field.setAccessible(true); - return field.get(obj); - } catch (IllegalArgumentException | IllegalAccessException e) { - logger.error(e.getMessage(), e); - return null; - } finally { - field.setAccessible(false); - } - } -}