Tech Whims

Saprk3.x Journey of Discovery | Spark RPC 架构设计和Akka架构以及基于Spark RPC框架的通信代码演示

张晓龙 / 2023-05-05



1. Akka 架构和Spark RPC 架构

我们在上一篇文章中聊到 spark RPC 架构设计是参考 akka 架构设计的。那我们看下 Akka 是怎么样的?

1.1 Akka 是什么

Akka 是一个用 Scala 编写的库,用于在 JVM 平台上简化编写具有可容错的、高可伸缩性的 Java 和 Scala 的 Actor 模型应用。其同时提供了Java 和 Scala 的开发接口。

Akka 允许专注于满足业务需求,而不是编写初级代码。在 Akka 中,Actor 之间通信的唯一机制就是消息传递

Akka 对 Actor 模型的使用提供了一个抽象级别,使得编写正确的并发、并行和分布式系统更加容易。

更详细的可以看一下官网:akka.io

或者 YouTube The Power of Akka in 1 min:

Akka 系统中最重要的三个概念: ActorSystem、 Actor、 ActorRef

ActorSystem:管理通信角色 actor 的一个系统概念,在一个服务器节点中,只要存在一个这样的对象就可以,这个对象的作用,就是用来生成和管理所有的 通信角色的生命周期

Actor:存在于一台服务器中的一个 ActorSystem 的内部,用来和其他节点的 actor 进行通信。每个 Actor 都有一个 MailBox,别的 Actor 发送给它的消息都首先储存在 MailBox 中,通过这种方式可以实现异步通信。

Akka-system

Spark-1.x 版本中的应用程序执行的时候,会生成给一个 Driver 和 多个 Executor 。 它的内部就有两个 Actor: 1、DriverActor: 负责发送任务给其他的 worker 中的 executor 来执行的,作用和 Spark-2.x 版本中的 DriverEndpoint 是一样的 2、ClientActor: 负责和 master 进行通信,作用和 Spark-2.x 版本中的 ClientEndpoint 是一样的

2. Spark 集群的 RPC 通信组件实现

上面介绍可 Akka ,总结来看有以下三个特点

Spark1.x 版本RPC通信组件采用 Actor 模型,spark2.x 版本以后,RPC通信组件RpcEndpoint模型。

Spark-1.x 中,用户文件和 jar 包上传,采用 jetty 实现的 HttpFileServer 实现的,Spark-2.x 废弃了,现在使用基于 Spark 内置 RPC 框架 NettyStreamManager

Shuffle 过程 和 Block 数据复制和备份在 Spark-2.x 版本依然沿用 Netty,通过对接口和程序的重新设计,将各个组件间的消息互通,用户文件和 jar 包的上传也一并纳入 Spark 的 RPC 框架。

根据上一篇文章内容,进一步总结如下

我们提到Spark-2.x 基于 netty 的 RPC 框架借鉴了Akka 的设计,基于 Actor 模型,各个组件可以认为是一个个独立的实体,各个实体之间通过消息来进行通信。

可以类比为:

3. 使用 Akka 框架实现网络通信代码实例

我们使用 akka 框架实现一个简单的 spark rpc 通信案例。这个 demo 实现公共 4 个scala 文件。

可以建一个简单maven 工程,更多细节详见代码

3.1 pom 中重要的依赖,特别注意 scala-actor 和下面 akka 版本的匹配,否则运行不起来

<dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.8</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-actors</artifactId>
            <version>2.11.8</version>
        </dependency>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-actor_2.11</artifactId>
            <version>2.4.17</version>
        </dependency>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-remote_2.11</artifactId>
            <version>2.4.17</version>
        </dependency>
    </dependencies>

3.2 spark master 代码实现

package org.example;


import akka.actor.{Actor, ActorSystem, Props, actorRef2Scala}
import com.typesafe.config.ConfigFactory

import java.util.concurrent.TimeUnit
import scala.collection.mutable
import scala.concurrent.duration.FiniteDuration;

