1   package com.trendmicro.grid.acl.ds.cache;
2   
3   import org.infinispan.Cache;
4   import org.infinispan.util.concurrent.NotifyingFuture;
5   import org.slf4j.Logger;
6   import org.slf4j.LoggerFactory;
7   
8   import java.util.*;
9   import java.util.concurrent.ExecutionException;
10  
11  /**
12   * Adapts a cache to CacheSource and CacheDestination with optional key and value translation.
13   *
14   * @author juergen_kellerer, 2011-03-24
15   * @version 1.0
16   */
17  public class CacheAdapter<K, V> implements CacheSource<K, V>, CacheDestination<K, V> {
18  
19  	private static final Logger log = LoggerFactory.getLogger(CacheAdapter.class);
20  
21  	protected boolean sync;
22  	protected Cache<K, V> delegate;
23  
24  	/**
25  	 * Constructs a new adapter for the specified delegate.
26  	 *
27  	 * @param delegate the cache to adapt.
28  	 * @param sync     specifies whether all cache operations must be synchronous.
29  	 */
30  	public CacheAdapter(Cache<K, V> delegate, boolean sync) {
31  		this.delegate = delegate;
32  		this.sync = sync;
33  	}
34  
35  	/**
36  	 * {@inheritDoc}
37  	 */
38  	@Override
39  	@SuppressWarnings("unchecked")
40  	public List<V> getAll(Collection<K> keys) {
41  		if (sync) {
42  			final List<V> result = new ArrayList<V>(keys.size());
43  			for (K key : keys)
44  				result.add(key == null ? null : delegate.get(key));
45  
46  			return result;
47  		} else {
48  			final NotifyingFuture<V>[] futures = new NotifyingFuture[keys.size()];
49  			int i = 0;
50  			for (K key : keys)
51  				futures[i++] = key == null ? null : delegate.getAsync(key);
52  
53  			return waitFor(Arrays.asList(futures));
54  		}
55  	}
56  
57  	/**
58  	 * {@inheritDoc}
59  	 */
60  	@Override
61  	public boolean containsKey(K key) {
62  		return key != null && delegate.containsKey(key);
63  	}
64  
65  	/**
66  	 * {@inheritDoc}
67  	 */
68  	@Override
69  	public int size() {
70  		return delegate.size();
71  	}
72  
73  	/**
74  	 * {@inheritDoc}
75  	 */
76  	@Override
77  	public boolean isEmpty() {
78  		return delegate.isEmpty();
79  	}
80  
81  	/**
82  	 * Waits on all notifying futures and returns their results.
83  	 *
84  	 * @param futures the futures to wait on.
85  	 * @return the results of the futures.
86  	 */
87  	protected <V> List<V> waitFor(Collection<NotifyingFuture<V>> futures) {
88  		final List<V> results = new ArrayList<V>(futures.size());
89  		try {
90  			for (NotifyingFuture<V> future : futures)
91  				results.add(future == null ? null : future.get());
92  
93  			return results;
94  
95  		} catch (InterruptedException e) {
96  			Thread.currentThread().interrupt();
97  			throw new RuntimeException(e);
98  		} catch (ExecutionException e) {
99  			throw new RuntimeException(e);
100 		}
101 	}
102 
103 	/**
104 	 * {@inheritDoc}
105 	 */
106 	@Override
107 	public void removeAll(Collection<K> keys) {
108 		final Collection<NotifyingFuture<V>> futures = new ArrayList<NotifyingFuture<V>>(keys.size());
109 		for (K key : keys) {
110 			if (!skipKey(key, null)) futures.add(delegate.removeAsync(key));
111 		}
112 
113 		if (sync) waitFor(futures);
114 	}
115 
116 	/**
117 	 * {@inheritDoc}
118 	 */
119 	@Override
120 	public void clear() {
121 		if (sync)
122 			delegate.clear();
123 		else
124 			delegate.clearAsync();
125 	}
126 
127 	/**
128 	 * {@inheritDoc}
129 	 */
130 	@Override
131 	public boolean supportsBatching() {
132 		return delegate.getConfiguration().isInvocationBatchingEnabled();
133 	}
134 
135 	/**
136 	 * {@inheritDoc}
137 	 */
138 	@Override
139 	public boolean startBatch() {
140 		return delegate.startBatch();
141 	}
142 
143 	/**
144 	 * {@inheritDoc}
145 	 */
146 	@Override
147 	public void endBatch(boolean success) {
148 		delegate.endBatch(success);
149 	}
150 
151 	/**
152 	 * {@inheritDoc}
153 	 */
154 	@Override
155 	@SuppressWarnings("unchecked")
156 	public void putAll(Map<? extends K, ? extends V> values, Mode mode) {
157 		switch (mode) {
158 			case PutIfAbsent:
159 				putIfAbsent((Map<K, V>) values);
160 				break;
161 
162 			case PutIfValueDiffers:
163 				putIfValueDiffers((Map<K, V>) values);
164 				break;
165 
166 			case ReplaceValue:
167 				replaceValue((Map<K, V>) values);
168 				break;
169 
170 			default:
171 				Map<? extends K, ? extends V> valuesToPut = null;
172 				for (Map.Entry<? extends K, ? extends V> entry : values.entrySet()) {
173 					if (skipKey(entry.getKey(), entry.getValue())) {
174 						if (valuesToPut == null) valuesToPut = new LinkedHashMap<K, V>(values);
175 
176 						valuesToPut.remove(entry.getKey());
177 					}
178 				}
179 
180 				if (valuesToPut == null) valuesToPut = values;
181 
182 				if (sync)
183 					delegate.putAll(valuesToPut);
184 				else
185 					delegate.putAllAsync(valuesToPut);
186 		}
187 
188 		logCacheWrites((Map<K, V>) values, mode);
189 	}
190 
191 	/**
192 	 * Implements the write mode "putIfAbsent".
193 	 *
194 	 * @param values the values to write to the cache.
195 	 */
196 	protected void putIfAbsent(Map<K, V> values) {
197 		for (Map.Entry<K, V> entry : values.entrySet()) {
198 			final K key = entry.getKey();
199 			final V value = entry.getValue();
200 
201 			if (skipKey(key, value)) continue;
202 
203 			if (sync)
204 				delegate.putIfAbsent(key, value);
205 			else
206 				delegate.putForExternalRead(key, value);
207 		}
208 	}
209 
210 	/**
211 	 * Implements the write mode "replaceValue".
212 	 *
213 	 * @param values the values to write to the cache.
214 	 */
215 	protected void replaceValue(Map<K, V> values) {
216 		final Collection<NotifyingFuture<V>> futures = new ArrayList<NotifyingFuture<V>>(values.size());
217 		for (Map.Entry<K, V> entry : values.entrySet()) {
218 			final K key = entry.getKey();
219 			final V value = entry.getValue();
220 
221 			if (skipKey(key, value)) continue;
222 
223 			futures.add(delegate.replaceAsync(key, value));
224 		}
225 
226 		if (sync) waitFor(futures);
227 	}
228 
229 	/**
230 	 * Implements the write mode "putIfValueDiffers".
231 	 *
232 	 * @param values the values to write to the cache.
233 	 */
234 	@SuppressWarnings("unchecked")
235 	protected void putIfValueDiffers(Map<K, V> values) {
236 		final Collection<NotifyingFuture<Object>> futures = new ArrayList<NotifyingFuture<Object>>(values.size());
237 		for (Map.Entry<K, V> entry : values.entrySet()) {
238 			final K key = entry.getKey();
239 			final V value = entry.getValue();
240 
241 			if (skipKey(key, value)) continue;
242 
243 			final V oldValue = delegate.get(key);
244 			if (oldValue == null)
245 				futures.add((NotifyingFuture<Object>) delegate.putAsync(key, value));
246 			else if (!oldValue.equals(value))
247 				futures.add((NotifyingFuture) delegate.replaceAsync(key, oldValue, value));
248 		}
249 
250 		if (sync) waitFor(futures);
251 	}
252 
253 	/**
254 	 * Returns true if the specified key should be skipped during a put operation.
255 	 *
256 	 * @param key   the key to evaluate.
257 	 * @param value the value of the key.
258 	 * @return true if the key should not be considered when writing data to the cache.
259 	 */
260 	protected boolean skipKey(K key, V value) {
261 		if (key == null) {
262 			if (log.isDebugEnabled()) log.debug("Skipping request to map the value '{}' against key 'null'. This is not supported!", value);
263 			return true;
264 		}
265 		return false;
266 	}
267 
268 	private void logCacheWrites(Map<K, V> values, Mode mode) {
269 		if (log.isTraceEnabled()) {
270 			for (Map.Entry<K, V> entry : values.entrySet()) {
271 				final K key = entry.getKey();
272 				final V value = entry.getValue();
273 
274 				if (skipKey(key, value)) continue;
275 
276 				String v = value == null ? "<null>" : value.getClass().getSimpleName();
277 				log.trace("Cached '{}' for key '{}' (mode '{}')", new Object[]{v, key, mode});
278 			}
279 		}
280 	}
281 
282 	/**
283 	 * {@inheritDoc}
284 	 */
285 	@Override
286 	public String toString() {
287 		return "CacheAdapter{" +
288 				"sync=" + sync +
289 				", delegate=" + delegate +
290 				'}';
291 	}
292 }