640.47K
Категория: ПрограммированиеПрограммирование

Технология программирования. Акторы

1.

Технология
программирования
Акторы

2.

Модель акторов
Использование модели Акторов позволяет решать проблемы параллельных
вычислений
Свойства:
Управляемая событиями модель – акторы выполняют работу в ответ на
сообщения. Взаимодействие между акторами асинхронное, позволяет
акторам посылать сообщения и продолжать свою работу без блокировки для
ожидания ответа.
Принцип строгой изоляции – в отличие от обычных объектов Scala, актор
не имеет публичного API в терминах методов, которые могут быть вызываны.
Публичный API определен через сообщения, которые обрабатываются
акторами. Это препятствует любому разделению состояния между акторами.
Единственный способ наблюдения состояния другого актора – послать
сообщение, запрашивающее его об этом.
Прозрачность расположения – система конструирует акторв при помощи
фабрики и возвращает ссылки на экземпляры. Расположение не имеет
значения, поэтому экземпляр актора может запускаться, останавливаться,
перезапускаться, восстанавливаться из сбойного состояния.
Легковесность – каждый экземпляр использует только несколько сотен
байтов, которые делают возможным существование миллионов
параллельных акторов в одном приложении.

3.

Модель акторов
Актор ничего не делает, пока не получит сообщение. Акторы
взаимодействуют используя асинхронные сообщения. Отправитель кладет
сообщение в почтовый ящик получателя и далее выполняет свою работу.
Почтовый ящик актора является упорядоченной очередью. Очередность
получения многочисленных сообщений от одного актора остается
неизменной, но может перемежаться с сообщениями от других акторов.
Когда он находится в приостановленном состоянии, то не потребляет никаких
ресурсов кроме памяти.

4.

Пример. Множественные акторы

5.

Пример. Множественные акторы
package com.lightbend.akka.sample
import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props}
// Компаньон для приветствующего
object Greeter {
// Типовой метод создания экземпляров актора, возвращает ссылку.
Физически актор может быть в том же процессе,
// а может быть удаленным.
def props(message: String, printerActor: ActorRef): Props = Props(new
Greeter(message, printerActor))
// Классы для принимаемых сообщений:
// Подготовить сообщение с заданной строкой
final case class WhoToGreet(who: String)
// Отправить ранее подготовленное приветствие
case object Greet
}

6.

Пример. Множественные акторы
// Класс акторов для посылки сообщений, начинающихся текстом message актору
printerActor
// Актор имеет состояние.
// Актор выполняется в одном потоке - потокобезопасен.
class Greeter(message: String, printerActor: ActorRef) extends Actor {
var greeting = ""
// Метод получения и обработки сообщений из почтового ящика
def receive = {
case Greeter.WhoToGreet(who) =>
// сформировать и запомнить сообщение со своим полем message и указанием
кому who из полученного сообщения
greeting = s"$message, $who"
case Greeter.Greet =>
// послать ранее сформированное сообщение в виде объекта "понятного"
целевому актору
printerActor ! Printer.Greeting(greeting)
}
}

7.

Пример. Множественные акторы
// Компаньон для Принтера
object Printer {
// Типовой метод создания актора
def props: Props = Props[Printer]
// Класс объектов для входного сообщения для вывода приветствия
final case class Greeting(greeting: String)
}
// Актор для вывода информации. В данном случе в журнал.
class Printer extends Actor with ActorLogging {
// Метод получения и обработки сообщений из почтового ящика
def receive = {
// Актор "понимает" только такие сообщения (см. компаньона):
case Printer.Greeting(greeting) =>
// sender() возвращает отправителя последнего сообщения. Текст - из самого
сообщения
log.info(s"Greeting received (from ${sender()}): $greeting")
}
}

8.

Пример. Множественные акторы
// Главный класс - приложение
object AkkaQuickstart extends App {
// Создать систему акторов 'helloAkka' - контейнер для акторов, управляющий их
жизненным циклом.
val system: ActorSystem = ActorSystem("helloAkka")
// Создать экземпляры акторов
// Создать экземпляр актора Printer.
// Системе передается конфигурационный объект и имя актора (учитывается в
системе и должно быть уникальным).
val printer: ActorRef = system.actorOf(Printer.props, "printerActor")
// Создать несколько акторов класса Greeter с разными началами сообщений
val howdyGreeter: ActorRef =
system.actorOf(Greeter.props("Howdy", printer), "howdyGreeter")
val helloGreeter: ActorRef =
system.actorOf(Greeter.props("Hello", printer), "helloGreeter")
val goodDayGreeter: ActorRef =
system.actorOf(Greeter.props("Good day", printer), "goodDayGreeter ")

9.

