Skip to content

源码之组件通信

1. 通信框架的发展

Spark早期版本中采用Akka作为内部通信部件。
➢ Spark1.3中引入Netty通信框架,为了解决Shuffle的大数据传输问题使用
➢ Spark1.6中Akka和Netty可以配置使用。Netty完全实现了Akka在Spark中的功能。
➢ Spark2系列中,Spark抛弃Akka,使用Netty。

Netty介绍

Netty是一个基于AIO的通信框架,在Linux中对AIO支持不够好,Windows原生支持AIO, 在Linux中底层使用Epllo方式模仿AIO操作。

2. Spark中通信设计

Spark2.x版本使用Netty通讯框架作为内部通讯组件。Spark基于Netty新的RPC框架。借鉴了Akka的中的设计,它是基于Actor模型,如下图所示:
Alt text Spark通讯框架中各个组件(Client/Master/Worker)可以认为是一个个独立的实体,各个实体之间通过消息来进行通信。具体各个组件之间的关系图如下:
Alt text Endpoint(Client/Master/Worker)有1个InBox和N个OutBox(N>=1,N 取决于当前Endpoint与多少其他的Endpoint进行通信,一个与其通讯的其他Endpoint对应一个OutBox),Endpoint接收到的消息被写入InBox,发送出去的消息写入OutBox并被发送到其他Endpoint的InBox中。
Driver: class DriverEndpoint extends IsolatedRpcEndpoint
Executor: class CoarseGrainedExecutorBackend extends IsolatedRpcEndpoint

3. Spark通讯架构

Alt text

  1. RpcEndpoint:RPC通信终端。Spark针对每个节点(Client/Master/Worker)都称之为一个RPC终端,且都实现 RpcEndpoint接口,内部根据不同端点的需求,设计不同的消息和不同的业务处理,如果需要发送(询问)则调用 Dispatcher。在Spark中,所有的终端都存在生命周期:Constructor->onStart->receive*->onStop
  2. RpcEnv:RPC上下文环境,每个RPC终端运行时依赖的上下文环境称为RpcEnv;在把当前Spark版本中使用的是NettyRpcEnv。
  3. Dispatcher:消息调度(分发)器,针对于RPC终端需要发送远程消息或者从远程RPC接收到的消息,分发至对应的指令收件箱(发件箱)。如果指令接收方是自己则存入收件箱,如果指令接收方不是自己,则放入发件箱;
  4. Inbox:指令消息收件箱。一个本地RpcEndpoint对应一个收件箱,Dispatcher在每次向Inbox存入消息时,都将对应EndpointData加入内部ReceiverQueue中,另外Dispatcher创建时会启动一个单独线程进行轮询ReceiverQueue,进行收件箱消息消费;
  5. RpcEndpointRef:RpcEndpointRef是对远程RpcEndpoint的一个引用。当我们需要向一个具体的RpcEndpoint发送消息时,一般我们需要获取到该RpcEndpoint的引用,然后通过该应用发送消息。
  6. OutBox:指令消息发件箱。对于当前RpcEndpoint来说,一个目标RpcEndpoint对应一个发件箱,如果向多个目标RpcEndpoint发送信息,则有多个OutBox。当消息放入Outbox后,紧接着通过TransportClient将消息发送出去。消息放入发件箱以及发送过程是在同一个线程中进行;
  7. RpcAddress:表示远程的RpcEndpointRef的地址,Host + Port。
  8. TransportClient:Netty通信客户端,一个OutBox对应一个TransportClient,TransportClient不断轮询 OutBox,根据OutBox消息的receiver信息,请求对应的远程TransportServer;
  9. TransportServer:Netty通信服务端,一个RpcEndpoint对应一个TransportServer,接受远程消息后调用 Dispatcher分发消息至对应收发件箱;

4. 源码之创建Driver端通信

点击SparkContxt.scala, SparkContext有一个_env属性: Alt text _env通过createSparkEnv()方法初始化: Alt text createSparkEnv()方法里面就有创建Driver端环境的方法:
Alt text 在SparkEnv.scala中查看createDriverEnv():

scala
private[spark] def createDriverEnv(
      conf: SparkConf,
      isLocal: Boolean,
      listenerBus: LiveListenerBus,
      numCores: Int,
      sparkContext: SparkContext,
      mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
    .......// 省略地址端口等信息逻辑

    create(
      conf,
      SparkContext.DRIVER_IDENTIFIER,
      bindAddress,
      advertiseAddress,
      Option(port),
      isLocal,
      numCores,
      ioEncryptionKey,
      listenerBus = listenerBus,
      Option(sparkContext),
      mockOutputCommitCoordinator = mockOutputCommitCoordinator
    )
  }

点击进入create()方法: Alt text 点击RPCEnv的create()方法,可以看到底层就是NettyRpcEnvFactory创建的NettyRpcEnv对象:

scala
def create(
      name: String,
      bindAddress: String,
      advertiseAddress: String,
      port: Int,
      conf: SparkConf,
      securityManager: SecurityManager,
      numUsableCores: Int,
      clientMode: Boolean): RpcEnv = {
    val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager,
      numUsableCores, clientMode)
    new NettyRpcEnvFactory().create(config)
}

点击NettyRpcEnvFactory的create()方法: Alt text 实际执行nettyenv.startServer()方法,进入startServer()方法

