#onenote# akka

Tips

Still error:

C:\_nCWD_\DevTool\typesafe-activator-dist-1.3.10\bin\test>activator -Dactivator

proxyDebug=true  -Dactivator.proxyDebug=true ui

 

Lost or failed sbt connection: Failed to start server, error output was: ‘Error: Could not find or load main class # Proxy settings’ directory ‘C:\_nCWD_\Work\Eclipse_Workspace\hello-akka’ command ‘java -jar C:\Users\pdclzj\.sbt\launchers\sbt-launch-57d0f04f4b48b11ef7e764f4cea58dee4e806ffd.jar –locate @file:/C:/Users/pdclzj/AppData/Local/Temp/boot1876359105027878154properties’

 

 

Command to start activator in proxy env.

activator -Dhttp.proxyUser=34069108 -Dhttp.proxyPassword=<pwd> -Dhttp.proxyHost=intpxy1.hk.hsbc -Dhttp.proxyPort=8080 ui

 

Convert  gradle project to maven

 

Add below config to build.gradle

 

apply plugin: ‘maven'”

..

task writeNewPom << {

pom {

project {

inceptionYear ‘2014’

licenses {

license {

name ‘The Apache Software License, Version 2.0’

url ‘http://www.apache.org/licenses/LICENSE-2.0.txt

distribution ‘repo’

}

}

}

}.writeTo(“pom.xml”)

}

 

 

 

Akka + GPU = ?

http://www.cakesolutions.net/blog/teamblogs/2013/02/13/akka-and-cuda

 

 

      • Set window path in command line

set PATH=%PATH%;C:\_nCWD_\DevTool\gradle-2.3-all\gradle-2.3\bin

set PATH=%PATH%;C:\_nCWD_\DevTool\sbt-0.13.6\sbt\bin

 

Override PATH

set PATH=C:\Program Files (x86)\Java\jdk1.6.0_20\bin;C:\_nCWD_\DevTool\Maven\apache-maven-3.2.5\bin

set PATH=C:\_nCWD_\DevTool\jdk1.7.0_17\bin;C:\_nCWD_\DevTool\Maven\apache-maven-3.2.5\bin

 

Override JAVA_HOME

SET PATH=C:\_nCWD_\DevTool\jdk1.8.0_5-x64\bin;C:\_nCWD_\DevTool\gradle-2.3-all\gradle-2.3\bin

SET JAVA_HOME=C:\_nCWD_\DevTool\jdk1.8.0_5-x64

 

 

Akka Jar files:

      • akka-actor – 经典角色、类型角色、IO角色等。
      • akka-agent – 代理、整合了Scala的STM特性
      • akka-camel – 整合Apache的Camel
      • akka-cluster – 集群成员管理、弹性路由
      • akka-kernel – AKKA微内核,运行着一个极简应用服务器
      • akka-osgi – 在OSG容器里使用AKKA的基本bundle,包括akka-actor的类
      • akka-osgi-aries – Aries——服务提供角色系统的蓝图
      • akka-remote – 远程角色
      • akka-slf4j – SLF4J Logger (事件总线监听器)
      • akka-testkit – 测试角色系统的工具包Toolkit for testing Actor systems
      • akka-zeromq – 整合ZeroMQ

 

 

 

STM

Akka的软件事务内存 (STM)

Actors模型是基于独立程序的前提下,可以隔离更新状态,状态更新只能只能通过消息传递实现。Actors内部保持住状态,异步消息传递意味着没有可以提供给组件一致性的调用过程。对于交易系统,如银行账户存款和提款的操作过程是需要是原子的,这可能需要跨两个账户调用过程是一个原子过程,软件事务内存(STM)提供了答案。

STM提供了并发访问共享内存的管理控制机制。 STM利用两个概念 – 乐观和交易来管理共享的并发控制。

(1). 乐观表示我们运行多个原子的平行块,假设将不会有错误。当我们完成后,我们检查是否有任何问题。如果没有发现任何问题,我们在原子块更新状态变量。如果我们发现问题,然后我们回滚,然后重试。乐观并发通常比任何其他的替代方法提供了更好的可伸缩性选项。

 

(2).其次, STM提供模拟数据库事务处理类似的方式。 Java堆在STM的情况下,开始/提交和回滚数据集的事务。作为对象保存在内存中的状态,交易只实现了ACID中几个特点 – 原子性,一致性,隔离。

要管理多个分别运行在不同线程上交易作为一个原子块,使用概念CommitBarrier 。 CommitBarrier是一个跨多个线程之间的交易实现同步synchronization,一旦达到屏障,所有的交易自动提交。它是基于Java的CountDownLatch的。

基于CommitBarrier的事务原理如下:每个Actor将阻塞等待彼此,直到每个Actor都参与的交易全部已完成。这意味着,即使成员可能分散在多个线程,但是也是作为CommitBarrier的成员,是一个单一的原子操作的一部分,如果任何一个堵塞的原子块抛出一个异常,或发生冲突,所有CommitBarrier成员将回滚。

Akka提供一个结构协调actor之间的事务称为coordinated.coordinated,它是用来定义事务边界在交易事务开始时,coordinated.coordinate的()方法被用来增加所有成员将参加同一个事务上下文。

