Sunday, May 13, 2012

Solving a Producer-Consumer Problem in Java

The producer-consumer problem is one of the most frequently encountered problems when we attempt multi threaded programming. While not as challenging as some of the other problems in multi-threaded programming, an incorrect implementation of this problem can create a mess of your application. Produced items will be left unconsumed, starting items will be skipped, consumption depends on whether the production began earlier or later than consumption attempts etc. To add to this you might notice the anomalies long after it has actually happened and most importantly like almost all multi-threaded programs, this one is hard to debug and reproduce too.

So in this post I thought I would attempt to solve this problem in Java with the help of Java' awesome java.util.concurrent package and its classes.

First of all, let us see the characteristics of the producer-consumer problem:
  • Producer(s) produce items.
  • Consumer(s) consume the items produced by the producer(s).
  • Producer(s) finish production and let the consumers know that they are done.
Note that in this producer-consumer problem the producer is running on a different thread than the ones on consumer. This setup makes sense in two cases:
  • The steps to do the consumption of the item produced in independent and not dependent on other items.
  • The time to process the items is larger that the time to produce them.
The term "larger" in the second point is used a bit loosely. Consider the case where producer reads a line from a file and the "consumption and processing" is just to log the line in a special format back to a file then the use of a producer consumer problem solution can be considered a case of over-engineering a solution. However if for each of those lines the "consumption and processing" step is to make a HTTP GET/POST request to a web-server and then dump the result somewhere then we should opt for a producer-consumer solution. In this case I am assuming that all the data to do a GET/POST is available in the line (item) itself and we are not dependent on previous/next lines.

So let us first take a look at the characteristics of the producer-consumer problem solution that I have posted below:
  • There can be multiple producer.
  • There will be multiple consumers.
  • Once the production of new items is done the producer(s) will let the consumers know so that the consumer will exit after the last item is consumed and processed.
It is interesting to note that to solve this problem at a generic level we can address only the consumer side and not the producer side. This is because the production of items might be done at any time and there is very little that we can do in a generic way to control the production of items. We can, however control the consumer's behaviour while accepting items from producer(s). Having laid out the rules let us take a look at the consumer contract:


package com.maximus.producerconsumer;

public interface Consumer
{
 public boolean consume(Item j);
 
 public void finishConsumption();
} 

Here the consumer can be shared between multiple producers of similar items; by similar items I mean producer that produces objects of type "Item". The definition if Item is as follows:

package com.maximus.consumer;

public interface Item
{
 public void process();
}

Now we take a look at an implementation of the Consumer interface:

package com.maximus.consumer;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

public class ConsumerImpl implements Consumer
{
 private BlockingQueue< Item > itemQueue = 
  new LinkedBlockingQueue< Item >();
 
 private ExecutorService executorService = 
  Executors.newCachedThreadPool();
 
 private List< ItemProcessor > jobList = 
  new LinkedList< ItemProcessor >();
 
 private volatile boolean shutdownCalled = false;
  
 public ConsumerImpl(int poolSize)
 {
  for(int i = 0; i < poolSize; i++)
  {
   ItemProcessor jobThread = 
    new ItemProcessor(itemQueue);
   
   jobList.add(jobThread);
   executorService.submit(jobThread);
  }
 }
 
 public boolean consume(Item j)
 {
  if(!shutdownCalled)
  {
   try
   {
    itemQueue.put(j);
   }
   catch(InterruptedException ie)
   {
    Thread.currentThread().interrupt();
    return false;
   }
   return true;
  }
  else
  {
   return false;
  }
 }
 
 public void finishConsumption()
 {
  shutdownCalled = true;
  
  for(ItemProcessor j : jobList)
  {
   j.cancelExecution();
  }
  
  executorService.shutdown();
 }
}

Now the only point of interest is the ItemProcessor that the consumer internally uses to process the incoming items. ItemProcessor is coded as follows:

package com.maximus.consumer;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class ItemProcessor implements Runnable
{
 private BlockingQueue< Item> jobQueue;
 
 private volatile boolean keepProcessing;
  
 public ItemProcessor(BlockingQueue< Item > queue)
 {
  jobQueue = queue;
  keepProcessing = true;
 }
 
 public void run()
 {
  while(keepProcessing || !jobQueue.isEmpty())
  {
   try
   {
    Item j = jobQueue.poll(10, TimeUnit.SECONDS);
    
    if(j != null)
    {
     j.process();
    }
   }
   catch(InterruptedException ie)
   {
    Thread.currentThread().interrupt();
    return;
   }
  }
 }
 
 public void cancelExecution()
 {
  this.keepProcessing = false;
 }
} 
The only challenge above is the condition in the while loop. The while loop is so written to support the continuation of the consumption of items even after the producer(s) have finished production and has notified the consumer that production is finished. The above while loop ensures that consumption of all the items is done before the threads exit.This will be the case when producers run faster that consumers.

The above consumer is thread-safe and can be shared multiple producers such that each producer may concurrently call consumer.consume() without bothering about synchronization and other multi-threading caveats. Producers just need to submit an implementation of the Item interface whose process() method will contain the logic of how the consumption will be done.

As a bonus for reading the post I put forward a test program that demonstrates how to use the above classes:


package com.maximus.consumer;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStreamReader;

public class Test
{
 public static void main(String[] args) throws Exception
        {
         Consumer consumer = new ConsumerImpl(10);
         
         BufferedReader br = 
          new BufferedReader(
          new InputStreamReader(
          new FileInputStream(
          new File(args[0]))));
         
         String line = "";
         
         while((line = br.readLine()) != null)
         {
          System.out.println(
           "Producer producing: " + line);
          consumer.consume(new PrintJob(line));
         }
         
         consumer.finishConsumption();
        }
}