scala
def startServer(bindAddress: String, port: Int): Unit = {
    val bootstraps: java.util.List[TransportServerBootstrap] =
      if (securityManager.isAuthenticationEnabled()) {
        java.util.Arrays.asList(new AuthServerBootstrap(transportConf, securityManager))
      } else {
        java.util.Collections.emptyList()
      }
      // 创建TransportServer
    server = transportContext.createServer(bindAddress, port, bootstraps)
    dispatcher.registerRpcEndpoint(
      RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
}

进入createServer()方法

java
public TransportServer createServer(
      String host, int port, List<TransportServerBootstrap> bootstraps) {
    return new TransportServer(this, host, port, rpcHandler, bootstraps);
}

进入TransportServer的构造函数:

java
public TransportServer(
      TransportContext context,
      String hostToBind,
      int portToBind,
      RpcHandler appRpcHandler,
      List<TransportServerBootstrap> bootstraps) {
    .......
    boolean shouldClose = true;
    try {
      init(hostToBind, portToBind);
      shouldClose = false;
    } finally {
      if (shouldClose) {
        JavaUtils.closeQuietly(this);
      }
    }
}

点击进入init()方法:

java
private void init(String hostToBind, int portToBind) {

    IOMode ioMode = IOMode.valueOf(conf.ioMode());
    EventLoopGroup bossGroup = NettyUtils.createEventLoop(ioMode, 1,
      conf.getModuleName() + "-boss");
    EventLoopGroup workerGroup =  NettyUtils.createEventLoop(ioMode, conf.serverThreads(),
      conf.getModuleName() + "-server");

    bootstrap = new ServerBootstrap()
      .group(bossGroup, workerGroup)
      // 指定AIO模式
      .channel(NettyUtils.getServerChannelClass(ioMode))
      .option(ChannelOption.ALLOCATOR, pooledAllocator)
      .option(ChannelOption.SO_REUSEADDR, !SystemUtils.IS_OS_WINDOWS)
      .childOption(ChannelOption.ALLOCATOR, pooledAllocator);

    ......// 底层启动Socket通信服务
    InetSocketAddress address = hostToBind == null ?
        new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind);
    channelFuture = bootstrap.bind(address);
    channelFuture.syncUninterruptibly();

    InetSocketAddress localAddress = (InetSocketAddress) channelFuture.channel().localAddress();
    port = localAddress.getPort();
    logger.debug("Shuffle server started on {} with port {}", localAddress.getHostString(), port);
}

点击进入NettyUtils.getServerChannelClass()方法,可以看到底层Netty就只支持NIO和EPOLL两种模式:

java
public static Class<? extends ServerChannel> getServerChannelClass(IOMode mode) {
    switch (mode) {
      case NIO:
        return NioServerSocketChannel.class;
      case EPOLL:
        return EpollServerSocketChannel.class;
      default:
        throw new IllegalArgumentException("Unknown io mode: " + mode);
    }
}

看了Netty底层实现之后,回到NettyRpcEnv.scala, 现在TransportServer有了,它会注册一个RpcEndpoint(终端): Alt text RpcEndpoint是用来干嘛的呢,可以看到它里面主要的方法就是receive()和receiveAndReply()接收消息。 Alt text 除此之外可以看到self属性是RpcEndpointRef类型,相比RpcEndpoint多了Ref,查看里面的方法: Alt text 里面主要就是send()和ask()方法,也就是发送消息。话说回来点击进入registerRpcEndpoint()方法中,可以看到看到里面有一个同步代码块会被执行: Alt text 它会匹配RpcEndpoint(终端)类型,我们的终端就是IsolatedRpcEndpoint的子类,默认会创建一个DedicatedMessageLoop(消息处理类) Alt text 里面含有属性inbox就是消息的收件箱。其实Driver端也有发件箱,而且是Map结构,每个终端对应一个发件箱,在NettyRpcEnv类中: Alt text 发件箱名叫outboxes, key是外部终端的地址,value是Outbox类。这样Driver端启动Netty后有收发方法和收发件箱,功能就完整了。

5. 源码之创建Executor端通信

Executor端通信和Driver端基本创建过程类似。 在CoarseGrainedExecutorBackend.scala中的run方法中: Alt text 点击进入createExecutorEnv()方法:

scala
private[spark] def createExecutorEnv(
      conf: SparkConf,
      executorId: String,
      bindAddress: String,
      hostname: String,
      numCores: Int,
      ioEncryptionKey: Option[Array[Byte]],
      isLocal: Boolean): SparkEnv = {
    val env = create(
      conf,
      executorId,
      bindAddress,
      hostname,
      None,
      isLocal,
      numCores,
      ioEncryptionKey
    )
    SparkEnv.set(env)
    env
}

进入create()方法,下面就和Driver端源码就一样了,都是调用RpcEnv.create()方法 Alt text 进入RpcEnv.scala的create()方法 Alt text 点击NettyRpcEnvFactory的create()方法: Alt text 一样执行nettyenv.startServer()方法,进入startServer()方法

scala
def startServer(bindAddress: String, port: Int): Unit = {
    val bootstraps: java.util.List[TransportServerBootstrap] =
      if (securityManager.isAuthenticationEnabled()) {
        java.util.Arrays.asList(new AuthServerBootstrap(transportConf, securityManager))
      } else {
        java.util.Collections.emptyList()
      }
      // 创建TransportServer
    server = transportContext.createServer(bindAddress, port, bootstraps)
    dispatcher.registerRpcEndpoint(
      RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
}

所有的Executor也是基于Netty创建TransportServer,注册Executor的RpcEndpoint终端,收发邮件箱。值得一提的是都是发邮件通过TransportClient对象,Outbox.scala源码中:
Alt text 它会在发送消息的时候初始化远程连接。

6. 总体通信流程总结

Alt text