以转账为案例:

为了确保帐户中的资金转移是发生在一个同步的方式下1,我们需要执行以下:

      • 在帐户对象中有一个状态变量需要参与交易的,应该是交易的引用类型。
      • 信用卡和借记卡的帐户对象中操作需要是原子的。
      • 在传输对象,需要定义事务边界的,帐户对象必须参加同一个事务上下文。
      • 此外,我们定义了主管政策在TransferActor和BankActor处理交易异常:

对上面一张图进行如下划分:

相关源码见:Akka框架STM

Agent使用

akka还提供Agent的原子操作使用,Agent STM源码下载

Akka STM 提供两个 Ref和 Agent:. Ref (Transactional References事务引用)提供访问多个实体协同的 同步. Agent提供对单个实体访问非协同的异步访问。Agent提供不协调异步访问,将意味着我们需要等待,直到所有的更新已经完成,Agents提供了非阻塞的读操作。

如果我们需要多个操作读写一个共享变量,使用Agent标识如下:

 

public class Stock {

private String symbol;

private Agent<Float> price;

价格的更改是需要锁的,这里是Agent。

读操作:

 

Float stockTicker = stock.getPrice().get();

写操作:

 

stock.getPrice().send(new Function<Float, Float>() {

public Float apply(Float i) {

return i + 10;

}

});

 

 

STM是一个针对并发问题的非常强大的编程模型,该模型有很多优点:

      • STM可以根据应用程序的行为来充分挖掘出其最大的并发潜力。也就是说,用了STM之后,我们可以无需使用过度保守的、需要预先定义的同步操作,而是让STM动态地管理竞争冲突。
      • STM是一种锁无关的编程模型,该模型可以提供良好的线程安全性和很高的并发性能。
      • STM可以保证实体仅能在事务内被更改。
      • STM没有显式锁意味着我们从此无需担心加锁顺序及其他相关问题。
      • STM可以帮助我们减轻前期设计的决策负担,有了它我们就无需关心谁对什么东西上了锁,而只需放心地把这些工作交给动态隐式组合锁(implicit lock composition)。

该模型适用于对相同数据存在并发读且写冲突不频繁的应用场景。

如果应用程序的数据访问方式符合STM的适用范畴,则STM就为我们提供了一种处理共享可变性的高效解决方案。而如果我们的应用场景里写冲突非常多,我们可能就会更倾向于使用

 

STM and the Java Memory Model

Akka’s Software Transactional Memory (STM) also provides a “happens before” rule:

  • The transactional reference rule: a successful write during commit, on an transactional reference, happens

before every subsequent read of the same transactional reference.

This rule looks a lot like the ‘volatile variable’ rule from the JMM. Currently the Akka STM only supports  deferred writes, so the actual writing to shared memory is deferred until the transaction commits. Writes during

the transaction are placed in a local buffer (the writeset of the transaction) and are not visible to other transactions.  That is why dirty reads are not possible.

How these rules are realized in Akka is an implementation detail and can change over time, and the exact details could even depend on the used configuration. But they will build on the other JMM rules like the monitor lock rule

or the volatile variable rule. This means that you, the Akka user, do not need to worry about adding synchronization to provide such a “happens before” relation, because it is the responsibility of Akka. So you have your hands free to deal with your business logic, and the Akka framework makes sure that those rules are guaranteed on your

behalf.

 

 

Akka使用Multiverse作为其STM的实现并提供了ACI(ACID的子集)特性。

使用Akka引用和事务

Clojure中的ref是在语言层定义的,而 Akka是一个公共类库所以不能依赖任何现有语言的支持。所以Akka在其akka.stm包中提供了一个托管事务引用(managed transactional reference)Ref和一些为原始类型而设的特殊类,如IntRef、LongRef等。Ref(以及所有原始类型的特化引用)代表指向类型T的一个不可变值的托管可变实体(managed mutable identity)。像Integer、Long、Double、String这些类型以及其他不可变类型都符合作为值对象的(value object)条件。如果我们用了自己定义的类,则必须保证这个类是不可变的。也就是说,这个自定义的类只能包含final字段。

我们可以创建一个Ref的实例作为托管事务引用,其值可以在初始化时指定或干脆不指定(默认为null)。如果想获得引用的当前值,可以使用get()函数。如果要使引用指向另一个可变实体,则可以使用swap()函数。这些调用可以在我们提供的事务里执行,但如果我们没提供事务的话,它们也可以在其各自的事务中运行。

当多个线程都试图更改同一个托管引用时,Akka可以保证只有一个变更可以写入内存而其他变更将全部重做。Akka有专门的事务工具负责管理事务跨越内存栅栏的过程。也就是说,Akka(通过Multiverse)保证了在事务中一个托管ref变更的提交会先于后续所有其他事务对该ref的读操作,即该变更对所有其他事务可见。

 

Pasted from <http://ifeve.com/stm-3/>

 

UntypedActor

UntypedActor 应用程序接口

UntypedActor 类仅仅定义一个抽象方法,就是上面提到onReceive(Object message)方法,该方法实现了角色行为。如果当前角色行为不匹配一个接收信息,建议调用unhandled方法,该方法默认将发出一个new akka.actor.UnhandledMessage(message, sender, recipient)在系统角色事件流中(设置配置项akka.actor.debug.unhandled on 让它们转化为实际调试信息)。另外,它提供:

