001
014
015 package com.liferay.util.transport;
016
017 import java.io.IOException;
018
019 import java.net.DatagramPacket;
020 import java.net.InetAddress;
021 import java.net.MulticastSocket;
022
023 import org.apache.commons.logging.Log;
024 import org.apache.commons.logging.LogFactory;
025
026
035 public class MulticastTransport extends Thread implements Transport {
036
037 public MulticastTransport(DatagramHandler handler, String host, int port) {
038 super("MulticastListener-" + host + port);
039
040 setDaemon(true);
041 _handler = handler;
042 _host = host;
043 _port = port;
044 }
045
046 public synchronized void connect() throws IOException {
047 if (_socket == null) {
048 _socket = new MulticastSocket(_port);
049 }
050 else if (_socket.isConnected() && _socket.isBound()) {
051 return;
052 }
053
054 _address = InetAddress.getByName(_host);
055
056 _socket.joinGroup(_address);
057
058 _connected = true;
059
060 start();
061 }
062
063 public synchronized void disconnect() {
064
065
066
067 if (_address != null) {
068 try {
069 _socket.leaveGroup(_address);
070 _address = null;
071 }
072 catch (IOException e) {
073 _log.error("Unable to leave group", e);
074 }
075 }
076
077 _connected = false;
078
079 interrupt();
080
081 _socket.close();
082 }
083
084 public boolean isConnected() {
085 return _connected;
086 }
087
088 @Override
089 public void run() {
090 try {
091 while (_connected) {
092 _socket.receive(_inboundPacket);
093 _handler.process(_inboundPacket);
094 }
095 }
096 catch (IOException e) {
097 _log.error("Unable to process ", e);
098
099 _socket.disconnect();
100
101 _connected = false;
102
103 _handler.errorReceived(e);
104 }
105 }
106
107 public synchronized void sendMessage(String msg) throws IOException {
108 _outboundPacket.setData(msg.getBytes());
109 _outboundPacket.setAddress(_address);
110 _outboundPacket.setPort(_port);
111
112 _socket.send(_outboundPacket);
113 }
114
115 private static Log _log = LogFactory.getLog(MulticastTransport.class);
116
117 private InetAddress _address;
118 private boolean _connected;
119 private DatagramHandler _handler;
120 private String _host;
121 private byte[] _inboundBuffer = new byte[4096];
122 private DatagramPacket _inboundPacket = new DatagramPacket(
123 _inboundBuffer, _inboundBuffer.length);
124 private byte[] _outboundBuffer = new byte[4096];
125 private DatagramPacket _outboundPacket = new DatagramPacket(
126 _outboundBuffer, _outboundBuffer.length);
127 private int _port;
128 private MulticastSocket _socket;
129
130 }