Vyoms OneStopTesting.com - Testing EBooks, Tutorials, Articles, Jobs, Training Institutes etc.
OneStopGate.com - Gate EBooks, Tutorials, Articles, FAQs, Jobs, Training Institutes etc.
OneStopMBA.com - MBA EBooks, Tutorials, Articles, FAQs, Jobs, Training Institutes etc.
OneStopIAS.com - IAS EBooks, Tutorials, Articles, FAQs, Jobs, Training Institutes etc.
OneStopSAP.com - SAP EBooks, Tutorials, Articles, FAQs, Jobs, Training Institutes etc.
OneStopGRE.com - of GRE EBooks, Tutorials, Articles, FAQs, Jobs, Training Institutes etc.
Bookmark and Share Rss Feeds

Creating a NotifyingBlockingThreadPoolExecutor | Articles | Recent Articles | News Article | Interesting Articles | Technology Articles | Articles On Education | Articles On Corporate | Company Articles | College Articles | Articles on Recession
Sponsored Ads
Hot Jobs
Fresher Jobs
Experienced Jobs
Government Jobs
Walkin Jobs
Placement Section
Company Profiles
Interview Questions
Placement Papers
Resources @ VYOMS
Companies In India
Consultants In India
Colleges In India
Exams In India
Latest Results
Notifications In India
Call Centers In India
Training Institutes In India
Job Communities In India
Courses In India
Jobs by Keyskills
Jobs by Functional Areas
Learn @ VYOMS
GATE Preparation
GRE Preparation
GMAT Preparation
IAS Preparation
SAP Preparation
Testing Preparation
MBA Preparation
News @ VYOMS
Freshers News
Job Articles
Latest News
India News Network
Interview Ebook
Get 30,000+ Interview Questions & Answers in an eBook.
Interview Success Kit - Get Success in Job Interviews
  • 30,000+ Interview Questions
  • Most Questions Answered
  • 5 FREE Bonuses
  • Free Upgrades

VYOMS TOP EMPLOYERS

Wipro Technologies
Tata Consultancy Services
Accenture
IBM
Satyam
Genpact
Cognizant Technologies

Home » Articles » Creating a NotifyingBlockingThreadPoolExecutor

Creating a NotifyingBlockingThreadPoolExecutor








Article Posted On Date : Wednesday, May 20, 2009


Creating a NotifyingBlockingThreadPoolExecutor
Advertisements

HTML clipboard

A Thread Pool is a useful tool for performing a collection of tasks in parallel. This becomes more and more relevant as CPUs introduce multi-core architectures that can benefit from parallelizing our programs. Java 5 introduced this framework as part of the new concurrency support, with the ThreadPoolExecutor class and other assisting classes. The ThreadPoolExecutor framework is powerful yet flexible enough, allowing user-specific configurations and providing relevant hooks and saturation strategies to deal with a full queue. To best follow this article, you may find it useful to open the ThreadPoolExecutor Java API in a parallel tab.

The Need for a Blocking Thread Pool

Recently, my colleague Yaneeve Shekel had the need for a thread pool that would work on several tasks in parallel but would wait to add new tasks until a free thread was there to handle them. This is really not something bizarre: in fact, this need is quite common. Yaneeve needed it to analyze a huge directory with a very long list of files, where there was no point in piling on more and more FileAnalyzeTask instances without a free thread to handle them. The analyze operation takes some time, while the speed in which we can pile files for analysis is much higher. Thus, not controlling for thread availability for the task would create a huge queue with a possible memory problem, and for no benefit.

Other cases in which you'd need a thread pool that can wait to add new tasks:

  • Doing some in-memory task on a long list of database records. You would not want to run and turn each record to a task in the ThreadPoolExecutor queue while the threads are busy with some long operation on previous records, as doing this would exhaust your memory. The right way to do it is to query the database, run over the result set and create enough tasks for a fixed sized queue, and then wait until there is room in the queue. You can use a cursor to represent the result set, but even if you get back a dynamic result set, the database will not reply with the entire bulk of records; it will send you a limited amount of records and update your result set object while you run over it, forwarding to the next records of your result set, thus only forwarding through the result set. When the queue is ready for more tasks, it reads the next records from the database.
  • Analyzing a long file with "independent lines": each line can be analyzed separately by a different thread. Again, there is no sense in reading the entire file into LineTask objects if there is no available thread to handle them. This scenario is in fact a true need raised in a forum asking for a recommended solution.