class PrintJob implements Item
{
 private String line;
 
 public PrintJob(String s)
 {
  line = s;
 }
 
 public void process()
 {
  System.out.println(
   Thread.currentThread().getName() + 
   " consuming :" + line);
 }
}
The above consumer can be tweaked in a host of different ways to make it more flexible. We can define what the consumer will do when production is done. It may be tweaked to allow batch processing but I leave that to the user. Feel free to use it and twist it in whatever way you want.

Happy coding!

25 comments:

  1. I think setting shutdownCalled to true in ConsumerImpl.finishConsumption() is missing. (Otherwise what's the point of having that private instance variable?) Besides that it's a nice one.

    ReplyDelete
  2. how to Solve Producer Consumer Problem
    http://www.youtube.com/watch?v=dUwboVZ59KM

    ReplyDelete
  3. Hi, I am getting an ArrayIndexoutofbound Execption at file(args[0]). can you help?

    ReplyDelete
  4. To get the information on the error and to learn java script in depth visit Nettechindia. Which is an IT institute located in Mumbai.

    ReplyDelete
  5. Truely a very good article on how to handle the future technology. After reading your post,thanks for taking the time to discuss this, I feel happy about and I love learning more about this topic. keep sharing your information regularly for my future reference. This content creates a new hope and inspiration with in me. Thanks for sharing article like this. The way you have stated everything above is quite awesome. Keep blogging like this. Thanks.
    Android training in chennai

    ReplyDelete
  6. Wonderful blog.. Thanks for sharing informative blog.. its very useful to me..

    iOS Training in Chennai

    ReplyDelete
  7. I just see the post i am so happy to the communication science post of information's.So I have really enjoyed and reading your blogs for these posts.Any way I’ll be replay for your great thinks and I hope you post again soon.

    digital marketing course in chennai

    ReplyDelete
  8. Hi I read your post very carefully and I think you are right that a well written post should be at least a 100 words and should capture the essence of your blog, book or article.

    Car Wash Services in Mumbai

    ReplyDelete
  9. Great site for these post and i am seeing the most of contents have useful for my Carrier.Thanks to such a useful information.Any information are commands like to share him.
    GMAT coaching chennai

    ReplyDelete
  10. Thanks for this blog. provided great information. All the details are explained clearly with the great explanation. Thanks for this wonderful blog. Step by step processes execution are given clearly.Know the details about different thing.
    Web Development Company in India

    ReplyDelete
  11. Thank you for sharing such a nice and interesting blog with us. I have seen that all will say the same thing repeatedly. But in your blog, I had a chance to get some useful and unique information. I would like to suggest your blog in my dude circle.
    SEO Company in Chennai

    ReplyDelete
  12. Great and useful article. Creating content regularly is very tough. Your points are motivated me to move on
    SEO Company in Chennai

    ReplyDelete
  13. well and nice tips about web culture. it really good for everything should know before to reach a target. keep update more updates about web info. thanks.
    Best CAT Coaching in Chennai

    ReplyDelete
  14. Learning new technolgy would help oneself at hard part of their career. And staying updated is the only way to survive in current position. Your content tells the same. Thanks for sharing this information in here. Keep blogging like this. Android Training in Chennai

    ReplyDelete
  15. Finding the time and actual effort to create a superb article like this is great thing. I’ll learn many new stuff right here! Good luck for the next post buddy..
    Fresher Jobs
    Fresher Opening

    ReplyDelete
  16. Thank you for taking the time to provide us with your valuable information. We strive to provide our candidates with excellent care and we take your comments to heart.As always, we appreciate your confidence and trust in us
    Digital Marketing Company in India

    ReplyDelete
  17. Thank you for your post. I found your blog more informative and useful.

    IOS training in chennai

    ReplyDelete

  18. I am expecting more interesting topics from you. And this was nice content and definitely it will be useful for many people.

    Android App Development Company

    ReplyDelete
  19. Its very useful to me. Wonderful blog.. Thanks for sharing informative Post.

    Installment loans
    Payday loans
    Title loans

    ReplyDelete
  20. I wondered upon your blog and wanted to say that I have really enjoyed reading your blog posts. Any way I’ll be subscribing to your feed and I hope you post again soon.
    iOS App Development Company
    iOS App Development Company

    ReplyDelete
  21. This article is very much helpful and i hope this will be an useful information for the needed one. Keep on updating these kinds of informative things...
    Fitness SMS
    Fitness Text
    Salon SMS
    Salon Text
    Investor Relation SMS
    Investor Relation Text
    Mobile Marketing Services
    mobile marketing companies
    Sms API

    ReplyDelete
  22. great and nice blog thanks sharing..I just want to say that all the information you have given here is awesome...Thank you very much for this one.
    web design Company
    web development Company
    web design Company in chennai
    web development Company in chennai
    web design Company in India
    web development Company in India

    ReplyDelete
  23. I do trust all of the concepts you’ve presented on your post. They’re really convincing and will definitely work. Still, the posts are too brief for newbies. May you please extend them a little from subsequent time?Also, I’ve shared your website in my social networks.
    Office Interior Designers in Bangalore
    Office Interior Designers in Hyderabad

    ReplyDelete

  24. Its a wonderful post and very helpful, thanks for all this information. You are including better information regarding this topic in an effective way.Thank you so much

    Personal Installment Loans
    Title Car loan
    Cash Advance Loan

    ReplyDelete
  25. This is excellent information. It is amazing and wonderful to visit your site.Thanks for sharing this information,this is useful to me...
    Mobile Marketing Service
    Mobile Marketing Companies
    Texting API
    Sms API
    sms marketing

    ReplyDelete