/**
 * @Author Allen Zhang
 * @Date 2023/5/4 17:12
 * @Description mocking Spark master rpc communication
 * 1.override receive method,achieve other actor msg, and match case process
 **/
class AkkaSparkMaster(var hostname: String, var port:Int) extends Actor {

    // store registered worker info to map
    private var id2AkkaSparkWorkerInfoMap = new mutable.HashMap[String, AkkaSparkWorkerInfo]()

    // execute once when actor start first
    override def preStart(): Unit = {
        import context.dispatcher

        context.system.scheduler.schedule(new FiniteDuration(0,TimeUnit.MILLISECONDS), new FiniteDuration(5000,TimeUnit
          .MILLISECONDS), self, CheckTimeOut)
    }

    // core!!!
    override def receive: Receive = {
          // receive register msg
        case RegisterAkkaSparkWorker(workerId,memory,cpu) => {
            val akkaSparkWorkerInfo = new AkkaSparkWorkerInfo(workerId, memory, cpu)
            println(s"node ${workerId} online!")

            id2AkkaSparkWorkerInfoMap.put(workerId, akkaSparkWorkerInfo)

            // store msg info to zk(maybe)
            sender() ! RegisteredAkkaSparkWorker(hostname + ":" + port)
        }

        // receive heartbeat msg
        case HeartBeat(workerId) => {
            val currentTime = System.currentTimeMillis()
            val akkaSparkWorkerInfo = id2AkkaSparkWorkerInfoMap(workerId)

            akkaSparkWorkerInfo.lastHeartBeatTime = currentTime

            id2AkkaSparkWorkerInfoMap(workerId) = akkaSparkWorkerInfo
        }

        //receive checkout signal : check invalid nodeManager since 15s expire
        case CheckTimeOut => {
            val currentTime = System.currentTimeMillis()

            var sparkWorkerInfoSet = id2AkkaSparkWorkerInfoMap.values.toSet
            sparkWorkerInfoSet.filter(workerInfo => {
                val heartbeatTimeOut = 15000
                val bool = currentTime - workerInfo.lastHeartBeatTime > heartbeatTimeOut
                if (bool) {
                    println(s"${workerInfo.workerId} offline")
                }
                bool
            }).foreach(deadWorker =>{
                sparkWorkerInfoSet -= deadWorker
                id2AkkaSparkWorkerInfoMap.remove(deadWorker.workerId)
            })

            println("Current registered node count : " + sparkWorkerInfoSet.size + " node info : " +
              sparkWorkerInfoSet.map(x=>x.toString).mkString(","))
        }
    }
}

object AkkaResourceManager {

    def main(args: Array[String]):Unit ={
        val str =
            """
              |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
              |akka.remote.netty.tcp.hostname = localhost
              |akka.remote.netty.tcp.port = 5678
              """.stripMargin
        val conf = ConfigFactory.parseString(str)

        print(conf)

        val actorSystem = ActorSystem(Constant.SMAS, conf)

        // create a actor named :  SparkMasterActor, and start
        actorSystem.actorOf(Props(new AkkaSparkMaster("localhost",5678)),Constant.SMA)
    }
}

3.3 spark worker 代码实现

package org.example

import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

import java.util.concurrent.TimeUnit
import scala.concurrent.duration.FiniteDuration

class AkkaSparkWorker(val workerHostName:String, val masterHostName: String, val masterPort: Int, val memory: Int,
val cpu:Int) extends Actor{

  var sparkWorkerId:String = workerHostName
  var sparkMasterRef:ActorSelection=_

  // execute when start,
  // register nm to rm
  override def preStart(): Unit = {
    sparkMasterRef = context.actorSelection(s"akka.tcp://${Constant.SMAS}@${masterHostName}:${masterPort}/user/${Constant.SMA}")

    // send message
    println(sparkWorkerId +" start register!")
    sparkMasterRef ! RegisterAkkaSparkWorker(workerId = sparkWorkerId, memory = memory, cpu = cpu)
  }

  // core service!
  override def receive: Receive = {
    case RegisteredAkkaSparkWorker(masterUrl)=>{
      println("masterUrl : " + masterUrl)

      import context.dispatcher
      // receiver : self , actorRef
      // message: SendMessage
      context.system.scheduler.schedule(new FiniteDuration(0,TimeUnit.MILLISECONDS),new FiniteDuration(5000,TimeUnit
        .MILLISECONDS),self,SendMessage)
    }
    case SendMessage=>{
      sparkMasterRef ! HeartBeat(workerId = sparkWorkerId)
      println("current thread id : " + Thread.currentThread().getId)
    }
  }
}