Пример. Множественные акторы
// послать сообщения - поместить объекты в "почтовые ящики". ! - это метод ссылки
на актора.
howdyGreeter ! Greeter.WhoToGreet("Akka")
howdyGreeter ! Greeter.Greet
howdyGreeter ! Greeter.WhoToGreet("Lightbend")
howdyGreeter ! Greeter.Greet
helloGreeter ! Greeter.WhoToGreet("Scala")
helloGreeter ! Greeter.Greet
goodDayGreeter ! Greeter.WhoToGreet("Play")
goodDayGreeter ! Greeter.Greet
}[INFO] [12/16/2022 04:11:06.205] [helloAkka-akka.actor.default-dispatcher-4] [akka://helloAkka/user/printerActor] Greeting received (from
Actor[akka://helloAkka/user/helloGreeter#-1014516836]): Hello, Scala
[INFO] [12/16/2022 04:11:06.207] [helloAkka-akka.actor.default-dispatcher-4] [akka://helloAkka/user/printerActor] Greeting received (from
Actor[akka://helloAkka/user/howdyGreeter#540458060]): Howdy, Akka
[INFO] [12/16/2022 04:11:06.207] [helloAkka-akka.actor.default-dispatcher-4] [akka://helloAkka/user/printerActor] Greeting received (from
Actor[akka://helloAkka/user/goodDayGreeter#1195948635]): Good day, Play
[INFO] [12/16/2022 04:11:06.207] [helloAkka-akka.actor.default-dispatcher-4] [akka://helloAkka/user/printerActor] Greeting received (from
Actor[akka://helloAkka/user/howdyGreeter#540458060]): Howdy, Lightbend

10.

Пример. Асинхронный ввод/вывод
import akka.actor.{Actor, ActorSystem, Props}
import scala.io.StdIn.readLine
sealed abstract class Messages
case class Request(val prompt: String) extends Messages
case class Response(val response: String) extends Messages
// Актор ожидающий ввода сивола из консоли
class InputReader extends Actor {
def receive = {
case Request(_) => {
// Блокирование потока ожиданием ввода
val input = readLine()
// Ответить тому, кто прислал запрос
sender() ! Response(input)
}
}
}

11.

Пример. Асинхронный ввод/вывод
// Актор выводит в консоль повторяющуюся строку
class PromptDisplay extends Actor {
def receive = {
case Request(prompt) => {
println(prompt);
Thread.sleep(2000);
// Зацикливание
self ! Request(prompt);
}
}
}

12.

Пример. Асинхронный ввод/вывод
// Актор порождает другие акторы. Он содержит на них ссылки, поэтому знает об их
существовании
class InputRequest extends Actor {
val printer = context.actorOf(Props[PromptDisplay])
val reader = context.actorOf(Props[InputReader])
def receive = {
case Request(prompt) => {
// Посылает запрос, но не ждет синхронного ответа
reader ! Request(prompt)
// Инициирует повторяющийся вывод на экран
printer ! Request(prompt)
}
case Response(resp) => {
// Асинхронно дождались ответа и завершили работу.
println(" Input was : " + resp)
context.system.terminate()
}
}
}

13.

Пример. Асинхронный ввод/вывод
object HelloActor {
def main(args: Array[String]) {
val system = ActorSystem("HelloSystem")
val inputRequest = system.actorOf(Props(new InputRequest), name = "actor1")
inputRequest ! Request("Press Enter to terminate")
}
}

14.

Future
Future (фьючерс) – структура данных для синхронного (блокирующего) и
асинхронного (не блокирующего) извлечения результата параллельной операции.
По сути Future – это анонимная функция, которая должна быть выполнена в
потоке. Исполняющая система имеет пул потоков. По очереди выполняются Future
в освобождаемых потоках. Future назначают обработчик событий (функция), в том
числе обработчик, который выполняется при завершении выполнения основной
функции Future.

15.

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent._
// Частичная сумма разделяется на фрагменты для параллельного вычисления
object IoTest {
def calc(i: Int, nrOfElements: Int): Future[Double] =
// Требует ExecutionContext. Он определен в import
scala.concurrent.ExecutionContext.Implicits.global
Future {
val start = i * nrOfElements
var acc = 0.0
for (i <- start until (start + nrOfElements))
acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1)
println(" branch " + i + " done ")
acc
}
def main(args: Array[String]) {
val n = 10
val elements = 1000
val futures = for (i <- 0 until n) yield calc(i, elements)
val result = Future.foldLeft(futures)(0.0)(_ + _)
result.onSuccess{
case pi => println(pi)
}
}
}
Пример. Вычисление
«Пи»

16.

Пример. Запрос результата у актора
import akka.actor.{Actor, ActorSystem, Props}
import akka.pattern.ask
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
case class Compute(number : String)
class FancyActor extends Actor {
def receive = {
case Compute(input) => {
val result = "555" + input // Do some fancy computation
sender ! result
}
}
}

17.

Пример. Запрос результата у актора
object IoTest {
def main(args: Array[String]) {
import scala.concurrent.Future
val system = ActorSystem("System")
val actor = system.actorOf(Props[FancyActor], "FancyActor1")
// needed for ask
implicit val timeout = akka.util.Timeout(1. second)
val data = "222"
// Some input data
val msg = Compute(data)
val future: Future[String] = ask(actor, msg).mapTo[String]
implicit val ec = system.dispatcher
// Блокирование потока в ожидании ответа (так лучше не делать)
val result = Await.result(future, timeout.duration)
println(result)
}
// Возможен такой вариант записи
}
val future = actor ? msg
val result =
Await.result(future, timeout.duration)
.asInstanceOf[String]
English     Русский Правила