akkaflow源码分析报告
1、背景知识
1.1 Actor模型
1.1.1简介
Actors模型(Actor model)首先是由Carl Hewitt在1973定义, 由Erlang OTP (Open Telecom Platform) 推广,其消息传递更加符合面向对象的原始意图。 Actors属于并发组件模型 ,通过组件方式定义并发编程范式的高级阶段,避免使用者直接接触多线程并发或线程池等基础概念。
传统多数流行的语言并发是基于多线程之间的共享内存,使用同步方法防止写争夺,Actors使用消息模型,每个Actors在同一时间处理最多一个消息,可以发送消息给其他Actors,保证了单独写原则(单独写原则 -解道Jdon)。从而巧妙避免了多线程写争夺。
1.1.2原理
在使用Java进行并发编程时需要特别的关注锁和内存原子性等一系列线程问题,而Actor模型内部的状态由它自己维护即它内部数据只能由它自己修改(通过消息传递来进行状态修改),所以使用Actors模型进行并发编程可以很好地避免这些问题,Actor由状态(state)、行为(Behavior)和邮箱(mailBox)三部分组成。
1)状态(state):Actor中的状态指的是Actor对象的变量信息,状态由Actor自己管理,避免了并发环境下的锁和内存原子性等问题
2)行为(Behavior):行为指定的是Actor中计算逻辑,通过Actor接收到消息来改变Actor的状态
3)邮箱(mailBox):邮箱是Actor和Actor之间的通信桥梁,邮箱内部通过FIFO消息队列来存储发送方Actor消息,接受方Actor从邮箱队列中获取消息
1.1.3特点
Actors模型的特点是:
1)隔离计算实体
2)"Share nothing"
3)没有任何地方同步
4)异步消息传递
5)不可变的消息 消息模型类似mailbox / queue
1.1.4好处
1)事件模型驱动–Actor之间的通信是异步的,即使Actor在发送消息后也无需阻塞或者等待就能够处理其他事情
2)强隔离性–Actor中的方法不能由外部直接调用,所有的一切都通过消息传递进行的,从而避免了Actor之间的数据共享,想要观察到另一个Actor的状态变化只能通过消息传递进行询问
3)位置透明–无论Actor地址是在本地还是在远程机上对于代码来说都是一样的。
4)轻量性–Actor是非常轻量的计算单机,单个Actor仅占400多字节,只需少量内存就能达到高并发
1.2 Akka框架
1.2.1简介
AKKA框架是一个平台,灵感来自ERlang,能更轻松地开发可扩展,实现多线程安全应用。虽然在大多数流行的语言并发是基于多线程之间的共享内存,使用同步方法防止写争夺,Akka提供的并发模型基于Actors。
Akka是JAVA虚拟机平台上构建高并发、分布式和容错应用的工具包和运行时。Akka用Scala语言编写,同时提供了Scala和Java的开发接口。Akka处理并发的方法基于Actor模型,Actor之间通信的唯一机制就是消息传递。
1.2.3体系组成
1)akka-actors
akka的核心,一个用于并发和分发的模型,没有线程原语的所有痛苦
2)akka-stream
一种直观而安全的方式来实现异步、非阻塞的回压流处理。
3)akka-http
现代的、快速的、异步的、流的HTTP服务器和客户端。
4)akka-cluster
通过在多个节点上分布您的系统来获得弹性和弹性。
5)akka-sharding
根据用户的身份,在集群中分配您的参与者。
6)Distributed Data
最终一致,高度读取和写入可用,低延迟数据
7)Akka Persistence
为参与者的事件包允许他们在重新启动后到达相同的状态。
8)Akka Management
在云系统上运行Akka系统的扩展(k8s,aws,…)
9)Alpakka
Akka流连接器用于集成其他技术
1.2.4特点
1)对并发模型进行了更高的抽象
2)是异步、非阻塞、高性能的事件驱动编程模型
3)是轻量级事件处理(1GB内存可容纳百万级别个Actor)
4)它提供了一种称为Actor的并发模型,其粒度比线程更小,你可以在系统中启用大量的Actor。
5)它提供了一套容错机制,允许在Actor出现异常时进行一些恢复或重置操作。
6)Akka既可以在单机上构建高并发程序,也可以在网络中构建分布式程序,并提供位置透明的Actor定位服务。
1.2.5特性
1)易于构建并行和分布式应用 (Simple Concurrency & Distribution)
Akka在设计时采用了异步通讯和分布式架构,并对上层进行抽象,如Actors、Futures ,STM等。
2)可靠性(Resilient by Design)
系统具备自愈能力,在本地/远程都有监护。
3)高性能(High Performance)
在单机中每秒可发送50000000个消息。内存占用小,1GB内存中可保存2500000个actors。
4)弹性,无中心(Elastic — Decentralized)
自适应的负责均衡,路由,分区,配置
5)可扩展(Extensible)
1.3 Akka的actor生命周期
Actor生命周期图
在上图中,Actor系统中的路径代表一个地方,其可能会被活着的Actor占据。最初路径都是空的。在调用actorOf()时,将会为指定的路径分配根据传入Props创建的一个Actor引用。该Actor引用是由路径和一个Uid标识的。重启时只会替换有Props定义的Actor示例,但不会替换引用,因此Uid保持不变。
当Actor停止时,其引用的生命周期结束。在这一时间点上相关的生命周期事件被调用,监视该Actor的Actor都会获得终止通知。当引用停止后,路径可以重复使用,通过actorOf()创建一个Actor。在这种情况下,除了UID不同外,新引用与老引用是相同的。
ActorRef始终表示引用(路径和UID)而不只是一个给定的路径。因此如果Actor停止,并且创建一个新的具有相同名称的Actor,则指向老化身的ActorRef将不会指向新的化身。
相对地,ActorSelection指向路径(或多个路径,如果使用了通配符),且完全不关注有没有引用占据它。因此ActorSelection 不能被监视。获取某路径下的当前化身ActorRef是可能的,只要向该ActorSelection发送Identify,如果收到ActorIdentity回应,则正确的引用就包含其中。也可以使用ActorSelection的resolveOne方法,它会返回一个包含匹配ActorRef的Future。
从上图我们可以发现Actor的生命周期主要包含三个状态:开始、终止和重启。下面分别就 这三个状态进行说明。
1.3.1开始
其实Actor的生命周期是使用Hooks体现和控制的,我们可以重新相关的hooks,从而实现对Actor生命周期各环节的细粒度控制。而当Akka通过Props构建一个Actor后,这个Actor可以立即开始处理消息,进入开始(started)状态。Akka提供了针对开始状态的事件接口(event hooks)preStart方法,因此,我们可以重写该方法进行一些操作,例如:
override def preStart={
log.info ("Starting storage actor...")
initDB
}
1.3.2终止
一个Actor可能因为完成运算、发生异常又或者人为通过发送Kill,PoisonPill强行终止等而进入停止(stopping)状态。而这个终止过程分为两步:
第一步:Actor将挂起对邮箱的处理,并向所有子Actor发送终止命令,然后处理来自子Actor的终止消息直到所有的子Actor都完成终止。
第二步:终止自己,调用postStop方法,清空邮箱,向DeathWatch发布Terminated,通知其监管者。
整个人过程保证Actor系统中的子树以一种有序的方式终止,将终止命令传播到叶子结点并收集它们回送的确认消息给被终止的监管者。如果其中某个Actor没有响应(即由于处理消息用了太长时间以至于没有收到终止命令),整个过程将会被阻塞。
因此,我们可以再最后调用postStop方法,来进行一些资源清理等工作,例如:
override def postStop={
log.info ("Stopping storage actor...")
db.release
}
1.3.3重启
重启是Actor生命周期里一个最重要的环节。在一个Actor的生命周期里可能因为多种原因发生重启(Restart)。造成一个Actor需要重启的原因可能有下面几个:
(1)在处理某特定消息时造成了系统性的异常,必须通过重启来清理系统错误
(2)内部状态毁坏,必须通过重启来重新构建状态
(3)在处理消息时无法使用到一些依赖资源,需要重启来重新配置资源
其实,Actor的重启过程也是一个递归的过程,由于其比较复杂,先上个图:
在默认情况下 ,重启过程主要分为以下几步:
(1)该Actor将被挂起
(2)调用旧实例的
supervisionStrategy.handleSupervisorFailing 方法 (缺省实现为挂起所有的子Actor)
(3)调用preRestart方法,preRestart方法将所有的children Stop掉了!(Stop动作,大家注意!),并调用postStop回收资源
(4)调用旧实例的
supervisionStrategy.handleSupervisorRestarted 方法 (缺省实现为向所有剩下的子Actor发送重启请求)
(5)等待所有子Actor终止直到 preRestart 最终结束
(6)再次调用之前提供的actor工厂创建新的actor实例
(7)对新实例调用 postRestart(默认postRestart是调用preStart方法)
(8)恢复运行新的actor
1.4 Akka cluster原理及应用
1.4.1 介绍
Akka集群支持去中心化的基于P2P的集群服务,没有单点故障(SPOF)问题,它主要是通过Gossip协议来实现。对于集群成员的状态,Akka提供了一种故障检测机制,能够自动发现出现故障而离开集群的成员节点,通过事件驱动的方式,将状态传播到整个集群的其它成员节点。
1.4.2 状态转移与故障检测
Akka内部为集群成员定义了一组有限状态(6种状态),并给出了一个状态转移矩阵,代码如下所示:
private[cluster] val allowedTransitions: Map[MemberStatus, Set[MemberStatus]] =
Map(
Joining -> Set(Up, Down, Removed),
Up -> Set(Leaving, Down, Removed),
Leaving -> Set(Exiting, Down, Removed),
Down -> Set(Removed),
Exiting -> Set(Removed, Down),
Removed -> Set.empty[MemberStatus])
}
Akka集群中的每个成员节点,都有可能处于上面的一种状态,在发生某些事件以后,会发生状态转移。需要注意的是,除了Down和Removed状态以外,节点处于其它任何一个状态时都有可能变成Down状态,即节点故障而无法提供服务,而在变成Down状态之前有一个虚拟的Unreachable状态,因为在Gossip收敛过程中,是无法到达或者经由Unreachable状态的节点,这个状态是由Akka实现的故障探测器(Failure Detector)来检测到的。处于Down状态的节点如果想要再次加入Akka集群,需要重新启动,并进入Joining状态,然后才能进行后续状态的转移变化。Akka集群成员节点状态及其转移情况,如下图所示:
我们说明一下Akka中的故障检测机制。在Akka中,集群中每一个成员节点M会被集群中的其他另一组节点(默认是5个)G监控,这一组节点G并不是整个集群中的其他所有节点,只是整个集群全部节点的一个子集,组G中的节点会检测节点M是否处于Unreachable状态,这是通过发送心跳来确认节点M是否可达,如果不可达则组G中的节点会将节点M的Unreachable状态向集群中组G之外的其它节点传播,最终使得集群中的每个成员节点都知道节点M故障。
1.4.3 Akka事件集合
节点状态发生转移会触发某个事件,我们可以根据不同类型的事件来进行相应的处理,为了能够详细捕获到各种事件,我们先看一下Akka定义的事件集合,如图所示:
通常,在基于Akka Cluster的应用中实现Actor时,可以重写Actor的preStart方法,通过Cluster来订阅集群事件,代码示例如下所示:
val cluster = Cluster(context.system)
override def preStart(): Unit = {
cluster.subscribe(self, initialStateMode = InitialStateAsEvents,
classOf[MemberUp], classOf[MemberRemoved], classOf[UnreachableMember])
}
例如,对于MemberUp事件,我们可以获取到对应Actor的引用ActorRef,然后通过与其进行消息交换,一起协同完成特定任务。
1.4.4 Akka成员角色(Node Role)
Akka支持在每个成员节点加入集群的时候,设置成员自己的角色。通过角色划分,可以将使用Akka集群处理业务的系统划分为多个处理逻辑独立的子系统,每个子系统处理自己的业务逻辑,而且,划分得到的多个子系统都处于一个统一的Akka集群中。因此,每个子系统也具备了Akka集群所具有的特性,如故障检测、状态转移、状态传播等等。
2、akkaflow角色及守护actor
角色及守护actor类图
(1)集群角色(ClusterRole)actor包括:HttpServer、Master(包括MasterStandBy)、Worker
(2)守护(Daemon)Actor包括:CronRunner、EmailSender、XmlLoader、HaDataStorager、WorkFlowManager、LogRecorder
2.1 集群及高可用
(1)Master: 活动主节点,调度触发工作流实例,分发子任务
(2)Master-Standby: 热备份主节点,当主节点宕机,立刻切换为活动主节点
(3)Http-Server:http服务节点,接受请求
(4)Worker:任务节点,可部署在多个机器上,运行节点任务
2.1.1 Master
Master类图
Master启动执行时序
2.1.2 Master-Standby
MasterStandby类图
MasterStandby启动执行时序图
2.1.3 Http-Server
HttpServer类图
HttpServer启动时序图
2.1.4 Worker
Worker类图
Worker启动执行时序
2.2 守护Actor
2.2.1 CronRunner
CronRunner类图
2.2.2 EmailSender
EmailSender类图
EmailSender启动时序图
2.2.3 HaDataStorager
HaDataStorager
HaDataStorager启动执行时序图
2.2.4 LogRecorder
LogRecorder类图
LogRecorder启动执行时序图
2.2.5 WorkFlowManager
WorkFlowManager类图
2.2.6 XmlLoader
XmlLoader类图
XmlLoader启动执行时序图
akkaflow工作流及其实例
3.1 行动actor及其节点实例
ActionActor类图
3.2 工作流actor及其实例
WorkflowActor类图
WorkflowActor启动执行实例