1
22
23 package com.liferay.portlet.messageboards.service.jms;
24
25 import com.liferay.mail.service.MailServiceUtil;
26 import com.liferay.portal.NoSuchUserException;
27 import com.liferay.portal.kernel.mail.MailMessage;
28 import com.liferay.portal.kernel.util.GetterUtil;
29 import com.liferay.portal.kernel.util.StringUtil;
30 import com.liferay.portal.model.Subscription;
31 import com.liferay.portal.model.User;
32 import com.liferay.portal.service.SubscriptionLocalServiceUtil;
33 import com.liferay.portal.service.UserLocalServiceUtil;
34 import com.liferay.portlet.messageboards.model.MBCategory;
35 import com.liferay.portlet.messageboards.model.MBThread;
36 import com.liferay.util.CollectionFactory;
37
38 import java.util.List;
39 import java.util.Set;
40
41 import javax.jms.Message;
42 import javax.jms.MessageListener;
43 import javax.jms.ObjectMessage;
44 import javax.jms.Queue;
45 import javax.jms.QueueConnection;
46 import javax.jms.QueueConnectionFactory;
47 import javax.jms.QueueReceiver;
48 import javax.jms.QueueSession;
49 import javax.jms.Session;
50
51 import javax.mail.internet.InternetAddress;
52
53 import org.apache.commons.logging.Log;
54 import org.apache.commons.logging.LogFactory;
55
56
62 public class MBMessageConsumer implements MessageListener {
63
64 public void consume() {
65 try {
66 QueueConnectionFactory qcf = MBMessageQCFUtil.getQCF();
67 QueueConnection con = qcf.createQueueConnection();
68
69 QueueSession session = con.createQueueSession(
70 false, Session.AUTO_ACKNOWLEDGE);
71 Queue queue = (Queue)MBMessageQueueUtil.getQueue();
72
73 QueueReceiver subscriber = session.createReceiver(queue);
74
75 subscriber.setMessageListener(this);
76
77 con.start();
78 }
79 catch (Exception e) {
80 _log.error(e);
81 }
82 }
83
84 public void onMessage(Message msg) {
85 try {
86 ObjectMessage objMsg = (ObjectMessage)msg;
87
88 String[] array = (String[])objMsg.getObject();
89
90 _onMessage(array);
91 }
92 catch (Exception e) {
93 _log.error("Error sending message board notifications", e);
94 }
95 }
96
97 private void _onMessage(String[] array) throws Exception {
98 long companyId = GetterUtil.getLong(array[0]);
99 long userId = GetterUtil.getLong(array[1]);
100 String[] categoryIds = StringUtil.split(array[2]);
101 String threadId = array[3];
102 String fromName = array[4];
103 String fromAddress = array[5];
104 String subject = array[6];
105 String body = array[7];
106 String replyToAddress = array[8];
107 String messageId = array[9];
108 String inReplyTo = array[10];
109
110 Set sent = CollectionFactory.getHashSet();
111
112 if (_log.isInfoEnabled()) {
113 _log.info(
114 "Sending notifications for {messageId=" + messageId +
115 ", threadId=" + threadId + ", categoryIds=" + array[2] +
116 "}");
117 }
118
119
121 List subscriptions = SubscriptionLocalServiceUtil.getSubscriptions(
122 companyId, MBThread.class.getName(), GetterUtil.getLong(threadId));
123
124 _sendEmail(
125 userId, fromName, fromAddress, subject, body, subscriptions, sent,
126 replyToAddress, messageId, inReplyTo);
127
128
130 for (int i = 0; i < categoryIds.length; i++) {
131 subscriptions = SubscriptionLocalServiceUtil.getSubscriptions(
132 companyId, MBCategory.class.getName(),
133 GetterUtil.getLong(categoryIds[i]));
134
135 _sendEmail(
136 userId, fromName, fromAddress, subject, body, subscriptions,
137 sent, replyToAddress, messageId, inReplyTo);
138 }
139
140 if (_log.isInfoEnabled()) {
141 _log.info("Finished sending notifications");
142 }
143 }
144
145 private void _sendEmail(
146 long userId, String fromName, String fromAddress, String subject,
147 String body, List subscriptions, Set sent, String replyToAddress,
148 String messageId, String inReplyTo)
149 throws Exception {
150
151 for (int i = 0; i < subscriptions.size(); i++) {
152 Subscription subscription = (Subscription)subscriptions.get(i);
153
154 Long subscribedUserId = new Long(subscription.getUserId());
155
156 if (sent.contains(subscribedUserId)) {
157 if (_log.isDebugEnabled()) {
158 _log.debug(
159 "Do not send a duplicate email to user " +
160 subscribedUserId);
161 }
162
163 continue;
164 }
165 else {
166 if (_log.isDebugEnabled()) {
167 _log.debug(
168 "Add user " + subscribedUserId +
169 " to the list of users who have received an email");
170 }
171
172 sent.add(subscribedUserId);
173 }
174
175 User user = null;
176
177 try {
178 user = UserLocalServiceUtil.getUserById(
179 subscription.getUserId());
180 }
181 catch (NoSuchUserException nsue) {
182 if (_log.isInfoEnabled()) {
183 _log.info(
184 "Subscription " + subscription.getSubscriptionId() +
185 " is stale and will be deleted");
186 }
187
188 SubscriptionLocalServiceUtil.deleteSubscription(
189 subscription.getSubscriptionId());
190
191 continue;
192 }
193
194 try {
195 InternetAddress from = new InternetAddress(
196 fromAddress, fromName);
197
198 InternetAddress to = new InternetAddress(
199 user.getEmailAddress(), user.getFullName());
200
201 String curSubject = StringUtil.replace(
202 subject,
203 new String[] {
204 "[$TO_ADDRESS$]",
205 "[$TO_NAME$]"
206 },
207 new String[] {
208 user.getFullName(),
209 user.getEmailAddress()
210 });
211
212 String curBody = StringUtil.replace(
213 body,
214 new String[] {
215 "[$TO_ADDRESS$]",
216 "[$TO_NAME$]"
217 },
218 new String[] {
219 user.getFullName(),
220 user.getEmailAddress()
221 });
222
223 InternetAddress replyTo = new InternetAddress(
224 replyToAddress, replyToAddress);
225
226 MailMessage message = new MailMessage(
227 from, to, curSubject, curBody, false);
228
229 message.setReplyTo(new InternetAddress[]{replyTo});
230 message.setMessageId(messageId);
231 message.setInReplyTo(inReplyTo);
232
233 MailServiceUtil.sendEmail(message);
234 }
235 catch (Exception e) {
236 _log.error(e);
237 }
238 }
239 }
240
241 private static Log _log = LogFactory.getLog(MBMessageConsumer.class);
242
243 }