1
22
23 package com.liferay.util.transport;
24
25 import com.liferay.portal.kernel.log.Log;
26 import com.liferay.portal.kernel.log.LogFactoryUtil;
27
28 import java.io.IOException;
29
30 import java.net.DatagramPacket;
31 import java.net.InetAddress;
32 import java.net.MulticastSocket;
33
34
45 public class MulticastTransport extends Thread implements Transport {
46
47 public MulticastTransport(DatagramHandler handler, String host, int port) {
48 super("MulticastListener-" + host + port);
49
50 setDaemon(true);
51 _handler = handler;
52 _host = host;
53 _port = port;
54 }
55
56 public synchronized void connect() throws IOException {
57 if (_socket == null) {
58 _socket = new MulticastSocket(_port);
59 }
60 else if (_socket.isConnected() && _socket.isBound()) {
61 return;
62 }
63
64 _address = InetAddress.getByName(_host);
65
66 _socket.joinGroup(_address);
67
68 _connected = true;
69
70 start();
71 }
72
73 public synchronized void disconnect() {
74
75
77 if (_address != null) {
78 try {
79 _socket.leaveGroup(_address);
80 _address = null;
81 }
82 catch (IOException e) {
83 _log.error("Unable to leave group", e);
84 }
85 }
86
87 _connected = false;
88
89 interrupt();
90
91 _socket.close();
92 }
93
94 public synchronized void sendMessage(String msg) throws IOException {
95 _outboundPacket.setData(msg.getBytes());
96 _outboundPacket.setAddress(_address);
97 _outboundPacket.setPort(_port);
98
99 _socket.send(_outboundPacket);
100 }
101
102 public boolean isConnected() {
103 return _connected;
104 }
105
106 public void run() {
107 try {
108 while (_connected) {
109 _socket.receive(_inboundPacket);
110 _handler.process(_inboundPacket);
111 }
112 }
113 catch (IOException e) {
114 _log.error("Unable to process ", e);
115
116 _socket.disconnect();
117
118 _connected = false;
119
120 _handler.errorReceived(e);
121 }
122 }
123
124 private static Log _log = LogFactoryUtil.getLog(MulticastTransport.class);
125
126 private byte[] _inboundBuffer = new byte[4096];
127 private DatagramPacket _inboundPacket =
128 new DatagramPacket(_inboundBuffer, _inboundBuffer.length);
129 private byte[] _outboundBuffer = new byte[4096];
130 private DatagramPacket _outboundPacket =
131 new DatagramPacket(_outboundBuffer, _outboundBuffer.length);
132 private String _host;
133 private DatagramHandler _handler;
134 private int _port;
135 private boolean _connected;
136 private MulticastSocket _socket;
137 private InetAddress _address;
138
139 }