Pergunta
Estou tentando usar uma abordagem de divisão e conquista (também conhecida como fork/junção) para um problema de trituração numérica. Aqui está o código:
import scala.actors.Futures.future
private def compute( input: Input ):Result = {
if( pairs.size < SIZE_LIMIT ) {
computeSequential()
} else {
val (input1,input2) = input.split
val f1 = future( compute(input1) )
val f2 = future( compute(input2) )
val result1 = f1()
val result2 = f2()
merge(result1,result2)
}
}
Ele é executado (com uma boa aceleração), mas o método Apply Future Apply parece bloquear um fio e o pool de threads aumenta tremendamente. E quando muitos threads são criados, os cálculos são presos.
Existe algum tipo de reagir Método para futuros que libera o tópico? Ou qualquer outra maneira de alcançar esse comportamento?
EDITAR: Estou usando o Scala 2.8.0.Final
Solução
Não reivindique (aplique) o seu Future
s, já que isso os obriga a bloquear e aguardar uma resposta; Como você viu, isso pode levar a deadlocks. Em vez disso, use -os monadicamente para dizer o que fazer quando eles concluírem. Ao invés de:
val result1 = f1()
val result2 = f2()
merge(result1,result2)
Experimente isso:
for {
result1 <- f1
result2 <- f2
} yield merge(result1, result2)
O resultado disso será um Responder[Result]
(essencialmente a Future[Result]
) contendo os resultados mesclados; Você pode fazer algo eficaz com este valor final usando respond()
ou foreach()
, ou você pode map()
ou flatMap()
para outro Responder[T]
. Não é necessário bloquear, continue agendando cálculos para o futuro!
Editar 1:
OK, a assinatura do compute
a função terá que mudar para Responder[Result]
Agora, então como isso afeta as chamadas recursivas? Vamos tentar isso:
private def compute( input: Input ):Responder[Result] = {
if( pairs.size < SIZE_LIMIT ) {
future(computeSequential())
} else {
val (input1,input2) = input.split
for {
result1 <- compute(input1)
result2 <- compute(input2)
} yield merge(result1, result2)
}
}
Agora você não precisa mais embrulhar as chamadas para compute
com future(...)
Porque eles já estão voltando Responder
(uma superclasse de Future
).
Editar 2:
Um resultado de usar esse estilo de passagem de continuação é que o seu código de nível superior-seja o que compute
Originalmente-não é mais bloqueado. Se estiver sendo chamado de main()
, e isso é tudo o que o programa faz, isso será um problema, porque agora ele gera um monte de futuros e depois desligou imediatamente, tendo terminado tudo o que foi instruído a fazer. O que você precisa fazer é block
em todos esses futuros, mas apenas uma vez, no nível superior, e apenas nos resultados de tudo os cálculos, não nenhum intermediário.
Infelizmente, isso Responder
coisa que está sendo devolvida por compute()
não tem mais um bloqueio apply()
método como o Future
fez. Não sei por que por que plana Future
s produz um genérico Responder
em vez de um Future
; Parece um erro da API. Mas, de qualquer forma, você deve ser capaz de fazer o seu próprio:
def claim[A](r:Responder[A]):A = {
import java.util.concurrent.ArrayBlockingQueue
import scala.actors.Actor.actor
val q = new ArrayBlockingQueue[A](1)
// uses of 'respond' need to be wrapped in an actor or future block
actor { r.respond(a => q.put(a)) }
return q.take
}
Então agora você pode criar uma chamada de bloqueio para calcular em seu main
Método como assim:
val finalResult = claim(compute(input))