我有一个也使用 Akka 的 Play 框架 2 应用程序。我有一个 Actor 从远程系统接收消息时,此类消息的数量可能非常巨大。收到消息后,我将其记录到数据库中(使用内置的 Ebean ORM),然后继续处理它。我不在乎这个数据库日志记录的运行速度有多快,但它绝对不应该阻止进一步的处理。这是一个简化的代码示例:

public class MessageReceiver extends UntypedActor {

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof ServerMessage) {
            ServerMessage serverMessage = (ServerMessage) message;
            ServerMessageModel serverMessageModel = new ServerMessageModel(serverMessage);
            serverMessageModel.save();
            //now send the message to another actor for further processing
        } else {
            unhandled(message);
        }
    }
}

据我了解,数据库插入在此实现中是阻塞的,因此它不能满足我的需求。但我不知道如何让它解锁。我读过有关 Future 类,但我无法让它工作,因为它应该返回一些值,并且 serverMessageModel.save(); 回报 void.我知道将大量消息逐一写入数据库效率很低,但这不是目前的问题。

我说的对吗?这个实现是阻塞的吗?如果是的话,如何让它异步运行?

有帮助吗?

解决方案

如果你想使用 Future - 使用 Callable (匿名类)构造一个 Akka Future,其 apply() 将实际实现数据库保存代码。实际上,您可以将所有这些(将来的创建和 apply())放入 ServerMessageModel 类中——也许可以将其称为 asynchSave()。你的未来也许是未来,其中状态是 asynchSave 的结果...

public Future<Status> asyncSave(...) { /* should the params be ServerMessageModel? */
  return future(new Callable<Status>() {
     public Status call() {
        /* do db work here */
     }
 }

在你的 onReceive 中,你可以继续告诉另一个演员。笔记:如果你想确保在 future 返回后向另一个参与者发送通知,那么你可以使用 Future 的 onSuccess。

Future<Status> f = serverMessageModel.asyncSave();
f.onSuccess(otherActor.tell(serverMessage, self());

您还可以进行故障处理...看 http://doc.akka.io/docs/akka/2.3.4/java/futures.html 了解更多详情。

希望有帮助。

其他提示

未来的解决方案对我来说似乎不错。我没有使用过 Java 中的 Futures,但是如果您确实需要一些返回值,您可以返回任意整数或字符串。

另一种选择是将该消息发送给其他某个参与者,由该参与者将消息保存到数据库中。然后你应该确保该演员的邮箱不会太满。

你有没有考虑过 akka-持久性 为了这?也许这适合您的用例。

使用 Martin Krassers akka-persistence 扩展和我的 jdbc 持久性提供程序 akka persistence jdbc 持久保留 actor 状态 https://github.com/dnvriend/akka-persistence-jdbc

许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top