1
22
23 package com.liferay.util.transport;
24
25 import java.io.IOException;
26
27 import java.net.DatagramPacket;
28 import java.net.InetAddress;
29 import java.net.MulticastSocket;
30
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33
34
46 public class MulticastTransport extends Thread implements Transport {
47
48 public MulticastTransport(DatagramHandler handler, String host, int port) {
49 super("MulticastListener-" + host + port);
50
51 setDaemon(true);
52 _handler = handler;
53 _host = host;
54 _port = port;
55 }
56
57 public synchronized void connect() throws IOException {
58 if (_socket == null) {
59 _socket = new MulticastSocket(_port);
60 }
61 else if (_socket.isConnected() && _socket.isBound()) {
62 return;
63 }
64
65 _address = InetAddress.getByName(_host);
66
67 _socket.joinGroup(_address);
68
69 _connected = true;
70
71 start();
72 }
73
74 public synchronized void disconnect() {
75
76
78 if (_address != null) {
79 try {
80 _socket.leaveGroup(_address);
81 _address = null;
82 }
83 catch (IOException e) {
84 _log.error("Unable to leave group", e);
85 }
86 }
87
88 _connected = false;
89
90 interrupt();
91
92 _socket.close();
93 }
94
95 public synchronized void sendMessage(String msg) throws IOException {
96 _outboundPacket.setData(msg.getBytes());
97 _outboundPacket.setAddress(_address);
98 _outboundPacket.setPort(_port);
99
100 _socket.send(_outboundPacket);
101 }
102
103 public boolean isConnected() {
104 return _connected;
105 }
106
107 public void run() {
108 try {
109 while (_connected) {
110 _socket.receive(_inboundPacket);
111 _handler.process(_inboundPacket);
112 }
113 }
114 catch (IOException e) {
115 _log.error("Unable to process ", e);
116
117 _socket.disconnect();
118
119 _connected = false;
120
121 _handler.errorReceived(e);
122 }
123 }
124
125 private static final Log _log = LogFactory.getLog(MulticastTransport.class);
126
127 private final byte[] _inboundBuffer = new byte[4096];
128
129 private final DatagramPacket _inboundPacket =
130 new DatagramPacket(_inboundBuffer, _inboundBuffer.length);
131
132 private final byte[] _outboundBuffer = new byte[4096];
133
134 private final DatagramPacket _outboundPacket =
135 new DatagramPacket(_outboundBuffer, _outboundBuffer.length);
136
137 private final String _host;
138 private final DatagramHandler _handler;
139 private final int _port;
140 private boolean _connected;
141 private MulticastSocket _socket;
142 private InetAddress _address;
143
144 }