object SparkWorker{

  def main(args: Array[String]): Unit = {
    val remoteHostName = args(0)

    val sparkMasterHostname= args(1)
    val sparkMasterPort = args(2).toInt

    val sparkWorkerMemory = args(3).toInt
    val sparkWorkerCores = args(4).toInt

    val sparkWorkerPort = args(5).toInt
    val sparkWorkerHostname = args(6)

    val str =
      """
        |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
        |akka.remote.netty.tcp.hostname = "localhost"
        |akka.remote.netty.tcp.port = 6789
        """.stripMargin

    val conf = ConfigFactory.parseString(str)

    print(conf)
    val actorSystem = ActorSystem(Constant.SWAS, conf)

    actorSystem.actorOf(Props(new AkkaSparkWorker(sparkWorkerHostname,sparkMasterHostname,sparkMasterPort,
      sparkWorkerMemory,sparkWorkerCores)),Constant.SWA)
  }

}

3.4 Constant 常量

package org.example

// message class
object Constant {
  val SMAS = "SparkMasterActorSystem"
  val SMA = "SparkMasterActor"
  val SWAS = "SparkWorkerActorSystem"
  val SWA = "SparkWorkerActor"
}

3.5 启动运行spark master 、 spark worker 并且进行通信

1、spark master启动运行,并且在端口 5678 运行

[INFO] [05/04/2023 22:59:08.451] [main] [akka.remote.Remoting] Starting remoting
[INFO] [05/04/2023 22:59:08.624] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://SparkMasterActorSystem@localhost:5678]
[INFO] [05/04/2023 22:59:08.625] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://SparkMasterActorSystem@localhost:5678]
Current registered node count : 0 node info : 

2、Spark worker 启动运行,需要设置运行参数localhost localhost 5678 512 32 6789 worker1 , 启动运行

