001
014
015 package com.liferay.portal.kernel.messaging;
016
017 import com.liferay.portal.kernel.util.ThreadLocalRegistry;
018
019 import java.util.Set;
020 import java.util.concurrent.ThreadPoolExecutor;
021
022
030 public class ParallelDestination extends BaseDestination {
031
032 public ParallelDestination() {
033 }
034
035
038 public ParallelDestination(String name) {
039 super(name);
040 }
041
042
045 public ParallelDestination(
046 String name, int workersCoreSize, int workersMaxSize) {
047
048 super(name, workersCoreSize, workersMaxSize);
049 }
050
051 protected void dispatch(
052 Set<MessageListener> messageListeners, final Message message) {
053
054 ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
055
056 for (final MessageListener messageListener : messageListeners) {
057 Runnable runnable = new MessageRunnable(message) {
058
059 public void run() {
060 try {
061 messageListener.receive(message);
062 }
063 finally {
064 ThreadLocalRegistry.resetThreadLocals();
065 }
066 }
067
068 };
069
070 threadPoolExecutor.execute(runnable);
071 }
072 }
073
074 }