1 package com.trendmicro.grid.acl.ds.msmq;
2
3 import net.sf.tinyjee.concurrent.LockingMap;
4 import org.slf4j.Logger;
5 import org.slf4j.LoggerFactory;
6
7 import java.io.IOException;
8 import java.io.OutputStream;
9 import java.io.UnsupportedEncodingException;
10 import java.net.*;
11 import java.util.*;
12 import java.util.concurrent.atomic.AtomicInteger;
13
14
15
16
17
18
19
20
21 public class PlainRoundRobinSender implements Sender {
22
23 private static final Logger log = LoggerFactory.getLogger(PlainRoundRobinSender.class);
24
25 public static final String CHARSET = "UTF-8";
26 public static final long RETRY_AFTER_MS = 1000 * 60 * 15;
27
28 String queueName;
29 List<String> endpoints;
30
31 final Random random = new Random();
32
33 final Map<String, AtomicInteger> errorCount = new LockingMap<String, AtomicInteger>();
34 final Map<String, Date> disabledURLs = new LockingMap<String, Date>();
35
36 public void reset(List<URI> endpoints, String queueName) {
37 this.queueName = queueName;
38
39 this.endpoints = new ArrayList<String>(endpoints.size());
40 errorCount.clear();
41 disabledURLs.clear();
42
43 for (URI endpoint : endpoints) {
44 try {
45 if (endpoint.getHost() == null)
46 throw new UnknownHostException();
47
48 URL ep = endpoint.toURL();
49 this.endpoints.add(ep.toExternalForm());
50
51 if (log.isDebugEnabled())
52 log.debug("Configured new MSMQ endpoint '{}' for queue '{}'", ep, queueName);
53
54 HttpURLConnection huc = (HttpURLConnection) ep.openConnection();
55 huc.setRequestMethod("HEAD");
56
57 if (huc.getResponseCode() != HttpURLConnection.HTTP_OK) {
58 throw new IllegalStateException("Response was " + huc.getResponseCode() + " " +
59 huc.getResponseMessage() + " for endpoint " + ep);
60 }
61 } catch (UnknownHostException e) {
62 log.error("TMACL-01081:Failed contacting MSMQ endpoint '" + endpoint +
63 "', the hostname is unknown.");
64 } catch (IllegalStateException e) {
65 log.error("TMACL-01082:Failed contacting MSMQ endpoint '" + endpoint + "'; " + e.getMessage());
66 } catch (IOException e) {
67 log.error("TMACL-01080:Failed contacting MSMQ endpoint '" + endpoint +
68 "', message sending will not be available on this endpoint.", e);
69 }
70 }
71 }
72
73 public void send(int priority, String messageBody) throws IOException {
74 final long time = System.currentTimeMillis();
75 final List<String> endpoints = getEndpoints(time);
76
77 if (endpoints.isEmpty()) {
78 throw new IOException("No valid endpoints found to send message. " +
79 "Disabled endpoints: " + disabledURLs.keySet());
80 }
81
82 int responseCode = -1;
83 String responseMessage = "";
84 for (String endpoint : endpoints) {
85 try {
86 final URL url = new URL(endpoint + "?queueName=" + encode(queueName) + "&priority=" + priority);
87
88 if (log.isTraceEnabled())
89 log.trace("Trying to send message to endpoint: {}", url);
90
91 final HttpURLConnection huc = (HttpURLConnection) url.openConnection();
92 huc.setDoOutput(true);
93 huc.setRequestMethod("POST");
94 huc.setRequestProperty("Content-Type", "text/xml; charset=" + CHARSET);
95
96 final OutputStream out = huc.getOutputStream();
97 out.write(messageBody.getBytes(CHARSET));
98 out.flush();
99
100 responseCode = huc.getResponseCode();
101 responseMessage = huc.getResponseMessage();
102
103 if (log.isTraceEnabled()) {
104 log.trace("Server responded with '{} {}', after sending message '{}' to endpoint: {}",
105 new Object[]{responseCode, responseMessage, messageBody, url});
106 }
107
108 if (responseCode != HttpURLConnection.HTTP_OK) {
109 AtomicInteger c = errorCount.get(endpoint);
110 if (c == null)
111 errorCount.put(endpoint, c = new AtomicInteger());
112 if (c.incrementAndGet() > 1000)
113 disabledURLs.put(endpoint, new Date(time));
114 } else {
115 errorCount.remove(endpoint);
116 break;
117 }
118
119 } catch (IOException e) {
120 disabledURLs.put(endpoint, new Date(time));
121 log.error("TMACL-01060:Failed sending message to endpoint '" + endpoint + "'", e);
122 }
123 }
124
125 if (responseCode != HttpURLConnection.HTTP_OK) {
126 throw new IOException("Failed sending message " + messageBody +
127 " to configured endpoints " + endpoints + " (response was: " +
128 responseCode + " " + responseMessage + ").");
129 }
130 }
131
132 private List<String> getEndpoints(long time) {
133 List<String> epList = new ArrayList<String>(endpoints.size());
134 for (String endpoint : endpoints) {
135 final Date disabledDate = disabledURLs.get(endpoint);
136 if (disabledDate != null) {
137 if (time - disabledDate.getTime() < RETRY_AFTER_MS)
138 continue;
139 }
140 epList.add(endpoint);
141 }
142 Collections.shuffle(epList, random);
143 return epList;
144 }
145
146 private static String encode(String value) {
147 try {
148 return URLEncoder.encode(value, CHARSET);
149 } catch (UnsupportedEncodingException e) {
150 throw new RuntimeException(e);
151 }
152 }
153 }