[INFO] [05/04/2023 23:01:15.870] [main] [akka.remote.Remoting] Starting remoting
[INFO] [05/04/2023 23:01:16.026] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://SparkWorkerActorSystem@localhost:6789]
[INFO] [05/04/2023 23:01:16.027] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://SparkWorkerActorSystem@localhost:6789]
worker1 start register!
[WARN] [SECURITY][05/04/2023 23:01:16.296] [SparkWorkerActorSystem-akka.remote.default-remote-dispatcher-13] [akka.serialization.Serialization(akka://SparkWorkerActorSystem)] Using the default Java serializer for class [org.example.RegisterAkkaSparkWorker] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting 'akka.actor.warn-about-java-serializer-usage'
masterUrl : localhost:5678
current thread id : 22
[WARN] [SECURITY][05/04/2023 23:01:16.388] [SparkWorkerActorSystem-akka.remote.default-remote-dispatcher-5] [akka.serialization.Serialization(akka://SparkWorkerActorSystem)] Using the default Java serializer for class [org.example.HeartBeat] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting 'akka.actor.warn-about-java-serializer-usage'
current thread id : 22

这时候 worker 和 master 进行通信,每 5s 向 master 汇报一次message

Current registered node count : 0 node info : 
node worker1 online!
[WARN] [SECURITY][05/04/2023 23:01:16.367] [SparkMasterActorSystem-akka.remote.default-remote-dispatcher-17] [akka.serialization.Serialization(akka://SparkMasterActorSystem)] Using the default Java serializer for class [org.example.RegisteredAkkaSparkWorker] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting 'akka.actor.warn-about-java-serializer-usage'
Current registered node count : 1 node info : worker1,512,32
Current registered node count : 1 node info : worker1,512,32
Current registered node count : 1 node info : worker1,512,32
Current registered node count : 1 node info : worker1,512,32

手动杀掉 worker 后,master 显示节点下线

Current registered node count : 1 node info : worker1,512,32
[WARN] [05/04/2023 23:11:21.605] [SparkMasterActorSystem-akka.remote.default-remote-dispatcher-13] [akka.tcp://SparkMasterActorSystem@localhost:5678/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FSparkWorkerActorSystem%40localhost%3A6789-0] Association with remote system [akka.tcp://SparkWorkerActorSystem@localhost:6789] has failed, address is now gated for [5000] ms. Reason: [Disassociated] 
Current registered node count : 1 node info : worker1,512,32
Current registered node count : 1 node info : worker1,512,32
worker1 offline
Current registered node count : 0 node info : 

至此,基于 akka 的 Spark RPC 通信案例完毕!

4. 使用 Spark 的 RPC 框架实现网络通信代码案例

Spark-2.x 基于 netty 的 RPC 框架借鉴了Akka 的设计,基于 Actor 模型,各个组件可以认为是一个个独立的实体,各个实体之间通过消息来进行通信。

Spark RPC 结构 :RpcEnv + RpcEndpoint + RpcEndpointRef (对比 akka:ActorSystem + Actor + ActorRef)

我们设计一个简单利用 spark rpc 通信的例子,大致的结构:

该例子是 maven 工程,包含以下几个重要的文件

4.1 pom 重要依赖

   <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.4.6</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-network-common -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-network-common_2.13</artifactId>
            <version>3.2.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.4.6</version>
        </dependency>
    </dependencies>

4.2 SparkRPC Server code

package org.apache.spark

import org.apache.spark.rpc.{RpcEndpoint, RpcEnv}
import org.apache.spark.sql.SparkSession

// Spark RPC server
object SparkRpcServerMain {

  def main(array: Array[String]): Unit= {

    val conf: SparkConf = new SparkConf()

    val sparkSession = SparkSession.builder().config(conf).master("local[*]").appName("Say Hi RPC").getOrCreate()
    val sparkContext: SparkContext = sparkSession.sparkContext
    val sparkEnv: SparkEnv = sparkContext.env

    /**
     * def create(
     * name: String,
     * bindAddress: String,
     * advertiseAddress: String,
     * port: Int,
     * conf: SparkConf,
     * securityManager: SecurityManager,
     * numUsableCores: Int,
     * clientMode: Boolean): RpcEnv
     */
    val rpcEnv = RpcEnv.create(SayHiSettings.getName(), SayHiSettings.getHostname(), SayHiSettings.getHostname(),
      SayHiSettings.getPort(), conf, sparkEnv.securityManager, 1, false)

    val saiHiEndpoint: RpcEndpoint = new SayHiEndpoint(rpcEnv)

    rpcEnv.setupEndpoint(SayHiSettings.getName(), saiHiEndpoint)

    rpcEnv.awaitTermination()
  }
}

4.3 SparkRPC Client code

package org.apache.spark

import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv}
import org.apache.spark.sql.SparkSession

import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}

// Spark RPC server
object SparkRpcClientMain {

  def main(array: Array[String]): Unit= {

    val conf: SparkConf = new SparkConf()

    val sparkSession = SparkSession.builder().config(conf).master("local[*]").appName("Say Hi to RPC Server")
      .getOrCreate()

    val sparkContext: SparkContext = sparkSession.sparkContext

    val sparkEnv: SparkEnv = sparkContext.env

    /**
    def create(
      name: String,
      host: String,
      port: Int,
      conf: SparkConf,
      securityManager: SecurityManager,
      clientMode: Boolean = false)
     */
    val rpcEnv = RpcEnv.create(SayHiSettings.getName(), SayHiSettings.getHostname(),
      SayHiSettings.getPort(), conf, sparkEnv.securityManager,false)

    val endpointRef:RpcEndpointRef = rpcEnv.setupEndpointRef(RpcAddress(SayHiSettings.getHostname(),SayHiSettings
      .getPort()), SayHiSettings.getName())

    // async send
    endpointRef.send(SayHi("hi , i am client allen."))

    import scala.concurrent.ExecutionContext.Implicits.global
    // ask , response value before time out
    val future:Future[String] = endpointRef.ask[String](SayHi(s"ask: hi , i am client allen."))
    future.onComplete {
      case scala.util.Success(value) => println(s"get msg form server : ${value}")
      case scala.util.Failure(exception) => println(s"get msg form server error : ${exception}")
    }

    Await.result(future,Duration.apply("30s"))

    val res = endpointRef.askSync[String](SayBye("i am allen ,and bye!"))

    println(res)

    sparkSession.stop()
  }
}

4.4 SparkRPC endpoint 实现

package org.apache.spark

import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEnv}

// service core
class SayHiEndpoint(override val rpcEnv: RpcEnv) extends RpcEndpoint {

  override def onStart(): Unit = {
    println(rpcEnv.address)
    println("start SayHiEndpoint")
  }

  override def receive: PartialFunction[Any, Unit] = {
    case SayHi(msg) => println(s"Receive msg : ${msg}")
  }

  override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    case SayHi(msg) => {
      println(s"Receive msg : ${msg}")
      context.reply(s"i am SayHi Server,${msg}")
    }

    case SayBye(msg) => {
      println(s"Receive msg : ${msg}")
      context.reply(s"i am SayHi Server,${msg}")
    }
  }

  override def onStop():Unit = {
    println("Stop SayHiEndpoint")
  }
}

case class SayHi(msg: String)

case class SayBye(msg: String)

4.5 Spark Common Utils

package org.apache.spark

object SayHiSettings {

  val rpcName: String = "say-hi-service"
  val port: Int = 5678
  val hostname: String = "localhost"

  def getName() = {
    rpcName
  }

  def getPort()={
    port
  }

  def getHostname()={
    hostname
  }

  override def toString = s"SayHiSettings($rpcName, $port, $hostname)"
}

4.6 运行实例

运行代码之前特别注意的点 : 代码的package名字,必须为org.xxx.spark,否则引入的 RpcEndpoint、RpcEnv等等不能引用(源于源码package限制)

1、启动 sparkRpcServer

23/05/05 12:17:51 INFO SparkContext: Running Spark version 2.4.6
23/05/05 12:17:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/05/05 12:17:52 INFO SparkContext: Submitted application: Say Hi RPC
23/05/05 12:17:52 INFO SecurityManager: Changing view acls to: wdzxl198
23/05/05 12:17:52 INFO SecurityManager: Changing modify acls to: wdzxl198
23/05/05 12:17:52 INFO SecurityManager: Changing view acls groups to: 
23/05/05 12:17:52 INFO SecurityManager: Changing modify acls groups to: 
23/05/05 12:17:52 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(wdzxl198); groups with view permissions: Set(); users  with modify permissions: Set(wdzxl198); groups with modify permissions: Set()
23/05/05 12:17:52 INFO Utils: Successfully started service 'sparkDriver' on port 63840.
23/05/05 12:17:52 INFO SparkEnv: Registering MapOutputTracker
23/05/05 12:17:52 INFO SparkEnv: Registering BlockManagerMaster
23/05/05 12:17:52 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
23/05/05 12:17:52 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
23/05/05 12:17:52 INFO DiskBlockManager: Created local directory at /private/var/folders/lr/dgy9yr3s759_6cpp7k28k2300000gq/T/blockmgr-e1d0eb47-0f5d-4169-af02-942649ebf366
23/05/05 12:17:52 INFO MemoryStore: MemoryStore started with capacity 4.1 GB
23/05/05 12:17:52 INFO SparkEnv: Registering OutputCommitCoordinator
23/05/05 12:17:52 INFO Utils: Successfully started service 'SparkUI' on port 4040.
23/05/05 12:17:52 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://172.xx.xx.xx:4040
23/05/05 12:17:53 INFO Executor: Starting executor ID driver on host localhost
23/05/05 12:17:53 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 63841.
23/05/05 12:17:53 INFO NettyBlockTransferService: Server created on 172.xx.xx.xx:63841
23/05/05 12:17:53 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
23/05/05 12:17:53 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 172.xx.xx.xx, 63841, None)
23/05/05 12:17:53 INFO BlockManagerMasterEndpoint: Registering block manager 172.xx.xx.xx:63841 with 4.1 GB RAM, BlockManagerId(driver, 172.xx.xx.xx, 63841, None)
23/05/05 12:17:53 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 172.xx.xx.xx, 63841, None)
23/05/05 12:17:53 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 172.xx.xx.xx, 63841, None)
23/05/05 12:17:53 INFO Utils: Successfully started service 'say-hi-service' on port 5678.
localhost:5678
start SayHiEndpoint