      • getSelf 角色ActorRef的引用
      • getSender 最后接收到信息角色发送者角色引用,典型使用方法将在Reply to messages里面介绍。
      • supervisorStrategy 用户可重写定义的策略,用来监管子角色。这种策略通常是角色中声明,以便在裁决者函数中使用了角色的内部状态:由于一个消息未能传达发送到监管者并处理,像其他的消息(尽管是正常的行为之外),在角色中所有的值和变量都是可用的,就像getSender引用(这将由直接的子类报告故障,如果在一个深层次后代类发生原始故障,则仍立即向上一级报告故障)。
      • getContext 对角色与当前信息暴露上下文信息
      • 创建一个子角色工厂方法actorOf
      • 属于角色的系统
      • 父级监管者
      • 监管的子类
      • 监控生命周期
      • 热插拔行为栈,在热插拔将介绍

Pasted from <http://ifeve.com/akka-doc-java-untyped-actors/#more-15358>

Supervision

The supervisor has a choice of the following four options:

1. Resume the subordinate, keeping its accumulated internal state

2. Restart the subordinate, clearing out its accumulated internal state

3. Stop the subordinate permanently

4. Escalate the failure, thereby failing itself

监测者可以做出如下四种响应:

1.重启子角色,并保持它失败前的内部状态

2.重启子角色,并清空它的内部状态

3.永久性的停止子角色

4.向它的监管者报告错误信息,并终止自己

There are two classes of supervision strategies which come with Akka: OneForOneStrategy and

AllForOneStrategy. Both are configured with a mapping from exception type to supervision directive (see

above) and limits on how often a child is allowed to fail before terminating it. The difference between them is that the former applies the obtained directive only to the failed child, whereas the latter applies it to all siblings as well.

Normally, you should use the OneForOneStrategy, which also is the default if none is specified explicitly.

What is the Difference Between Actor Reference and Path?

An actor reference designates a single actor and the life-cycle of the reference matches that actor’s life-cycle; an actor path represents a name which may or may not be inhabited by an actor and the path itself does not have a

life-cycle, it never becomes invalid. You can create an actor path without creating an actor, but you cannot create

an actor reference without creating corresponding actor.

Akka实现了一种特殊的形式叫“父层监控”。角色只能被其它角色创建——顶级的角色由程序库创建——并且被创建的角色将被它的父节点监控。这个限制隐式的形成了角色监控体系的形式,并且鼓励健全的设计决策。同时,这样也保证了角色不会成为孤儿或者依附于系统外的监控者,否则那样将会很难监控它们。另外,这种方式也使得(子)角色应用的关闭变的非常容易和彻底。

警告:父子角色监管相关的通信是通过一种特殊的系统消息来实现的。并且每个角色都有一个与普通用户消息分开的信箱来存放该系统消息。这也就暗示着,监管事件和普通消息之间并没有一个确定的处理顺序。通常,用户是无法影响普通消息和失败消息的顺序的。

Machine generated alternative text:
2?2 
LE  梩   L
o
/user 
actor support
hierarchy hierarchy

/user: 守护角色

与这个角色打交道最多的就是用户创建角色的父角色。我们将其命名为“/user”。所有以system.actorOf()创建的角色都是它的子角色。这意味着当这个角色终止时,系统中所有的普通角色也都将被关闭。同时,这也意味着这个守护者的监管策略决定了最顶层的普通角色是怎样被监管的。从Akka版本2.1以后,可以通过配置akka.actor.guardian-supervisor-strategy来设置它。它的值必须是一个SupervisorStrategyConfigurator类的全路径。当该守护者向上汇报失败时,根守护者的响应就是终止该守护者,即停止整个角色系统

/system:系统守护者

这个特殊的守护者是为了实现有序的关闭一系列的角色引入的。当所有的普通角色终止时,它会记录那些仍然活着的角色。日志的记录也是通过使用这类角色实现的。它的实现是通过让系统守护者监控用户守护者,并在收到Terminated消息时关闭自己。顶层的系统角色通过这样一种策略监控:当遇到Exceptoin时,它将无限次的重启角色。但ActorInitialzationException和ActorKilledException例外,遇到这两种异常时,它将终止有问题的子角色。对于那些被抛出去的异常将会被升级,从而导致整个角色系统被关闭。

/:根守护者

根守护者是那些所谓的“顶层”角色的祖父。它通过SupervisorStrategy.stoppingStrategy策略来监控所有在角色路径的顶级范围一节中提到的特殊角色。该策略的目的是当遇到任何异常时终止它的子角色。所有被抛出去的异常将会被升级…但是升级给谁呢?因为每个真正的角色都会有一个监控者,而根守护者的监控者不可能是一个真正的角色。这意味着“走到了气泡以外”,所以我们将其称作“气泡-行走者”。这是一个综合的角色引用,它可以在收到第一个错误信号的时候停止它的子角色,并且在根守护者终止时,将系统角色的isTerminated状态置为真(所有的子角色递归的停止)

