• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

实现简易rpc框架-(4)使用nacos作注册心

武飞扬头像
qq553270311
帮助1

nacos是分布式框架里面的一个重要组件,所有的服务端在nacos中注册自己拥有的服务,客户端在进行rpc调用的时候,nacos负责返回一个可用的服务地址供客户端调用。nacos还起到负载均衡的作用,因为在部署的时候一般是分布式部署,假设某服务部署在两台服务器上,在服务调用的时候,nacos可以使用轮询的算法处理请求,不至于让某一台服务器的处理过多的远程调用。为了简单,使用单机版的nacos,当然了单机版的nacos就用不着raft算法做冗余了,nacos集群至少要三台服务器。下载好nacos并且启动,界面非常整洁。

学新通

 在第二节中,把本地服务保存到本地的类叫ServiceRegistry,这里用ServiceProvider代替,而ServiceRegistry作为远程注册表使用.

定义一个服务中心通用接口:

public interface ServiceRegistry {

    /**
     * 将一个服务注册进注册表
     *
     * @param serviceName 服务名称
     * @param inetSocketAddress 提供服务的地址
     */
    void register(String serviceName, InetSocketAddress inetSocketAddress);

    /**
     * 根据服务名称查找服务实体
     *
     * @param serviceName 服务名称
     * @return 服务实体
     */
    InetSocketAddress lookupService(String serviceName);

}
学新通

借来下是nacos作为注册中心的实现,当然了,注册中心可以用zookeeper,etcd。。。nacos提供了注册的接口namingService.registerInstance和获取服务ip地址和端口号的接口namingService.getAllInstances.在获取的服务实例中,选择instances.get(0),设计到nacos负载均衡策略之后会陈述。

public class NacosServiceRegistry implements ServiceRegistry {

    private static final Logger logger = LoggerFactory.getLogger(NacosServiceRegistry.class);

    private static final String SERVER_ADDR = "127.0.0.1:8848";
    private static final NamingService namingService;

    static {
        try {
            namingService = NamingFactory.createNamingService(SERVER_ADDR);
        } catch (NacosException e) {
            logger.error("连接到Nacos时有错误发生: ", e);
            throw new RpcException(RpcError.FAILED_TO_CONNECT_TO_SERVICE_REGISTRY);
        }
    }

    @Override
    public void register(String serviceName, InetSocketAddress inetSocketAddress) {
        try {
            namingService.registerInstance(serviceName, inetSocketAddress.getHostName(), inetSocketAddress.getPort());
        } catch (NacosException e) {
            logger.error("注册服务时有错误发生:", e);
            throw new RpcException(RpcError.REGISTER_SERVICE_FAILED);
        }
    }

    @Override
    public InetSocketAddress lookupService(String serviceName) {
        try {
            List<Instance> instances = namingService.getAllInstances(serviceName);
            Instance instance = instances.get(0);
            return new InetSocketAddress(instance.getIp(), instance.getPort());
        } catch (NacosException e) {
            logger.error("获取服务时有错误发生:", e);
        }
        return null;
    }
}
学新通

 RpcServer有向nacos注册服务的功能,所以对Rpcserver代码做出修改,新增一个方法publishService,用于向nacos注册服务, serviceProvider会把服务信息加入本地map中,而

serviceRegistry.register负责把服务注册到nacos中。

public NettyServer(String host, int port) {
    this.host = host;
    this.port = port;
    serviceRegistry = new NacosServiceRegistry();
    serviceProvider = new ServiceProviderImpl();
}

@Override
public <T> void publishService(Object service, Class<T> serviceClass) {
    if(serializer == null) {
        logger.error("未设置序列化器");
        throw new RpcException(RpcError.SERIALIZER_NOT_FOUND);
    }
    serviceProvider.addServiceProvider(service);
    serviceRegistry.register(serviceClass.getCanonicalName(), new InetSocketAddress(host, port));
    start();
}
学新通

服务发现则在RpcClient实现,之前RpcClient调用sendRequest的时候,ip地址和端口是写死的,现在ip地址和端口号从nacos中获取:

@Override
public Object sendRequest(RpcRequest rpcRequest) {
    if(serializer == null) {
        logger.error("未设置序列化器");
        throw new RpcException(RpcError.SERIALIZER_NOT_FOUND);
    }
    AtomicReference<Object> result = new AtomicReference<>(null);
    try {
        InetSocketAddress inetSocketAddress = serviceRegistry.lookupService(rpcRequest.getInterfaceName());
        Channel channel = ChannelProvider.get(inetSocketAddress, serializer);
        if(channel.isActive()) {
            channel.writeAndFlush(rpcRequest).addListener(future1 -> {
                if (future1.isSuccess()) {
                    logger.info(String.format("客户端发送消息: %s", rpcRequest.toString()));
                } else {
                    logger.error("发送消息时有错误发生: ", future1.cause());
                }
            });
            channel.closeFuture().sync();
            AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse"   rpcRequest.getRequestId());
            RpcResponse rpcResponse = channel.attr(key).get();
            RpcMessageChecker.check(rpcRequest, rpcResponse);
            result.set(rpcResponse.getData());
        } else {
            System.exit(0);
        }
    } catch (InterruptedException e) {
        logger.error("发送消息时有错误发生: ", e);
    }
    return result.get();
}
学新通

 一些其他变更:RpcServer本地调用异步化,交给线程池去调用,产生结果之后再把结果发送给客户端,对应RpcServerHandler中的改变。

@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcRequest msg) throws Exception {
    threadPool.execute(() -> {
        try {
            logger.info("服务器接收到请求: {}", msg);
            Object result = requestHandler.handle(msg);
            ChannelFuture future = ctx.writeAndFlush(RpcResponse.success(result, msg.getRequestId()));
            future.addListener(ChannelFutureListener.CLOSE);
        } finally {
            ReferenceCountUtil.release(msg);
        }
    });
}

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhhhekie
系列文章
更多 icon
同类精品
更多 icon
继续加载