您好,登錄后才能下訂單哦!
[TOC]
? Scala的Actor有點類似于Java中的多線程編程。但是不同的是,Scala的Actor提供的模型與多線程有所不同。Scala的Actor盡可能地避免鎖和共享狀態,從而避免多線程并發時出現資源爭用的情況,進而提升多線程編程的性能。
Spark中使用的分布式多線程框架,是Akka,是Scala的一種多線程的類庫。Akka也實現了類似Scala Actor的模型,其核心概念同樣也是Actor。Scala Actor模型已經在2.1.0的時候還在用,但是在2.1.1的時候已經被遺棄了,Spark開始轉換用AKKA來替代Scala Actor,但是Scala Actor概念和原理都還是相同的。所以學習Scala Actor對我們學習AKKA,Spark還是有所幫助的
之所以學習Scala Actor,AKKA是因為在學習Spark源碼的時,我們能看懂Spark的源碼,因為在底層消息傳遞機制上大量使用AKKA的傳送機制。
在使用前,需要先引入maven依賴:
<!--scala actor-->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-actors</artifactId>
<version>2.10.5</version>
</dependency>
測試代碼如下:
package cn.xpleaf.bigdata.p5.myactor
import scala.actors.Actor
/**
* 學習scala actor的基本操作
* 和java中的Runnable Thread幾乎一致
*
* 第一步:編寫一個類,擴展特質trait Actor(scala 的actor)
* 第二步:復寫其中的act方法
* 第三步:創建該actor的對象,調用該對象的start()方法,啟動該線程
* 第四步:通過scala的操作符"!",發送消息
* 第五步:結束的話,調用close即可
*
* 模擬單向打招呼
*/
object ActorOps {
def main(args: Array[String]): Unit = {
val mFActor = new MyFirstActor()
mFActor.start()
// 發送消息
mFActor ! "小美,睡了嗎?"
mFActor ! "我去洗澡了~"
mFActor ! "呵呵"
}
}
class MyFirstActor extends Actor {
override def act(): Unit = {
while(true) {
receive {
case str: String => println(str)
}
}
}
}
輸出結果如下:
小美,睡了嗎?
我去洗澡了~
呵呵
測試代碼如下:
package cn.xpleaf.bigdata.p5.myactor
import scala.actors.Actor
/**
*
*/
object GreetingActor {
def main(args: Array[String]): Unit = {
val ga = new GreetingActor
ga.start()
ga ! Greeting("小美")
ga ! WorkContent("裝系統")
}
}
case class Greeting(name:String)
case class WorkContent(content:String)
class GreetingActor extends Actor {
override def act(): Unit = {
while(true) {
receive {
case Greeting(name) => println(s"Hello, $name")
case WorkContent(content) => println(s"Let's talk about sth. with $content")
}
}
}
}
輸出結果如下:
Hello, 小美
Let's talk about sth. with 裝系統
測試代碼如下:
package cn.xpleaf.bigdata.p5.myactor
import scala.actors.Actor
/**
* actor之線程間,互相通信
*
* studentActor
* 向老師問了一個問題
*
* teacherActor
* 向學生做回應
*
* 通信的協議:
* 請求,使用Request(內容)來表示
* 響應,使用Response(內容)來表示
*/
object _03CommunicationActorOps {
def main(args: Array[String]): Unit = {
val teacherActor = new TeacherActor()
teacherActor.start()
val studentActor = new StudentActor(teacherActor)
studentActor.start()
studentActor ! Request("老李啊,scala學習為什么這么難啊")
}
}
case class Request(req:String)
case class Response(resp:String)
class StudentActor(teacherActor: TeacherActor) extends Actor {
override def act(): Unit = {
while(true) {
receive {
case Request(req) => {
// 向老師請求相關的問題
println("學生向老師說:" + req)
teacherActor ! Request(req)
}
case Response(resp) => {
println(resp)
println("高!")
}
}
}
}
}
class TeacherActor() extends Actor {
override def act(): Unit = {
while (true) {
receive {
case Request(req) => { // 接收到學生的請求
sender ! Response("這個問題,需要如此搞定~")
}
}
}
}
}
輸出結果如下:
學生向老師說:老李啊,scala學習為什么這么難啊
這個問題,需要如此搞定~
高!
1、Scala在默認情況下,消息都是以異步進行發送的;但是如果發送的消息是同步的,即對方接受后,一定要給自己返回結果,那么可以使用!?的方式發送消息。即:
val response= activeActor !? activeMessage
2、如果要異步發送一個消息,但是在后續要獲得消息的返回值,那么可以使用Future。即!!語法,如下:
val futureResponse = activeActor !! activeMessage
val activeReply = future()
首先需要添加akka的maven依賴:
<!--akka actor-->
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.10</artifactId>
<version>2.3.16</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_2.10</artifactId>
<version>2.3.16</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-slf4j_2.10</artifactId>
<version>2.3.16</version>
</dependency>
原理如下:
package cn.xpleaf.bigdata.p5.myakka.p1
import akka.actor.{Actor, ActorSystem, Props}
import cn.xpleaf.bigdata.p5.myakka.MessageProtocol.{QuoteRequest, QuoteResponse}
import scala.util.Random
/**
* 基于AKKA Actor的單向通信案例
* 學生向老師發送請求
*/
object _01StudentActorOps {
def main(args: Array[String]): Unit = {
// 第一步:構建Actor操作系統
val actorSystem = ActorSystem("StudentActorSystem")
// 第二步:actorSystem創建TeacherActor的代理對象ActorRef
val teacherActorRef = actorSystem.actorOf(Props[TeacherActor])
// 第三步:發送消息
teacherActorRef ! QuoteRequest()
Thread.sleep(2000)
// 第四步:關閉
actorSystem.shutdown()
}
}
class TeacherActor extends Actor {
val quotes = List(
"Moderation is for cowards",
"Anything worth doing is worth overdoing",
"The trouble is you think you have time",
"You never gonna know if you never even try")
override def receive = {
case QuoteRequest() => {
val random = new Random()
val randomIndex = random.nextInt(quotes.size)
val randomQuote = quotes(randomIndex)
val response = QuoteResponse(randomQuote)
println(response)
}
}
}
后面akka通信的幾個測試程序都會使用到這個object,只在這里給出,后面不再給出。
package cn.xpleaf.bigdata.p5.myakka
/**
* akka actor通信協議
*/
object MessageProtocol {
case class QuoteRequest()
case class QuoteResponse(resp: String)
case class InitSign()
}
object Start extends Serializable
object Stop extends Serializable
trait Message {
val id: String
}
case class Shutdown(waitSecs: Int) extends Serializable
case class Heartbeat(id: String, magic: Int) extends Message with Serializable
case class Header(id: String, len: Int, encrypted: Boolean) extends Message with Serializable
case class Packet(id: String, seq: Long, content: String) extends Message with Serializable
輸出結果如下:
QuoteResponse(Anything worth doing is worth overdoing)
原理如下:
package cn.xpleaf.bigdata.p5.myakka.p2
import akka.actor.Actor
import cn.xpleaf.bigdata.p5.myakka.MessageProtocol.{QuoteRequest, QuoteResponse}
import scala.util.Random
/**
* Teacher Actor
*/
class TeacherActor extends Actor {
val quotes = List(
"Moderation is for cowards",
"Anything worth doing is worth overdoing",
"The trouble is you think you have time",
"You never gonna know if you never even try")
override def receive = {
case QuoteRequest() => {
val random = new Random()
val randomIndex = random.nextInt(quotes.size)
val randomQuote = quotes(randomIndex)
val response = QuoteResponse(randomQuote)
// println(response)
sender ! response
}
}
}
package cn.xpleaf.bigdata.p5.myakka.p2
import akka.actor.{Actor, ActorLogging, ActorRef}
import cn.xpleaf.bigdata.p5.myakka.MessageProtocol.{InitSign, QuoteRequest, QuoteResponse}
/**
* Student Actor
* 當學生接收到InitSign信號之后,便向老師發送一條Request請求的消息
*/
class StudentActor(teacherActorRef:ActorRef) extends Actor with ActorLogging {
override def receive = {
case InitSign => {
teacherActorRef ! QuoteRequest()
// println("student send request")
}
case QuoteResponse(resp) => {
log.info(s"$resp")
}
}
}
package cn.xpleaf.bigdata.p5.myakka.p2
import akka.actor.{ActorSystem, Props}
import cn.xpleaf.bigdata.p5.myakka.MessageProtocol.InitSign
object DriverApp {
def main(args: Array[String]): Unit = {
val actorSystem = ActorSystem("teacherStudentSystem")
// 老師的代理對象
val teacherActorRef = actorSystem.actorOf(Props[TeacherActor], "teacherActor")
// 學生的代理對象
val studentActorRef = actorSystem.actorOf(Props[StudentActor](new StudentActor(teacherActorRef)), "studentActor")
studentActorRef ! InitSign
Thread.sleep(2000)
actorSystem.shutdown()
}
}
輸出結果如下:
[INFO] [04/24/2018 10:02:19.932] [teacherStudentSystem-akka.actor.default-dispatcher-2] [akka://teacherStudentSystem/user/studentActor] Anything worth doing is worth overdoing
MyRemoteServerSideActor {
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 2552
}
}
}
}
MyRemoteClientSideActor {
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
}
}
package cn.xpleaf.bigdata.p5.myakka.p3
import akka.actor.{Actor, ActorLogging}
import cn.xpleaf.bigdata.p5.myakka.{Header, Shutdown, Start, Stop}
class RemoteActor extends Actor with ActorLogging {
def receive = {
case Start => { // 處理Start消息
log.info("Remote Server Start ==>RECV Start event : " + Start)
}
case Stop => { // 處理Stop消息
log.info("Remote Server Stop ==>RECV Stop event: " + Stop)
}
case Shutdown(waitSecs) => { // 處理Shutdown消息
log.info("Remote Server Shutdown ==>Wait to shutdown: waitSecs=" + waitSecs)
Thread.sleep(waitSecs)
log.info("Remote Server Shutdown ==>Shutdown this system.")
context.system.shutdown // 停止當前ActorSystem系統
}
case Header(id, len, encrypted) => log.info("Remote Server => RECV header: " + (id, len, encrypted)) // 處理Header消息
case _ =>
}
}
package cn.xpleaf.bigdata.p5.myakka.p3
import akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactory
object AkkaServerApplication extends App {
// 創建名稱為remote-system的ActorSystem:從配置文件application.conf中獲取該Actor的配置內容
val system = ActorSystem("remote-system",
ConfigFactory.load().getConfig("MyRemoteServerSideActor"))
val log = system.log
log.info("===>Remote server actor started: " + system)
// 創建一個名稱為remoteActor的Actor,返回一個ActorRef,這里我們不需要使用這個返回值
system.actorOf(Props[RemoteActor], "remoteActor")
}
package cn.xpleaf.bigdata.p5.myakka.p3
import akka.actor.SupervisorStrategy.Stop
import akka.actor.{Actor, ActorLogging}
import cn.xpleaf.bigdata.p5.myakka.{Header, Start}
class ClientActor extends Actor with ActorLogging {
// akka.<protocol>://<actor system>@<hostname>:<port>/<actor path>
val path = "akka.tcp://remote-system@127.0.0.1:2552/user/remoteActor" // 遠程Actor的路徑,通過該路徑能夠獲取到遠程Actor的一個引用
val remoteServerRef = context.actorSelection(path) // 獲取到遠程Actor的一個引用,通過該引用可以向遠程Actor發送消息
@volatile var connected = false
@volatile var stop = false
def receive = {
case Start => { // 發送Start消息表示要與遠程Actor進行后續業務邏輯處理的通信,可以指示遠程Actor初始化一些滿足業務處理的操作或數據
send(Start)
if (!connected) {
connected = true
log.info("ClientActor==> Actor connected: " + this)
}
}
case Stop => {
send(Stop)
stop = true
connected = false
log.info("ClientActor=> Stopped")
}
case header: Header => {
log.info("ClientActor=> Header")
send(header)
}
case (seq, result) => log.info("RESULT: seq=" + seq + ", result=" + result) // 用于接收遠程Actor處理一個Packet消息的結果
case m => log.info("Unknown message: " + m)
}
private def send(cmd: Serializable): Unit = {
log.info("Send command to server: " + cmd)
try {
remoteServerRef ! cmd // 發送一個消息到遠程Actor,消息必須是可序列化的,因為消息對象要經過網絡傳輸
} catch {
case e: Exception => {
connected = false
log.info("Try to connect by sending Start command...")
send(Start)
}
}
}
}
package cn.xpleaf.bigdata.p5.myakka.p3
import akka.actor.{ActorSystem, Props}
import cn.xpleaf.bigdata.p5.myakka.{Header, Start}
import com.typesafe.config.ConfigFactory
object AkkaClientApplication extends App {
// 通過配置文件application.conf配置創建ActorSystem系統
val system = ActorSystem("client-system",
ConfigFactory.load().getConfig("MyRemoteClientSideActor"))
val log = system.log
val clientActor = system.actorOf(Props[ClientActor], "clientActor") // 獲取到ClientActor的一個引用
clientActor ! Start // 發送一個Start消息,第一次與遠程Actor握手(通過本地ClientActor進行轉發)
Thread.sleep(2000)
clientActor ! Header("What's your name: Can you tell me ", 20, encrypted = false) // 發送一個Header消息到遠程Actor(通過本地ClientActor進行轉發)
Thread.sleep(2000)
}
服務端輸出結果如下:
[INFO] [04/24/2018 09:39:49.271] [main] [Remoting] Starting remoting
[INFO] [04/24/2018 09:39:49.508] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://remote-system@127.0.0.1:2552]
[INFO] [04/24/2018 09:39:49.509] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://remote-system@127.0.0.1:2552]
[INFO] [04/24/2018 09:39:49.517] [main] [ActorSystem(remote-system)] ===>Remote server actor started: akka://remote-system
[INFO] [04/24/2018 09:46:01.872] [remote-system-akka.actor.default-dispatcher-3] [akka.tcp://remote-system@127.0.0.1:2552/user/remoteActor] Remote Server Start ==>RECV Start event : cn.xpleaf.bigdata.p5.myakka.Start$@325737b3
[INFO] [04/24/2018 09:46:03.501] [remote-system-akka.actor.default-dispatcher-3] [akka.tcp://remote-system@127.0.0.1:2552/user/remoteActor] Remote Server => RECV header: (What's your name: Can you tell me ,20,false)
客戶端輸出結果如下:
[INFO] [04/24/2018 09:46:01.274] [main] [Remoting] Starting remoting
[INFO] [04/24/2018 09:46:01.479] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://client-system@192.168.43.132:2552]
[INFO] [04/24/2018 09:46:01.480] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://client-system@192.168.43.132:2552]
[INFO] [04/24/2018 09:46:01.493] [client-system-akka.actor.default-dispatcher-4] [akka.tcp://client-system@192.168.43.132:2552/user/clientActor] Send command to server: cn.xpleaf.bigdata.p5.myakka.Start$@4f00805d
[INFO] [04/24/2018 09:46:01.496] [client-system-akka.actor.default-dispatcher-4] [akka.tcp://client-system@192.168.43.132:2552/user/clientActor] ClientActor==> Actor connected: cn.xpleaf.bigdata.p5.myakka.p3.ClientActor@5a85b576
[INFO] [04/24/2018 09:46:03.490] [client-system-akka.actor.default-dispatcher-2] [akka.tcp://client-system@192.168.43.132:2552/user/clientActor] ClientActor=> Header
[INFO] [04/24/2018 09:46:03.491] [client-system-akka.actor.default-dispatcher-2] [akka.tcp://client-system@192.168.43.132:2552/user/clientActor] Send command to server: Header(What's your name: Can you tell me ,20,false)
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。