 一对一策略 VS. 多对一策略

Akka有两种类监管策略:一对一策略和多对一策略。它们都配置了从异常类型到监管命令的一个映射(参考上文),并且限制子角色在终止之前允许失败的次数。它们之间的不同是,前者只可以将指令实施在其失败的子角色上,而后者还可以将指令实施在它的兄弟姐妹之上。通常,你应该使用一对一策略,它也是默认的选项。

多对一策略(AllForOneStrategy)策略适用于子角色之间有相互依赖关系的场景,当一个子角色失败时,会影响到其它子角色的功能。例如,它们难见难分的链接在一起。因为重启并不会清空信箱,所以最好的选择通常是,终止失败的子角色,然后由监管者显式的重新创建它们(通过观察它们的生命周期);否则你必须能够确保每个重启的角色都能正确的处理重启之前入队的消息。

通常,在多对一策略中,停止一个子角色(例如,不响应失败)并不会自动的终止其他子角色;这个可以通过观察它们的生命周期很容易的实现:如果Terminated消息没有被监管者处理,它将会抛出一个DeathPactException(决定于它的监管者)来重启自己,默认的preStart行为将会终止它所有的子角色。当然这些也可以显式的实现。

需要注意的是,对于多对一监管者来说创建一次性的角色将会导致零时角色的失败被扩大化,从而影响永久性角色。如果这个不是你希望的,那么请创建一个中间监管者;这个可以通过创建一个大小为1的路由器来

Pasted from <http://ifeve.com/akka-doc-java-supervision-and-monitoring/>

 

Actors and the Java Memory Model

 

With the Actors implementation in Akka, there are two ways multiple threads can execute actions on shared

memory:

  • if a message is sent to an actor (e.g. by another actor). In most cases messages are immutable, but if

that message is not a properly constructed immutable object, without a “happens before” rule, it would be possible for the receiver to see partially initialized data structures and possibly even values out of thin air

(longs/doubles).

  • if an actor makes changes to its internal state while processing a message, and accesses that state while

processing another message moments later. It is important to realize that with the actor model you don’t get  any guarantee that the same thread will be executing the same actor for different messages.

To prevent visibility and reordering problems on actors, Akka guarantees the following two “happens before” rules:

  • The actor send rule: the send of the message to an actor happens before the receive of that message by the same actor.
  • The actor subsequent processing rule: processing of one message happens before processing of the next message by the same actor.

Note: In layman’s terms this means that changes to internal fields of the actor are visible when the next message  is processed by that actor. So fields in your actor need not be volatile or equivalent.

Both rules only apply for the same actor instance and are not valid if different actors are used.

Message Delivery Reliability

The General Rules These are the rules for message sends (i.e. the tell or ! method, which also underlies the ask pattern):

  • at-most-once delivery, i.e. no guaranteed delivery
  • message ordering per sender–receiver pair

The first rule is typically found also in other actor implementations while the second is  specific to Akka.

 

 

When it comes to describing the semantics of a delivery mechanism, there are three basic categories:

  • at-most-once delivery means that for each message handed to the mechanism, that message is delivered zero or one times; in more casual terms it means that messages may be lost.
  • at-least-once delivery means that for ea chmessage handed to the mechanism potentially multiple attempts

are made at delivering it, such that at least one succeeds; again, in more casual terms this means that  messages may be duplicated but not lost.

