使用ThreadPoolExecutor和Semaphore限制任务提交率

使用ThreadPoolExecutorSemaphore限制任务提交率

原文: https://howtodoinjava.com/java/multi-threading/throttling-task-submission-rate-using-threadpoolexecutor-and-semaphore/

如果您知道在 Web 服务器中,则可以配置到服务器的最大并发连接数。 如果有更多连接超出此限制,则它们必须等待直到释放或关闭某些其他连接。 此限制可以视为节流。 节流是为输出速率比输入速率慢的系统调节输入速率的能力。 必须停止系统崩溃或资源耗尽。

在与BlockingQueueThreadPoolExecutor相关的上一篇文章中,我们了解了如何创建具有以下能力的CustomThreadPoolExecutor

1)提交到阻塞队列 的任务,2)一个执行器,从队列中拾取任务并执行它们,3)已在ExecuteGate之后覆盖了Execute()方法以执行一些必要的额外活动,4)附加了一个RejectedExecutionHandler,用于处理由于队列已满而被拒绝的任务

我们的方法已经足够好,并且能够处理大多数实际情况。 现在,我们再添加一个概念,在某些情况下可能会证明是有益的。 这个概念是围绕队列中任务提交的限制。

在此示例中,节流将有助于使队列中的任务数保持在限制范围内,从而使任何任务都不会被拒绝。 它从本质上也消除了RejectedExecutionHandler的必要性。

使用CustomThreadPoolExecutorRejectedExecutionHandler的先前解决方案

在此解决方案中,我们有以下类:

DemoTask.java

public class DemoTask implements Runnable
{
   private String name = null;

   public DemoTask(String name) {
      this.name = name;
   }

   public String getName() {
      return this.name;
   }

   @Override
   public void run(){
      try {
         Thread.sleep(1000);
      } catch (InterruptedException e){
         e.printStackTrace();
      }
      System.out.println("Executing : " + name);
   }
}

CustomThreadPoolExecutor.java

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class CustomThreadPoolExecutor extends ThreadPoolExecutor
{
   public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, 
									TimeUnit unit, BlockingQueue<Runnable> workQueue)
   {
      super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
   }

   @Override
   protected void beforeExecute(Thread t, Runnable r)
   {
      super.beforeExecute(t, r);
   }

   @Override
   protected void afterExecute(Runnable r, Throwable t)
   {
      super.afterExecute(r, t);
      if (t != null)
      {
         t.printStackTrace();
      }
   }
}

DemoExecutor.java

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class DemoExecutor
{
   public static void main(String[] args)
   {
      Integer threadCounter = 0;
      BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(50);
      CustomThreadPoolExecutor executor = new CustomThreadPoolExecutor(10, 20, 5000, TimeUnit.MILLISECONDS, blockingQueue);
      executor.setRejectedExecutionHandler(new RejectedExecutionHandler()
         {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
            {
               System.out.println("DemoTask Rejected : " + ((DemoTask) r).getName());
               try
               {
                  Thread.sleep(1000);
               } catch (InterruptedException e)
               {
                  e.printStackTrace();
               }
               System.out.println("Lets add another time : " + ((DemoTask) r).getName());
               executor.execute(r);
            }
         });
      // Let start all core threads initially
      executor.prestartAllCoreThreads();
      while (true)
      {
         threadCounter++;
         // Adding threads one by one
         //System.out.println("Adding DemoTask : " + threadCounter);
         executor.execute(new DemoTask(threadCounter.toString()));
         if (threadCounter == 1000)
            break;
      }
   }
}

如果运行上述程序,则将获得输出,如下所示:

DemoTask Rejected : 71
Executing : 3
Executing : 5
...
...

将出现多次“DemoTask Rejected”。 在下一个解决方案中,我们将使用节流技术,以使任何任务都不会被拒绝。

使用ThreadPoolExecutorSemaphore限制任务的提交率

在此解决方案中,我们将创建一个Semaphore,其编号必须等于在任何给定时间点阻塞队列中的最大任务数。 因此该方法如下所示:

1)在执行任务之前,要求锁定信号量 2)如果获取了锁定,则执行正常。 否则,将重试直到获得锁 3)任务完成后; 锁被释放到信号量

我们启用节流的新BlockingThreadPoolExecutor如下所示:

package threadpoolDemo;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class BlockingThreadPoolExecutor extends ThreadPoolExecutor
{
   private final Semaphore semaphore;

   public BlockingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
   {
      super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
      semaphore = new Semaphore(corePoolSize + 50);
   }

   @Override
   protected void beforeExecute(Thread t, Runnable r)
   {
      super.beforeExecute(t, r);
   }

   @Override
   public void execute(final Runnable task)
   {
      boolean acquired = false;
      do
      {
         try
         {
            semaphore.acquire();
            acquired = true;
         } catch (final InterruptedException e)
         {
            //LOGGER.warn("InterruptedException whilst aquiring semaphore", e);
         }
      } while (!acquired);
      try
      {
         super.execute(task);
      } catch (final RejectedExecutionException e)
      {
         System.out.println("Task Rejected");
         semaphore.release();
         throw e;
      }
   }

   @Override
   protected void afterExecute(Runnable r, Throwable t)
   {
      super.afterExecute(r, t);
      if (t != null)
      {
         t.printStackTrace();
      }
      semaphore.release();
   }
}

现在,如下测试代码。

package threadpoolDemo;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class DemoExecutor
{
   public static void main(String[] args)
   {
      Integer threadCounter = 0;
      BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(50);
      BlockingThreadPoolExecutor executor = new BlockingThreadPoolExecutor(10, 20, 5000, TimeUnit.MILLISECONDS, blockingQueue);
      executor.setRejectedExecutionHandler(new RejectedExecutionHandler()
         {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
            {
               System.out.println("DemoTask Rejected : " + ((DemoTask) r).getName());
               try
               {
                  Thread.sleep(1000);
               } catch (InterruptedException e)
               {
                  e.printStackTrace();
               }
               System.out.println("Lets add another time : " + ((DemoTask) r).getName());
               executor.execute(r);
            }
         });
      // Let start all core threads initially
      executor.prestartAllCoreThreads();
      while (true)
      {
         threadCounter++;
         // Adding threads one by one
         System.out.println("Adding DemoTask : " + threadCounter);
         executor.execute(new DemoTask(threadCounter.toString()));
         if (threadCounter == 1000)
            break;
      }
   }
}

当使用BlockingThreadPoolExecutor代替CustomThreadPoolExecutor运行DemoExecutor程序时,您不会看到任何任务被拒绝,并且所有任务都将成功执行。

您可以控制在任何时候通过Semaphore构造函数传递参数的任务数量。

这就是这篇文章的全部内容。 您应该阅读有关并发的更多信息,以提高信心。

学习愉快!