2、启动 sparkRpcClient

23/05/05 12:19:23 INFO SparkContext: Running Spark version 2.4.6
23/05/05 12:19:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/05/05 12:19:23 INFO SparkContext: Submitted application: Say Hi to RPC Server
23/05/05 12:19:23 INFO SecurityManager: Changing view acls to: wdzxl198
23/05/05 12:19:23 INFO SecurityManager: Changing modify acls to: wdzxl198
23/05/05 12:19:23 INFO SecurityManager: Changing view acls groups to: 
23/05/05 12:19:23 INFO SecurityManager: Changing modify acls groups to: 
23/05/05 12:19:23 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(wdzxl198); groups with view permissions: Set(); users  with modify permissions: Set(wdzxl198); groups with modify permissions: Set()
23/05/05 12:19:24 INFO Utils: Successfully started service 'sparkDriver' on port 63877.
23/05/05 12:19:24 INFO SparkEnv: Registering MapOutputTracker
23/05/05 12:19:24 INFO SparkEnv: Registering BlockManagerMaster
23/05/05 12:19:24 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
23/05/05 12:19:24 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
23/05/05 12:19:24 INFO DiskBlockManager: Created local directory at /private/var/folders/lr/dgy9yr3s759_6cpp7k28k2300000gq/T/blockmgr-c478391a-1fe2-4433-a204-5b225d360a3e
23/05/05 12:19:24 INFO MemoryStore: MemoryStore started with capacity 4.1 GB
23/05/05 12:19:24 INFO SparkEnv: Registering OutputCommitCoordinator
23/05/05 12:19:24 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/05/05 12:19:24 INFO Utils: Successfully started service 'SparkUI' on port 4041.
23/05/05 12:19:24 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://172.xx.xx.xx:4041
23/05/05 12:19:24 INFO Executor: Starting executor ID driver on host localhost
23/05/05 12:19:24 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 63878.
23/05/05 12:19:24 INFO NettyBlockTransferService: Server created on 172.xx.xx.xx:63878
23/05/05 12:19:24 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
23/05/05 12:19:24 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 172.xx.xx.xx, 63878, None)
23/05/05 12:19:24 INFO BlockManagerMasterEndpoint: Registering block manager 172.xx.xx.xx:63878 with 4.1 GB RAM, BlockManagerId(driver, 172.xx.xx.xx, 63878, None)
23/05/05 12:19:24 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 172.xx.xx.xx, 63878, None)
23/05/05 12:19:24 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 172.xx.xx.xx, 63878, None)
23/05/05 12:19:24 WARN Utils: Service 'say-hi-service' could not bind on port 5678. Attempting port 5679.
23/05/05 12:19:24 INFO Utils: Successfully started service 'say-hi-service' on port 5679.
23/05/05 12:19:24 INFO TransportClientFactory: Successfully created connection to localhost/127.0.0.1:5678 after 45 ms (0 ms spent in bootstraps)

3、进行通信测试

server端:

Receive msg : hi , i am client allen.
Receive msg : ask: hi , i am client allen.
Receive msg : i am allen ,and bye!

client 端:

get msg form server : i am SayHi Server,ask: hi , i am client allen.
i am SayHi Server,i am allen ,and bye!

至此,利用 spark 自己实现的 SparkRPC 框架进行通信演示完毕。