  • exactly-once delivery means that for each message handed to the mechanism exactly one delivery is made

to the recipient; the message can neither be lost nor duplicated

 

Dead Letters

Dead Letters

Messages which cannot be delivered (and for which this can be ascertained) will be delivered to a synthetic actor called /deadLetters. This delivery happens on a best-effort basis; it may fail even within the local JVM (e.g. during actor termination). Messages sent via unreliable network transports will be lost without turning up as dead letters.

What Should I Use Dead Letters For?

The main use of this facility is for debugging, especially if an actor send does not arrive consistently (where usually inspecting the dead letters will tell you that the sender or recipient was set wrong somewhere along the way). In order to be useful for this purpose it is good practice to avoid sending to deadLetters where possible,

什么时候角色终止

一旦角色终止,如失败后无法重启,自己停止或者被监督者停止,那么它将释放它占有的所有资源,并将它信箱中没处理的消息发送到“死信件信箱”里,然后“死信件信箱”将会把他们以DeadLetters的形式传递给EventStream. Actor引用中的信箱将会被一个系统信箱所取代,然后将所有消息以DeadLetters的形式转发给EventStream。虽然系统会尽最大努力来实现该消息传递,但是我们不能依赖它来实现“有保证的传递”。

为什么不选择悄悄的将所有消息倾倒出来,是基于我们下面的测试结果考虑:我们在事件总线(BUS)上注册了测试事件监听器(TestEventListener)。这里事件总线即死信件被发送到的地方。该监听器将会对收到的每条死信件记录一条警告日志——这个可以帮助我们更快的检测失败。所以我们有理由相信这个特征还将可以应用于其他目的。

Pasted from <http://ifeve.com/akka-doc-java-what-is-actor/>

Send messages

Send messages

Messages are sent to an Actor through one of the following methods.

• ! means “fire-and-forget”, e.g. send a message asynchronously and return immediately. Also known as tell.

• ? sends a message asynchronously and returns a Future representing a possible reply. Also known as ask.

Typed Actors

Typed Actors

Akka Typed Actors is an implementation of the Active Objects pattern. Essentially turning method invocations into asynchronous dispatch instead of synchronous that has been the default way since Smalltalk came out.

Typed Actors consist of 2 “parts”, a public interface and an implementation, and if you’ve done any work in “enterprise” Java, this will be very familiar to you. As with normal Actors you have an external API (the public interface instance) that will delegate methodcalls asynchronously to a private instance of the implementation.

The advantage of Typed Actors vs. Actors is that with TypedActors you have a static contract, and don’t need to define your own messages, the downside is that it places some limitations on what you can do and what you can’t, i.e. you cannot use become/unbecome.

Typed Actors are implemented using JDK Proxies which provide a pretty easy-worked API to intercept method calls.

 

When to use Typed Actors

Typed actors are nice for bridging between actor systems (the “inside”) and non-actor code (the “outside”), because  they allow you to write normal OO-looking code on the outside

 

3.2 类型化角色

Akka的类型化角色是活动对象(Active Object)模式的实现。Smalltalk诞生的时候,默认的方法调用由异步派发代替同步操作。

类型化角色由2部分组成,包括一个公共的接口和实现,如果你有“企业级”Java的开发经验,这对你来说会非常熟悉。与普通的角色一样,你有一个外部的API(公共接口实例),将异步的方法调用委托给实现类的一个私有实例。

类型化角色对比角色的优势是你可以有一个静态的约定,而不需要去定义你自己的消息,不好的一面就是它会限制你能做什么和不能做什么,比如你不能使用become/unbecome。

类型化角色是利用JDK Proxies 来实现的,它提供一个非常简单的API去调用拦截方法。

注意

正如普通的非类型化角色一样,类型化角色每次处理一次调用。

3.2.1 什么时候使用类型化角色

类型化角色是角色系统和非角色代码之间的美好桥梁,因为它们允许你在外部编写正常的面向对象代码。把它们想象成门:它们实际上是公共部分和私有部分之间的接口,但你并不想你的房子有很多的门,不是吗?你可以通过this blog post查看更详细的讨论。

更多的背景:TypedActor很容易作为RPC被滥用,因此TypedActor并不是我们一开始想象中的那样,能够更加容易的去正确编写高可扩展的并发软件。我们要在合适的地方使用它们。

3.2.2 工具箱

在创建第一个类型化角色之前,我们先了解一下这个工具,掌握它的功能,它位于akka.actor.TypedActor。

01 //返回类型化角色的表达式
02 TypedActorExtension extension = TypedActor.get(system); //系统是一个ActorSystem对象</p>
03 //返回引用是否是一个类型化角色代理
04 TypedActor.get(system).isTypedActor(someReference);

05

06 //返回一个外部类型化角色代理的AKKA角色
07 TypedActor.get(system).getActorRefFor(someReference);

08

09 //返回当前ActorContext
10 //方法仅在一个TypedActor的方法实现中有效
11 ActorContext context = TypedActor.context();

12

13 //返回当前类型化角色的外部代理
14 //方法只在TypedActor的方法实现中有效
15 Squarer sq = TypedActor.<Squarer>self();

16

17 //返回一个类型化角色的上下文实例
18 //这意味着如果你使用它创建了其它的类型化角色实例
19 //它们将是当前这个类型化角色的子角色
20 TypedActor.get(TypedActor.context());

警告

类型化角色和akka角色一样不暴露this引用,这一点很重要。你应该通过外部代理引用,它可以通过TypedActor.self来获得,这是你的对外身份标识,就像akka角色的对外身份标识是ActorRef一样。

3.2.3 创建类型化角色

创建类型化角色需要有一个以上的接口和一个实现接口的类。

假设入口如下所示:

001 import akka.actor.TypedActor;
002 import akka.actor.*;
003 import akka.japi.*;
004 import akka.dispatch.Futures;</p>
005 import scala.concurrent.Await;
006 import scala.concurrent.Future;
007 import scala.concurrent.duration.Duration;
008 import java.util.concurrent.TimeUnit;

009

010 import java.util.List;
011 import java.util.ArrayList;
012 import java.util.Random;
013 import akka.routing.RoundRobinGroup;
014 public class TypedActorDocTest {
015     Object someReference = null;
016     ActorSystem system = null;

017

018     static public interface Squarer {
019         void squareDontCare(int i); //fire-forget(审校者注:这个词怎么翻译?)
020         Future<Integer> square(int i); //非阻塞send-request-reply
021         Option<Integer> squareNowPlease(int i);//阻塞send-request-reply
022         int squareNow(int i); //阻塞send-request-reply
023     }

024

025     static class SquarerImpl implements Squarer {
026         private String name;
027         public SquarerImpl() {
028             this.name = “default”;
029         }

030

031         public SquarerImpl(String name) {
032             this.name = name;
033         }

034

035         public void squareDontCare(int i) {
036             int sq = i * i; //Nobody cares <img src=”http://ifeve.com/wp-includes/images/smilies/frownie.png” alt=”:(” class=”wp-smiley” style=”height: 1em; max-height: 1em;”>
037         }

038

039         public Future<Integer> square(int i) {
040             return Futures.successful(i * i);
041         }

042

043         public Option<Integer> squareNowPlease(int i) {
044             return Option.some(i * i);
045         }

046

047         public int squareNow(int i) {
048             return i * i;
049         }
050     }

051

052     @Test public void mustGetTheTypedActorExtension() {
053         try {
054             //返回类型化角色的的表达式
055             TypedActorExtension extension = TypedActor.get(system); //系统是一个ActorSystem实例

056

057             //返回引用是否是一个类型化角色代理
058             TypedActor.get(system).isTypedActor(someReference);

059

060             //返回类型化角色代理的AKKA角色
061             TypedActor.get(system).getActorRefFor(someReference);

062

063             //返回当前ActorContext
064             // 方法只在TypedActor方法实现内部有效
065             ActorContext context = TypedActor.context();

066

067             //返回当前类型化角色的外部代理
068             // 方法只在TypedActor方法实现内部有效</pre>
069             Squarer sq = TypedActor.<Squarer>self();

070

071             //返回类型化角色的上下文实例
072             //这意味着如果你用它创建了其它类型化角色
073             //它们就是当前类型化角色的子角色
074             TypedActor.get(TypedActor.context());
075         } catch (Exception e) {
076             //dun care
077         }
078      }
079      @Test public void createATypedActor() {
080          try {
081              Squarer mySquarer = TypedActor.get(system).typedActorOf(
082                  new TypedProps<SquarerImpl>(Squarer.class, SquarerImpl.class));
083              Squarer otherSquarer = TypedActor.get(system).typedActorOf(
084                  new TypedProps<SquarerImpl>(Squarer.classnewCreator<SquarerImpl>() {
085                      public SquarerImpl create() { return newSquarerImpl(“foo”); }
086              }),”name”);

087

088              mySquarer.squareDontCare(10);
089              Future<Integer> fSquare = mySquarer.square(10); //A Future[Int]
090              Option<Integer> oSquare = mySquarer.squareNowPlease(10);//Option[Int]
091              int iSquare = mySquarer.squareNow(10); //Int

092

093              assertEquals(100, Await.result(fSquare, Duration.create(3, TimeUnit.SECONDS)).intValue());
094              assertEquals(100, oSquare.get().intValue());
095              assertEquals(100, iSquare);

096

097              TypedActor.get(system).stop(mySquarer);
098              TypedActor.get(system).poisonPill(otherSquarer);
099         } catch(Exception e) {
100              //忽略
101         }
102     }

103

104     @Test public void createHierarchies() {
105         try {
106             Squarer childSquarer = TypedActor.get(TypedActor.context()).typedActorOf(
107                 new TypedProps<SquarerImpl>(Squarer.class, SquarerImpl.class)
108             );
109             //Use “childSquarer” as a Squarer
110         } catch (Exception e) {
111             //dun care
112         }
113     }

114

115     @Test public void proxyAnyActorRef() {
116         try {
117             final ActorRef actorRefToRemoteActor = system.deadLetters();
118             Squarer typedActor = TypedActor.get(system).typedActorOf(
119             new TypedProps<Squarer>(Squarer.class),actorRefToRemoteActor);
120             //Use “typedActor” as a FooBar
121         } catch (Exception e) {
122             //dun care
123         }
124     }

125

126     interface HasName {
127         String name();
128     }

129

130    class Named implements HasName {
131         private int id = new Random().nextInt(1024);
132         @Override public String name() { return “name-” + id; }
133     }

134

135     @Test public void typedRouterPattern() {
136        try {
137            // prepare routees
138            TypedActorExtension typed = TypedActor.get(system);
139            Named named1 = typed.typedActorOf(new TypedProps<Named>(Named.class));
140            Named named2 = typed.typedActorOf(new TypedProps<Named>(Named.class));

141

142            List<Named> routees = new ArrayList<Named>();
143            routees.add(named1);
144            routees.add(named2);

145

146            List<String> routeePaths = new ArrayList<String>();
147            routeePaths.add(typed.getActorRefFor(named1).path().toStringWithoutAddress());
148            routeePaths.add(typed.getActorRefFor(named2).path().toStringWithoutAddress());

149

150            // prepare untyped router
151            ActorRef router = system.actorOf(newRoundRobinGroup(routeePaths).props(), “router”);

152

153           //准备类型化代理,向“router”转发方法调用消息
154           Named typedRouter = typed.typedActorOf(new TypedProps<Named>(Named.class), router);

155

156            System.out.println(“actor was: ” + typedRouter.name()); // name-243
157            System.out.println(“actor was: ” + typedRouter.name()); // name-614
158            System.out.println(“actor was: ” + typedRouter.name()); // name-243
159            System.out.println(“actor was: ” + typedRouter.name()); // name-614

160

161            typed.poisonPill(named1);
162            typed.poisonPill(named2);
163            typed.poisonPill(typedRouter);
164        } catch (Exception e) {
165            //dun care
166        }
167     }
168 }

接口的例子:

1     public interface Squarer {
2         //类型化的角色接口方法 …
3     }

接口的实现类:

01 class SquarerImpl implements Squarer {
02     private String name;
03     public SquarerImpl() {
04         this.name = “default”;
05     }

06

07     public SquarerImpl(String name) {
08         this.name = name;
09     }

10

11     //类型化的角色方法实现 …
12 }

创建Squarer的类型化角色最简单的方式如下:

1 Squarer mySquarer = TypedActor.get(system).typedActorOf(newTypedProps<SquarerImpl>(Squarer.class, SquarerImpl.class));

第一种类型是代理类型,第二种是代理类型的实现。如果你需要去调用一个特殊的构造器,你可以这样做:

1 Squarer otherSquarer = TypedActor.get(system).typedActorOf(newTypedProps<SquarerImpl>(Squarer.class,new Creator<SquarerImpl>() {
2     public SquarerImpl create() { return new SquarerImpl(“foo”); }
3 }),”name”);

既然提供一个Props,你可以指定使用哪一个调度程序,应该使用缺省的timeout或者别的。目前,Squarer没有定义任何方法,我们可以添加如下这些方法。

1 public interface Squarer {
2     void squareDontCare(int i); //fire-forget
3     Future<Integer> square(int i); //non-blocking send-request-reply
4     Option<Integer> squareNowPlease(int i);//blocking send-request-reply
5     int squareNow(int i); //blocking send-request-reply
6 }

那好,现在我们可以调用这些方法了,不过他们需要在SquarerImpl中实现。

01 class SquarerImpl implements Squarer {
02     private String name;
03     public SquarerImpl() {
04         this.name = “default”;
05     }

06

07     public SquarerImpl(String name) {
08         this.name = name;
09     }

10

11     public void squareDontCare(int i) {
12         int sq = i * i; //Nobody cares <img src=”http://ifeve.com/wp-includes/images/smilies/frownie.png” alt=”:(” class=”wp-smiley” style=”height: 1em; max-height: 1em;”>
13     }

14

15     public Future<Integer> square(int i) {
16         return Futures.successful(i * i);
17     }

18

19     public Option<Integer> squareNowPlease(int i) {
20         return Option.some(i * i);
21     }

22

23     public int squareNow(int i) {
24        return i * i;
25     }
26 }

很好,现在我们已经有一个接口和它的实现类,并且知道怎么去创建一个类型化角色了,让我们了解下这些方法。

3.2.4 方法调度语义

方法返回:

