1 package net.sf.tinyjee.jgroups.config;
2
3 import net.sf.tinyjee.Prioritizeable;
4 import net.sf.tinyjee.Server;
5 import net.sf.tinyjee.config.Configurable;
6 import net.sf.tinyjee.config.Connection;
7 import net.sf.tinyjee.config.Property;
8 import net.sf.tinyjee.jgroups.JGroupsLoggingReceiverAdapter;
9 import org.eclipse.jetty.util.component.AbstractLifeCycle;
10 import org.jgroups.Channel;
11 import org.jgroups.JChannel;
12 import org.jgroups.auth.MD5Token;
13 import org.jgroups.protocols.AUTH;
14 import org.jgroups.protocols.ENCRYPT;
15 import org.jgroups.protocols.TP;
16 import org.jgroups.stack.Protocol;
17 import org.jgroups.stack.ProtocolStack;
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20
21 import java.net.InetAddress;
22 import java.net.UnknownHostException;
23 import java.util.*;
24 import java.util.concurrent.ConcurrentLinkedQueue;
25 import java.util.concurrent.TimeUnit;
26
27 import static net.sf.tinyjee.jgroups.config.JGroupsConnection.*;
28 import static net.sf.tinyjee.jgroups.config.JGroupsConnection.DETAULT_ENCRYPTION_ALGORITHM;
29 import static net.sf.tinyjee.jgroups.config.JGroupsConnection.KEY_JGROUPS_CLUSTER_NAME;
30 import static org.jgroups.stack.ProtocolStack.ABOVE;
31 import static org.jgroups.stack.ProtocolStack.BELOW;
32
33
34
35
36
37
38
39 public class JGroupsConfigurator extends AbstractLifeCycle implements Configurable, Prioritizeable {
40
41 private static final Logger log = LoggerFactory.getLogger(JGroupsConfigurator.class);
42
43 private static final String JGROUPS_UDP_MCAST_ADDR = "jgroups.udp.mcast_addr";
44 private static final String JGROUPS_UDP_IP_TTL = "jgroups.udp.ip_ttl";
45 private static final String DEFAULT_JGROUPS_MULTICAST_IP_TTL = System.getProperty(JGROUPS_UDP_IP_TTL, "");
46 private static final String DEFAULT_JGROUPS_MULTICAST_ADDRESS = System.getProperty(JGROUPS_UDP_MCAST_ADDR, "");
47
48 private static String machineId = System.getProperty("tinyjee.jgroups.machine.id");
49
50 static String getMachineId() {
51 if (machineId == null) {
52 try {
53 machineId = InetAddress.getLocalHost().getCanonicalHostName();
54 } catch (UnknownHostException e) {
55 log.warn("Cannot create properly named JGroups channels as 'tinyjee.jgroups.machine.id' was not " +
56 "specified as system property and auto-detection failed with.", e);
57 machineId = "";
58 }
59 }
60 return machineId;
61 }
62
63 private final static Object lock = new Object();
64 private final Queue<JChannel> pendingChannels = new ConcurrentLinkedQueue<JChannel>();
65 private final Map<JChannel, Connection> channels = new IdentityHashMap<JChannel, Connection>();
66
67
68
69
70 public int getPriority() {
71
72 return 1;
73 }
74
75
76
77
78 public void configure() {
79 for (Connection connection : JGroupsConnection.getJGroupsConnections())
80 synchronized (lock) {
81 try {
82
83
84 exportConnectionAddressAndPort(connection);
85
86
87 String properties = connection.getPropertyValue(KEY_JGROUPS_PROPERTIES, DEFAULT_PROPERTIES);
88 if (!properties.endsWith(".xml") && !properties.contains("://"))
89 properties += ".xml";
90 JChannel channel = new JChannel(properties);
91 channel.setName(connection.getName() + "@" + getMachineId());
92
93
94 channel.setReceiver(new JGroupsLoggingReceiverAdapter());
95
96
97 for (Protocol protocol : channel.getProtocolStack().getProtocols()) {
98 if (protocol instanceof TP) {
99 Property property = connection.getProperty(KEY_JGROUPS_LOG_DISCARDED_MESSAGES);
100 if (property != null)
101 ((TP) protocol).setLogDiscardMessages(property.getValue().getBoolean());
102 if (connection.getPortNumber() != -1)
103 ((TP) protocol).setBindPort(connection.getPortNumber());
104 }
105 }
106
107 try {
108 addAuthProtocolIfNeeded(connection, channel);
109 } catch (Exception e) {
110 log.error("TJEE-00840:Failed to enable pre-shared key authentication on " +
111 "jgroups connection " + toString(connection) + ". " +
112 "Not initializing the channel to prevent data integrity problems.", e);
113 continue;
114 }
115
116 try {
117 addEncryptionProtocolIfNeeded(connection, channel);
118 } catch (Exception e) {
119 log.error("TJEE-00890:Failed to enable encryption on " +
120 "jgroups connection " + toString(connection) + ". " +
121 "Not initializing the channel to prevent data integrity problems.", e);
122 continue;
123 }
124
125 channels.put(channel, connection);
126 pendingChannels.offer(channel);
127
128 connection.bindRelatedInstanceToJNDI(channel);
129
130 } catch (Exception e) {
131 log.error("TJEE-00790:Failed creating the jgroups channel for connection " +
132 toString(connection), e);
133 }
134 }
135
136
137 Server.addToLifeCycle(this);
138 }
139
140
141
142
143 public void reconfigure() throws Exception {
144 doStop();
145 configure();
146 doStart();
147 }
148
149 private void exportConnectionAddressAndPort(Connection connection) {
150 updateSystemProperty("jgroups.bind_address", connection.getAddressValue());
151 updateSystemProperty(JGROUPS_UDP_IP_TTL,
152 connection.getPropertyValue(KEY_JGROUPS_MULTICAST_IP_TTL, DEFAULT_JGROUPS_MULTICAST_IP_TTL));
153 updateSystemProperty(JGROUPS_UDP_MCAST_ADDR,
154 connection.getPropertyValue(KEY_JGROUPS_MULTICAST_ADDRESS, DEFAULT_JGROUPS_MULTICAST_ADDRESS));
155 if (connection.getPortNumber() != -1)
156 System.setProperty("jgroups.port", String.valueOf(connection.getPortNumber()));
157 }
158
159 private void updateSystemProperty(String key, String value) {
160 if (value != null && !value.isEmpty())
161 System.setProperty(key, value);
162 else
163 System.getProperties().remove(key);
164 }
165
166 private void addAuthProtocolIfNeeded(Connection connection, JChannel channel) throws Exception {
167 final String authToken = connection.getPropertyValue(KEY_JGROUPS_AUTHENTICATION_KEY, "");
168 if (!authToken.isEmpty()) {
169 final ProtocolStack stack = channel.getProtocolStack();
170
171 final boolean isAuthConfigured = null != stack.findProtocol(AUTH.class);
172 if (!isAuthConfigured) {
173 AUTH auth = new AUTH();
174 MD5Token token = new MD5Token(authToken);
175 token.setAuth(auth);
176 auth.setValue("auth_token", token);
177
178
179 Protocol anchor = findAnchorProtocol(connection, stack, "GMS");
180 stack.insertProtocol(auth, BELOW, anchor.getClass());
181 stack.init();
182 }
183
184 if (log.isDebugEnabled())
185 if (isAuthConfigured) {
186 log.debug("Not changing existing auth configuration " +
187 "used for jgroups connection '{}'", toString(connection));
188 } else {
189 log.debug("Configuring SHA1 pre-shared key authentication " +
190 "for jgroups connection '{}'", toString(connection));
191 }
192 }
193 }
194
195 private void addEncryptionProtocolIfNeeded(Connection connection, JChannel channel) throws Exception {
196 final String encryptionAlgorithm = connection.getPropertyValue(
197 KEY_JGROUPS_ENCRYPTION_ALGORITHM, DETAULT_ENCRYPTION_ALGORITHM);
198 if (!DETAULT_ENCRYPTION_ALGORITHM.equalsIgnoreCase(encryptionAlgorithm)) {
199 final ProtocolStack stack = channel.getProtocolStack();
200
201 final boolean isEncryptionConfigured = null != stack.findProtocol(ENCRYPT.class);
202 if (!isEncryptionConfigured) {
203 ENCRYPT encrypt = new ENCRYPT();
204 encrypt.setValue("symAlgorithm", encryptionAlgorithm);
205
206
207 Protocol anchor = findAnchorProtocol(connection, stack, "AUTH");
208 stack.insertProtocol(encrypt, ABOVE, anchor.getClass());
209 stack.init();
210 }
211
212 if (log.isDebugEnabled())
213 if (isEncryptionConfigured) {
214 log.debug("Not changing existing encryption configuration " +
215 "used for jgroups connection '{}'", toString(connection));
216 } else {
217 log.debug("Configuring '{}' encryption " +
218 "for jgroups connection '{}'", encryptionAlgorithm, toString(connection));
219 }
220 }
221 }
222
223 private Protocol findAnchorProtocol(
224 Connection connection, ProtocolStack stack, String protocolName) throws Exception {
225 Protocol anchor = null;
226 for (Protocol protocol : stack.getProtocols())
227 if (protocol.getName().contains(protocolName))
228 anchor = protocol;
229
230 if (anchor == null) {
231 throw new Exception("Cannot configure jgroups connection " + toString(connection) +
232 " with the configured features (auth or encryption) as no " + protocolName +
233 " protocol was specified inside the given jgroups protocol stack.");
234 }
235 return anchor;
236 }
237
238
239
240
241 @Override
242 protected void doStart() throws Exception {
243 if (connectionThread != null)
244 connectionThread.interrupt();
245
246 connectPendingChannels();
247
248 if (!pendingChannels.isEmpty()) {
249 connectionThread = new Thread(connectPendingChannelsClosure, "Asynchronous JGroups Connection Thread");
250 connectionThread.setDaemon(true);
251 connectionThread.start();
252 }
253 }
254
255 private Thread connectionThread;
256 private Runnable connectPendingChannelsClosure = new Runnable() {
257 public void run() {
258 int retryTime = 0;
259 while (!pendingChannels.isEmpty()) {
260 try {
261 Thread.sleep(TimeUnit.MINUTES.toMillis(retryTime++));
262 } catch (InterruptedException e) {
263 return;
264 }
265
266 connectPendingChannels();
267 }
268 }
269 };
270
271 private synchronized void connectPendingChannels() {
272 final List<JChannel> failed = new ArrayList<JChannel>();
273
274 JChannel channel;
275 while ((channel = pendingChannels.poll()) != null) {
276 final Connection connection = channels.get(channel);
277
278 String clusterName = connection.getPropertyValue(KEY_JGROUPS_CLUSTER_NAME, "");
279 if (clusterName.isEmpty()) {
280 log.error("TJEE-00810:Cannot start jgroups channel. " +
281 "A cluster name is missing for the connection '{}'.", toString(connection));
282 }
283
284 try {
285 if (log.isDebugEnabled())
286 log.debug("About to connect jgroups channel '{}'", toString(channel));
287
288 channel.connect(clusterName);
289 log.info("TJEE-00860:Successfully connected the jgroups channel '{}'", toString(channel));
290
291 if (channel.getReceiver() instanceof JGroupsLoggingReceiverAdapter)
292 ((JGroupsLoggingReceiverAdapter) channel.getReceiver()).setLocalAddress(channel.getAddress());
293 } catch (Exception e) {
294 log.error("TJEE-00820:Failed connecting jgroups channel '" + channel.getName() +
295 "' with cluster '" + clusterName + "'", e);
296 failed.add(channel);
297 }
298 }
299
300 pendingChannels.addAll(failed);
301 }
302
303
304
305
306 @Override
307 protected void doStop() throws Exception {
308 pendingChannels.clear();
309 for (Map.Entry<JChannel, Connection> entry : channels.entrySet()) {
310 final JChannel channel = entry.getKey();
311 final Connection connection = entry.getValue();
312
313 try {
314 channel.disconnect();
315 channel.close();
316 log.info("TJEE-00870:Successfully closed the jgroups channel '{}'", toString(channel));
317 connection.unbindRelatedInstance();
318 } catch (Exception e) {
319 log.error("TJEE-00800:Failed closing the jgroups channel '" + toString(channel) + "'.", e);
320 }
321 }
322
323 channels.clear();
324 }
325
326 private static String toString(Connection connection) {
327 return connection.getName() + "/" +
328 connection.getPropertyValue(KEY_JGROUPS_CLUSTER_NAME, "!!NO-CLUSTER-NAME!!");
329 }
330
331 private static String toString(Channel channel) {
332 return channel.getName() + "/" + (channel.isConnected() ?
333 channel.getClusterName() + "/" + channel.getAddress() :
334 "unconnected");
335 }
336 }