1   package com.trendmicro.grid.acl.ds.msmq;
2   
3   import net.sf.tinyjee.concurrent.LockingMap;
4   import org.slf4j.Logger;
5   import org.slf4j.LoggerFactory;
6   
7   import java.io.IOException;
8   import java.io.OutputStream;
9   import java.io.UnsupportedEncodingException;
10  import java.net.*;
11  import java.util.*;
12  import java.util.concurrent.atomic.AtomicInteger;
13  
14  /**
15   * Simple implementation to send the message body to the configured MSMQ machines,
16   * using a simple POST protocol (requires a custom web-app running on the MSMQ server).
17   *
18   * @author juergen_kellerer, 2010-06-17
19   * @version 1.0
20   */
21  public class PlainRoundRobinSender implements Sender {
22  
23  	private static final Logger log = LoggerFactory.getLogger(PlainRoundRobinSender.class);
24  
25  	public static final String CHARSET = "UTF-8";
26  	public static final long RETRY_AFTER_MS = 1000 * 60 * 15;
27  
28  	String queueName;
29  	List<String> endpoints;
30  
31  	final Random random = new Random();
32  
33  	final Map<String, AtomicInteger> errorCount = new LockingMap<String, AtomicInteger>();
34  	final Map<String, Date> disabledURLs = new LockingMap<String, Date>();
35  
36  	public void reset(List<URI> endpoints, String queueName) {
37  		this.queueName = queueName;
38  
39  		this.endpoints = new ArrayList<String>(endpoints.size());
40  		errorCount.clear();
41  		disabledURLs.clear();
42  
43  		for (URI endpoint : endpoints) {
44  			try {
45  				if (endpoint.getHost() == null)
46  					throw new UnknownHostException();
47  
48  				URL ep = endpoint.toURL();
49  				this.endpoints.add(ep.toExternalForm());
50  
51  				if (log.isDebugEnabled())
52  					log.debug("Configured new MSMQ endpoint '{}' for queue '{}'", ep, queueName);
53  
54  				HttpURLConnection huc = (HttpURLConnection) ep.openConnection();
55  				huc.setRequestMethod("HEAD");
56  
57  				if (huc.getResponseCode() != HttpURLConnection.HTTP_OK) {
58  					throw new IllegalStateException("Response was " + huc.getResponseCode() + " " +
59  						huc.getResponseMessage() + " for endpoint " + ep);
60  				}
61  			} catch (UnknownHostException e) {
62  				log.error("TMACL-01081:Failed contacting MSMQ endpoint '" + endpoint +
63  					"', the hostname is unknown.");
64  			} catch (IllegalStateException e) {
65  				log.error("TMACL-01082:Failed contacting MSMQ endpoint '" + endpoint + "'; " + e.getMessage());
66  			} catch (IOException e) {
67  				log.error("TMACL-01080:Failed contacting MSMQ endpoint '" + endpoint +
68  					"', message sending will not be available on this endpoint.", e);
69  			}
70  		}
71  	}
72  
73  	public void send(int priority, String messageBody) throws IOException {
74  		final long time = System.currentTimeMillis();
75  		final List<String> endpoints = getEndpoints(time);
76  
77  		if (endpoints.isEmpty()) {
78  			throw new IOException("No valid endpoints found to send message. " +
79  				"Disabled endpoints: " + disabledURLs.keySet());
80  		}
81  
82  		int responseCode = -1;
83  		String responseMessage = "";
84  		for (String endpoint : endpoints) {
85  			try {
86  				final URL url = new URL(endpoint + "?queueName=" + encode(queueName) + "&priority=" + priority);
87  
88  				if (log.isTraceEnabled())
89  					log.trace("Trying to send message to endpoint: {}", url);
90  
91  				final HttpURLConnection huc = (HttpURLConnection) url.openConnection();
92  				huc.setDoOutput(true);
93  				huc.setRequestMethod("POST");
94  				huc.setRequestProperty("Content-Type", "text/xml; charset=" + CHARSET);
95  
96  				final OutputStream out = huc.getOutputStream();
97  				out.write(messageBody.getBytes(CHARSET));
98  				out.flush();
99  
100 				responseCode = huc.getResponseCode();
101 				responseMessage = huc.getResponseMessage();
102 
103 				if (log.isTraceEnabled()) {
104 					log.trace("Server responded with '{} {}', after sending message '{}' to endpoint: {}",
105 						new Object[]{responseCode, responseMessage, messageBody, url});
106 				}
107 
108 				if (responseCode != HttpURLConnection.HTTP_OK) {
109 					AtomicInteger c = errorCount.get(endpoint);
110 					if (c == null)
111 						errorCount.put(endpoint, c = new AtomicInteger());
112 					if (c.incrementAndGet() > 1000)
113 						disabledURLs.put(endpoint, new Date(time));
114 				} else {
115 					errorCount.remove(endpoint);
116 					break;
117 				}
118 
119 			} catch (IOException e) {
120 				disabledURLs.put(endpoint, new Date(time));
121 				log.error("TMACL-01060:Failed sending message to endpoint '" + endpoint + "'", e);
122 			}
123 		}
124 
125 		if (responseCode != HttpURLConnection.HTTP_OK) {
126 			throw new IOException("Failed sending message " + messageBody +
127 				" to configured endpoints " + endpoints + " (response was: " +
128 				responseCode + " " + responseMessage + ").");
129 		}
130 	}
131 
132 	private List<String> getEndpoints(long time) {
133 		List<String> epList = new ArrayList<String>(endpoints.size());
134 		for (String endpoint : endpoints) {
135 			final Date disabledDate = disabledURLs.get(endpoint);
136 			if (disabledDate != null) {
137 				if (time - disabledDate.getTime() < RETRY_AFTER_MS)
138 					continue;
139 			}
140 			epList.add(endpoint);
141 		}
142 		Collections.shuffle(epList, random);
143 		return epList;
144 	}
145 
146 	private static String encode(String value) {
147 		try {
148 			return URLEncoder.encode(value, CHARSET);
149 		} catch (UnsupportedEncodingException e) {
150 			throw new RuntimeException(e);
151 		}
152 	}
153 }