Saltar al contenido

Anidamiento en espera en Parallel.ForEach

El paso a paso o código que hallarás en este post es la resolución más sencilla y válida que hallamos a esta inquietud o dilema.

Solución:

Toda la idea detrás Parallel.ForEach() es que tienes un conjunto de hilos y cada hilo procesa parte de la colección. Como habrás notado, esto no funciona con asyncawaitdonde desea liberar el subproceso durante la duración de la llamada asíncrona.

Podrías “arreglar” eso bloqueando el ForEach() hilos, pero eso anula todo el punto de asyncawait.

Lo que podría hacer es usar TPL Dataflow en lugar de Parallel.ForEach()que soporta asíncrono Taskhinchar.

Específicamente, su código podría escribirse usando un TransformBlock que transforma cada id en un Customer utilizando el async lambda. Este bloque se puede configurar para ejecutarse en paralelo. Vincularías ese bloque a un ActionBlock que escribe cada uno Customer a la consola Después de configurar la red de bloques, puede Post() cada identificación a la TransformBlock.

En codigo:

var ids = new List  "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" ;

var getCustomerBlock = new TransformBlock(
    async i =>
    
        ICustomerRepo repo = new CustomerRepo();
        return await repo.GetCustomer(i);
    , new ExecutionDataflowBlockOptions
    
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
    );
var writeCustomerBlock = new ActionBlock(c => Console.WriteLine(c.ID));
getCustomerBlock.LinkTo(
    writeCustomerBlock, new DataflowLinkOptions
    
        PropagateCompletion = true
    );

foreach (var id in ids)
    getCustomerBlock.Post(id);

getCustomerBlock.Complete();
writeCustomerBlock.Completion.Wait();

Aunque probablemente desee limitar el paralelismo de los TransformBlock a alguna pequeña constante. Además, podría limitar la capacidad de la TransformBlock y agregue los elementos de forma asíncrona usando SendAsync()por ejemplo, si la colección es demasiado grande.

Como beneficio adicional en comparación con su código (si funcionó) es que la escritura comenzará tan pronto como finalice un solo elemento, y no esperará hasta que finalice todo el procesamiento.

La respuesta de Svick es (como siempre) excelente.

Sin embargo, encuentro que Dataflow es más útil cuando realmente tienes grandes cantidades de datos para transferir. O cuando necesites un async-cola compatible.

En su caso, una solución más simple es simplemente usar el async-paralelismo de estilo:

var ids = new List()  "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" ;

var customerTasks = ids.Select(i =>
  
    ICustomerRepo repo = new CustomerRepo();
    return repo.GetCustomer(i);
  );
var customers = await Task.WhenAll(customerTasks);

foreach (var customer in customers)

  Console.WriteLine(customer.ID);


Console.ReadKey();

El uso de DataFlow como sugirió Svick puede ser excesivo, y la respuesta de Stephen no proporciona los medios para controlar la concurrencia de la operación. Sin embargo, eso se puede lograr de manera bastante simple:

public static async Task RunWithMaxDegreeOfConcurrency(
     int maxDegreeOfConcurrency, IEnumerable collection, Func taskFactory)

    var activeTasks = new List(maxDegreeOfConcurrency);
    foreach (var task in collection.Select(taskFactory))
    
        activeTasks.Add(task);
        if (activeTasks.Count == maxDegreeOfConcurrency)
        
            await Task.WhenAny(activeTasks.ToArray());
            //observe exceptions here
            activeTasks.RemoveAll(t => t.IsCompleted); 
        
    
    await Task.WhenAll(activeTasks.ToArray()).ContinueWith(t => 
    
        //observe exceptions in a manner consistent with the above   
    );

Él ToArray() las llamadas se pueden optimizar mediante el uso de un array en lugar de una lista y reemplazar las tareas completadas, pero dudo que haga una gran diferencia en la mayoría de los escenarios. Ejemplo de uso según la pregunta del OP:

RunWithMaxDegreeOfConcurrency(10, ids, async i =>

    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
);

EDITAR El compañero usuario de SO y mago de TPL, Eli Arbel, me indicó un artículo relacionado de Stephen Toub. Como de costumbre, su implementación es elegante y eficiente:

public static Task ForEachAsync(
      this IEnumerable source, int dop, Func body) 
 
    return Task.WhenAll( 
        from partition in Partitioner.Create(source).GetPartitions(dop) 
        select Task.Run(async delegate  
            using (partition) 
                while (partition.MoveNext()) 
                    await body(partition.Current).ContinueWith(t => 
                          
                              //observe exceptions
                          );
                      
        )); 

Comentarios y calificaciones de la guía

Eres capaz de asentar nuestra ocupación ejecutando un comentario y valorándolo te lo agradecemos.

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *