1 package com.trendmicro.grid.acl.ds.cache;
2
3 import org.infinispan.Cache;
4 import org.infinispan.util.concurrent.NotifyingFuture;
5 import org.slf4j.Logger;
6 import org.slf4j.LoggerFactory;
7
8 import java.util.*;
9 import java.util.concurrent.ExecutionException;
10
11
12
13
14
15
16
17 public class CacheAdapter<K, V> implements CacheSource<K, V>, CacheDestination<K, V> {
18
19 private static final Logger log = LoggerFactory.getLogger(CacheAdapter.class);
20
21 protected boolean sync;
22 protected Cache<K, V> delegate;
23
24
25
26
27
28
29
30 public CacheAdapter(Cache<K, V> delegate, boolean sync) {
31 this.delegate = delegate;
32 this.sync = sync;
33 }
34
35
36
37
38 @Override
39 @SuppressWarnings("unchecked")
40 public List<V> getAll(Collection<K> keys) {
41 if (sync) {
42 final List<V> result = new ArrayList<V>(keys.size());
43 for (K key : keys)
44 result.add(key == null ? null : delegate.get(key));
45
46 return result;
47 } else {
48 final NotifyingFuture<V>[] futures = new NotifyingFuture[keys.size()];
49 int i = 0;
50 for (K key : keys)
51 futures[i++] = key == null ? null : delegate.getAsync(key);
52
53 return waitFor(Arrays.asList(futures));
54 }
55 }
56
57
58
59
60 @Override
61 public boolean containsKey(K key) {
62 return key != null && delegate.containsKey(key);
63 }
64
65
66
67
68 @Override
69 public int size() {
70 return delegate.size();
71 }
72
73
74
75
76 @Override
77 public boolean isEmpty() {
78 return delegate.isEmpty();
79 }
80
81
82
83
84
85
86
87 protected <V> List<V> waitFor(Collection<NotifyingFuture<V>> futures) {
88 final List<V> results = new ArrayList<V>(futures.size());
89 try {
90 for (NotifyingFuture<V> future : futures)
91 results.add(future == null ? null : future.get());
92
93 return results;
94
95 } catch (InterruptedException e) {
96 Thread.currentThread().interrupt();
97 throw new RuntimeException(e);
98 } catch (ExecutionException e) {
99 throw new RuntimeException(e);
100 }
101 }
102
103
104
105
106 @Override
107 public void removeAll(Collection<K> keys) {
108 final Collection<NotifyingFuture<V>> futures = new ArrayList<NotifyingFuture<V>>(keys.size());
109 for (K key : keys) {
110 if (!skipKey(key, null)) futures.add(delegate.removeAsync(key));
111 }
112
113 if (sync) waitFor(futures);
114 }
115
116
117
118
119 @Override
120 public void clear() {
121 if (sync)
122 delegate.clear();
123 else
124 delegate.clearAsync();
125 }
126
127
128
129
130 @Override
131 public boolean supportsBatching() {
132 return delegate.getConfiguration().isInvocationBatchingEnabled();
133 }
134
135
136
137
138 @Override
139 public boolean startBatch() {
140 return delegate.startBatch();
141 }
142
143
144
145
146 @Override
147 public void endBatch(boolean success) {
148 delegate.endBatch(success);
149 }
150
151
152
153
154 @Override
155 @SuppressWarnings("unchecked")
156 public void putAll(Map<? extends K, ? extends V> values, Mode mode) {
157 switch (mode) {
158 case PutIfAbsent:
159 putIfAbsent((Map<K, V>) values);
160 break;
161
162 case PutIfValueDiffers:
163 putIfValueDiffers((Map<K, V>) values);
164 break;
165
166 case ReplaceValue:
167 replaceValue((Map<K, V>) values);
168 break;
169
170 default:
171 Map<? extends K, ? extends V> valuesToPut = null;
172 for (Map.Entry<? extends K, ? extends V> entry : values.entrySet()) {
173 if (skipKey(entry.getKey(), entry.getValue())) {
174 if (valuesToPut == null) valuesToPut = new LinkedHashMap<K, V>(values);
175
176 valuesToPut.remove(entry.getKey());
177 }
178 }
179
180 if (valuesToPut == null) valuesToPut = values;
181
182 if (sync)
183 delegate.putAll(valuesToPut);
184 else
185 delegate.putAllAsync(valuesToPut);
186 }
187
188 logCacheWrites((Map<K, V>) values, mode);
189 }
190
191
192
193
194
195
196 protected void putIfAbsent(Map<K, V> values) {
197 for (Map.Entry<K, V> entry : values.entrySet()) {
198 final K key = entry.getKey();
199 final V value = entry.getValue();
200
201 if (skipKey(key, value)) continue;
202
203 if (sync)
204 delegate.putIfAbsent(key, value);
205 else
206 delegate.putForExternalRead(key, value);
207 }
208 }
209
210
211
212
213
214
215 protected void replaceValue(Map<K, V> values) {
216 final Collection<NotifyingFuture<V>> futures = new ArrayList<NotifyingFuture<V>>(values.size());
217 for (Map.Entry<K, V> entry : values.entrySet()) {
218 final K key = entry.getKey();
219 final V value = entry.getValue();
220
221 if (skipKey(key, value)) continue;
222
223 futures.add(delegate.replaceAsync(key, value));
224 }
225
226 if (sync) waitFor(futures);
227 }
228
229
230
231
232
233
234 @SuppressWarnings("unchecked")
235 protected void putIfValueDiffers(Map<K, V> values) {
236 final Collection<NotifyingFuture<Object>> futures = new ArrayList<NotifyingFuture<Object>>(values.size());
237 for (Map.Entry<K, V> entry : values.entrySet()) {
238 final K key = entry.getKey();
239 final V value = entry.getValue();
240
241 if (skipKey(key, value)) continue;
242
243 final V oldValue = delegate.get(key);
244 if (oldValue == null)
245 futures.add((NotifyingFuture<Object>) delegate.putAsync(key, value));
246 else if (!oldValue.equals(value))
247 futures.add((NotifyingFuture) delegate.replaceAsync(key, oldValue, value));
248 }
249
250 if (sync) waitFor(futures);
251 }
252
253
254
255
256
257
258
259
260 protected boolean skipKey(K key, V value) {
261 if (key == null) {
262 if (log.isDebugEnabled()) log.debug("Skipping request to map the value '{}' against key 'null'. This is not supported!", value);
263 return true;
264 }
265 return false;
266 }
267
268 private void logCacheWrites(Map<K, V> values, Mode mode) {
269 if (log.isTraceEnabled()) {
270 for (Map.Entry<K, V> entry : values.entrySet()) {
271 final K key = entry.getKey();
272 final V value = entry.getValue();
273
274 if (skipKey(key, value)) continue;
275
276 String v = value == null ? "<null>" : value.getClass().getSimpleName();
277 log.trace("Cached '{}' for key '{}' (mode '{}')", new Object[]{v, key, mode});
278 }
279 }
280 }
281
282
283
284
285 @Override
286 public String toString() {
287 return "CacheAdapter{" +
288 "sync=" + sync +
289 ", delegate=" + delegate +
290 '}';
291 }
292 }