最近在一些服务中使用了akka,主要用来作异步解耦和本地消息分发(路由),这里简单总结一下用法.java
网上有很多集成的例子,要使用到spring的扩展.
我这边没有这样处理,而是简单把ActorSystem
建立的actor的过程放在了spring configuration里,把ActorRef
做为bean,毕竟actor自己不能为单例可是ref能够.spring
actor要使用一些bean的话就所有都由构造函数传入.
以下:数据库
public class UserActor extends AbstractLoggingActor { private UserService userService; public static Props props(UserService userService) { return Props.create(UserActor.class, () -> new UserActor(userService)); } public UserActor(UserService userService) { this.userService = userService; }
使用中能感受和java仍是有些隔阂的...
特别是容器(Collection)之间的转换和处理.
给出一些例子,多是我没发现更方便的写法..网络
List<Routee> routees = new ArrayList<>(); senders.forEach(e -> { try { ActorRef ref = getContext().actorOf(createdProps.apply(e), prefix + "-" + e.getId()); routees.add(new ActorRefRoutee(ref)); } catch (InvalidActorNameException ex) { log().error(ex, "name already in use"); } }); router = new Router(new RoundRobinRoutingLogic(), routees);
这边原本用一个map就能解决了,但会要求类型强转Router的第二个参数是java.lang.Iterable<Routee>
而不是java.lang.Iterable<? extends Routee>
(ActorRefRoutee是其子类),再加上还要处理重名错误,仍是用了这种错误的forEach使用方式.多线程
private void stopRoutee(Router router) { List<Routee> routees = new ArrayList<>(seqAsJavaList(router.routees())); routees.forEach(e -> { ActorRefRoutee ae = (ActorRefRoutee) e; ActorRef ref = ae.ref(); // 移除这个routee router.removeRoutee(ae); // 中止这个ref context().stop(ref); }); }
router.routees()返回的是个IndexSeq...和java的没有兼容和对应.
最开始找了一些,后来发现有scala.collection.JavaConversions
能够作兼容...感觉到了隔阂.app
此外ask模式,akka为java提供了PatternsCS
.这个类返回的是java的CompletionStage
类型相比Patterns
,比较友好.异步
最容易误用的一点就是消息处理中有阻塞操做,好比直接在actor中进行数据库操做和处理网络请求,而使用的actor也没有和线程绑定,这种状况须要使用额外的线程池,这个其实和用netty是同样的状况,用来进行应用调度的线程数有限.函数
因为上面提到的点,因而在actor内可能就会存在一些回调,一不当心就可能直接调用/改变actor内部的状态(例如在回调直接访问field).ui
private List<String> list; ... ... public Receive createReceive() { return receiveBuilder() .match(Message.class, e -> xxxx.onSuccess(r -> list.add(r))) ... ...
这样等于内部状态被多线程访问,破坏了actor内部的状态,正确的作法是在回调中给本身发送消息.this