The problem is that ThreadPoolExecutor doesn't give you the required behavior -- blocking when the queue is full -- out of the box. A feature request was even submitted to the Java Bug database (Bug Id 6648211, "Need for blocking ThreadPoolExecutor"), but it was put on "very low priority," as the user is supposedly able to quite easily implement this behavior.

At a first glance it looks odd; you think that a ThreadPoolExecutor with a bounded BlockingQueue will give you exactly this behavior. But apparently it does not. In fact, by default it throws RejectedExecutionException if a task is submitted and the queue is full. This happens because ThreadPoolExecutor.execute(Runnable) does not call the blocking method BlockingQueue.put(...) when queuing a task, but rather the unblocking Queue.offer(...), with a timeout of 0, which means "try but do not wait.". And if the result is false (offer failed), it calls the saturation policy -- the assigned RejectExecutionHandler for this thread pool -- with the default handler throwing an exception. Though it seems that there is no real logic in this, it is in fact a design decision, allowing the user to react to the fact that a task is rejected rather than just deciding in the framework to wait or block.

Suggested Solutions

There are several ways to allow blocking on a full queue:

  • We may implement our own BlockingThreadPoolExecutor and override the execute(...) method, so it will call the BlockingQueue.put(...) instead of BlockingQueue.offer(...). But this may not be so elegant as we interfere quite brutally in how execute() works (and we cannot call super.execute(...) since we do the queuing).
  • There is the option to create a ThreadPoolExecutor with the CallerRunsPolicy reject strategy. This strategy, in the case of a full queue, sends the exceeding task to be executed by the thread that called execute() (the producer), thus killing two birds with one stone: the task is handled and the producer is busy in handling the task and not in overloading the queue with additional tasks. There are, however, two flaws in this strategy. First, the task is not handled in the order it was produced; this is usually not so problematic anyhow, as there is no real guarantee on the order of context switch between the worker threads that influences task progress and order. Second, when the producer is working on its task, no one fills the queue. So if one of the worker threads, or more, finish their tasks while the producer is still working, they will become idle. It requires fine configuration tuning of the queue size in order to minimize it, but you can never guarantee to avoid this situation. It would have been nice if there was a way to set the ThreadPoolExecutor in a true Leader-Followers manner (a design pattern in which the producer gets to run the task while a thread from the pool becomes the new producer), but the CallerRunsPolicy strategy does not work like that. (The C++ ACE framework for example, implemented the Leader-Followers pattern. For more details on the Leader-Followers pattern, you can follow this presentation.)
  • One can implement a simple "counting" ThreadPoolExecutor that uses a Semaphore initialized to the bound that we want to set, decremented, by calling acquire() at execute(...), and increased back, by calling release() at the afterExecute() hook method, as well as in a catch at the end of execute(...) for the reject scenario. The semaphore is acting in this way as a block on the call to execute(...) and you can in fact use an unbounded BlockingQueue in this case.
      public class BlockingThreadPoolExecutor       extends ThreadPoolExecutor {                private Semaphore semaphore;        public BlockingThreadPoolExecutor(..., int bound, ...) {          super(...);          this.semaphore = new Semaphore(bound);      }        @Override      public void execute(Runnable task) {          boolean acquired = false;          do {              try {                  semaphore.acquire();                  acquired = true;              } catch (InterruptedException e) {                  // wait forever!              }                             } while(!acquired);            try {              super.execute(task);          } catch(RuntimeException e) {              // specifically, handle RejectedExecutionException                semaphore.release();              throw e;          } catch(Error e) {              semaphore.release();              throw e;          }      }        @Override      protected void afterExecute(Runnable r, Throwable t) {          semaphore.release();      }  }        

    This is a nice solution. A nice adaptation may be to use tryAcquire(timeout) as it is always a better practice to allow a timeout on blocking operations. But anyway, I personally don't like self-managing the blocking operation when the ThreadPoolExecutor may have its own bounded queue. It doesn't make sense for me. I prefer the following solution that uses the bounded queue blocking and the saturation policy.

  • The fourth solution is to create a ThreadPoolExecutor with a bounded queue and our own RejectExecutionHandler that will block on the queue waiting for it to be ready to take new tasks. We prefer to wait on the queue with a timeout and to notify the user if the timeout occurs, so that we will not wait forever in case of some problem in pulling the tasks from the queue. However, for most reasonable scenarios, the caller will not have to take any action when the queue is full, as the producer thread will just wait on the queue. I prefer this approach is it seems the most simple using the original design of ThreadPoolExecutor.
  public class BlockingThreadPoolExecutor      extends ThreadPoolExecutor {        public BlockingThreadPoolExecutor(          int poolSize,          int queueSize,          long keepAliveTime,          TimeUnit keepAliveTimeUnit,          long maxBlockingTime,          TimeUnit maxBlockingTimeUnit,          Callable<Boolean> blockingTimeCallback) {            super(                  poolSize, // Core size                  poolSize, // Max size                  keepAliveTime,                  keepAliveTimeUnit,                  new ArrayBlockingQueue<Runnable>(                      // to avoid redundant threads                      Math.max(poolSize, queueSize)                  ),                   // our own RejectExecutionHandler � see below                  new BlockThenRunPolicy(                      maxBlockingTime,                      maxBlockingTimeUnit,                      blockingTimeCallback                  )          );            super.allowCoreThreadTimeOut(true);      }        @Override      public void setRejectedExecutionHandler        (RejectedExecutionHandler h) {          throw new unsupportedOperationException(              "setRejectedExecutionHandler               is not allowed on this class.");      }        // ...  }        

This is our new blocking thread pool. But as you may see, the real thing is still missing and that is our own new RejectExecutionHandler. In the constructor we pass parameters to our super, ThreadPoolExecutor. We use the full version constructor since the most important parameter that we wish to pass to our base class is the RejectExecutionHandler, which is the last parameter. We create a new object of the type BlockThenRunPolicy, our own class (presented in a moment). The name of this saturation policy means exactly what it does: if a task is rejected due to saturation, block on the task submission in the producer thread context, and when there is enough capacity to take the task, accept it. We implement the BlockThenRunPolicy class as a private inner class inside our BlockingThreadPoolExecutor, as no one else should know it.

      // --------------------------------------------------       // Inner private class of BlockingThreadPoolExecutor      // A reject policy that waits on the queue      // --------------------------------------------------       private static class BlockThenRunPolicy          implements RejectedExecutionHandler {            private long blockTimeout;          private TimeUnit blocTimeoutUnit;          private Callable<Boolean> blockTimeoutCallback;            // Straight-forward constructor          public BlockThenRunPolicy(...){...}            // --------------------------------------------------            @Override          public void rejectedExecution(              Runnable task,              ThreadPoolExecutor executor) {                           BlockingQueue<Runnable> queue = executor.getQueue();              boolean taskSent = false;                while (!taskSent) {                    if (executor.isShutdown()) {                      throw new RejectedExecutionException(                          "ThreadPoolExecutor has shutdown                            while attempting to offer a new task.");                  }                    try {                      // offer the task to the queue, for a blocking-timeout                      if (queue.offer(task, blockTimeout, blocTimeoutUnit)) {                          taskSent = true;                      }                      else {                          // task was not accepted - call the user's Callback                          Boolean result = null;                          try {                              result = blockTimeoutCallback.call();                          }                          catch(Exception e) {                              // wrap the Callback exception and re-throw                              throw new RejectedExecutionException(e);                          }                          // check the Callback result                          if(result == false) {                              throw new RejectedExecutionException(                                "User decided to stop waiting                                 for task insertion");                                                  }                          else {                              // user decided to keep waiting (may log it)                              continue;                          }                      }                  }                  catch (InterruptedException e) {                      // we need to go back to the offer call...                  }                } // end of while for InterruptedException             } // end of method rejectExecution            // --------------------------------------------------        } // end of inner private class BlockThenRunPolicy              

Note that we may get a timeout when waiting on the queue, on the call to queue.offer(...). It is always the right practice to use a timeout-enabled version of a blocking call, rather than any "wait-forever" version. This way it is easier to be aware of and troubleshoot cases of thread starvation and deadlocks. In this case, we do not log the event of getting the timeout, as we do not have a logger at hand. But still, this is a major event, especially if we set a long timeout that we do not expect to happen. This is why we ask the user to provide a callback so we can report the event and let the user decide whether to just log and keep waiting or stop the wait.

Our solution preserves the default behavior of ThreadPoolExecutor, except for the saturation policy. Since we use inheritance, any setter or getter of the original ThreadPoolExecutor can be used, excluding the setRejectedExecutionHandler, which we forbid, throwing an exception if called. Prometheus, another open source approach to the blocking thread pool problem, used a wrapper solution as a straightforward approach (with the following API). However, the wrapper solution requires implementing all ExecutorService interface methods -- in order to be a common ExecutorService -- resulting with a quite cumbersome solution compared to our more organic extension.

We have a BlockingThreadPoolExecutor. But bear with me for a few more moments, as we are about to ask for more.

Remember our problem. We have a huge directory filled with files and we wanted to block on the queue if it is full. But we need something more. When all files are sent to the queue, the producer thread knows it is done sending all the files, but it still needs to wait for the worker threads to finish. And we do not want to shut down the thread pool and wait for it to finish that way, as we are going to use it in a few moments again. What we need is a way to wait for the final tasks sent to the thread pool to complete.

To do that we add a "synchronizer" object for the producer to wait on. The producer will wait on a new method we create, which we called await(), but there is an underlying condition inside that waits for a signal, and this is our Synchronizer. The thread pool signals the Synchronizer when it is idle; that is, all worker threads are idle. To have this info we simply count the number of currently working threads. We do not rely on the getActiveCount() method, as its contract and definition are not clear enough; we prefer to simply do it ourselves using an AtomicInteger to make sure that increment and decrement operations are done atomically, without a need to synchronize around ++ or --.

Here we use the beforeExecute() and afterExecute() hook methods, but must take care of tasks that failed at the execute point, before assuming position in the queue, in which case decreasing the counter must be done. Our Synchronizer class manages the blocking wait on the await() method, by waiting on a Condition that is signaled only when there are no tasks in the queue.

The resulting code is this:

  public class NotifyingThreadPoolExecutor      extends ThreadPoolExecutor {        private AtomicInteger tasksInProcess = new AtomicInteger();      // using our own private inner class, see below      private Synchronizer synchronizer = new Synchronizer();        @Override      public void execute(Runnable task) {          // count a new task in process          tasksInProcess.incrementAndGet();            try {              super.execute(task);          } catch(RuntimeException e) {              // specifically, handle RejectedExecutionException                tasksInProcess.decrementAndGet();              throw e;          } catch(Error e) {              tasksInProcess.decrementAndGet();              throw e;          }      }        @Override      protected void afterExecute(Runnable r, Throwable t) {            super.afterExecute(r, t);            // synchronizing on the pool (and all its threads)          // we need the synchronization to avoid more than one signal          // if two or more threads decrement almost together and come          // to the if with 0 tasks together          synchronized(this) {              tasksInProcess.decrementAndGet();              if (tasksInProcess.intValue() == 0) {                  synchronizer.signalAll();              }          }      }        public void await() throws InterruptedException {          synchronizer.await();      }        // (there is also an await with timeout, see the full source code)         }          

We need now to provide the Synchronizer class that does the actual locking and synchronization work. We prefer to implement the Synchronizer class as a private inner class inside our NotifyingThreadPoolExecutor, as no one else should know it.

        //--------------------------------------------------------------      // Inner private class of NotifyingThreadPoolExecutor      // for signaling when queue is idle      //--------------------------------------------------------------      private class Synchronizer {            private final Lock lock = new ReentrantLock();          private final Condition done = lock.newCondition();          private boolean isDone = false;            // called from the containing class NotifyingThreadPoolExecutor          private void signalAll() {                lock.lock(); // MUST lock!              try {                  isDone = true;                  done.signalAll();              }              finally {                  lock.unlock(); // unlock even in case of an exception              }          }            public void await() throws InterruptedException {                lock.lock(); // MUST lock!              try {                  while (!isDone) { // avoid signaling on 'spuriously' wake-up                      done.await();                  }              }              finally {                  isDone = false; // for next call to await                  lock.unlock();  // unlock even in case of an exception              }          }          // (there is also an await with timeout, see the full source code)         } // end of private inner class Synchronizer        //--------------------------------------------------------------          






Sponsored Ads



Interview Questions
HR Interview Questions
Testing Interview Questions
SAP Interview Questions
Business Intelligence Interview Questions
Call Center Interview Questions

Databases

Clipper Interview Questions
DBA Interview Questions
Firebird Interview Questions
Hierarchical Interview Questions
Informix Interview Questions
Microsoft Access Interview Questions
MS SqlServer Interview Questions
MYSQL Interview Questions
Network Interview Questions
Object Relational Interview Questions
PL/SQL Interview Questions
PostgreSQL Interview Questions
Progress Interview Questions
Relational Interview Questions
SQL Interview Questions
SQL Server Interview Questions
Stored Procedures Interview Questions
Sybase Interview Questions
Teradata Interview Questions

Microsof Technologies

.Net Database Interview Questions
.Net Deployement Interview Questions
ADO.NET Interview Questions
ADO.NET 2.0 Interview Questions
Architecture Interview Questions
ASP Interview Questions
ASP.NET Interview Questions
ASP.NET 2.0 Interview Questions
C# Interview Questions
Csharp Interview Questions
DataGrid Interview Questions
DotNet Interview Questions
Microsoft Basics Interview Questions
Microsoft.NET Interview Questions
Microsoft.NET 2.0 Interview Questions
Share Point Interview Questions
Silverlight Interview Questions
VB.NET Interview Questions
VC++ Interview Questions
Visual Basic Interview Questions

Java / J2EE

Applet Interview Questions
Core Java Interview Questions
Eclipse Interview Questions
EJB Interview Questions
Hibernate Interview Questions
J2ME Interview Questions
J2SE Interview Questions
Java Interview Questions
Java Beans Interview Questions
Java Patterns Interview Questions
Java Security Interview Questions
Java Swing Interview Questions
JBOSS Interview Questions
JDBC Interview Questions
JMS Interview Questions
JSF Interview Questions
JSP Interview Questions
RMI Interview Questions
Servlet Interview Questions
Socket Programming Interview Questions
Springs Interview Questions
Struts Interview Questions
Web Sphere Interview Questions

Programming Languages

C Interview Questions
C++ Interview Questions
CGI Interview Questions
Delphi Interview Questions
Fortran Interview Questions
ILU Interview Questions
LISP Interview Questions
Pascal Interview Questions
Perl Interview Questions
PHP Interview Questions
Ruby Interview Questions
Signature Interview Questions
UML Interview Questions
VBA Interview Questions
Windows Interview Questions
Mainframe Interview Questions


Copyright © 2001-2025 Vyoms.com. All Rights Reserved. Home | About Us | Advertise With Vyoms.com | Jobs | Contact Us | Feedback | Link to Us | Privacy Policy | Terms & Conditions
Placement Papers | Get Your Free Website | IAS Preparation | C++ Interview Questions | C Interview Questions | Report a Bug | Romantic Shayari | CAT 2025

Fresher Jobs | Experienced Jobs | Government Jobs | Walkin Jobs | Company Profiles | Interview Questions | Placement Papers | Companies In India | Consultants In India | Colleges In India | Exams In India | Latest Results | Notifications In India | Call Centers In India | Training Institutes In India | Job Communities In India | Courses In India | Jobs by Keyskills | Jobs by Functional Areas

Testing Articles | Testing Books | Testing Certifications | Testing FAQs | Testing Downloads | Testing Interview Questions | Testing Jobs | Testing Training Institutes

Gate Articles | Gate Books | Gate Colleges | Gate Downloads | Gate Faqs | Gate Jobs | Gate News | Gate Sample Papers | Gate Training Institutes

MBA Articles | MBA Books | MBA Case Studies | MBA Business Schools | MBA Current Affairs | MBA Downloads | MBA Events | MBA Notifications | MBA FAQs | MBA Jobs
MBA Job Consultants | MBA News | MBA Results | MBA Courses | MBA Sample Papers | MBA Interview Questions | MBA Training Institutes

GRE Articles | GRE Books | GRE Colleges | GRE Downloads | GRE Events | GRE FAQs | GRE News | GRE Training Institutes | GRE Sample Papers

IAS Articles | IAS Books | IAS Current Affairs | IAS Downloads | IAS Events | IAS FAQs | IAS News | IAS Notifications | IAS UPSC Jobs | IAS Previous Question Papers
IAS Results | IAS Sample Papers | IAS Interview Questions | IAS Training Institutes | IAS Toppers Interview

SAP Articles | SAP Books | SAP Certifications | SAP Companies | SAP Study Materials | SAP Events | SAP FAQs | SAP Jobs | SAP Job Consultants
SAP Links | SAP News | SAP Sample Papers | SAP Interview Questions | SAP Training Institutes |


Copyright ©2001-2025 Vyoms.com, All Rights Reserved.
Disclaimer: VYOMS.com has taken all reasonable steps to ensure that information on this site is authentic. Applicants are advised to research bonafides of advertisers independently. VYOMS.com shall not have any responsibility in this regard.