Pergunta

Estou tentando usar uma abordagem de divisão e conquista (também conhecida como fork/junção) para um problema de trituração numérica. Aqui está o código:

import scala.actors.Futures.future

private def compute( input: Input ):Result = {
  if( pairs.size < SIZE_LIMIT ) {
    computeSequential()
  } else {
    val (input1,input2) = input.split
    val f1 = future( compute(input1) )
    val f2 = future( compute(input2) )
    val result1 = f1()
    val result2 = f2()
    merge(result1,result2)
  }
}

Ele é executado (com uma boa aceleração), mas o método Apply Future Apply parece bloquear um fio e o pool de threads aumenta tremendamente. E quando muitos threads são criados, os cálculos são presos.

Existe algum tipo de reagir Método para futuros que libera o tópico? Ou qualquer outra maneira de alcançar esse comportamento?

EDITAR: Estou usando o Scala 2.8.0.Final

Foi útil?

Solução

Não reivindique (aplique) o seu Futures, já que isso os obriga a bloquear e aguardar uma resposta; Como você viu, isso pode levar a deadlocks. Em vez disso, use -os monadicamente para dizer o que fazer quando eles concluírem. Ao invés de:

val result1 = f1()
val result2 = f2()
merge(result1,result2)

Experimente isso:

for {
  result1 <- f1
  result2 <- f2
} yield merge(result1, result2)

O resultado disso será um Responder[Result] (essencialmente a Future[Result]) contendo os resultados mesclados; Você pode fazer algo eficaz com este valor final usando respond() ou foreach(), ou você pode map() ou flatMap() para outro Responder[T]. Não é necessário bloquear, continue agendando cálculos para o futuro!

Editar 1:

OK, a assinatura do compute a função terá que mudar para Responder[Result] Agora, então como isso afeta as chamadas recursivas? Vamos tentar isso:

private def compute( input: Input ):Responder[Result] = {
  if( pairs.size < SIZE_LIMIT ) {
    future(computeSequential())
  } else {
    val (input1,input2) = input.split
    for {
      result1 <- compute(input1)
      result2 <- compute(input2)
    } yield merge(result1, result2)
  }
}

Agora você não precisa mais embrulhar as chamadas para compute com future(...) Porque eles já estão voltando Responder (uma superclasse de Future).

Editar 2:

Um resultado de usar esse estilo de passagem de continuação é que o seu código de nível superior-seja o que compute Originalmente-não é mais bloqueado. Se estiver sendo chamado de main(), e isso é tudo o que o programa faz, isso será um problema, porque agora ele gera um monte de futuros e depois desligou imediatamente, tendo terminado tudo o que foi instruído a fazer. O que você precisa fazer é block em todos esses futuros, mas apenas uma vez, no nível superior, e apenas nos resultados de tudo os cálculos, não nenhum intermediário.

Infelizmente, isso Responder coisa que está sendo devolvida por compute() não tem mais um bloqueio apply() método como o Future fez. Não sei por que por que plana Futures produz um genérico Responder em vez de um Future; Parece um erro da API. Mas, de qualquer forma, você deve ser capaz de fazer o seu próprio:

def claim[A](r:Responder[A]):A = {
  import java.util.concurrent.ArrayBlockingQueue
  import scala.actors.Actor.actor

  val q = new ArrayBlockingQueue[A](1)
  // uses of 'respond' need to be wrapped in an actor or future block
  actor { r.respond(a => q.put(a)) } 
  return q.take
}

Então agora você pode criar uma chamada de bloqueio para calcular em seu main Método como assim:

val finalResult = claim(compute(input))
Licenciado em: CC-BY-SA com atribuição
Não afiliado a StackOverflow
scroll top