  1. void会被fire-and-forget语义调度,和ActorRef.tell完全一样
  2. scala.concurrent.Future<?>会使用send-request-reply语义,和ActorRef.ask完全一样。
  3. akka.japi.Option<?>会使用send-request-reply语义,但会堵塞等待一个应答,并且如果在timeout内没有回复,就会返回akka.japi.Option.None,或者,相反的返回akka.japi.Option.Some<?>。此调用过程中被抛出的任何异常都将被重新抛出。
  4. 任何别的类型值会使用send-request-reply语义,但会阻塞等待一个回答,如果在一个timeout内抛出异常或者在调用过程中出现重新抛异常的情况,就会抛出java.util.concurrent.TimeoutException。注意,基于Java异常和反射机制,一个TimeoutException会包装在一个java.lang.reflect.UndeclaredThrowableException里,除非接口方法明确的描述TimeoutException作为一个受检异常抛出。

3.2.5 消息和不可变性

虽然Akka不能强制转换类型化角色方法的参数为不可变的,但是我们强烈建议把参数设置为不可变的。

3.2.5.1 单向(One-way)消息发送

1 mySquarer.squareDontCare(10);

就像上面这么简单,方法会在另一个线程里异步的执行。

3.2.5.2 双向(Request-reply)消息发送

1 Option<Integer> oSquare = mySquare.squareNowPlease(10);//Option[Int]

如果需要,阻塞的时长可以配置类型化角色的Props的timeout。如果超时,它会返回None。

1 int iSquare = mySquarer.squareNow(10);//Int

如果需要,阻塞的时长可以配置类型化角色的Props的timeout。如果超时,它会抛出一个java.util.concurrent.TimeoutException。这里需要注意一下,通过Java的反射机制,这样一个TimeoutException会被包装在一个java.lang.reflect.UndeclaredThrowableException中,因为接口方法没有明确描述TimeoutException作为一个受检异常抛出。为了直接得到TimeoutException,可以在接口方法中添加throws java.util.concurrent.TimeoutException。

3.2.5.3 (Request-reply-with-future) 消息发送

1 Future<Integer> fSquare = mySquarer.square(10);//一个Future对象[Int]

这个调用是异步的,并且future的返回可以用于异步成分。

 

3.2.6 停止类型化角色

一旦Akka的类型化角色被Akka角色阻塞,当不再需要它们,必须被停掉。

1 TypedActor.get(system).stop(mySquarer);
2 </span></span>

这个异步的方法会尽快的停掉类型化角色关联的代理。

1 TypedActor.get(system).poisonPill(otherSquarer);

这个异步的方法会在所有调用都完成之后停掉类型化角色关联的代理。

 

3.2.7 类型化角色层次结构

既然你可以通过传递一个ActorContext来获得一个上下文的类型化角色,你可以通过在它上面调用typeActorOf来创建子类型化角色。

1 Squarer childSquarer = TypedActor.get(TypedActor.context()).typedActorOf(
2     new TypedProps<SquarerImpl>(Squarer.class, SquarerImpl.class)
3 );
4 //Use “childSquarer” as a Squarer

你可以通过常规的Akka角色,把UntypedActorContext作为TypedActor.get的输入参数创建一个子类型化角色。

 

3.2.8 监管者策略

通过你的类型化角色实现类实现TypedActor.supervisor。你可以定义策略去监管子角色,就像监管与监控(Supervision and Monitoring) and 容错(Fault Tolerance)中描述的一样。

 

3.2.9 接收任意的消息

如果你的TypedActor的实现类继承akka.actor.TypedActor.Receiver,所有非方法调用的消息都会传递到onReceive方法。

这允许你处理DeathWatch的Terminated消息和别的类型的消息,例如当与非类型化角色进行交互的场景。

 

3.2.10 生命周期回调

通过你的类型化角色实现类实现如下任何一个或所有的方法:

