Saltar al contenido

¿Cómo pausar / reanudar todos los hilos en un ExecutorService en Java?

Después de de una larga recopilación de datos dimos con la solución este contratiempo que pueden tener ciertos de nuestros usuarios. Te brindamos la solución y esperamos que te sea de mucha ayuda.

Solución:

Para responder a mi propia pregunta, encontré un ejemplo de PausableThreadPoolExecutor en los javadocs de ThreadPoolExecutor sí mismo. Aquí está mi versión usando los monitores de Guava:

import com.google.common.util.concurrent.Monitor;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;

public class PausableExecutor extends ScheduledThreadPoolExecutor 

    private boolean isPaused;

    private final Monitor monitor = new Monitor();
    private final Monitor.Guard paused = new Monitor.Guard(monitor) 
        @Override
        public boolean isSatisfied() 
            return isPaused;
        
    ;

    private final Monitor.Guard notPaused = new Monitor.Guard(monitor) 
        @Override
        public boolean isSatisfied() 
            return !isPaused;
        
    ;

    public PausableExecutor(int corePoolSize, ThreadFactory threadFactory) 
        super(corePoolSize, threadFactory);
    

    protected void beforeExecute(Thread t, Runnable r) 
        super.beforeExecute(t, r);
        monitor.enterWhenUninterruptibly(notPaused);
        try 
            monitor.waitForUninterruptibly(notPaused);
         finally 
            monitor.leave();
        
    

    public void pause() 
        monitor.enterIf(notPaused);
        try 
            isPaused = true;
         finally 
            monitor.leave();
        
    

    public void resume() 
        monitor.enterIf(paused);
        try 
            isPaused = false;
         finally 
            monitor.leave();
        
    

Hice algunas críticas sobre su respuesta aceptada, pero no fueron muy constructivas … Así que aquí está mi solución. Usaría una clase como esta y luego llamaría checkIn donde / cuando quiera la funcionalidad de pausa. ¡Encuéntrelo en GitHub!

import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Provides a mechanism to pause multiple threads.
 * If wish your thread to participate, then it must regularly check in with an instance of this object.
 * 
 * @author Corin Lawson <[email protected]>
 */
public class Continue 
    private boolean isPaused;
    private ReentrantLock pauseLock = new ReentrantLock();
    private Condition unpaused = pauseLock.newCondition();

    public void checkIn() throws InterruptedException 
        if (isPaused) 
            pauseLock.lock();
            try 
                while (isPaused)
                    unpaused.await();
             finally 
                pauseLock.unlock();
            
        
    

    public void checkInUntil(Date deadline) throws InterruptedException 
        if (isPaused) 
            pauseLock.lock();
            try 
                while (isPaused)
                    unpaused.awaitUntil(deadline);
             finally 
                pauseLock.unlock();
            
        
    

    public void checkIn(long nanosTimeout) throws InterruptedException 
        if (isPaused) 
            pauseLock.lock();
            try 
                while (isPaused)
                    unpaused.awaitNanos(nanosTimeout);
             finally 
                pauseLock.unlock();
            
        
    

    public void checkIn(long time, TimeUnit unit) throws InterruptedException 
        if (isPaused) 
            pauseLock.lock();
            try 
                while (isPaused)
                    unpaused.await(time, unit);
             finally 
                pauseLock.unlock();
            
        
    

    public void checkInUninterruptibly() 
        if (isPaused) 
            pauseLock.lock();
            try 
                while (isPaused)
                    unpaused.awaitUninterruptibly();
             finally 
                pauseLock.unlock();
            
        
    

    public boolean isPaused() 
        return isPaused;
    

    public void pause() 
        pauseLock.lock();
        try 
            isPaused = true;
         finally 
            pauseLock.unlock();
        
    

    public void resume() 
        pauseLock.lock();
        try 
            if (isPaused) 
                isPaused = false;
                unpaused.signalAll();
            
         finally 
            pauseLock.unlock();
        
    

Por ejemplo:

import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;

public class PausableExecutor extends ScheduledThreadPoolExecutor 
    private Continue cont;

    public PausableExecutor(int corePoolSize, ThreadFactory threadFactory, Continue c) 
        super(corePoolSize, threadFactory);
        cont = c;
    

    protected void beforeExecute(Thread t, Runnable r) 
        cont.checkIn();
        super.beforeExecute(t, r);
    

Esto tiene el beneficio adicional de que puede pausar muchos hilos con una sola llamada a Continue‘s pause.

Estaba buscando la funcionalidad de pausa / reanudación en el ejecutor, pero con la capacidad adicional de esperar cualquier tarea que se esté procesando actualmente. A continuación se muestra una variante de otras grandes implementaciones de este SO con la adición de funciones de espera. Lo estaba probando en el ejecutor con un solo hilo. Entonces el uso básico es:

executor.pause();
executor.await(10000); // blocks till current tasks processing ends

código de clase:

import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class PausableScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor       
  public boolean isPaused;
  private ReentrantLock pauseLock = new ReentrantLock();
  private Condition unpaused = pauseLock.newCondition();
  private Latch activeTasksLatch = new Latch();

  private class Latch 
    private final Object synchObj = new Object();
    private int count;

    public boolean awaitZero(long waitMS) throws InterruptedException 
      long startTime = System.currentTimeMillis();
      synchronized (synchObj) 
        while (count > 0) 
          if ( waitMS != 0) 
            synchObj.wait(waitMS);
            long curTime = System.currentTimeMillis();
            if ( (curTime - startTime) > waitMS )                 
              return count <= 0;
            
          
          else
            synchObj.wait();
        
        return count <= 0; 
      
    
    public void countDown() 
      synchronized (synchObj) 
        if (--count <= 0) 
          // assert count >= 0;              
          synchObj.notifyAll();
        
      
    
    public void countUp() 
      synchronized (synchObj) 
        count++;
      
        
  

  /**
   * Default constructor for a simple fixed threadpool
   */
  public PausableScheduledThreadPoolExecutor(int corePoolSize) 
    super(corePoolSize);
  

  /**
   * Executed before a task is assigned to a thread.
   */
  @Override
  protected void beforeExecute(Thread t, Runnable r) 
    pauseLock.lock();
    try 
      while (isPaused)
        unpaused.await();
     catch (InterruptedException ie) 
      t.interrupt();
     finally 
      pauseLock.unlock();
    

    activeTasksLatch.countUp();
    super.beforeExecute(t, r);
  

  @Override
  protected void afterExecute(Runnable r, Throwable t) 
    try 
      super.afterExecute(r, t);
    
    finally 
      activeTasksLatch.countDown();
    
  

  /**
   * Pause the threadpool. Running tasks will continue running, but new tasks
   * will not start untill the threadpool is resumed.
   */
  public void pause() 
    pauseLock.lock();
    try 
      isPaused = true;
     finally 
      pauseLock.unlock();
    
  

  /**
   * Wait for all active tasks to end.
   */ 
  public boolean await(long timeoutMS) 
    // assert isPaused;
    try 
      return activeTasksLatch.awaitZero(timeoutMS);
     catch (InterruptedException e) 
      // log e, or rethrow maybe
    
    return false;
  

  /**
   * Resume the threadpool.
   */
  public void resume() 
    pauseLock.lock();
    try 
      isPaused = false;
      unpaused.signalAll();
     finally 
      pauseLock.unlock();
    
  


Agradecemos que quieras apoyar nuestra labor fijando un comentario o valorándolo te damos las gracias.

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *