1
22
23 package com.liferay.portlet.messageboards.service.jms;
24
25 import com.liferay.mail.service.MailServiceUtil;
26 import com.liferay.portal.NoSuchUserException;
27 import com.liferay.portal.kernel.mail.MailMessage;
28 import com.liferay.portal.kernel.util.GetterUtil;
29 import com.liferay.portal.kernel.util.StringUtil;
30 import com.liferay.portal.model.Subscription;
31 import com.liferay.portal.model.User;
32 import com.liferay.portal.service.SubscriptionLocalServiceUtil;
33 import com.liferay.portal.service.UserLocalServiceUtil;
34 import com.liferay.portlet.messageboards.model.MBCategory;
35 import com.liferay.portlet.messageboards.model.MBThread;
36 import com.liferay.portlet.messageboards.util.BBCodeUtil;
37
38 import java.util.ArrayList;
39 import java.util.HashSet;
40 import java.util.List;
41 import java.util.Set;
42
43 import javax.jms.Message;
44 import javax.jms.MessageListener;
45 import javax.jms.ObjectMessage;
46 import javax.jms.Queue;
47 import javax.jms.QueueConnection;
48 import javax.jms.QueueConnectionFactory;
49 import javax.jms.QueueReceiver;
50 import javax.jms.QueueSession;
51 import javax.jms.Session;
52
53 import javax.mail.internet.InternetAddress;
54
55 import org.apache.commons.logging.Log;
56 import org.apache.commons.logging.LogFactory;
57
58
64 public class MBMessageConsumer implements MessageListener {
65
66 public void consume() {
67 try {
68 QueueConnectionFactory qcf = MBMessageQCFUtil.getQCF();
69 QueueConnection con = qcf.createQueueConnection();
70
71 QueueSession session = con.createQueueSession(
72 false, Session.AUTO_ACKNOWLEDGE);
73 Queue queue = MBMessageQueueUtil.getQueue();
74
75 QueueReceiver subscriber = session.createReceiver(queue);
76
77 subscriber.setMessageListener(this);
78
79 con.start();
80 }
81 catch (Exception e) {
82 _log.error(e);
83 }
84 }
85
86 public void onMessage(Message msg) {
87 try {
88 ObjectMessage objMsg = (ObjectMessage)msg;
89
90 String[] array = (String[])objMsg.getObject();
91
92 _onMessage(array);
93 }
94 catch (Exception e) {
95 _log.error("Error sending message board notifications", e);
96 }
97 }
98
99 private void _onMessage(String[] array) throws Exception {
100 long companyId = GetterUtil.getLong(array[0]);
101 long userId = GetterUtil.getLong(array[1]);
102 String[] categoryIds = StringUtil.split(array[2]);
103 String threadId = array[3];
104 String fromName = array[4];
105 String fromAddress = array[5];
106 String subject = array[6];
107 String body = array[7];
108 String replyToAddress = array[8];
109 String mailId = array[9];
110 String inReplyTo = array[10];
111 boolean htmlFormat = GetterUtil.getBoolean(array[11]);
112
113 Set<Long> sent = new HashSet<Long>();
114
115 if (_log.isInfoEnabled()) {
116 _log.info(
117 "Sending notifications for {mailId=" + mailId + ", threadId=" +
118 threadId + ", categoryIds=" + array[2] + "}");
119 }
120
121
123 List<Subscription> subscriptions =
124 SubscriptionLocalServiceUtil.getSubscriptions(
125 companyId, MBThread.class.getName(),
126 GetterUtil.getLong(threadId));
127
128 _sendEmail(
129 userId, fromName, fromAddress, subject, body, subscriptions, sent,
130 replyToAddress, mailId, inReplyTo, htmlFormat);
131
132
134 for (int i = 0; i < categoryIds.length; i++) {
135 subscriptions = SubscriptionLocalServiceUtil.getSubscriptions(
136 companyId, MBCategory.class.getName(),
137 GetterUtil.getLong(categoryIds[i]));
138
139 _sendEmail(
140 userId, fromName, fromAddress, subject, body, subscriptions,
141 sent, replyToAddress, mailId, inReplyTo, htmlFormat);
142 }
143
144 if (_log.isInfoEnabled()) {
145 _log.info("Finished sending notifications");
146 }
147 }
148
149 private void _sendEmail(
150 long userId, String fromName, String fromAddress, String subject,
151 String body, List<Subscription> subscriptions, Set<Long> sent,
152 String replyToAddress, String mailId, String inReplyTo,
153 boolean htmlFormat)
154 throws Exception {
155
156 List<InternetAddress> addresses =
157 new ArrayList<InternetAddress>();
158
159 for (Subscription subscription : subscriptions) {
160 long subscribedUserId = subscription.getUserId();
161
162 if (sent.contains(subscribedUserId)) {
163 if (_log.isDebugEnabled()) {
164 _log.debug(
165 "Do not send a duplicate email to user " +
166 subscribedUserId);
167 }
168
169 continue;
170 }
171 else {
172 if (_log.isDebugEnabled()) {
173 _log.debug(
174 "Add user " + subscribedUserId +
175 " to the list of users who have received an email");
176 }
177
178 sent.add(subscribedUserId);
179 }
180
181 User user = null;
182
183 try {
184 user = UserLocalServiceUtil.getUserById(
185 subscription.getUserId());
186 }
187 catch (NoSuchUserException nsue) {
188 if (_log.isInfoEnabled()) {
189 _log.info(
190 "Subscription " + subscription.getSubscriptionId() +
191 " is stale and will be deleted");
192 }
193
194 SubscriptionLocalServiceUtil.deleteSubscription(
195 subscription.getSubscriptionId());
196
197 continue;
198 }
199
200 InternetAddress userAddress = new InternetAddress(
201 user.getEmailAddress(), user.getFullName());
202
203 addresses.add(userAddress);
204 }
205
206 try {
207 InternetAddress[] bulkAddresses = addresses.toArray(
208 new InternetAddress[addresses.size()]);
209
210 if (bulkAddresses.length == 0) {
211 return;
212 }
213
214 InternetAddress from = new InternetAddress(fromAddress, fromName);
215
216 InternetAddress[] to = new InternetAddress[] {
217 new InternetAddress(replyToAddress, replyToAddress)};
218
219 String curSubject = StringUtil.replace(
220 subject,
221 new String[] {
222 "[$TO_ADDRESS$]",
223 "[$TO_NAME$]"
224 },
225 new String[] {
226 replyToAddress,
227 replyToAddress
228 });
229
230 String curBody = StringUtil.replace(
231 body,
232 new String[] {
233 "[$TO_ADDRESS$]",
234 "[$TO_NAME$]"
235 },
236 new String[] {
237 replyToAddress,
238 replyToAddress
239 });
240
241 InternetAddress replyTo = new InternetAddress(
242 replyToAddress, replyToAddress);
243
244 if (htmlFormat) {
245 curBody = BBCodeUtil.getHTML(curBody);
246 }
247
248 MailMessage message = new MailMessage(
249 from, to, null, null, bulkAddresses, curSubject, curBody,
250 htmlFormat);
251
252 message.setMessageId(mailId);
253 message.setInReplyTo(inReplyTo);
254 message.setReplyTo(new InternetAddress[] {replyTo});
255
256 MailServiceUtil.sendEmail(message);
257 }
258 catch (Exception e) {
259 _log.error(e);
260 }
261 }
262
263 private static Log _log = LogFactory.getLog(MBMessageConsumer.class);
264
265 }