      • TypedActor.PreStart
      • TypedActor.PostStop
      • TypedActor.PreRestart
      • TypedActor.PostRestart

你可以用钩子方法连接到你的类型化角色的生命周期。

 

3.2.11 代理

你可以使用带TypedProps和Actor引用参数的 typedActorOf以代理的方式将ActorRef引用转换成类型化角色。如果你想和其他机器上的TypedActor进行远程交互,只要把ActorRef传给typedActorOf即可。

 

3.2.12 查找和远程处理

既然TypedActor是基于Akka Actor的,你可以用typedActorOf去代理有可能在远程节点上的ActorRef。

1 Squarer typedActor = TypedActor.get(system).typedActorOf(
2     new TypedProps<Squarer>(Squarer.class),actorRefToRemoteActor);
3 //Use “typedActor” as a FooBar

3.2.13 类型化路由模式

有时候你想在多个角色之间传递消息。在Akka中最简单的实现方法是用一个路由(Router),它可以实现一个特定的路由逻辑,例如最小邮箱(smallest-mailbox)或者一致性哈希(consistent-hashing)等等。

路由没有直接提供给类型化角色,但可以通过利用一个非类型化路由和类型化代理来实现它。为了展示这一点让我们创建一些类型化角色并给他们指派随机的id,然后我们会看到路由把消息发送给了不同的角色:

01 @Test public void typedRouterPattern() {
02     try {
03         // prepare routees
04         TypedActorExtension typed = TypedActor.get(system);

05

06         Named named1 = typed.typedActorOf(new TypedProps<Named>(Named.class));
07         Named named2 = typed.typedActorOf(new TypedProps<Named>(Named.class));

08

09         List<Named> routees = new ArrayList<Named>();
10         routees.add(named1);
11         routees.add(named2);

12

13         List<String> routeePaths = new ArrayList<String>();
14         routeePaths.add(typed.getActorRefFor(named1).path().toStringWithoutAddress());
15         routeePaths.add(typed.getActorRefFor(named2).path().toStringWithoutAddress());

16

17         // 准备类型化路由
18         ActorRef router = system.actorOf(newRoundRobinGroup(routeePaths).props(), “router”);

19

20         // 准备类型化代理,向路由转发MethodCall消息
21         Named typedRouter = typed.typedActorOf(new TypedProps<Named>(Named.class), router);

22

23         System.out.println(“actor was: ” + typedRouter.name()); // name-243
24         System.out.println(“actor was: ” + typedRouter.name()); // name-614
25         System.out.println(“actor was: ” + typedRouter.name()); // name-243
26         System.out.println(“actor was: ” + typedRouter.name()); // name-614

27

28         typed.poisonPill(named1);
29         typed.poisonPill(named2);
30         typed.poisonPill(typedRouter);

31

32     } catch (Exception e) {
33         //dun care
34     }
35 }

为了在这样的角色实例中间轮询,你可以创建一个简单的非类型化路由,并用一个TypedActor实现这个路由的外观模式,就像下面的例子展示的。这样会起作用是因为类型化角色利用和普通角色相同的通讯机制,以及它们的方法调用被转化为MethodCall消息。

01 // prepare routees
02 TypedActorExtension typed = TypedActor.get(system);

03

04 Named named1 = typed.typedActorOf(new TypedProps<Named>(Named.class));

05

06 Named named2 = typed.typedActorOf(new TypedProps<Named>(Named.class));

07

08 List<Named> routees = new ArrayList<Named>();
09 routees.add(named1);
10 routees.add(named2);

11

12 List<String> routeePaths = new ArrayList<String>();
13 routeePaths.add(typed.getActorRefFor(named1).path().toStringWithoutAddress());
14 routeePaths.add(typed.getActorRefFor(named2).path().toStringWithoutAddress());

15

16 // prepare untyped router
17 ActorRef router = system.actorOf(new RoundRobinGroup(routeePaths).props(),”router”);

18

19 // prepare typed proxy, forwarding MethodCall messages to `router`
20 Named typedRouter = typed.typedActorOf(new TypedProps<Named>(Named.class), router);

21

22 System.out.println(“actor was: ” + typedRouter.name()); // name-243
23 System.out.println(“actor was: ” + typedRouter.name()); // name-614
24 System.out.println(“actor was: ” + typedRouter.name()); // name-243
25 System.out.println(“actor was: ” + typedRouter.name()); // name-614
Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s