1   package net.sf.tinyjee.jgroups.config;
2   
3   import net.sf.tinyjee.Prioritizeable;
4   import net.sf.tinyjee.Server;
5   import net.sf.tinyjee.config.Configurable;
6   import net.sf.tinyjee.config.Connection;
7   import net.sf.tinyjee.config.Property;
8   import net.sf.tinyjee.jgroups.JGroupsLoggingReceiverAdapter;
9   import org.eclipse.jetty.util.component.AbstractLifeCycle;
10  import org.jgroups.Channel;
11  import org.jgroups.JChannel;
12  import org.jgroups.auth.MD5Token;
13  import org.jgroups.protocols.AUTH;
14  import org.jgroups.protocols.ENCRYPT;
15  import org.jgroups.protocols.TP;
16  import org.jgroups.stack.Protocol;
17  import org.jgroups.stack.ProtocolStack;
18  import org.slf4j.Logger;
19  import org.slf4j.LoggerFactory;
20  
21  import java.net.InetAddress;
22  import java.net.UnknownHostException;
23  import java.util.*;
24  import java.util.concurrent.ConcurrentLinkedQueue;
25  import java.util.concurrent.TimeUnit;
26  
27  import static net.sf.tinyjee.jgroups.config.JGroupsConnection.*;
28  import static net.sf.tinyjee.jgroups.config.JGroupsConnection.DETAULT_ENCRYPTION_ALGORITHM;
29  import static net.sf.tinyjee.jgroups.config.JGroupsConnection.KEY_JGROUPS_CLUSTER_NAME;
30  import static org.jgroups.stack.ProtocolStack.ABOVE;
31  import static org.jgroups.stack.ProtocolStack.BELOW;
32  
33  /**
34   * Hooks into the Server LifeCycle and Configuration
35   *
36   * @author Juergen_Kellerer, 2011-02-10
37   * @version 1.0
38   */
39  public class JGroupsConfigurator extends AbstractLifeCycle implements Configurable, Prioritizeable {
40  
41  	private static final Logger log = LoggerFactory.getLogger(JGroupsConfigurator.class);
42  
43  	private static final String JGROUPS_UDP_MCAST_ADDR = "jgroups.udp.mcast_addr";
44  	private static final String JGROUPS_UDP_IP_TTL = "jgroups.udp.ip_ttl";
45  	private static final String DEFAULT_JGROUPS_MULTICAST_IP_TTL = System.getProperty(JGROUPS_UDP_IP_TTL, "");
46  	private static final String DEFAULT_JGROUPS_MULTICAST_ADDRESS = System.getProperty(JGROUPS_UDP_MCAST_ADDR, "");
47  
48  	private static String machineId = System.getProperty("tinyjee.jgroups.machine.id");
49  
50  	static String getMachineId() {
51  		if (machineId == null) {
52  			try {
53  				machineId = InetAddress.getLocalHost().getCanonicalHostName();
54  			} catch (UnknownHostException e) {
55  				log.warn("Cannot create properly named JGroups channels as 'tinyjee.jgroups.machine.id' was not " +
56  						"specified as system property and auto-detection failed with.", e);
57  				machineId = "";
58  			}
59  		}
60  		return machineId;
61  	}
62  
63  	private final static Object lock = new Object();
64  	private final Queue<JChannel> pendingChannels = new ConcurrentLinkedQueue<JChannel>();
65  	private final Map<JChannel, Connection> channels = new IdentityHashMap<JChannel, Connection>();
66  
67  	/**
68  	 * {@inheritDoc}
69  	 */
70  	public int getPriority() {
71  		// Make sure this configurable is scheduled before others.
72  		return 1;
73  	}
74  
75  	/**
76  	 * {@inheritDoc}
77  	 */
78  	public void configure() {
79  		for (Connection connection : JGroupsConnection.getJGroupsConnections())
80  			synchronized (lock) {
81  				try {
82  					// Export some parameters to the sys environment to allow
83  					// their usage inside the jgroups properties.
84  					exportConnectionAddressAndPort(connection);
85  
86  					// Create a new channel
87  					String properties = connection.getPropertyValue(KEY_JGROUPS_PROPERTIES, DEFAULT_PROPERTIES);
88  					if (!properties.endsWith(".xml") && !properties.contains("://"))
89  						properties += ".xml";
90  					JChannel channel = new JChannel(properties);
91  					channel.setName(connection.getName() + "@" + getMachineId());
92  
93  					// Configure a view logger (used until the receiver is changed to something else)
94  					channel.setReceiver(new JGroupsLoggingReceiverAdapter());
95  
96  					// Setup the logging of discarded messages.
97  					for (Protocol protocol : channel.getProtocolStack().getProtocols()) {
98  						if (protocol instanceof TP) {
99  							Property property = connection.getProperty(KEY_JGROUPS_LOG_DISCARDED_MESSAGES);
100 							if (property != null)
101 								((TP) protocol).setLogDiscardMessages(property.getValue().getBoolean());
102 							if (connection.getPortNumber() != -1)
103 								((TP) protocol).setBindPort(connection.getPortNumber());
104 						}
105 					}
106 
107 					try {
108 						addAuthProtocolIfNeeded(connection, channel);
109 					} catch (Exception e) {
110 						log.error("TJEE-00840:Failed to enable pre-shared key authentication on " +
111 								"jgroups connection " + toString(connection) + ". " +
112 								"Not initializing the channel to prevent data integrity problems.", e);
113 						continue;
114 					}
115 
116 					try {
117 						addEncryptionProtocolIfNeeded(connection, channel);
118 					} catch (Exception e) {
119 						log.error("TJEE-00890:Failed to enable encryption on " +
120 								"jgroups connection " + toString(connection) + ". " +
121 								"Not initializing the channel to prevent data integrity problems.", e);
122 						continue;
123 					}
124 
125 					channels.put(channel, connection);
126 					pendingChannels.offer(channel);
127 
128 					connection.bindRelatedInstanceToJNDI(channel);
129 
130 				} catch (Exception e) {
131 					log.error("TJEE-00790:Failed creating the jgroups channel for connection " +
132 							toString(connection), e);
133 				}
134 			}
135 
136 		// Make sure we connect and disconnect the jgroups channels with the Server's life-cycle.
137 		Server.addToLifeCycle(this);
138 	}
139 
140 	/**
141 	 * {@inheritDoc}
142 	 */
143 	public void reconfigure() throws Exception {
144 		doStop();
145 		configure();
146 		doStart();
147 	}
148 
149 	private void exportConnectionAddressAndPort(Connection connection) {
150 		updateSystemProperty("jgroups.bind_address", connection.getAddressValue());
151 		updateSystemProperty(JGROUPS_UDP_IP_TTL,
152 				connection.getPropertyValue(KEY_JGROUPS_MULTICAST_IP_TTL, DEFAULT_JGROUPS_MULTICAST_IP_TTL));
153 		updateSystemProperty(JGROUPS_UDP_MCAST_ADDR,
154 				connection.getPropertyValue(KEY_JGROUPS_MULTICAST_ADDRESS, DEFAULT_JGROUPS_MULTICAST_ADDRESS));
155 		if (connection.getPortNumber() != -1)
156 			System.setProperty("jgroups.port", String.valueOf(connection.getPortNumber()));
157 	}
158 
159 	private void updateSystemProperty(String key, String value) {
160 		if (value != null && !value.isEmpty())
161 			System.setProperty(key, value);
162 		else
163 			System.getProperties().remove(key);
164 	}
165 
166 	private void addAuthProtocolIfNeeded(Connection connection, JChannel channel) throws Exception {
167 		final String authToken = connection.getPropertyValue(KEY_JGROUPS_AUTHENTICATION_KEY, "");
168 		if (!authToken.isEmpty()) {
169 			final ProtocolStack stack = channel.getProtocolStack();
170 
171 			final boolean isAuthConfigured = null != stack.findProtocol(AUTH.class);
172 			if (!isAuthConfigured) {
173 				AUTH auth = new AUTH();
174 				MD5Token token = new MD5Token(authToken);
175 				token.setAuth(auth);
176 				auth.setValue("auth_token", token);
177 
178 				// Placing auth below GMS => joining a cluster is only possible when AUTH succeeded.
179 				Protocol anchor = findAnchorProtocol(connection, stack, "GMS");
180 				stack.insertProtocol(auth, BELOW, anchor.getClass());
181 				stack.init();
182 			}
183 
184 			if (log.isDebugEnabled())
185 				if (isAuthConfigured) {
186 					log.debug("Not changing existing auth configuration " +
187 							"used for jgroups connection '{}'", toString(connection));
188 				} else {
189 					log.debug("Configuring SHA1 pre-shared key authentication " +
190 							"for jgroups connection '{}'", toString(connection));
191 				}
192 		}
193 	}
194 
195 	private void addEncryptionProtocolIfNeeded(Connection connection, JChannel channel) throws Exception {
196 		final String encryptionAlgorithm = connection.getPropertyValue(
197 				KEY_JGROUPS_ENCRYPTION_ALGORITHM, DETAULT_ENCRYPTION_ALGORITHM);
198 		if (!DETAULT_ENCRYPTION_ALGORITHM.equalsIgnoreCase(encryptionAlgorithm)) {
199 			final ProtocolStack stack = channel.getProtocolStack();
200 
201 			final boolean isEncryptionConfigured = null != stack.findProtocol(ENCRYPT.class);
202 			if (!isEncryptionConfigured) {
203 				ENCRYPT encrypt = new ENCRYPT();
204 				encrypt.setValue("symAlgorithm", encryptionAlgorithm);
205 
206 				// Placing encryption above auth  => keys are exchanged only when AUTH succeeded.
207 				Protocol anchor = findAnchorProtocol(connection, stack, "AUTH");
208 				stack.insertProtocol(encrypt, ABOVE, anchor.getClass());
209 				stack.init();
210 			}
211 
212 			if (log.isDebugEnabled())
213 				if (isEncryptionConfigured) {
214 					log.debug("Not changing existing encryption configuration " +
215 							"used for jgroups connection '{}'", toString(connection));
216 				} else {
217 					log.debug("Configuring '{}' encryption " +
218 							"for jgroups connection '{}'", encryptionAlgorithm, toString(connection));
219 				}
220 		}
221 	}
222 
223 	private Protocol findAnchorProtocol(
224 			Connection connection, ProtocolStack stack, String protocolName) throws Exception {
225 		Protocol anchor = null;
226 		for (Protocol protocol : stack.getProtocols())
227 			if (protocol.getName().contains(protocolName))
228 				anchor = protocol;
229 
230 		if (anchor == null) {
231 			throw new Exception("Cannot configure jgroups connection " + toString(connection) +
232 					" with the configured features (auth or encryption) as no " + protocolName +
233 					" protocol was specified inside the given jgroups protocol stack.");
234 		}
235 		return anchor;
236 	}
237 
238 	/**
239 	 * {@inheritDoc}
240 	 */
241 	@Override
242 	protected void doStart() throws Exception {
243 		if (connectionThread != null)
244 			connectionThread.interrupt();
245 
246 		connectPendingChannels();
247 
248 		if (!pendingChannels.isEmpty()) {
249 			connectionThread = new Thread(connectPendingChannelsClosure, "Asynchronous JGroups Connection Thread");
250 			connectionThread.setDaemon(true);
251 			connectionThread.start();
252 		}
253 	}
254 
255 	private Thread connectionThread;
256 	private Runnable connectPendingChannelsClosure = new Runnable() {
257 		public void run() {
258 			int retryTime = 0;
259 			while (!pendingChannels.isEmpty()) {
260 				try {
261 					Thread.sleep(TimeUnit.MINUTES.toMillis(retryTime++));
262 				} catch (InterruptedException e) {
263 					return;
264 				}
265 
266 				connectPendingChannels();
267 			}
268 		}
269 	};
270 
271 	private synchronized void connectPendingChannels() {
272 		final List<JChannel> failed = new ArrayList<JChannel>();
273 
274 		JChannel channel;
275 		while ((channel = pendingChannels.poll()) != null) {
276 			final Connection connection = channels.get(channel);
277 
278 			String clusterName = connection.getPropertyValue(KEY_JGROUPS_CLUSTER_NAME, "");
279 			if (clusterName.isEmpty()) {
280 				log.error("TJEE-00810:Cannot start jgroups channel. " +
281 						"A cluster name is missing for the connection '{}'.", toString(connection));
282 			}
283 
284 			try {
285 				if (log.isDebugEnabled())
286 					log.debug("About to connect jgroups channel '{}'", toString(channel));
287 
288 				channel.connect(clusterName);
289 				log.info("TJEE-00860:Successfully connected the jgroups channel '{}'", toString(channel));
290 
291 				if (channel.getReceiver() instanceof JGroupsLoggingReceiverAdapter)
292 					((JGroupsLoggingReceiverAdapter) channel.getReceiver()).setLocalAddress(channel.getAddress());
293 			} catch (Exception e) {
294 				log.error("TJEE-00820:Failed connecting jgroups channel '" + channel.getName() +
295 						"' with cluster '" + clusterName + "'", e);
296 				failed.add(channel);
297 			}
298 		}
299 
300 		pendingChannels.addAll(failed);
301 	}
302 
303 	/**
304 	 * {@inheritDoc}
305 	 */
306 	@Override
307 	protected void doStop() throws Exception {
308 		pendingChannels.clear();
309 		for (Map.Entry<JChannel, Connection> entry : channels.entrySet()) {
310 			final JChannel channel = entry.getKey();
311 			final Connection connection = entry.getValue();
312 
313 			try {
314 				channel.disconnect();
315 				channel.close();
316 				log.info("TJEE-00870:Successfully closed the jgroups channel '{}'", toString(channel));
317 				connection.unbindRelatedInstance();
318 			} catch (Exception e) {
319 				log.error("TJEE-00800:Failed closing the jgroups channel '" + toString(channel) + "'.", e);
320 			}
321 		}
322 
323 		channels.clear();
324 	}
325 
326 	private static String toString(Connection connection) {
327 		return connection.getName() + "/" +
328 				connection.getPropertyValue(KEY_JGROUPS_CLUSTER_NAME, "!!NO-CLUSTER-NAME!!");
329 	}
330 
331 	private static String toString(Channel channel) {
332 		return channel.getName() + "/" + (channel.isConnected() ?
333 				channel.getClusterName() + "/" + channel.getAddress() :
334 				"unconnected");
335 	}
336 }