源码之组件通信
1. 通信框架的发展
Spark早期版本中采用Akka作为内部通信部件。
➢ Spark1.3中引入Netty通信框架,为了解决Shuffle的大数据传输问题使用
➢ Spark1.6中Akka和Netty可以配置使用。Netty完全实现了Akka在Spark中的功能。
➢ Spark2系列中,Spark抛弃Akka,使用Netty。
Netty介绍
Netty是一个基于AIO的通信框架,在Linux中对AIO支持不够好,Windows原生支持AIO, 在Linux中底层使用Epllo方式模仿AIO操作。
2. Spark中通信设计
Spark2.x版本使用Netty通讯框架作为内部通讯组件。Spark基于Netty新的RPC框架。借鉴了Akka的中的设计,它是基于Actor模型,如下图所示: Spark通讯框架中各个组件(Client/Master/Worker)可以认为是一个个独立的实体,各个实体之间通过消息来进行通信。具体各个组件之间的关系图如下:
Endpoint(Client/Master/Worker)有1个InBox和N个OutBox(N>=1,N 取决于当前Endpoint与多少其他的Endpoint进行通信,一个与其通讯的其他Endpoint对应一个OutBox),Endpoint接收到的消息被写入InBox,发送出去的消息写入OutBox并被发送到其他Endpoint的InBox中。
Driver: class DriverEndpoint extends IsolatedRpcEndpoint
Executor: class CoarseGrainedExecutorBackend extends IsolatedRpcEndpoint
3. Spark通讯架构
- RpcEndpoint:RPC通信终端。Spark针对每个节点(Client/Master/Worker)都称之为一个RPC终端,且都实现 RpcEndpoint接口,内部根据不同端点的需求,设计不同的消息和不同的业务处理,如果需要发送(询问)则调用 Dispatcher。在Spark中,所有的终端都存在生命周期:Constructor->onStart->receive*->onStop
- RpcEnv:RPC上下文环境,每个RPC终端运行时依赖的上下文环境称为RpcEnv;在把当前Spark版本中使用的是NettyRpcEnv。
- Dispatcher:消息调度(分发)器,针对于RPC终端需要发送远程消息或者从远程RPC接收到的消息,分发至对应的指令收件箱(发件箱)。如果指令接收方是自己则存入收件箱,如果指令接收方不是自己,则放入发件箱;
- Inbox:指令消息收件箱。一个本地RpcEndpoint对应一个收件箱,Dispatcher在每次向Inbox存入消息时,都将对应EndpointData加入内部ReceiverQueue中,另外Dispatcher创建时会启动一个单独线程进行轮询ReceiverQueue,进行收件箱消息消费;
- RpcEndpointRef:RpcEndpointRef是对远程RpcEndpoint的一个引用。当我们需要向一个具体的RpcEndpoint发送消息时,一般我们需要获取到该RpcEndpoint的引用,然后通过该应用发送消息。
- OutBox:指令消息发件箱。对于当前RpcEndpoint来说,一个目标RpcEndpoint对应一个发件箱,如果向多个目标RpcEndpoint发送信息,则有多个OutBox。当消息放入Outbox后,紧接着通过TransportClient将消息发送出去。消息放入发件箱以及发送过程是在同一个线程中进行;
- RpcAddress:表示远程的RpcEndpointRef的地址,Host + Port。
- TransportClient:Netty通信客户端,一个OutBox对应一个TransportClient,TransportClient不断轮询 OutBox,根据OutBox消息的receiver信息,请求对应的远程TransportServer;
- TransportServer:Netty通信服务端,一个RpcEndpoint对应一个TransportServer,接受远程消息后调用 Dispatcher分发消息至对应收发件箱;
4. 源码之创建Driver端通信
点击SparkContxt.scala, SparkContext有一个_env属性: _env通过createSparkEnv()方法初始化:
createSparkEnv()方法里面就有创建Driver端环境的方法:
在SparkEnv.scala中查看createDriverEnv():
private[spark] def createDriverEnv(
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus,
numCores: Int,
sparkContext: SparkContext,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
.......// 省略地址端口等信息逻辑
create(
conf,
SparkContext.DRIVER_IDENTIFIER,
bindAddress,
advertiseAddress,
Option(port),
isLocal,
numCores,
ioEncryptionKey,
listenerBus = listenerBus,
Option(sparkContext),
mockOutputCommitCoordinator = mockOutputCommitCoordinator
)
}
点击进入create()方法: 点击RPCEnv的create()方法,可以看到底层就是NettyRpcEnvFactory创建的NettyRpcEnv对象:
def create(
name: String,
bindAddress: String,
advertiseAddress: String,
port: Int,
conf: SparkConf,
securityManager: SecurityManager,
numUsableCores: Int,
clientMode: Boolean): RpcEnv = {
val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager,
numUsableCores, clientMode)
new NettyRpcEnvFactory().create(config)
}
点击NettyRpcEnvFactory的create()方法: 实际执行nettyenv.startServer()方法,进入startServer()方法
def startServer(bindAddress: String, port: Int): Unit = {
val bootstraps: java.util.List[TransportServerBootstrap] =
if (securityManager.isAuthenticationEnabled()) {
java.util.Arrays.asList(new AuthServerBootstrap(transportConf, securityManager))
} else {
java.util.Collections.emptyList()
}
// 创建TransportServer
server = transportContext.createServer(bindAddress, port, bootstraps)
dispatcher.registerRpcEndpoint(
RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
}
进入createServer()方法
public TransportServer createServer(
String host, int port, List<TransportServerBootstrap> bootstraps) {
return new TransportServer(this, host, port, rpcHandler, bootstraps);
}
进入TransportServer的构造函数:
public TransportServer(
TransportContext context,
String hostToBind,
int portToBind,
RpcHandler appRpcHandler,
List<TransportServerBootstrap> bootstraps) {
.......
boolean shouldClose = true;
try {
init(hostToBind, portToBind);
shouldClose = false;
} finally {
if (shouldClose) {
JavaUtils.closeQuietly(this);
}
}
}
点击进入init()方法:
private void init(String hostToBind, int portToBind) {
IOMode ioMode = IOMode.valueOf(conf.ioMode());
EventLoopGroup bossGroup = NettyUtils.createEventLoop(ioMode, 1,
conf.getModuleName() + "-boss");
EventLoopGroup workerGroup = NettyUtils.createEventLoop(ioMode, conf.serverThreads(),
conf.getModuleName() + "-server");
bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
// 指定AIO模式
.channel(NettyUtils.getServerChannelClass(ioMode))
.option(ChannelOption.ALLOCATOR, pooledAllocator)
.option(ChannelOption.SO_REUSEADDR, !SystemUtils.IS_OS_WINDOWS)
.childOption(ChannelOption.ALLOCATOR, pooledAllocator);
......// 底层启动Socket通信服务
InetSocketAddress address = hostToBind == null ?
new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind);
channelFuture = bootstrap.bind(address);
channelFuture.syncUninterruptibly();
InetSocketAddress localAddress = (InetSocketAddress) channelFuture.channel().localAddress();
port = localAddress.getPort();
logger.debug("Shuffle server started on {} with port {}", localAddress.getHostString(), port);
}
点击进入NettyUtils.getServerChannelClass()方法,可以看到底层Netty就只支持NIO和EPOLL两种模式:
public static Class<? extends ServerChannel> getServerChannelClass(IOMode mode) {
switch (mode) {
case NIO:
return NioServerSocketChannel.class;
case EPOLL:
return EpollServerSocketChannel.class;
default:
throw new IllegalArgumentException("Unknown io mode: " + mode);
}
}
看了Netty底层实现之后,回到NettyRpcEnv.scala, 现在TransportServer有了,它会注册一个RpcEndpoint(终端): RpcEndpoint是用来干嘛的呢,可以看到它里面主要的方法就是receive()和receiveAndReply()接收消息。
除此之外可以看到self属性是RpcEndpointRef类型,相比RpcEndpoint多了Ref,查看里面的方法:
里面主要就是send()和ask()方法,也就是发送消息。话说回来点击进入registerRpcEndpoint()方法中,可以看到看到里面有一个同步代码块会被执行:
它会匹配RpcEndpoint(终端)类型,我们的终端就是IsolatedRpcEndpoint的子类,默认会创建一个DedicatedMessageLoop(消息处理类)
里面含有属性inbox就是消息的收件箱。其实Driver端也有发件箱,而且是Map结构,每个终端对应一个发件箱,在NettyRpcEnv类中:
发件箱名叫outboxes, key是外部终端的地址,value是Outbox类。这样Driver端启动Netty后有收发方法和收发件箱,功能就完整了。
5. 源码之创建Executor端通信
Executor端通信和Driver端基本创建过程类似。 在CoarseGrainedExecutorBackend.scala中的run方法中: 点击进入createExecutorEnv()方法:
private[spark] def createExecutorEnv(
conf: SparkConf,
executorId: String,
bindAddress: String,
hostname: String,
numCores: Int,
ioEncryptionKey: Option[Array[Byte]],
isLocal: Boolean): SparkEnv = {
val env = create(
conf,
executorId,
bindAddress,
hostname,
None,
isLocal,
numCores,
ioEncryptionKey
)
SparkEnv.set(env)
env
}
进入create()方法,下面就和Driver端源码就一样了,都是调用RpcEnv.create()方法 进入RpcEnv.scala的create()方法
点击NettyRpcEnvFactory的create()方法:
一样执行nettyenv.startServer()方法,进入startServer()方法
def startServer(bindAddress: String, port: Int): Unit = {
val bootstraps: java.util.List[TransportServerBootstrap] =
if (securityManager.isAuthenticationEnabled()) {
java.util.Arrays.asList(new AuthServerBootstrap(transportConf, securityManager))
} else {
java.util.Collections.emptyList()
}
// 创建TransportServer
server = transportContext.createServer(bindAddress, port, bootstraps)
dispatcher.registerRpcEndpoint(
RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
}
所有的Executor也是基于Netty创建TransportServer,注册Executor的RpcEndpoint终端,收发邮件箱。值得一提的是都是发邮件通过TransportClient对象,Outbox.scala源码中: 它会在发送消息的时候初始化远程连接。