如何在Actor的onReceive方法中执行异步数据库插入?
-
21-12-2019 - |
题
我有一个也使用 Akka 的 Play 框架 2 应用程序。我有一个 Actor
从远程系统接收消息时,此类消息的数量可能非常巨大。收到消息后,我将其记录到数据库中(使用内置的 Ebean ORM),然后继续处理它。我不在乎这个数据库日志记录的运行速度有多快,但它绝对不应该阻止进一步的处理。这是一个简化的代码示例:
public class MessageReceiver extends UntypedActor {
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof ServerMessage) {
ServerMessage serverMessage = (ServerMessage) message;
ServerMessageModel serverMessageModel = new ServerMessageModel(serverMessage);
serverMessageModel.save();
//now send the message to another actor for further processing
} else {
unhandled(message);
}
}
}
据我了解,数据库插入在此实现中是阻塞的,因此它不能满足我的需求。但我不知道如何让它解锁。我读过有关 Future
类,但我无法让它工作,因为它应该返回一些值,并且 serverMessageModel.save();
回报 void
.我知道将大量消息逐一写入数据库效率很低,但这不是目前的问题。
我说的对吗?这个实现是阻塞的吗?如果是的话,如何让它异步运行?
解决方案
如果你想使用 Future - 使用 Callable (匿名类)构造一个 Akka Future,其 apply() 将实际实现数据库保存代码。实际上,您可以将所有这些(将来的创建和 apply())放入 ServerMessageModel 类中——也许可以将其称为 asynchSave()。你的未来也许是未来,其中状态是 asynchSave 的结果...
public Future<Status> asyncSave(...) { /* should the params be ServerMessageModel? */
return future(new Callable<Status>() {
public Status call() {
/* do db work here */
}
}
在你的 onReceive 中,你可以继续告诉另一个演员。笔记:如果你想确保在 future 返回后向另一个参与者发送通知,那么你可以使用 Future 的 onSuccess。
Future<Status> f = serverMessageModel.asyncSave();
f.onSuccess(otherActor.tell(serverMessage, self());
您还可以进行故障处理...看 http://doc.akka.io/docs/akka/2.3.4/java/futures.html 了解更多详情。
希望有帮助。
其他提示
未来的解决方案对我来说似乎不错。我没有使用过 Java 中的 Futures,但是如果您确实需要一些返回值,您可以返回任意整数或字符串。
另一种选择是将该消息发送给其他某个参与者,由该参与者将消息保存到数据库中。然后你应该确保该演员的邮箱不会太满。
你有没有考虑过 akka-持久性 为了这?也许这适合您的用例。
使用 Martin Krassers akka-persistence 扩展和我的 jdbc 持久性提供程序 akka persistence jdbc 持久保留 actor 状态 https://github.com/dnvriend/akka-persistence-jdbc