书名:Akka入门与实践
ISBN:978-7-115-45354-9
本书由人民邮电出版社发行数字版。版权所有,侵权必究。
您购买的人民邮电出版社电子书仅供您个人使用,未经授权,不得以任何方式复制和传播本书内容。
我们愿意相信读者具有这样的良知和觉悟,与我们共同保护知识产权。
如果购买者有侵权行为,我们可能对该用户实施包括但不限于关闭该帐号等维权措施,并可能追究法律责任。
• 著 [加] Jason Goodwin
译 诸豪文
责任编辑 王峰松
• 人民邮电出版社出版发行 北京市丰台区成寿寺路11号
邮编 100164 电子邮件 315@ptpress.com.cn
• 读者服务热线:(010)81055410
反盗版热线:(010)81055315
Copyright © Packt Publishing 2015. First published in the English language under the title Learning Akka,ISBN 978-1-78439-300-7. All rights reserved.
本书中文简体字版由Packt Publishing公司授权人民邮电出版社出版。未经出版者书面许可,对本书的任何部分不得以任何方式或任何手段复制和传播。
版权所有,侵权必究。
本书主要面向使用Akka工具集来构建大规模分布式应用程序的Java和Scala开发者,介绍了分布式系统的基本概念以及如何使用Akka来构建容错性高、可横向扩展的分布式应用程序。书中主要内容包括:Akka工具集、Actor模型、响应式编程、Actor及Future的使用、Akka消息传递模式、Actor生命周期、监督机制、状态与错误处理、Akka并发编程、路由、阻塞IO的处理、Akka Cluster、CAP理论、Akka邮箱问题的处理、Akka Testkit、领域驱动设计等。本书贯穿使用了分布式键值存储以及文章解析服务两个实例,将原理与实践结合,介绍了使用Akka设计并实现分布式应用程序的方法。
Jason Goodwin是一个基本上通过自学成才的开发者。他颇具企业家精神,在学校学习商学。不过他从15岁起就开始学习编程,并且一直对技术保持着浓厚的兴趣。这对他的职业生涯产生了重要的影响,从商学转向了软件开发。现在他主要从事大规模分布式系统的开发。在业余时间,他喜欢自己原创电子音乐。
他在mDialog公司第一次接触到Akka项目。mDialog是一家使用Scala/Akka的公司,为主流出版商提供视频广告插入软件。这家公司最终被Google收购。他同时还是一名很有影响力的“技术控”,将Akka引入加拿大一家主要的电信公司,帮助该公司为客户提供容错性更高、响应更及时的软件。除此之外,他还为该公司中的一些团队教授Akka、函数式以及并发编程等知识。
诸豪文,网名clasnake,毕业于清华大学,现为全职软件开发工程师,常用的开发语言有Java、Scala、JavaScript和Python。其个人博客地址为http://clasnake.net。他也是开源项目Swagger的贡献者,并译有《Python网络编程》(第3版)一书。
Taylor Jones是一名全栈软件工程师,精通基于Java的Web应用程序开发,目前在Cisco Systems工作。他很喜欢使用开源技术设计并构建复杂的应用程序。
我在这里想要感谢一些人,你们对我的价值观的形成有着很大的影响,并且给予我不断的支持。
首先,要感谢我的妻子Kate,谢谢你在我撰写本书期间以及做一些疯狂项目的时候对我的支持。如果没有你不断的支持、耐心和照顾,我就不可能改变我的职业生涯去做我喜欢的事情,也不可能写出来这本书。我们终于完成了。现在是时候去粉刷一下墙壁,修缮一下屋子,看着电影休息一下了!
感谢我的祖父母和父母,你们一直告诉我,只要下定了决心,什么事都难不倒我。谢谢你们的建议。你们是对的。
感谢我的mDialog/Google团队,感谢你们的审阅和批评。能有机会和你们共事,我觉得非常幸运。特别要感谢Chris,感谢你相信我的兴趣足以帮助我成长为一个合格的工程师,也感谢你一直都希望团队能够给予我支持。
感谢Graig和Herb,感谢你们对我的启蒙。如果我没有在17岁的时候编写冒泡排序、画像素圆或是移植客户数据库,那么我自己都不确定是不是能像现在一样找到自己这么热爱的工作。
Jason Goodwin
本书将尝试帮助入门级、中级以及高级读者理解基本的分布式计算概念,并且展示如何使用Akka来构建具备高容错性、可以横向扩展的分布式网络应用程序。Akka是一个强大的工具集,提供了很多选项,可以对在本地机器上处理或网络远程机器上处理的某项工作进行抽象封装,使之对开发者不可见。本书将介绍各种概念,帮助读者理解网络上各系统进行交互的困难之处,并介绍如何使用Akka提供的解决方案来解决这些问题。
编写本书的过程也是作者自我学习、自我发现的过程。希望读者也能一起分享这些知识。作者在工作中有很多使用Java 8和Scala来编写Akka应用程序的经验,但是在编写本书的过程中,学到了很多更深入的Akka细节。这本书很好地介绍了为什么要使用Akka,以及如何使用Akka,并且展示了如何使用Akka工具集来开始构建可扩展的分布式应用程序。本书并不仅仅是官方文档的重复,更涉及了许多作为当代程序员要成功构建能够处理扩展性相关问题的系统时应该要理解的重要话题和方法。
第1章 初识Actor:Akka工具集以及Actor模型的介绍。
第2章 Actor与并发:响应式编程。Actor与Future的使用。
第3章 传递消息:消息传递模式。
第4章 Actor的生命周期—处理状态与错误:Actor生命周期、监督机制、Stash/ Unstash、Become/Unbecome以及有限自动机。
第5章 纵向扩展:并发编程、Router Group/Pool、Dispatcher、阻塞I/O的处理以及API。
第6章 横向扩展—集群化:集群、CAP理论以及Akka Cluster。
第7章 处理邮箱问题:加大邮箱负载、不同邮箱的选择、熔断机制。
第8章 测试与设计:行为说明、领域驱动设计以及Akka Testkit。
第9章 尾声:其他Akka特性。下一步需要学习的知识。
读者需要一台能够安装各种工具的计算机,比如Java JDK8(用于Java开发)或是Java JDK6(用于Scala开发)。除此之外,还需要SBT简单构建工具(Simple Build Tool)或是Typesafe Activator(已经包含SBT)。本书会介绍这些工具的安装。
本书面向想要构建满足大规模用户需求的应用程序的初级至中级Java或Scala开发者。如果应用程序在处理日益增加的用户量以及数据量时需要满足高性能要求,那么建议阅读本书。本书可以让我们只编写更少、更简单的代码就轻松构建并扩展网络应用程序,给用户提供更强大的功能。
我们欢迎读者的反馈。对本书的任何看法敬请告知我们。读者反馈对我们至关重要,可以帮助我们出版读者真正能够从中获益的书籍。
如果有普通的反馈,请直接向feedback@packtpub.com发送电子邮件,并且在邮件标题中提及书名。
如果有您精通的话题并且有兴趣撰写书籍或是向某本书做贡献,请从www.packtpub. com/authors查阅作者手册。
您是Packt书籍的拥有者,所以我们提供了很多服务,帮助您从所购买的书中获得最大的收获。
读者可以使用自己的账户从http://www.packtpub.com下载购买过的所有Packt Publishing书籍的示例代码文件。如果从别处购买了本书,那么可以访问http://www. packtpub.com/support并进行注册,我们会通过电子邮件将文件发送给您。
我们还提供了一个PDF文件,其中包含本书中使用的截屏/图表的彩色图片。这些彩色图片可以帮助读者更好地理解输出结果中的不同之处。读者可以从https://www.packtpub. com/sites/default/files/downloads/LearningAkka_ColoredImages.pdf处下载该文件。
Packt出版社为出版的每一本书都提供了PDF和ePub的电子书版本。读者可以从www.packtpub.com获取电子书版本。纸质书的客户在购买电子书时可以享受折扣优惠。请通过customercare@packtpub.com联系我们,获取更多细节。
您还可以在www.packtpub.com阅读更多免费的技术文章,订阅免费新闻,并接收到大量Packt书籍以及电子书的折扣优惠。
如果对本书有任何问题,请通过questions@packtpub.com联系我们,我们将尽最大的努力解决您的问题。
Actor模型是一种并发计算的理论模型,而Akka的核心其实是Actor模型的一种实现。在本章中,我们将通过了解Akka和Actor模型的历史来介绍Akka的核心概念。这会帮助读者更好地理解Akka到底是什么,以及Akka试图要解决什么样的问题。其次,本章中将重复使用同一个例子来阐述本书的目的。
在介绍了上面这些概念后,本章将会把篇幅放在开发环境及工具的配置方法上。我们将配置好机器的环境,集成开发环境(Integrated Development Environment,IDE)并介绍第一个Akka项目(包括该项目的单元测试)。
本节将介绍Akka和Actor模型。Akka一词据说来源于瑞典的一座山,我们说到Akka时,通常是指一个分布式工具集,用于协调远程计算资源来进行一些工作。Akka是Actor并发模型的一种现代化实现。现在的Akka可以认为是从许多其他技术发展演化而来的,它借鉴了Erlang的Actor模型实现,同时又引入了许多新特性,帮助构建能够处理如今大规模问题的应用程序。
为了更好地理解Akka的含义及其使用方法,我们将快速地了解Actor模型的历史,理解Actor模型的含义,以及它是如何一步一步发展到如今的Akka这样一个用于构建高容错性分布式系统的框架。
Actor并发模型最早出现于一篇叫作《A Universal Modular Actor Formalism for Artificial Intelligence》的论文,该论文发表于1973年,提出了一种并发计算的理论模型,Actor就源于该模型。我们将在本节中学习Actor模型的特性,理解它的优点,能够在并发计算中帮助我们解决共享状态带来的常见问题。
首先,让我们来定义什么是Actor。在Actor模型中,Actor是一个并发原语;更简单地说,可以把一个Actor看作是一个工人,就像能够工作或是处理任务的进程和线程一样。把Actor看成是某个机构中拥有特定职位及职责的员工可能会对理解有所帮助。比如说一个寿司餐馆。餐馆的职员需要做各种各样不同的工作,给客人准备餐盘就是其中之一。
在面向对象编程语言中,对象的特点之一就是能够被直接调用:一个对象可以访问或修改另一个对象的属性,也可以直接调用另一个对象的方法。这在只有一个线程进行这些操作时是没有问题的,但是如果多个线程同时读取并修改同一个值,那么可能就需要进行同步并加锁。
Actor和对象的不同之处在于其不能被直接读取、修改或是调用。反之,Actor只能通过消息传递的方式与外界进行通信。简单来说,消息传递指的是一个Actor可以接收消息(在我们的例子中该消息是一个对象),本身可以发送消息,也可以对接收到的消息作出回复。尽管我们可以将这种方式与向某个方法传递参数并接收返回值进行类比,但是消息传递与方法调用在本质上是不同的:消息传递是异步的。无论是处理消息还是回复消息,Actor对外界都没有依赖。
Actor每次只同步处理一个消息。邮箱本质上是等待Actor处理的一个工作队列,如图1-1所示。处理一个消息时,为了能够做出响应,Actor可以修改内部状态,创建更多Actor或是将消息发送给其他Actor。
图1-1
在具体实现中,我们通常使用Actor系统这个术语来表示多个Actor的集合以及所有与该Actor集合相关的东西,包括地址、邮箱以及配置。
下面再重申一下这几个重要的概念:
虽然现在看来可能还不是太明显,但是Actor模型要比命令式的面向对象并发应用程序容易理解多了。我们可以举一个现实世界中的例子来比喻使用Actor模型来建模的过程,这会帮助我们理解它带来的好处。比如有一个寿司餐馆,其中有3个Actor:客人、服务员以及厨师。
首先,客人向服务员点单。服务员将客人点的菜品写在一张纸条上,然后将这张纸条放在厨师的邮箱中(将纸条贴在厨房的窗户上)。当厨师有空闲的时候,就会获取这条消息(客人点的菜品),然后就开始制作寿司(处理消息),直至寿司制作完成。寿司准备好以后,厨师会发送一条消息(盛放寿司的盘子)到服务员的邮箱(厨房的窗户),等待服务员来获取这条消息。此时厨师可以去处理其他客人的订单。
当服务员有空闲时,就可以从厨房的窗户获取食物的消息(盛放寿司的盘子),然后将其送到客人的邮箱(比如餐桌)。当客人准备好的时候,他们就会处理消息(吃寿司),如图1-2所示。
运用餐厅的运作来理解Actor模型是很容易的。随着越来越多的客人来到餐厅,我们可以想象服务员每次接收一位客人的下单,并将订单交给厨房,接着厨师处理订单制作寿司,最后服务员将寿司交给客人。每个任务都可以并发进行。这就是Actor模型提供的最大好处之一:当每个人各司其职时,使用Actor模型分析并发事件非常容易。而使用Actor模型对真实应用程序的建模过程和本例中对寿司餐厅的建模过程并没有太大差异。
Actor模型的另一个好处就是可以消除共享状态。因为一个Actor每次只处理一条消息,所以可以在Actor内部安全地保存状态。如果读者此前没有接触过并发系统,那么可能不是很容易马上理解这一点。不过我们可以用一种简单的方式来进行说明。假设我们尝试执行两个操作,同时读取、修改并保存一个变量,那么如果我们不进行同步操作并加锁的话,其中的一个操作结果将丢失。这是一个非常容易犯的错误。
图1-2
在下面的例子中,有两个线程同时执行一个非原子的自增操作。让我们来看看在线程间共享状态会带来什么结果。会有多个线程从内存中读取一个变量值,将该变量自增,然后将结果写回内存。这就是竞态条件(Race Condition),可以通过保证某一时刻只有一个线程访问内存中的值来解决其带来的一部分问题。下面用一个Scala的例子进行说明。
如果我们尝试使用多个线程并发地对一个整型变量执行100 000次自增操作,那么极有可能会丢失掉其中一些自增操作的结果。
import concurrent.Future
import concurrent.ExecutionContext.Implicits.global
var i, j = 0
(1 to 100000).foreach(_ => Future{i = i + 1})
(1 to 100000).foreach(_ => j = j + 1)
Thread.sleep(1000)
println(s"${i} ${j}")
我们使用x = x + 1这个简单的函数将i和j都自增了100 000次,其中i的自增操作通过多个线程并发执行,而j的自增操作则只通过一个线程来执行。等待1秒钟后,我们再打印运行结果,确保所有更新都已经完成。读者可能会认为运行结果是100000 100000,然而结果却并非如此,如图1-3所示。
共享状态是不安全的。如果两个线程同时读取一个值,将该值自增,然后写回内存,那么由于该值同时被多个线程读取,其中的某些操作结果将会丢失。这就是竞态条件,也是使用共享状态的并发模型存在的最基本的问题之一。
图1-3
通过推导每一步读取和写入操作的结果,我们能够更清晰地展示出竞态条件发生时的具体情况:
[...]
Thread 2 reads value in memory - value read as 9
Thread 2 writes value in memory - value set to 10 (9 + 1)
Thread 1 reads value in memory - value read as 10
Thread 2 reads value in memory - value read as 10
Thread 1 writes value in memory - value set to 11 (10 + 1) !! LOST
INCREMENT !!
Thread 2 writes value in memory - value set to 11 (10 + 1)
Thread 1 reads value in memory - value read as 11
[...]
为了保证内存中的共享状态值不出现错误,我们可以使用锁和同步机制,防止多个线程同时读取并写入同一个值。这就会导致问题变得更为复杂,更难理解,也更难确保结果的正确。
使用共享状态带来的最大威胁就是代码在测试中看上去经常是正确的,但是一旦有多个线程并发运行时,就会时不时地出现一些错误。由于测试时通常都不会出现多线程并发的情况,因此这些Bug很容易被忽略。Dion Almaer曾经在博客中写到过,大多数Java应用程序都存在大量的并发Bug,因此有时能正确运行,有时却运行失败。Actor通过减少共享状态来解决这一问题。如果我们把状态移到Actor内部,那么只有该Actor才能访问其内部的状态(实际上只有一个线程能够访问这些内部状态)。如果把所有消息都看做是不可变的,那么我们实际上可以去除Actor系统中的所有共享状态,构建更安全的应用程序。
本小节中的概念是Actor模型中的核心。第2章Actor与并发和第3章发送消息将会更详细地介绍并发、Actor以及消息传递。
自从在前面提到的论文中第一次出现以来,Actor模型随着时间的推移不断地发展,它对程序语言设计也产生了显著影响(比如Scheme)。
20世纪80年代,爱立信在Erlang语言中实现了Actor模型,用于嵌入式电信应用程序。这一实现绝对值得一提。该实现中引入了通过监督机制(Supervision)提供的容错性概念。爱立信使用Erlang和Actor模型实现了一款日后经常被提及的应用,叫作AXD301。AXD301能够提供99.9999999%的可用性,这一点令人惊叹。相当于在100年中,AXD301只有3.1秒的时间会宕机。AXD的开发团队表示,他们通过消除共享状态(正如我们之前介绍的一样)并引入Erlang中的监督容错机制来达到如此高的可用性。
Actor模型也是通过监督机制来提供容错性的。监督机制基本上是指把处理响应错误的责任交给出错对象以外的实体。这意味着一个Actor可以负责监督它的子Actor,它会监控子Actor的运行错误,并且根据子Actor生命周期中的运行表现执行相应的操作。当一个正在运行的Actor发生错误时,监督机制提供的默认处理方式是重新启动发生错误的Actor(实际上是重新创建)。这种重新创建出错Actor的处理方式基于一种假设:意外发生的错误是由错误状态导致的,因此移除并重新创建应用程序中出错的部分可以将其修复,并恢复正常工作。我们也可以编写自定义的错误处理方式作为监督策略,这样一来基本上就可以采取任何操作将应用程序恢复至工作状态,如图1-4所示。
图1-4
在本书中,我们会将关于分布式系统的容错性作为一个通用的问题,在多个章节中交叉介绍,并且把重点放在Akka和分布式系统的容错性上。第4章“Actor的生命周期——处理状态与错误”将重点介绍这一内容。
如今的业务需求要求工程师能够设计同时响应成千上万个并发用户请求的系统,而用一台机子来运行这样的系统是绝对不够的。除此之外,多核处理器变得越来越流行,因此将任务分布到多个核上以确保有效地利用硬件资源也变得越来越重要。
Akka采用了Actor模型,并且继续对其发展演化,引入了对如今的工程师来说最重要的一个特性:网络环境下的分布式。Akka将其自己视作是一个支持容错性的分布式工具集。也就是说,Akka是一个提供了在多个服务器的物理边界之间工作的工具集。在支持高可用性的同时,几乎可以无限扩展。在最近的几个Akka发布版本中,大多数新增的特性都和解决网络系统的问题有关。最近引入的Akka Cluster(集群)允许将一个Actor系统部署到多台机器上,并且这一点对用户不可见。Akka IO和Akka HTTP也已经进入了核心库,使得系统之间的交互变得更简单。Akka对于Actor模型的重要贡献之一就是位置透明性的概念:就是说一个Actor的邮箱地址实际上可以是一个远程地址,但是这个地址对开发者来说基本上是透明的,所以无论是否是远程地址,编写的代码也基本上是相同的。
Akka沿用了Erlang的一些Actor实现,并且打破了Actor系统的物理边界。Akka添加了远程处理以及位置透明性,使得一个Actor的邮箱可以在远程机器上,而Akka会对网络上的消息传输进行抽象封装。
最近,Akka又引入了集群。读者可能知道一些基于Amazon那篇Dynamo论文的分布式系统,比如Dynamo、Cassandra和Riak。Akka Cluster也采用了类似的现代化方法。有了Cluster以后,一个Actor系统就可以运行在多台机器之上,而不同的节点之间也可以就各自的状态互相通信交互,这就实现了一个可伸缩的Akka集群,并且没有单点故障。这一机制和Dynamo风格的数据库类似,比如Riak和Cassandra。这个特性非常好,使得创建可伸缩并且具备优良容错性的系统变得相当容易。
Typesafe(提供Scala和Akka等技术的公司)[1]不断在通过提供许多网络工具(比如Akka IO和Akka HTTP)来推广分布式计算。除此之外,Typesafe已经参与了Reactive Streams提案,而Akka也实现了第一个版本,支持用于异步处理的非阻塞背压(Back-Pressure)。
在本书中,我们会详细介绍上面诸多内容。第4章Actor的生命周期——处理状态与错误和第5章纵向扩展将更详细地介绍远程处理。第6章横向扩展——集群化将介绍Cluster。第7章处理邮箱问题将介绍Reactive Streams。
在本书中,我们将主要构建两个服务,推荐读者着手操作,一起一步一步地实践。在每章的结尾都有一些课后作业,通过这些练习,读者可以将这一章学到的内容付诸实践。在开始下一章的学习之前,请完成这些课后作业。如果想和别人分享学习的进度,或是希望将这些作业的答案开源,那么可以把它们发布到GitHub上。
在本书中,我们将主要开发两个软件。第一个例子用于展示状态和分布式的处理;第二个例子用于展示如何完成工作。
我们将研究如何构建一个可扩展的分布式内存数据库。在另一个例子中,我们将把数据存储到这个数据库中。说得明确一点,我们将构建一个类似于Redis或Memcached的高可用键值存储。我们构建的数据库将处理其在真实应用场景下所需的所有并发、集群以及分布式问题。为了了解如何提供上面的诸多特性,我们应学习如何在集群中对数据库的数据和负载进行切分及分配,有效地利用硬件资源以及如何进行横向扩展利用多台机器。我们将了解面对现实问题时的设计挑战及常见解决方案。我们还将研究如何构建客户端库,与我们编写的基于Akka的数据库进行交互,并且允许JVM上的任意用户使用。我极力推荐读者自己编写一个这样的数据库,放到GitHub上面去,并且在简历里展示一下。
这看上去相当复杂,但是庆幸的是,有了Akka工具集,要完成上面所有的任务其实相当简单。我们将从零开始,很快地构建出这个完整的系统。
为了在本书中举例展示如何使用Akka完成更多的工作,我们将构建一个用于阅读文章的API,读取博客或是新闻文章,抽取出主要的文本内容,然后将其存储到我们编写的数据库中,作为其他应用的数据源。
下面是一个用例:假设有一个移动设备上的阅读器,通过我们编写的服务从流行的RSS Feed请求读取文章,然后显示主要的文本内容,能够缩放文字适应屏幕显示,提供更好的阅读体验。我们的服务负责从主要的RSS Feed解析出文本内容,使其在移动设备上显示速度更快,用户无需等待。如果想要体验这样的真实移动应用程序,可以看看iOS上的Flipboard,这是我们所编写服务的应用的一个很好的例子,如图1-5所示。
图1-5
我们已经介绍了本书涉及的内容,接下来就开始配置环境并创建一个Actor吧!
在真正深入学习Akka之前,我们将介绍开发环境以及项目基本结构的设置。在本书后面的章节中新建项目时,读者可以回过头来参考本节的内容。
Scala和Java的API基本上是一一对应的,所以读者选择自己熟悉的语言即可。如果既会Scala,又会Java,那么Scala API当然更符合推荐的风格。不过两者都是绝对可用的选择。因为我们可以通过Scala的Actor API使用Scala来访问使用Java构建的Actor,反之亦然,所以也没有必要立刻就做出决定。选择能够让自己更快上手开发的就行。现在我们的重点是学习Akka,而不是学习语言。一旦了解了Akka,要学习另一个语言的API并不需要费很大的功夫。
本书并不支持所有旧版本的Java,只支持Java 8。因此如果读者是一个Java开发者,但是对Java 8的特性并不熟悉,那么应该花点时间了解一下lambda和stream API。下面的教程对此进行了介绍:
http://www.oracle.com/webfolder/technetwork/tutorials/obe/java/Lambda-QuickStart/ index.html。
读者将会发现,本书中大量使用了lambda。因此花时间了解一下绝对是有帮助的。
从Oracle下载并安装Windows JDK8的安装包(exe):http://www.oracle.com/ technetwork/java/javase/downloads/index.html。
按照说明安装。
从Oracle下载并安装OS X JDK8的安装包(dmg):http://www.oracle.com/ technetwork/java/javase/downloads/index.html。
按照说明安装。
有很多方法可以用于在*Nix系统上安装Java。我们可以使用通用安装包,也可以使用包管理器,比如基于Red Hat Enterprise Linux(RHEL)的发行版上的Yum和基于Debian的发行版上的Apt-Get。不同发行版上包管理器的安装方法各不相同,不过如果需要的话,可以在Google搜索引擎上找到相应的安装步骤。
通用安装包适用于所有系统,因此我们在这里介绍这种方法。这也是能够使用的最基本的安装方法。它会安装JDK,并且为当前用户启动JDK,但是不会对系统做出更改。如果想要修改系统的JDK/JRE,可以参照所运行的特定发行版的安装步骤。这一点对于服务器和桌面环境都是适用的。如果使用的是桌面环境,可以参照一下为其他用户设置默认JDK/JRE的步骤。
从Oracle下载Linux的JDK安装包(tar.gz):http://www.oracle.com/ technetwork/java/
javase/downloads/index.html
该文件的名字可能类似jdk-8u31-linux-x64.tar.gz。将tar.gz的文件解压到合适的位置,比如/opt:
sudo cp jdk-8u31-linux-x64.tar.gz /opt cd /opt sudo tar -xvf jdk-
8u31-linux-x64.tar.gz
我们需要将JAVA_HOME设置为Java8的文件夹:
echo 'export JAVA_HOME=/opt/jdk1.8.031' >> ~/.profile
并且确认PATH包含Java的bin目录:
echo 'export PATH=$PATH:/opt/jdk1.8.031' >> ~/.profile
现在,IDE和Sbt/Activator就可以使用安装的JDK来生成并运行我们所构建的应用程序了。
无论运行的是哪个操作系统,我们都需要确认设置了JAVA_HOME,并且PATH包含了Java的bin目录。只有使用通用安装包的时候才需要手动设置JAVA_HOME和PATH,但是无论使用哪种安装方式,都应该在一个新建的终端窗口中确认配置了JAVA_HOME环境变量并向PATH中添加了Java的bin文件夹。
如果使用的是Scala,那么需要在系统上安装Scala及其REPL。在本书撰写时,最新的Scala版本(2.11)会被编译成Java 1.6的字节码,因此无需安装JDK8。不过有人说未来的Scala版本可能会要求JDK8,所以这一点可能会有变化。
Scala并不一定要单独安装。Typesafe Activator包含了Scala以及我们需要的所有工具,接下来我们会进行安装。
Typesafe Activator的安装包内打包了Scala、Akka、Play,简单构建工具(Simple Build Tool,SBT)以及其他一些特性,比如项目结构与模板。
从Typesafe下载Typesafe Activator:http://www.typesafe.com/get-started
运行安装程序,按照屏幕上的步骤安装。
从Typesafe下载Typesafe Activator:
将文件解压到合适的位置,比如/opt:cd /opt sudo unzip typesafe- activator-1.2.12.zip。
赋予Activator可执行权限:sudo chmod 755 /opt/activator-1.2.12/activator。
将Activator添加至PATH:echo 'export PATH=$PATH:/opt/activator- 1.2.12'。
退出并重新登录。确认可以在命令行运行下述命令:
activator --version
这句命令会输出类似如下的结果:sbt launcher version 0.13.5
除了可以使用Linux上的方法安装Activator外,还可以使用brew。本小节介绍使用brew的安装方法。
打开一个终端窗口。
输入下述命令(拷贝自http://brew.sh)。该命令会安装Homebrew OS X包管理器。
ruby -e "$(curl –fsSL https://raw.githubusercontent.com/Homebrew/
install/master/install)"
最后,在终端窗口中输入下述命令并按回车键:
brew install typesafe-activator
确认可以从命令行访问Activator:
activator --version
在本书中,我们将使用Activator快速建立项目的结构。我们可以使用任意模板来生成一个项目。我们只会使用基本的Java和Scala模板。读者可以随意尝试其他选项。Typesafe有许多由用户提交的模板,这些模板展示了如何结合使用各种不同的技术和方法。
在终端窗口的命令行提示符中输入如下命令,创建一个新的Activator模板:
activator new
该命令的输出如下。
从中选择一个模板或者键入模板的名称:
提示
可以敲击Tab键查看所有模板
根据想要使用的语言,选择minimal-scala或者minimal-java。接着会要求输入应用程序的名称,可以叫作akkademy-db。
进入akkademy-db文件夹,运行activator test,确认项目及环境已正确配置。
cd akkademy-db activator test
在输出中可以看到,项目进行了编译,并且运行了测试。如果有任何问题,读者可能需要在继续往下阅读前查询stack-overflow,解决环境配置问题。
如果没有任何问题的话,可以看到下述输出:
[info] Passed: Total 1, Failed 0, Errors 0, Passed 1 [success] Total
time: 3 s, completed 22-Jan-2015 9:44:21 PM
我们已经配置好了环境,并且可以运行。现在可以开始编写代码了。如果读者想使用简单的文本编辑器,那么请跳过本小节。Emacs和Sublime都是非常优秀的文本编辑器,提供语法高亮和支持自动补全的插件。不过如果读者想要使用IDE的话,本小节会介绍Eclipse和Intellij的配置。
如果选择使用IDE的话,推荐Intellij。在撰写本书的过程中,和我共事的许多Java开发者接手开发SBT项目后,他们几乎全部都转而使用Intellij,并且再也不想改回去了。
Intellij现在有内置的SBT支持,因此用于开发Akka项目的时候速度会很快。由于Intellij原生地支持本书中用到的所有技术,因此用户几乎不需要进行任何IDE配置。
创建并运行项目的步骤:
1.下载并安装Intellij CE(免费)。
2.安装完成后,选择打开项目。选择akkademy-db文件夹。
3.如果使用的是Java的话(或者如果Scala 2.12要求Java 1.8),选择Java 1.8;如果使用的是Scala 2.11,那么选择Java 1.6或Java 1.7。启用自动导入功能。单击确定按钮。
如果使用Eclipse的话,那么推荐读者下载Scala-Ide。Scala-Ide中包含了使用Java或Scala创建本书中Sbt/Akka项目所需的所有插件。即使只使用Java,读者也可能会在学习本书的过程中使用一些Scala相关的内容。
Scala-Ide是一个在Eclipse中集成了Sbt和Scala插件的安装包。可以从http://scala- ide.org下载。
解压缩下载的文件。可以把解压缩后的文件夹移动到其他位置,比如Linux下的/opt和OSX下的~/Applications。
运行Eclipse二进制可执行文件。选择一个工作目录,或是设置默认工作目录。
进入Preferences: Java | Compiler,确认选择了正确的Java JDK。
要在Eclipse中打开项目,就必须要先生成一个Eclipse项目。
首先,我们必须要将eclipse sbt插件添加到环境中。打开全局sbt插件文件(如果没有的话新建一个),该文件位于~/.sbt/{version}/plugins/plugins.sbt。其中version就是sbt的版本。在撰写本书时,sbt的版本是0.13,所以该文件为~/.sbt/0.13/ plugins/plugins.sbt。
向该文件中添加如下行,如果文件中包含多行的话,确保行与行之间以空行相隔。
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "3.0.0")
可能需要查看sbteclipse的Git项目,确保该插件仍然可用:https://github. com/typesafehub/
sbteclipse/。
一旦安装完该插件后,就可以生成eclipse项目:https://github.com/ typesafehub/ sbteclipse/。
在终端窗口中,进入之前建立的项目(akkademy-db)。在项目根目录下运行activator eclipsify,生成eclipse项目结构。
运行成功的话,将会看到如下信息:
[info] Successfully created Eclipse project files for project(s): [info]
akkademy-db
在Eclipse中,选择File | Import。
选择General | Existing Projects into Workspace,如图1-6所示。
图1-6
选择文件夹,点击下一步,如图1-7所示。
提示
要注意的是,如果修改了build.sbt,那么需要重新生成项目,并且可能需要重新导入。
图1-7
既然我们已经介绍了如何配置环境和创建项目,现在就可以开始使用Akka来编写一些Actor的代码并进行测试了。我们将使用简单构建工具(Simple Build Tool,SBT)。SBT是Scala项目的首选构建工具,也是Play框架和Activator实际使用的构建工具。SBT并不复杂,而且我们只使用它来管理依赖,构建、测试并运行应用程序,因此它不会成为我们学习Akka的障碍。
现在我们将使用IDE打开Java或Scala应用程序。Activator创建的项目结构并不是用于Akka的,因此我们首先需要添加Akka的依赖。我们将添加Akka的核心Actor模块akka-actor和akka-testkit,akka-testkit提供的工具使得我们对Actor的测试变得更加简单。
在一个Scala项目的build.sbt文件中,我们可以看到类似下面的代码。要注意的是,build.sbt中的依赖实际上是Maven的依赖。我们可以轻松地将任何Maven依赖添加到build.sbt的依赖中。接下来会对此做简要介绍。Java项目和Scala项目的build.sbt大同小异,只不过Java项目中包含了Junit依赖,而Scala的则是Scalatest。
name := """akkademy-db-java"""
version := "1.0"
scalaVersion := "2.11.1"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % "2.3.6",
"com.typesafe.akka" %% "akka-testkit" % "2.3.6" % "test",
"junit" % "junit" % "4.11" % "test",
"com.novocode" % "junit-interface" % "0.10" % "test"
)
要使用Akka,需要添加一条新的依赖。
Java项目的依赖如下:[2]
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % "2.3.6",
"com.typesafe.akka" %% "akka-testkit" % "2.3.6" % "test",
"junit" % "junit" % "4.11" % "test",
"com.novocode" % "junit-interface" % "0.10" % "test"
)
Scala项目的build.sbt如下:
name := """akkademy-db-scala"""
version := "1.0"
scalaVersion := "2.11.1"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % "2.3.3",
"com.typesafe.akka" %% "akka-testkit" % "2.3.6" % "test",
"org.scalatest" %% "scalatest" % "2.1.6" % "test"
)
由于Scala的各个主要版本之间并不互相兼容,因此经常会使用多个Scala的版本来构建并发布库。为了能够在项目中通过SBT试图解析出使用正确Scala版本的依赖项,我们可以对build.sbt中声明的依赖稍作修改,在组ID后面使用两个%符号,而不是在库ID中给出Scala版本。
例如,在一个Scala 2.11的项目中,下面代码中的两个依赖是相同的:
"com.typesafe.akka" % "akka-actor_2.11" % "2.3.3"
"com.typesafe.akka" %% "akka-actor" % "2.3.3"
任何Maven依赖都可以添加到build.sbt中。比如http://www. mvnrepository.com中的所有依赖。在这个链接指向的页面上,每个库都有一个sbt的tab,其中包含了添加SBT依赖需要的代码。
在本小节中,我们将创建一个Actor,该Actor接收一条消息,将消息中的值存入Map以修改Actor的内部状态。这是我们构建分布式数据库的简单的开始。
我们将从构造一个SetRequest消息开始编写我们的内存数据库。该消息用于将一个键(String)值(Object)对存储到内存中。我们可以把它看成是在同一个操作中进行插入和更新,类似于Map的Set操作。
要记住的是,Actor需要从其邮箱中获取消息,并查看消息中的操作指示。我们通过消息的类/类型来决定具体的操作。消息类型的内容具体描述了如何实现API协议。在本例中,我们在消息中使用String作为键,Object作为值。
消息必须永远是不可变的,这样可以确保我们和我们的团队不通过多个执行上下文/线程来做一些不安全的操作,从而避免一些奇怪而又出人意料的行为。同样要记住这些消息除了会发送给本地的Actor以外,也可能会发送给另一台机器上的Actor。如果可能的话,把所有值都定义为val(Scala)或final(Java),并且使用不可变集合及类型,比如Google Guava(Java)和Scala标准库所提供的集合及类型。
下面使用Java将SetRequest消息实现为一个不可变对象。这是在Java中实现不可变对象的一种相当标准的方法。任何熟练的Java开发者应该都对此很熟悉。一般来说,我们应该始终在所有代码中优先使用不可变对象。
package com.akkademy.messages;
public class SetRequest {
private final String key;
private final Object value;
public SetRequest(String key, Object value) {
this.key = key;
this.value = value;
}
public String getKey() {
return key;
}
public Object getValue() {
return value;
}
}
在Scala中,我们有一种更为简洁的方式来定义不可变消息:case class。我们可以通过case class来创建不可变消息,一旦在构造函数中设置属性初值后,之后就只能够读取属性值,不能再进行修改:
package com.akkademy.messages
case class SetRequest(key: String, value: Object)
这样消息就定义完成了。
既然已经定义好了消息,现在我们就可以创建Actor,并描述Actor接收到消息后如何做出响应。作为例子的开始,在本小节中我们将在响应中做两件事:
1.将消息输出到日志。
2.将任何Set消息中的内容保存起来,以供之后使用。
在后面的章节中,我们将在这个例子的基础上不断完善,支持获取存储的消息,并将这个Actor作为一个线程安全的缓存抽象层来使用(最终实现成一个全功能的分布式键值存储)。
首先让我们看一下使用Java 8实现的Actor。
下面的代码展示了使用Java实现的Actor收到消息后的响应[3]
package com.akkademy;
import akka.actor.AbstractActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.pf.ReceiveBuilder;
import com.akkademy.messages.SetRequest;
import java.util.HashMap;
import java.util.Map;
public class AkkademyDb extends AbstractActor {
protected final LoggingAdapter log =
Logging.getLogger(context().system(), this);
protected final Map<String, Object> map = new HashMap<>();
private AkkademyDb() {
receive(ReceiveBuilder
.match(SetRequest.class, message -> {
log.info("Received Set request: {}", message);
map.put(message.getKey(), message.getValue());
})
.matchAny(o -> log.info("received unknown message: {}", o))
.build()
);
}
}
Actor是一个继承了AbstractActor(Java8的Java Akka API)的Java类。我们在这个类里创建了log和map两个变量,并且声明为protected,这样就可以在本章后面的测试用例中访问这两个变量。
我们在构造函数中调用receive。receive方法接受ReceiveBuilder作为参数,我们连续调用ReceiveBuilder的几个方法,生成最终的ReceiveBuilder。这样就描述了Actor接收到不同类型消息时该如何做出响应。在这里我们定义两种响应,并逐一介绍。
首先,我们定义Actor接收到任何SetRequest消息时做出的响应:
match(SetRequest.class, message -> {
log.info("Received Set request: {}", message);
map.put(message.getKey(), message.getValue());
}).
在Java8的API里,ReceiveBuilder的match方法有点儿像case语句,只不过match方法可以匹配类的类型。更正式地来说,这其实就是模式匹配。
在调用的match方法中,我们定义:如果消息的类型是SetRequest.class,那么接受该消息,打印日志,并且将该Set消息的键和值作为一条新纪录插入到map中。
其次,我们捕捉其他所有未知类型的消息,直接输出到日志。
matchAny(o -> log.info("received unknown message"))
由于Scala在语言层面原生支持模式匹配,因此用来实现Actor最合适不过了。现在我们来看一下相应的Scala代码。
package com.akkademy
import akka.actor.Actor
import akka.event.Logging
import scala.collection.mutable.HashMap
import com.akkademy.messages.SetRequest
class AkkademyDb extends Actor {
val map = new HashMap[String, Object]
val log = Logging(context.system, this)
override def receive = {
case SetRequest(key, value) => {
log.info("received SetRequest - key: {} value: {}", key, value)
map.put(key, value)
}
case o => log.info("received unknown message: {}", o);
}
}
在Scala API中,我们混入了Actor Trait,和在Java中一样定义了map和log,然后实现receive方法。Actor Trait中的receive方法返回一个Receive,在Akka的源代码中,将Receive定义为一个PartialFunction,如下所示:
type Receive = scala.PartialFunction[scala.Any, scala.Unit]
我们使用模式匹配生成的PartialFunction来定义接收到SetRequest消息时的响应。通过模式匹配提供的语义,我们可以用更清晰的代码提取出表示键和值的变量:
case SetRequest(key, value)
这个响应行为就是直接将该请求输出到日志,然后在map中插入键值对。
case SetRequest(key, value) => {
log.info("received SetRequest - key: {} value: {}", key, value)
map.put(key, value)
}
最后,我们添加一种情况用于捕捉其他所有未知类型的消息,并且直接将这样的消息输出到日志。
case o => log.info("received unknown message: {}", o);
这样就定义好Actor了。接下来我们需要验证编写的Actor是否正确。
尽管介绍测试框架的书可能会直接打印到控制台,或是创建网页来证明代码可以正确运行,不过我们将使用单元测试来验证代码并阐释Actor的用法。库的代码和服务很多情况下都没有提供易于交互或理解的API,而一般来说,几乎所有项目都通过测试来对这些库和服务进行验证。对于任何严肃的开发者来说,这都是必须具备的重要技能。
Akka提供了一套测试工具集,其中几乎包含了测试Actor代码需要的所有工具。之前配置项目的时候,我们已经导入了这个测试工具集的依赖。在build.sbt中,它的SBT依赖如下所示,读者可作参考:
"com.typesafe.akka" %% "akka-testkit" % "2.3.6" % "test"
我们在这里将会使用testkit中提供的TestActorRef,而不是普通的ActorRef(下一章中将作介绍)。TestActorRef是通用的,有两个功能:首先,它提供的Actor API是同步的,这样我们就不需要在测试中考虑并发的问题;其次,我们可以通过TestActorRef访问其背后的Actor对象。
说得更清楚一点,Akka隐藏了真实的Actor(AkkademyDb),提供了指向Actor的引用,我们可以将消息发送至该引用。这样就对Actor进行了封装,保证了没有任何人可以直接访问真正的对象实例,只能进行消息传输。
接下来让我们来看看源代码,然后逐行解释。
下面是使用Akka testkit[4]的源代码:
package com.akkademy;
import static org.junit.Assert.assertEquals;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.testkit.TestActorRef;
import com.akkademy.messages.SetRequest;
import org.junit.Test;
public class AkkademyDbTest {
ActorSystem system = ActorSystem.create();
@Test
public void itShouldPlaceKeyValueFromSetMessageIntoMap() {
TestActorRef<AkkademyDb> actorRef =
TestActorRef.create(system, Props.create(AkkademyDb.class));
actorRef.tell(
new SetRequest("key", "value"), ActorRef.noSender());
AkkademyDb akkademyDb = actorRef.underlyingActor();
assertEquals(akkademyDb.map.get("key"), "value");
}
}
下面是与Actor进行交互的源代码:
package com.akkademy
import akka.actor.ActorSystem
import akka.testkit.TestActorRef
import akka.util.Timeout
import com.akkademy.messages.SetRequest
import org.scalatest.{FunSpecLike, Matchers}
import scala.concurrent.duration._
class AkkademyDbSpec extends FunSpecLike with Matchers {
implicit val system = ActorSystem()
implicit val timeout = Timeout(5 seconds)
describe("akkademyDb") {
describe("given SetRequest") {
it("should place key/value into map") {
val actorRef = TestActorRef(new AkkademyDb)
actorRef ! SetRequest("key", "value")
val akkademyDb = actorRef.underlyingActor
akkademyDb.map.get("key") should equal(Some("value"))
}
}
}
}
这是我们第一次讲到与Actor的交互,所以有一些之前没出现过的代码。其中一部分是专门用于测试的,另一部分是与Actor的交互相关的。
我们已经介绍过,Actor系统是包含了所有Actor及其地址的一个地方,所以在创建Actor之前,需要做的第一件事就是获取Actor系统的引用。在上面的测试示例里,我们将该引用存在一个变量中:
//Java
ActorSystem system = ActorSystem.create();
//Scala
implicit val system = ActorSystem()
在创建完Actor系统之后,我们就可以在Actor系统中创建Actor了。正如之前提到的那样,我们将使用Akka Testkit来创建一个TestActorRef,提供同步API,并且允许我们访问其指向的Actor。下面,我们就在Actor系统中创建Actor:
//Java
TestActorRef<AkkademyDb> actorRef = TestActorRef.create(system, Props.
create(AkkademyDb.class));
//Scala
val actorRef = TestActorRef(new AkkademyDb)
我们调用Akka Testkit中TestActorRef提供的create方法,传入创建的Actor系统(在Scala中是隐式传入的),以及指向Actor类的引用。在后面的章节中,我们将介绍如何创建Actor。由于真正的Actor实例被隐藏了,因此在我们的Actor系统中创建Actor返回的是一个ActorRef(在本例中是TestActorRef),我们可以将消息发送至该ActorRef。有了Actor系统和Actor的类引用后,Akka就足以在Actor系统的创建这个简单的Actor。因此,现在我们已经成功地创建了第一个Actor了。
与Actor之间的交互是通过消息传递来进行的。我们使用“tell”或是Scala中的“!”(仍然读作“tell”)将消息放入Actor的邮箱中。使用Java时,我们通过tell的第二个参数定义该消息并不需要任何响应对象。而在Scala中,这是在外部隐式定义的。
//Java
actorRef.tell(new SetRequest("key", "value"), ActorRef.noSender());
//Scala
actorRef ! SetRequest("key", "value")
因为我们使用的是TestActorRef,所以只有在tell调用请求处理完成后,才会继续执行后面的代码。对于我们的第一个Actor来说,这并没有问题,但是要注意的是,这个例子并没有展示出Actor API的异步特性,而这并不是常见的用法。通常情况下,tell是一个异步操作,调用后会立即返回。
最后,我们需要检查Actor是否将值存入了map中,确认其行为是否正确。为了进行这项检查,首先得到指向背后Actor实例的引用,然后调用get("key")检查map,确认已经将值存入map中。
//Java
AkkademyDb akkademyDb = actorRef.underlyingActor();
assertEquals(akkademyDb.map.get("key"), "value");
//Scala
val akkademyDb = actorRef.underlyingActor
akkademyDb.map.get("key") should equal(Some("value"))
这样我们就完成了第一个简单的测试用例。这个基本的模式可以用于构建同步的Actor单元测试。在阅读本书的过程中,我们会看到更多的单元测试示例以及对Actor进行异步集成测试的例子。
我们快要完成了!既然已经创建了测试用例,我们就可以打开命令行,运行“activator”,启动activator cli。接着运行“clean”,删除所有无用文件,然后运行“test”,开始执行测试用例。可以通过运行activator clean test在一条命令中执行上述所有操作。
运行Java的JUnit测试用例时,会看到如下输出:
[INFO] [01/12/2015 23:09:24.893] [pool-7-thread-1] [akka://default/
user/$$a]
Received Set request: Set{key='key', value=value}
[info] Passed: Total 1, Failed 0, Errors 0, Passed 1
[success] Total time: 7 s, completed 12-Jan-2015 11:09:25 PM
而如果使用Scala的话,scala-test的输出会更优雅一些:
[info] AkkademyDbSpec:
[info] akkademyDb
[info] - should place key/value from Set message into map
[info] Run completed in 1 second, 990 milliseconds.
[info] Total number of tests run: 1
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
从输出结果中,我们可以看到运行测试用例的总数和失败的测试用例数。如果有错误的话,结果中会给出错误发生的位置,以便我们进行排查。一旦编写了测试用例并指定了其应有的行为,就不用担心对代码的修改或重构会导致错误的行为了。
为了确保读者能够很好地掌握每章的内容,在每章的结尾将会给出一些课后作业。
我们已经正式开始学习如何Akka来编写可扩展的分布式应用程序了。本章简要介绍了Actor模型的历史,以帮助我们了解Akka的含义和由来。接着,讲述了如何创建用于Akka代码的SBT项目。我们配置了开发SBT项目所需的环境,并创建了一个Actor。然后通过一个测试用例对创建的Actor进行了测试。
在接下来的章节中,我们会给示例应用程序添加一个客户端,并将其分布到多个核以及处理器上,这个程序会开始看上去像一个真正的分布式数据库。
[1] 译者注:已改名为Lightbend。
[2] 译者注:此处原书代码有误,已更正。
[3] 译者注:原书代码有误,已更正,请参照GitHub上的代码:https://github.com/ jasongoodwin/ learning-akka/blob/master/ch1/akkademy-db-java/src/main/java/com/akkademy/AkkademyDb.java。
[4] 译者注:原文为toolkit,有误。
本章将介绍进行并发编程及异步编程所需的背景知识。在继续学习后面的章节前,理解本章的内容至关重要。如果读者早已熟稔Scala的Future,Play的Promise或是Java 8的CompletableFuture,那么可以跳过本章。如果曾经使用过Guava或Spring的Listenable Future,可能需要了解本章所介绍API的不同之处。如果从来没有使用过monadic风格的Future,那就需要花点时间学习一下本章了。
本章涉及如下话题:
任何关于Akka的书籍都会有关于“响应式”这个术语的描述。Akka也被称作是一个响应式平台,更具体的说,它是Typesafe响应式平台的一部分。由此而论,这个术语变得越来越流行,尤其要感谢响应式宣言(Reactive Manifesto)。响应式宣言是一份文档,尝试分析Web应用程序要满足如今用户的需要以成功地进行扩展所必需的特性。
从这个角度来看,这个词其实是开发者的一个流行文化。当然,也有一些人在听到这个词的时候可能会对它颇有微词。
本小节将简要介绍响应式宣言中提出的4个准则。这些是我们在开发应用程序的过程中需要努力支持的特性,以提高程序的可扩展性和容错性。在介绍后面的内容时,我们也会回过头来提到这些特性。可以从http://reactivemanifesto.org/访问查看响应式宣言。
响应式宣言中包含了4个准则,或者说是设计目标:灵敏性、伸缩性、容错性以及事件驱动设计。接下来通过应用实例分别对这4个准则进行概述。
应用程序应该尽可能快地对请求做出响应。
如果可以在顺序获取数据和并行获取数据之间进行选择的话,为了尽快向用户返回响应,始终应该优先选择并行获取数据,可以同时请求互相没有关联的数据。当我们需要请求多个互相无关、没有依赖的数据时,应该考虑是否能够同时请求这些数据。
如果可能出现错误,应该立即返回,将问题通知用户,不要让用户等待直到超时。
应用程序应该能够根据不同的工作负载进行伸缩扩展(尤其是通过增加计算资源来进行扩展)。为了提供伸缩性,系统应该努力消除瓶颈。
如果在虚拟机上运行内存数据库,那么添加另一个虚拟节点就可以将所有的查询请求分布到两台虚拟服务器上,将可能的吞吐量增加至原来的两倍。添加额外的节点应该能够几乎线性地提高系统的性能。
增加一个内存数据库的节点后,还可以将数据分为两半,并将其中的一半移至新的节点,这样就能够将内存容量提高至原来的两倍。添加节点应该能够几乎线性地提高内存容量。
应用程序应该考虑到错误发生的情况,并且从容地对错误情况做出响应。如果系统的某个组件发生错误,对与该组件无关的请求不应该产生任何影响。错误是难以避免的,因此应该将错误造成的影响限制在发生错误的组件之内。如果可能的话,通过对重要组件及数据的备份和冗余,这些组件发生错误时不应该对其外部行为有任何影响。
使用消息而不直接进行方法调用提供了一种帮助我们满足另外3个响应式准则的方法。消息驱动的系统着重于控制何时、何地以及如何对请求做出响应,允许做出响应的组件进行路由以及负载均衡。
由于异步的消息驱动系统只在真正需要时才会消耗资源(比如线程),因此它对系统资源的利用更为高效。消息也可以被发送到远程机器(位置透明)。因为要发送的消息暂存在Actor外的队列中,并从该队列中发出,所以就能够通过监督机制使得发生错误的系统进行自我恢复。
4个响应式准则之间并不是完全独立的。为了满足某个准则而采取的方法通常也对满足其他准则有所帮助。例如,如果发现某个服务响应速度较慢,我们可能会在短时间内停止再向该服务发送请求,等待其恢复正常,并立即向用户返回错误信息。这样做降低了响应慢的服务不堪重负直接崩溃的风险,因此也提高了系统的容错性。除此之外,我们立即告知了用户系统发生的问题,也就改善了系统的响应速度,如图2-1所示。
图2-1
在介绍与本书要编写的分布式数据库更相关的例子之前,我们将先介绍Actor最基本的特性,理解Actor的基本结构和方法。为了展示最简单的可能情况,我们将在这个例子中构建一个简单的Actor,这个Actor接收一个字符串“Ping”,返回字符串“Pong”作为响应。
我们将首先介绍Java实现。Java和Scala的API差别很大,因此需要分别介绍。
public class JavaPongActor extends AbstractActor {
public PartialFunction receive() {
return ReceiveBuilder
.matchEquals("Ping", s ->
sender().tell("Pong", ActorRef.noSender()))
.matchAny(x ->
sender().tell(
new Status.Failure(new Exception("unknown message")), self()
))
.build();
}
}
上面的代码展示了Java的Actor API。我们将具体解释这个例子。
match函数从上至下进行模式匹配。所以可以先定义特殊情况,最后定义一般情况。
ReceiveBuilder
.matchEquals("Ping", s -> System.out.println("It's Ping: " + s))
.match(String.class, s -> System.out.println("It's a string: " + s))
.matchAny(x -> System.out.println("It's something else: " + x))
.build
和第1章初识Actor中使用过的方法类似,我们描述了接收到的消息是String时应该做出的响应。由于需要检查接收到的字符串是否为“Ping”,因此需要进行判断,所以这里使用的match方法略有不同。然后描述响应行为:通过tell()方法向sender()返回一条消息。我们返回的消息是字符串“Pong”。Java的tell方法要求提供消息发送者的身份:这里使用ActorRef.noSender()表示没有返回地址。 - 返回akka.actor.Status.Failure:为了向发送方报告错误信息,需要向其发送一条消息。如果Actor中抛出了异常,就会通知对其进行监督的Actor(将在第3章传递消息中进行介绍)。不过无论如何,如果想要报告错误消息,需要将错误发送给发送方。如果发送方使用Future来接收响应,那么返回错误消息会导致Future的结果为失败。我们将之后对此进行简要介绍。
上面几点涉及了Java AbstractActor的基本API。接下来我们将了解一下Scala API。
下面的代码展示了一个Scala Actor:
//Scala
class ScalaPongActor extends Actor {
override def receive: Receive = {
case "Ping" => sender() ! "Pong"
case _ =>
sender() ! Status.Failure(new Exception("unknown message"))
}
}
接下来我们就详细介绍这个Actor。
scala> val pf: PartialFunction[Any, Unit] = {
case _: String => println("Got a String")
}
pf: PartialFunction[Any,Unit] = <function1>
scala> pf("hey")
Got a String
def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit
implicit final val self = context.self
消息发送者是隐式传入的,无需担心,但是在对Java和Scala的API进行比较时,理解隐式的值到底从何而来对我们是很有帮助的。
我们很快就会介绍如何获取另一个Actor发回的响应。在这之前,我们首先要能创建Actor。
访问Actor的方法和访问普通对象的方法有所不同。我们从来都不会得到Actor的实例,从不调用Actor的方法,也不直接改变Actor的状态,反之,只向Actor发送消息。除此之外,我们也不会直接访问Actor的成员,而是通过消息传递来请求获取关于Actor状态的信息。使用消息传递代替直接方法调用可以加强封装性。Alan Kay是最早对面向对象编程进行描述的人,他实际上把消息传递也定义为面向对象编程的一部分。我想Alan Kay看到面向对象后来的发展情况,有时候一定会唏嘘不已。
〖〓em〓〗我发明了“面向对象”这个术语,不过我可以告诉你,这跟C++一点关系都没有。〖〓/em〓〗
——Alan Kay, OOPSLA '97
通过使用基于消息的方法,我们可以相当完整地将Actor的实例封装起来。如果只通过消息进行相互通信的话,那么永远都不会需要获取Actor的实例。我们只需要一种机制来支持向Actor发送消息并接收响应。
在Akka中,这个指向Actor实例的引用叫做ActorRef。ActorRef是一个无类型的引用,将其指向的Actor封装起来,提供了更高层的抽象,并且给用户提供了一种与Actor进行通信的机制。
上文已经介绍过,Actor系统就是包含所有Actor的地方。有一点可能相当明显:我们也正是在Actor系统中创建新的Actor并获取指向Actor的引用。actorOf方法会生成一个新的Actor,并返回指向该Actor的引用。
//Java
ActorRef actor = actorSystem.actorOf(Props.create(JavaPongActor.class));
//Scala
val actor: ActorRef =
actorSystem.actorOf(Props(classOf[ScalaPongActor]))
注意
要注意的是,这里我们实际上并没有新建Actor,例如,我们没有调用actorOf(new PongActor)。
Actor都被封装起来了,不能够被直接访问。我们不能从外部代码中直接访问Actor的状态。创建Actor的模式保证了这一点,现在我们就来一窥究竟。
为了保证能够将Actor的实例封装起来,不让其被外部直接访问,我们将所有构造函数的参数传给一个Props的实例。Props允许我们传入Actor的类型以及一个变长的参数列表。
//Java
Props.create(PongActor.class, arg1, arg2, argn);
//Scala
Props(classOf[PongActor], arg1, arg2, argn)
如果Actor的构造函数有参数,那么推荐的做法是通过一个工厂方法来创建Props。假如我们不希望Pong Actor返回“Pong”,而是希望其返回另一条消息,那么可能就会需要这样的构造参数。我们可以创建一个工厂方法,用于生成这样的Props示例:
//Java
public static Props props(String response) {
return Props.create(this.class, response);
}
//Scala
object ScalaPongActor {
def props(response: String): Props = {
Props(classOf[ScalaPongActor], response)
}
}
然后就可以使用Props的工厂方法来创建Actor:
//Java
ActorRef actor = actorSystem.actorOf(JavaPongActor.props("PongFoo"));
//Scala
val actor: ActorRef = actorSystem.actorOf(ScalaPongActor props
"PongFoo")
虽然创建Props的工厂方法并非必须,但是能够在同一个地方管理对Props对象的创建,因此所有对Actor构造参数的修改都可以与其他代码隔离,防止在代码修改的过程中引起其他模块的错误。
actorOf创建一个Actor,并返回指向该Actor的引用ActorRef。除此之外,还有另一种方法可以获取指向Actor的引用:actorSelection。
为了理解actorSelection,我们需要先来看一下Actor的路径。每个Actor在创建时都会有一个路径,我们可以通过ActorRef.path来查看该路径。该路径看上去如下所示:
该路径是一个URI,它甚至可以指向使用akka.tcp协议的远程Actor。
要注意的是,路径的前缀说明使用的协议是akka.tcp,并且指定了远程Actor系统的主机名和端口号。如果知道Actor的路径,就可以使用actorSelection来获取指向该Actor引用的ActorSelection(无论该Actor在本地还是远程)。
ActorSelection selection = system.actorSelection("akka.tcp://
actorSystem@host.jason-goodwin.com:5678/user/KeanuReeves");
这里的ActorSelection是一个指向Actor的引用,我们可以像使用ActorRef一样,使用ActorSelection进行网络间的通信。由此可以看出,使用Akka在网络上传递消息非常容易,而这也是Akka的位置透明性在实际应用中的一个例子。要对某个应用程序进行修改,使之与远程服务进行通信,几乎只需要修改对Actor位置的配置就足够了。
现在来回顾一下本小节的内容。我们可以创建一个Actor,传入一个构造参数列表构建一个Props实例,并将该Props实例作为参数传给system.actorOf并调用system.actorOf方法,得到指向该Actor的引用。要为Actor指定名字的话,只需要将该名字作为参数传给actorOf方法即可。最后,我们可以使用actorSelection在本地或远程系统上查找已有的Actor。
接下来,我们将开始学习如何编写异步和事件驱动的代码。
在继续学习更复杂的基于Actor的应用程序之前,需要了解一些事件驱动编程模型中的基本抽象概念:Promise和Future。在第一章初识Actor中,我们已经了解过如何向一个Actor发送消息,以及如何在Actor内根据接收到的事件进行不同的响应行为。但是,如果想要通过发送消息向Actor请求获取一些输出结果呢?比如说需要从内存键值存储中获取一条记录。
几乎每个开发者都很熟悉阻塞式的代码。进行IO操作时,编写的都是阻塞式的代码。当我们调用一个同步的API时,调用的方法不会立即返回:应用程序会等待该调用执行完成。例如,如果发起一个HTTP请求的话,只有在请求完成后,才会收到返回的响应对象。由于发起调用的线程会暂停执行并等待,因此等待IO操作完成的代码都是阻塞的,在IO操作完成之前,发起调用的线程无法进行任何其他操作。下面是一个阻塞式代码的例子。例子使用Java数据库连接(Java Database Connectivity,JDBC)发起一个查询:
stmt = conn.createStatement();
String sql = "select name from users where id='123'";
ResultSet rs = stmt.executeQuery(sql);
rs.next();
String name = rs.getString("name");
这里我们使用JDBC从数据库获取一个用户名。代码看上去非常简单,但是有一些不是很显然的运行行为降低了这段简单代码的可读性。
调用executeQuery时,发起调用的线程必需等待数据库查询的完成。在一个Web应用程序中,有许多用户可能会同时发起很多并发请求,线程池中的线程数很可能会达到其支持的最大值。如果这些线程都在等待IO操作完成的话,那么即使还有可用的计算资源,也没有任何线程能够使用这些资源,因此服务器也就无法再进行任何操作了。如果读者曾经对阻塞式的基于Servlet的Web应用程序做过性能调优,那么可能接触过线程池中线程数的最大限制问题。通常情况下,在服务器负载较大时,由于所有线程都只是在等待,服务器无法充分利用CPU资源。
这可能是因为线程池中的线程被耗尽,也可能是因为系统把时间都花在了需要CPU的线程之间的上下文切换上,而没有利用CPU进行实际的工作。同样,由于线程池中的线程数是有限的,因此如果所有线程都在等待的话,服务器在释放其中一个线程资源之前就无法处理接收到的其他请求,导致系统响应延时增加。
因此,读者有可能会问,为什么不直接使用没有线程数量限制的线程池呢(为每一个请求都新建一个线程)?新建线程是有开销的,维护许多运行的线程也是有开销的。在同一个CPU核心中运行多个线程时,操作系统需要不断切换线程上下文,保证所有的线程都能分配到CPU时间。CPU需要获取并存储当前线程的状态,然后载入下一个需要使用CPU的线程的上下文。如果运行了1000个线程的话,可以想象,大把的时间会耗费在上下文切换上。
总结一下,使用多线程来处理阻塞式IO时会遇到一些问题:
非阻塞、异步的消息驱动系统可以只运行少量的线程,并且不阻塞这些线程,只在需要计算资源时才使用它们。这大大提高了系统的响应速度,并且能够更高效地利用系统资源。取决于具体实现的不同,异步系统还可以在返回类型中清晰地定义错误和延时等信息,我们接下来就会看到这一点带来的好处。
其缺点在于我们需要花点时间来理解如何使用消息驱动的编程模型来编写代码。对于这两种模型,我们将各给出一个例子,帮助我们更好地理解这两种设计方法的工作原理。
首先,我们来看一个非常简单的使用阻塞IO调用数据库的例子。
//Java
String username = getUsernameFromDatabase(userId);
System.out.println(username);
//Scala
val username = getUsernameFromDatabase(userId)
println(username)
调用了方法之后,线程会进入到被调用的方法,得到结果后再返回。
如果进行调试的话,可以设置断点在该线程内进入被调用的getUsernameFrom Database方法,逐行查看该方法的具体执行情况。一旦开始执行真正的IO操作,该线程就会暂停,直到IO结果返回为止。然后,线程返回该方法的结果,跳出该方法,继续执行,打印出结果,如图2-2所示。
图2-2
基于事件驱动编写相同功能的代码时,由于需要描述事件完成时要进行的操作,而该操作在一个不同的上下文中执行,因此代码看上去会有所差别。要把上面的例子改为事件驱动,就需要在数据库返回结果之后再在代码中调用打印语句。刚开始可能需要花点时间来适应这个模型,但是一旦适应以后就会觉得很自然了。
要转而使用事件驱动的模型,我们需要在代码中用不同的方法来表示结果。我们需要用一个占位符来表示最终将会返回的结果:Future。然后注册事件完成时应该进行的操作:打印结果。我们注册的代码会在Future占位符的值真正返回可用时被调用执行。“事件驱动”这个术语正是描述了这种方法:在发生某些特定事件时,就执行某些对应的代码。
//Java
CompletableFuture<String> usernameFuture =
getUsernameFromDatabaseAsync(userId);
usernameFuture.thenRun(username ->
//executed somewhere else
System.out.println(username)
);
//Scala
val future = getUsernameFromDatabaseAsync(userId)
future.onComplete(username =>
//executed somewhere else
println(username)
)
从线程的角度来看,代码首先会调用方法,然后进入该方法内部,接着几乎立即返回一个Future/CompletableFuture。返回的这个结果只是一个占位符,真正的值在未来某个时刻最终会返回到这个占位符内。
我们不会过于详细地介绍这个方法调用本身,读者需要理解的是:该方法会立即返回,而数据库调用及结果的生成是在另一个线程上执行的。ExecutionContext表示了执行这些操作的线程,我们将在本书后面的章节中对此进行介绍。(在Akka中,可以看到ActorSystem中有一个dispatcher,就是ExecutionContext的一种实现。)
要注意的是,调试异步代码与调试同步代码有着很大的不同:我们无法在发起调用的线程上看到数据库调用的所有细节,因此无法像调试使用阻塞模型的代码一样,使用调试器在发起调用的线程内单步执行,了解数据库调用的所有细节。同样地,如果查看某个错误信息的栈追踪信息,可能找不到最开始发起调用的代码,看到的是真正执行这段代码的栈信息。
方法返回Future之后,我们只得到了一个承诺,表示真正的值最终会返回到Future中。我们并不希望发起调用的线程等待返回结果,而是希望其在真正的结果返回后再执行特定的操作(打印到控制台)。在一个事件驱动的系统中,需要做的就是描述某个事件发生时需要执行的代码。在Actor中,描述接收到某个消息时进行的操作。同样地,在Future中,我们描述Future的值真正可用时进行的操作。在Java 8中,使用thenRun来注册事件成功完成时需要执行的代码;而在Scala中,使用onComplete,如图2-3所示。
图2-3
有一点必须要再次强调:打印语句并不会运行在进行事件注册的线程上。它会运行在另一个线程上,该线程信息由ExecutionContext维护。Future永远是通过Execution Context来创建的,因此我们可以选择在哪里运行Future中真正需要执行的代码。
在注册的匿名函数中,可以访问到作用域内的所有变量。不过由于方法并不在与闭包相同的同一词法作用域内被调用,因此在调用方法时要格外小心,或者直接不要在闭包内调用方法。我们将会在下一章中介绍这一要点。
要注意的是,Future是有可能执行失败的,因此一定要给Future提供一个超时参数(在Scala API中是必须提供的),这样一来,Future就不可能一直等待结果,不管是执行成功,还是失败,可以保证Future一定会执行完成。接下来我们会更深入地介绍如何来处理Future。
Scala开发者应该对高阶函数和匿名函数的使用较为熟悉。这对于开发者能够迅速地理解本书的内容是很有帮助的。
如果读者正在使用Java 8,那么应该下载本书目前为止的代码示例,检查自己阅读这些示例时是否遇到困难。如果对匿名函数的使用尚且不太清楚,那么读者现在就应该花点时间过一遍Oracle的Java 8匿名函数简介,可以从下面的链接处找到该教程:http://www.oracle.com/webfolder/technetwork/tutorials/obe/java/ Lambda-QuickStart/index.html
读者还应该了解一下Stream API和Optional类型,并做一些实践练习。由于Optional类型的使用在语义上和CompletableFuture类似,因此了解Optional是很有帮助的。
有了上面这些相关经验之后,再理解下面的内容就容易多了。
之前我们看到了一个模拟的异步数据库调用的例子,在这个例子中,数据库调用的结果会保证Future的完成。现在将介绍一个真实的例子,与本章之前创建的示例Actor(PongActor)进行通信。接着,我们将从Actor系统外部与Actor进行通信,这样一来,就能够使用Akka编写应用程序或库的核心部分,然后在普通的Java或是Scala代码中使用。
尽管接下来的测试用例确实引入了异步API,但是测试用例仍然会因为等待结果而阻塞。这对于展示另一种在测试用例中处理Akka的方法很有帮助。因为如果不等待结果的话,测试用例始终会立即返回,永远会通过,所以在测试用例中需要阻塞等待。
读者应该编写好这些测试用例,并且通过这些测试用例来更好地了解并理解下一小节中将介绍的Future API。有了这些例子以后,就算是将来需要更深入地学习Future的工作原理,始终可以回过来好好利用这些测试用例。这些例子也包含在本书的源代码中。
我们将首先介绍使用Java 8的例子。Akka是用Scala编写的。一般来说,Scala和Java的API是一一对应的,不过有一个重要的特例:所有返回Future的异步方法返回的都是Scala的scala.concurrent.Future。
在使用Java的例子中,我们需要某种方法来处理Scala的Future。在本书中,我们将把Scala的Future转换成Java 8的CompletableFuture。
如果在构建Play应用程序的话,Play的Promise API也是个很好的选择。相比于Java 8的CompletableFuture API,笔者个人更喜欢Play的Promise API使用的语义。不过对于还不习惯编写异步代码的人来说,可能Java 8的API可读性更高一些。如果编写的代码会成为某个库的一部分,那么推荐使用Java 8的CompletableFuture,这样在代码中就不会有对Play的外部依赖了。
首先,需要在build.sbt中加入一个Scala团队提供的依赖项,支持Scala和Java 8 Future之间的相互转换。
"org.scala-lang.modules" %% "scala-java8-compat" % "0.6.0"
下面就是完整的测试用例。接下来我们就详细介绍API的各个部分。
package pong;
//[...imports]
import static scala.compat.java8.FutureConverters.*;
public class PongActorTest {
ActorSystem system = ActorSystem.create();
ActorRef actorRef =
system.actorOf(Props.create(JavaPongActor.class));
@Test
public void shouldReplyToPingWithPong() throws Exception {
Future sFuture = ask(actorRef, "Ping", 1000);
final CompletionStage<String> cs = toJava(sFuture);
final CompletableFuture<String> jFuture =
(CompletableFuture<String>) cs;
assert (jFuture.get(1000, TimeUnit.MILLISECONDS)
.equals("Pong"));
}
@Test(expected = ExecutionException.class)
public void shouldReplyToUnknownMessageWithFailure() throws
Exception {
Future sFuture = ask(actorRef, "unknown", 1000);
final CompletionStage<String> cs = toJava(sFuture);
final CompletableFuture<String> jFuture =
(CompletableFuture<String>) cs;
jFuture.get(1000, TimeUnit.MILLISECONDS);
}
}
上面的PongActor测试有两个测试用例,一个针对成功的情况,一个针对失败的情况。
首先创建一个ActorSystem,然后通过actorOf在刚创建的Actor系统中创建一个Actor,前面的章节对此做过介绍:
ActorSystem system = ActorSystem.create();
ActorRef actorRef = system.actorOf(Props.create(JavaPongActor.class));
现在向Actor询问其对于某个消息的响应:
final Future sFuture = ask(actorRef, "Ping", 1000);
这一做法相当直接,我们调用ask方法,传入以下参数:
ask会返回一个Scala Future,作为响应的占位符。在Actor的代码中,Actor会向sender()发送回一条消息,这条消息就是在ask返回的Scala Future中将接收到的响应。
虽然我们无法在Java 8中使用Scala Future,但是可以通过之前导入的库将其转换为CompletableFuture:
final CompletionStage<String> cs = toJava(sFuture);
final CompletableFuture<String> jFuture = (CompletableFuture<String>) cs;
我们首先使用scala.compat.java8.FutureConverters. toJava对Scala Future进行转换,该方法会返回一个CompletionStage。CompletionStage是CompletableFuture实现的接口,而且这是一个只读的接口。为了调用get方法,我们将结果的类型转换为CompletableFuture。在测试用例外部,我们并不需要进行该转换。
要注意的是,我们在Future内存放的数据类型是String,而Actor是无类型的,会返回Object,因此读者可能会觉得这种无限制的类型转换有问题。当然,在ActorSystem外部与Actor进行通信的时候需要在这方面多加小心。不过在这条消息中,我们知道Actor一定会返回一个String,所以认为Future中存放String是安全的。
最后,我们调用get()方法将测试线程阻塞,并得到结果。在查询失败的例子中,get方法会抛出一个从Actor发送出的akka.status.Failure异常。
现在我们就有了一个查询成功的Future和一个查询失败的Future用来做实验了!
接下来,我们介绍使用Scala的例子。由于Akka返回的是Scala的Future,所以使用Scala的测试用例更简单一些。
下面是完整的Scala测试用例。接着我们来详细介绍这个测试:
package pong
//[...imports]
import akka.pattern.ask
import scala.concurrent.duration._
class ScalaAskExamplesTest extends FunSpecLike with Matchers {
val system = ActorSystem()
implicit val timeout = Timeout(5 seconds)
val pongActor = system.actorOf(Props(classOf[ScalaPongActor]))
describe("Pong actor") {
it("should respond with Pong") {
val future = pongActor ? "Ping" //uses the implicit timeout
val result = Await.result(future.mapTo[String], 1 second)
assert(result == "Pong")
}
it("should fail on unknown message") {
val future = pongActor ? "unknown"
intercept[Exception]{
Await.result(future.mapTo[String], 1 second)
}
}
}
}
首先创建一个ActorSystem,然后像前面介绍过的那样,使用actorOf在刚创建的Actor系统中创建一个Actor。
为了创建Future,还要创建一个隐式的Timeout(注意到我们需要引入scala. concurrent.duration,然后把一个时间长度传递给Timeout):
implicit val system = ActorSystem()
implicit val timeout = Timeout(5 seconds)
val pongActor = system.actorOf(Props(classOf[ScalaPongActor]))
现在,我们向Actor请求一条消息的响应:
val future = pongActor ? "Ping"
注意
要完成这一操作,我们需要引入akka.pattern.ask。
请求响应的调用需要引用下面的参数:
该调用会返回一个占位符,也就是一个Future,表示Actor返回的响应。在Actor的代码中,Actor会向sender()返回一个消息,这个消息就是我们会在Future中接收到的响应。
最后,我们想在真正接收到结果之前阻塞测试线程。我们使用Await.result,并传入Future和一个超时参数。
val result = Await.result(future.mapTo[String], 1 second)
Actor的返回值是没有类型的,因此我们接收到的结果是Future[AnyRef]。所以应调用future.mapTo[String]将Future的类型转换成我们需要的结果类型。
有了这个例子作为基础,在继续学习本章余下章节的内容时,就可以进行实验,使用了解Future API了。
我们向Actor请求消息会返回一个Future作为响应,这展示了从Actor系统外部与Actor进行通信的方法。在这个测试用例中,我们休眠/阻塞了调用Await.result的测试线程,这样就能同步地得到Future的结果。
注意
不要在非测试代码中休眠或阻塞线程。
在测试用例中,如果要阻塞Java 8的CompletableFuture,推荐使用的方法是调用Future的get()方法。get()方法会阻塞线程,直到返回结果。
jFuture.get().equals("Pong")
不过如果没有指定超时参数的话,get()方法是有可能使线程永远休眠的。所以在创建Scala的Future时,必须要提供超时参数,保证请求超时时Future返回失败。
可以使用scala.concurrent.Await.result来获取Scala Future中的结果。
import scala.concurrent.duration._
val result: String = Await.result(future.mapTo[String], 1 second)
尽管在ask方法中已经设置了Future的超时时间,但是在这里仍然必须要提供超时参数。
无论是在Java还是Scala的例子中,如果Future返回失败,那么阻塞线程会抛出异常。Future失败会产生一个Throwable,Java 8的CompletableFuture会对这个Throwable进行封装并抛出ExecutionException,而Scala API则会直接抛出这个Throwable。(Scala没有受检异常,所以可以直接抛出Throwable,而Java会在这里抛出一个非受检的异常类型。)
现在我们就有了一个Future的例子,可以基于这个例子来创建测试用例,研究实验结果。由于理解Future API对于编写异步代码至关重要,接下来我们就将对其进行深入介绍。极力推荐读者在这些测试用例中对Future API做一些研究。为了节省一些篇幅,所有的例子都可以在网上的第2章Actor与并发的代码中找到。
现代化的Future隐式地处理了两种情况:失败与延迟。要了解如何把阻塞式IO转化成非阻塞式IO,我们必须学习一些不同的表示失败处理和延迟处理的抽象概念。刚开始可能会显得有点困难,但是一旦真正理解了,大多数开发者就能够习惯这种编程范式了。
像ask模式这样的异步API会返回一个占位符,类似前面提到的Future类型。我们可以了解如何使用不同的方法在测试用例中与PongActor进行交互,以及如何让代码变得越来越简洁。强烈推荐读者在前面创建的测试用例的基础上学习本小节余下的内容。
首先,在Java 8的例子中,为了避免冗余,我们将ask方法简化,并封装到一个方法中。这样看上去就像一个真正的异步API了:
public CompletionStage<String> askPong(String message){
Future sFuture = ask(actorRef, "Ping", 1000);
CompletionStage<String> cs = toJava(sFuture);
return cs;
}
接着我们就可以来创建简单的测试用例:
@Test public void printToConsole() throws Exception {
askPong("Ping").
thenAccept(x -> System.out.println("replied with: " + x));
Thread.sleep(100);
}
首先定义一个简单的方法,去除冗余,稍稍增加示例的可读性:
def askPong(message: String): Future[String] = (pongActor ? message).mapTo[String]
接着使用运行在多线程上的异步操作,因此需要引入隐式的ExecutionContext。
可以创建如下的测试用例并进行实验:
describe("FutureExamples"){
import scala.concurrent.ExecutionContext.Implicits.global
it("should print to console"){
(pongActor ? "Ping").onSuccess({
case x: String => println("replied with: " + x)
})
Thread.sleep(100)
}
}
这个测试并没有进行任何断言,但是已经展示了真正的异步行为。我们可以通过观察运行效果来确认异步操作是否成功(在这个测试用例中,我们希望打印到控制台)。如果希望事件异步发生的话,我们可能时不时地需要将测试线程休眠。和阻塞一样,在测试中休眠线程是没问题的,不过在任何时候都不应该在真正的代码中休眠线程。
尽管这些测试并不进行真实的测试,但是它们对于帮助我们观察异步操作实验的效果是很有用的。在花一些时间理解Future之后,我们将学习如何在异步代码中进行断言。
Future[T]/CompletableFuture<T>成功时会返回一个类型为T的值,失败时则会返回Throwable。我们将分别学习如何处理这两种情况(成功与失败),以及如何将Future的值转换成有用的结果。
在测试中已经看到,PongActor会在接收到“Ping”时返回“Pong”。我们将使用这个例子来展示与Future进行交互的不同方法。
有时候我们需要使用返回结果做一些“简单的事情”。可能是将事件记录到日志,也可能是通过网络返回一个响应。我们可以“注册”事件,一旦Future的结果返回就执行这个事件。
就像上面的例子中那样,在Java 8中,可以使用thenAccept来操作返回结果。
askPong("Ping").thenAccept(x -> System.out.println("replied with: " + x));
而在Scala中,可以使用onSuccess:
(pongActor ? "Ping").onSuccess(){
case x: String => println("replied with: " + x)
})
注意
注意到onSuccess接受一个部分函数作为参数,所以非常适合用来处理Akka的无类型响应(通过模式匹配来判断返回结果的类型)。
最常见的一种用例就是在处理响应之前先异步地对其进行转换。例如,我们可能需要从数据库获取数据,然后将其转换成一个HTTP响应,再返回给客户端。
在大多数API中,类型转换可以通过map来完成,例如Scala的Future。
askPong("Ping").map(x => x.charAt(0))
在Java 8中,我们调用thenApply:
askPong("Ping").thenApply(x -> x.charAt(0))
上面的操作会返回一个新的Future,包含Char类型。我们可以在对返回结果进行转换后将新得到的Future再传递给其他方法,做进一步处理。
如果需要进行异步调用,那么首先要对返回结果进行另一个异步调用,这样代码就会看上去有一点乱:
//Java
CompletionStage<CompletionStage<String>> futureFuture =
askPong("Ping").thenApply(x -> askPong(x));
//Scala
val futureFuture: Future[Future[String]] =
askPong("Ping").map(x => {
askPong(x)
})
很多情况下,需要进行一个异步调用,然后像上面的例子中一样,在得到结果后进行另一个异步调用。不过这样一来,结果就会嵌套在两层Future中了。这种情况是很难处理的,要将结果扁平化,使得结果只在一个Future中,我们需要的是一个Future[String]/CompletionStage[String]。
有很多方法都可以用来做这样的链式异步操作。在Java中使用thenCompose:
CompletionStage<String> cs = askPong("Ping").thenCompose(x ->
askPong("Ping"));
不出意料地,在Scala中使用flatMap:
val f: Future[String] = askPong("Ping").flatMap(x => askPong("Ping"))
一旦对第一个“Ping”做出了响应,就发送第二个“Ping”并返回包含结果值的Future作为响应。
注意到我们可以继续像这样把异步操作连接到一起。这是一种进行流数据处理的很强大的方法。我们可以向一个远程服务发起调用,然后使用得到的结果向另一个服务发起调用。
其中任何一个调用失败都会导致整个Future失败。接下来我们就来看一下失败的情况。
失败情况是有可能发生的,而我们也需要去处理这些失败情况。所有的失败情况最终都会由一个Throwable来表示。和成功的情况类似,有许多方法可以帮助我们来处理失败情况,甚至是从失败中恢复。
很多时候,我们都想要在失败情况下做些什么。最基本的就是在失败情况下向日志中打印一些信息。
在Scala中,有一种很简单的方法支持这种需求:onFailure。这个方法接受一个部分函数作为参数,而这个部分函数接受一个Throwable。
askPong("causeError").onFailure {
case e: Exception => println("Got exception")
}
不幸的是,在Java 8中,没有面向用户的用于失败处理的方法,因此我们在这里引入handle()来处理这种情况:
askPong("cause error").handle((x, t) -> {
if(t != null){
System.out.println("Error: " + t);
}
return null;
});
handle接受一个BiFunction作为参数,该函数会对成功或失败情况进行转换。handle中的函数在成功情况下会提供结果,在失败情况下则会提供Throwable,因此需要检查Throwable是否存在(结果和Throwable中只有一个不是null)。如果Throwable存在,就向日志输出一条语句。由于我们需要在该函数中返回一个值,而失败情况下又不需要对返回值做任何操作,因此直接返回null。
很多时候,在发生错误的时候我们仍然想要使用某个结果值。如果想要从错误中恢复的话,可以对该Future进行转换,使之包含一个成功的结果值。
在Java中,可以使用exceptionally将Throwable转换为一个可用的值。
CompletionStage<String> cs = askPong("cause error")
.exceptionally(t -> {
return "default";
});
在Scala中,有一个recover方法提供相同的功能。同样地,recover方法也接受一个PartialFunction作为参数,所以我们可以对异常的类型进行模式匹配:
val f = askPong("causeError").recover {
case t: Exception => "default"
}
我们经常需要在发生错误时使用另一个异步方法来恢复,下面是两个用例。
下面展示了一个重试操作:
askPong("cause error")
.handle( (pong, ex) -> ex == null
? CompletableFuture.completedFuture(pong)
: askPong("Ping")
).thenCompose(x -> x);
我们需要分两步来完成这一操作。首先,检查exception是否为null。如果为null,就返回包含结果的Future,否则返回重试的Future。接着,调用thenCompose将CompletionStage[CompletionStage[String]]扁平化。
在Scala中,我们要调用的函数是recoverWith:类似专门用于错误情况的flatMap,所以要比Java的处理方法可读性更高,也更简洁得多。
askPong("causeError").recoverWith({
case t: Exception => askPong("Ping")
})
很多时候,我们需要执行多个操作,而且可能想要在代码库的不同位置来执行这些操作。之前介绍到的每个方法调用都会返回一个新的Future,而我们又可以对这个新的Future执行其他操作。
我们已经介绍了Future的基本使用方法。应用函数式风格来处理延迟和失败的好处之一就是可以把多个操作组合起来,而在组合的过程中无需处理异常。我们可以把注意力放在成功的情况上,在链式操作的结尾再收集错误。
之前介绍的每个用于结果转换的方法都会返回一个新的Future,可以处理这个Future,也可以将其与更多操作链接到一起。
总结一下,执行多个操作时,我们最后使用一个恢复函数来处理所有可能发生的错误。可以用我们想要的任何顺序来组合这些函数(combinators)来完成我们需要完成的工作。
在Java中:
askPong("Ping")
.thenCompose(x -> askPong("Ping" + x))
.handle((x, t) -> {
if(t != null) {
return "default";
} else {
return x;
}
});
在Scala中:
val f = askPong("Ping")
.flatMap(x => askPong("Ping" + x))
.recover({ case Exception => "There was an error" })
在上面的例子中,我们得到了一个Future,然后调用thenCompose/flatMap,在第一个操作完成时异步地发起另一个调用。接着,在发生错误时,我们使用一个String值来恢复错误,保证Future能够返回成功。
在执行操作链中的任一操作时发生的错误都可以作为链的末端发生的错误来处理。这样就形成了一个很有效的操作管道,无论是哪个操作导致了错误,都可以在最后来处理异常。我们可以集中注意力描述成功的情况,无需在链的中间做额外的错误检查。可以在最后单独处理错误。
我们经常需要访问执行的多个Future。同样有很多方法可以用来处理这些情况。在Java中,可以使用CompletableFuture的thenCompose方法,在Future的值可用时访问到这些值:
askPong("Ping")
.thenCombine(askPong("Ping"), (a,b) -> {
return a + b; //"PongPong"
});
在Scala中,也可以使用for推导式将多个Future组合起来。我们能够像处理任何其他集合一样,解析出两个Future的结果并对它们进行处理。(要注意的是,这只不过是flatMap的一个“语法糖”:相比于flatMap,我更喜欢这个语法。)
val f1 = Future {4}
val f2 = Future {5}
val futureAddition: Future[Int] =
for (
res1 <- f1;
res2 <- f2
) yield res1 + res2
这个例子展示了一种处理多个不同类型Future的机制。通过这种方法,可以并行地执行任务,同时处理多个请求,更快地将响应返回给用户。这种对并行的使用可以帮助我们提高系统的响应速度。
如果想要对集合中的每个元素执行异步方法,那么可以使用Future列表。
例如,在Scala中,如果我们有一个消息列表,对于列表中的每个消息,向PongActor发送查询,最后会得到如下的一个Future列表:
val listOfFutures: List[Future[String]] = List("Pong", "Pong",
"failed").map(x => askPong(x))
对Future列表的处理并不容易。我们希望得到的是一个结果列表,也就是要反转一下类型,把List[Future]转换成Future[List]。Future的sequence方法就是用来完成这一工作的:
val futureOfList: Future[List[String]] = Future.sequence(listOfFutures)
现在我们就有了一个可以使用的类型。例如,如果在futureOfList上调用map方法的话,就可以得到一个List[String],这就是我们想要的结果类型。不过这里有一个问题。一旦Future列表中的任何一个Future返回失败,那么sequence生成的Future也会返回失败。如果不希望这种情况发生,想要得到一些成功的结果值,那么可以在执行sequence之前将返回失败的Future逐一恢复:
Future.sequence(listOfFutures.map(future => future.recover {
case Exception => ""
}))
在Java 8的核心库中,并没有提供具备类似功能的方法。不过我们能找到一些代码示例用于实现类似的功能。
表2-1给出了本章中介绍过的一些基本操作:
表2-1
操作 |
Scala Future |
Java CompletableFuture |
---|---|---|
Transform Value |
.map(x => y) |
.thenApply(x -> y) |
Transform Value Async |
.flatMap(x => futureOfY) |
.thenCompose(x -> futureOfY) |
Return Value if Error |
.recover(t => y |
.exceptionally(t -> y) |
Return Value Async if Error |
.recoverWith(t => futureOfY) |
.handle(t,x -> futureOfY).thenCompose(x->x) |
本小节对Future和Promise的API做了比较详细的介绍。理解这些抽象概念是很必要的,因此推荐读者实际编写一些异步的代码,使用一下Future,把这个知识点掌握得更牢固一些。在下一小节中,我们将介绍如何通过返回Future来和Actor进行交互。读者应该花点时间练习本小节的内容,了解如何处理多个Future结果,确保有一个扎实的基础,帮助后面的学习。
本书的目的是教授如何构建分布式应用程序,因此我们要把本章中介绍过的所有知识点结合起来,编写一个小型分布式应用程序。尽管代码相当简单,但是这个例子直接展示了两个远程系统如何通过Akka相互通信,虽然从结构上来看还是比较先进的,不过我认为还是要马上展示一下Akka的强大之处,让读者保持对本书的兴趣。如果对Akka能提供的强大功能有些许了解,就会很有兴趣继续阅读余下的章节了。
我们将构建一个服务和一个客户端。也就是数据库和与之通信的数据库客户端。要通过网络在客户端和服务之间发送消息,我们的两个项目需要共享相同的消息。
我们也可以在两个项目中都包含消息,不过为了让例子更简短一些,我们把消息放在服务器项目中,然后在客户端项目中导入服务器项目(也就包含了消息)。
首先,我们将扩展第一章中的服务器项目,定义希望数据库接受的所有消息。接着,我们将针对这些消息分别实现数据库中的相关功能。
在构建了这些基本操作之后,我们将编写一个main()方法来运行数据库。启动应用程序后,我们将构建一个ActorSystem以及一个在该ActorSystem中的Actor,这就构成了我们的第一个Akka微服务。
我们还将创建一个数据库客户端,用于展示如何请求服务器,以及如何从远程Actor中获取Future。服务器端的服务接收到客户端的请求后将返回Future。这样我们就已经编写了一个可以使用的键值存储数据库(和redis很类似)以及一个可以使用该数据库的远程客户端,如图2-4所示。
图2-4
首先,我们要构造几种消息。
我们将在服务器端实现这些消息及其行为,以及用于启动该数据库的main函数。需要注意的是,我们将使用“第1章:初识Actor”中的项目,并在这基础上添加本章介绍的功能,比如返回响应及失败情况的处理。
由于我们将在通过网络连接的独立应用程序之间远程发送消息,因此需要能够对所有的消息进行序列化,这样Akka就能够将表示这些消息的对象转换成能够在网络应用程序之间传输的表示形式。我们将实现SetRequest,GetRequest和KeyNotFound- Exception。
使用Java实现的消息:
public class SetRequest implements Serializable {
public final String key;
public final Object value;
public SetRequest(String key, Object value) {
this.key = key;
this.value = value;
}
}
public class GetRequest implements Serializable {
public final String key;
public GetRequest(String key) {
this.key = key;
}
}
public class KeyNotFoundException extends Exception implements
Serializable {
public final String key;
public KeyNotFoundException(String key) {
this.key = key;
}
}
使用Scala实现的消息:
case class SetRequest(key: String, value: Object)
case class GetRequest(key: String)
case class KeyNotFoundException(key: String) extends Exception
这些都是很简单的类。由于消息是不可变的,所以我们没有使用Java的getter方法,而是直接将成员变量设为public。当然如果读者想要添加getter方法的话也可以。
注意
消息始终都应该是不可变的。
Scala的case class是可以被序列化的。
前文已经介绍过如何使用sender() tell/!来返回响应消息,以及如何返回Status.Failure(Exception)。接下来,我们就将实现对所有消息的响应,并且在GetRequest请求获取的键值不包含在键值存储中时返回失败响应。
下面是Java的receive语句:
receive(ReceiveBuilder
.match(SetRequest.class, message -> {
log.info("Received Set request: {}", message);
map.put(message.key, message.value);
sender().tell(new Status.Success(message.key), self());
})
.match(GetRequest.class, message -> {
log.info("Received Get request: {}", message);
String value = map.get(message.key);
Object response = (value != null)
? value
: new Status.Failure(new KeyNotFoundException(message.key));
sender().tell(response, self());
})
.matchAny(o ->
sender().tell(new Status.Failure(new ClassNotFoundException()), self())
)
.build()
);
下面是Scala的receive方法:
override def receive = {
case SetRequest(key, value) =>
log.info("received SetRequest - key: {} value: {}", key, value)
map.put(key, value)
sender() ! Status.Success
case GetRequest(key) =>
log.info("received GetRequest - key: {}", key)
val response: Option[String] = map.get(key)
response match{
case Some(x) => sender() ! x
case None => sender() ! Status.Failure(new KeyNotFoundException(key))
}
case o => Status.Failure(new ClassNotFoundException)
}
除了使用的语言不同之外,上面两者基本上等价的。如果Actor接收到一个SetRequest,就将键值存储到map中。在第1章中,只是返回了一个Success消息,这里对此做了更新。如果接收到的是GetRequest,Actor就会尝试从map中获取结果。如果找到key,就将value返回。如果没有找到,就返回一个包含KeyNotFoundException的失败消息。最后,我们也更新了第1章中接收到未知消息时的行为,会返回一个包含了ClassNotFoundException的错误消息,不过读者也可以提供一个自定义的异常,表达更清晰的异常语义。
我们需要支持远程应用程序通过网络远程访问上面定义的Actor。幸运的是,这是件很简单的事。我们只需要在build.sbt中添加用于远程访问的依赖即可:
"com.typesafe.akka" %% "akka-remote" % "2.3.6"
接着,只要添加配置文件,就可以启用Actor的远程访问功能了。在src/main/ resources文件夹中新建一个文件,命名为application.conf,然后把下面的配置内容添加到该文件中,其中包含了要监听的主机和端口。Akka负责解析application.conf。这是一个HOCON文件,是一种类型安全的配置文件,格式和JSON类似,与其他配置文件格式一样,使用方便。Akka文档中经常提到该格式的配置文件,笔者认为如果要配置多个属性,HOCON是一种相当不错的用于配置文件的格式。要注意的是,如果将其命名为application.properties并且使用属性配置文件的格式(比如keypath.key=value),Akka也是可以解析的。下面是application.conf的内容:
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 2552
}
}
}
最后,我们需要为数据库添加一个main方法,启动Actor系统,并创建Actor。
在Java中,我们将添加一个类:com.akkademy.Main:
public class Main {
public static void main(String... args) {
ActorSystem system = ActorSystem.create("akkademy");
system.actorOf(Props.create(AkkademyDb.class), "akkademy-db");
}
}
在Scala中,可以把Main对象放在com.akkademy.AkkademyDb.scala文件中:
object Main extends App {
val system = ActorSystem("akkademy")
system.actorOf(Props[AkkademyDb], name = "akkademy-db")
}
我们只需要创建ActorSystem,然后在该Actor系统中创建一个Actor。注意到我们给Actor指定了一个名字:akkademy-db。有了这个名字,客户端就能够很容易地查询到Actor。同时由于Akka会在发生错误时把Actor的名字记录到日志中,指定名字也使得调试变得更容易。
现在我们就要在本地发布这些消息,这样就能够在客户端项目中使用它们了。如果想要发布到Nexus或是Artifactory,那么需要在build.sbt中设置仓库信息。不过在这个例子中,我们直接将消息发布到本地。
我们需要在build.sbt文件中添加机构和版本信息,如下所示:
name := "akkademy-db"
organization := "com.akkademy-db"
version := "0.0.1-SNAPSHOT"
“-SNAPSHOT”表示该版本还不稳定,有可能发生改变。由于可能会重新发布服务器,所以应该将这个标签添加到版本信息中。如果准备正式发布代码,就可以从版本信息中删除“-SNAPSHOT”标签,表示这是一个稳定版本。
最后,需要在发布的内容中排除application.conf,防止客户端也能够试图启动远程服务器。当然,更好的做法是把消息放在一个单独的库中,这里只是为了简单才这么做。把下面的内容添加到build.sbt文件中,就可以在发布时排除application.conf:
mappings in (Compile, packageBin) ~= { _.filterNot { case (_, name) =>
Seq("application.conf").contains(name)
}}
如果把消息放在一个单独的库中(当然可以),就不需要在build.sbt添加上面的配置将application.conf排除了。现在就已经完成了构建文件的配置。可以在命令行中进入项目的根目录,直接运行activator的publish-local任务,发布项目:
$activator publish-local
接下来我们将构建客户端,并且通过编写一些集成测试来展示如何将客户端与服务器进行集成,因此需要服务器保持运行。现在启动数据库:
$activator run
Akka会输出日志,表明其正在监听远程连接,并且给出当前服务器的地址(我们马上会在客户端中使用该地址):
[Remoting] Remoting now listens on addresses: [akka.tcp://
akkademy@127.0.0.1:2552]
我们已经发布了消息,键值数据库也已经处于运行中。现在就可以编写一个客户端,连接并使用服务器提供的服务。我们将以此来结束第一个分布式应用程序。
首先需要为客户端创建一个项目,并且导入服务器项目,得到消息的定义。我们将像“第1章:初始Actor”中一样创建项目结构,如果需要的话读者可以重新阅读相关的内容。运行activator-new,选择minimal-java或是minimal-akka项目。将该项目命名为akkademy-db-client。
我们需要在build.sbt文件中添加项目需要的依赖。在新建的项目中,在build.sbt中添加消息定义所需的下述依赖:
"com.akkademy-db" %% "akkademy-db" % "0.0.1-SNAPSHOT"
除了新建项目后已经默认添加的测试框架依赖之外,这个依赖包含了Scala项目所需要的所有依赖项。
在Java项目中,还需要添加一个scala-java8-compat库,用于将Actor生成的Scala Future转换成Java的CompletionStage:
"org.scala-lang.modules" %% "scala-java8-compat" % "0.6.0"
在这一小节中,我们将构建客户端,连接远程Actor,然后分别实现用于处理SetRequest和GetRequest消息的方法。
首先,将Java代码放在com.akkademy.JClient中:
public class JClient {
private final ActorSystem system = ActorSystem
.create("LocalSystem");
private final ActorSelection remoteDb;
public JClient(String remoteAddress) {
remoteDb = system.actorSelection("akka.tcp://akkademy@" +
remoteAddress + "/user/akkademy-db");
}
public CompletionStage set(String key, Object value) {
return toJava(ask(remoteDb, new SetRequest(key, value),
2000));
}
public CompletionStage<Object> get(String key){
return toJava(ask(remoteDb, new GetRequest(key), 2000));
}
}
将Scala代码放在com.akkademy.SClient中:
class SClient(remoteAddress: String) {
private implicit val timeout = Timeout(2 seconds)
private implicit val system = ActorSystem("LocalSystem")
private val remoteDb = system.actorSelection(
s"akka.tcp://akkademy@$remoteAddress/user/akkademy-db")
def set(key: String, value: Object) = {
remoteDb ? SetRequest(key, value)
}
def get(key: String) = {
remoteDb ? GetRequest(key)
}
}
代码相当简单。首先创建一个本地的ActorSystem,然后通过构造函数中提供的地址得到指向远程Actor的引用。接着,分别为GetRequest和SetRequest两种消息创建方法。我们向远程Actor发送本项目中导入的消息,然后得到返回的Future。注意到我们在发起的请求中随意使用了一个超时参数值。理想情况下,这个超时参数最好是可以配置的。
在Java代码中,我们将scala.concurrent.Future转换成Completion Stage,然后返回CompletionStage。这样能够给库的用户提供一个更好的Java API。
接下来,我们将编写一个简单的测试用例来进行集成测试。
由于要编写的是集成测试,所以需要数据库服务器保持运行。在这个例子中,我们将在远程数据库中创建一条记录,然后获取该记录。
Java的例子如下:
public class JClientIntegrationTest {
JClient client = new JClient("127.0.0.1:2552");
@Test
public void itShouldSetRecord() throws Exception {
client.set("123", 123);
Integer result = (Integer) ((CompletableFuture) client.
get("123")).get();
assert(result == 123);
}
}
Scala的例子如下:
class SClientIntegrationSpec extends FunSpecLike with Matchers {
val client = new SClient("127.0.0.1:2552")
describe("akkademyDbClient") {
it("should set a value") {
client.set("123", new Integer(123))
val futureResult = client.get("123")
val result = Await.result(futureResult, 10 seconds)
result should equal(123)
}
}
}
在测试我们编写的API时,需要用到在介绍Future时学到的知识。由于这只是个测试用例,所以使用了Await和get。虽然我们仅仅学完了“第2章:Actor与并发”,但是现在已经完全可以证明,使用Akka来编写分布式应用程序是切实可行的。
由于本章介绍了使用Akka所需的核心技能,因此完成本章的作业非常重要。本章的示例代码虽然简单,但是提前介绍了远程Actor的连接。在接下来的几章中,我们将只使用本地Actor系统,所以示例会更加简单。不过通过运行本章的示例程序,读者能够对Akka试图解决的问题有一些认识。所以推荐读者花上些许时间学习一下本章的例子。本章的内容(尤其是Future API)是接下来所有内容的基础。我们需要在学习了本章的知识后为后面的章节打下扎实的基础。
请完成下面的任务,确保理解如何使用本章介绍的内容进行编程。
对示例代码进行扩展。
本章介绍了使用Akka编写应用程序的基础。现在我们已经有了开始真正编写分布式应用程序所需的所有预备知识了。我们深入分析了Actor代码:创建一个Actor实例、向Actor发起查询、在Actor中返回消息的响应、从ActorSystem外部获取Actor返回的响应以及Future的使用。
现在,我们已经有了足够的知识将消息驱动的异步Akka应用程序提供给别人访问(可以使用Akka编写库和服务,并利用Scala核心库和Java 8的Future API返回结果)。这就是下一章要介绍的内容。