1   package com.trendmicro.grid.acl.ds.cache.commands;
2   
3   import com.trendmicro.grid.acl.ds.cache.CacheAdapter;
4   import com.trendmicro.grid.acl.ds.cache.CacheDestination;
5   import com.trendmicro.grid.acl.ds.cache.CacheSettings;
6   import com.trendmicro.grid.acl.ds.cache.CacheSource;
7   import com.trendmicro.grid.acl.l0.datatypes.FileIdentifier;
8   import org.slf4j.Logger;
9   import org.slf4j.LoggerFactory;
10  
11  import java.util.*;
12  import java.util.concurrent.Callable;
13  
14  /**
15   * Implements the command used to fetch from caches or the original source and cache values that were missing.
16   *
17   * @author juergen_kellerer, 2011-03-24
18   * @version 1.0
19   */
20  public class GetFromCacheCommand<K, V> extends AbstractCommand<K, V> {
21  
22  	private static final Logger log = LoggerFactory.getLogger(GetFromCacheCommand.class);
23  
24  	private CacheSource<K, V>[] cacheSource;
25  
26  	private CacheDestination<K, V> cacheDestination;
27  	private CacheDestination.Mode mode;
28  
29  	private CacheAdapter<K, Object> unknownKeys;
30  	private CacheDestination.Mode unknownKeysPutMode;
31  
32  	/**
33  	 * Constructs a fetch command that is reads from the cache source but does not cache elements.
34  	 *
35  	 * @param keys        the keys to read.
36  	 * @param cacheSource the cache source or sources to read the values for the keys from.
37  	 */
38  	public GetFromCacheCommand(Collection<K> keys, CacheSource<K, V>... cacheSource) {
39  		super(keys);
40  
41  		if (cacheSource.length == 0)
42  			throw new IllegalArgumentException("At least one cache source must be specified.");
43  
44  		this.cacheSource = cacheSource;
45  	}
46  
47  	/**
48  	 * Adds a cache destination to the command allowing to cache missing values into that.
49  	 *
50  	 * @param cacheDestination the destination used for caching missing key and value pairs.
51  	 * @param mode             the mode to apply when writing to the cache.
52  	 * @return this instance.
53  	 */
54  	public GetFromCacheCommand<K, V> useDestination(CacheDestination<K, V> cacheDestination, CacheDestination.Mode mode) {
55  		this.cacheDestination = cacheDestination;
56  		this.mode = mode;
57  		return this;
58  	}
59  
60  	/**
61  	 * Adds an unknown key cache to the command allowing to lookup and store values that are known to be unknown.
62  	 *
63  	 * @param unknownKeysCache the key cache containing keys that do not exist in the data storage backend.
64  	 * @param mode             the mode to apply when writing to the cache.
65  	 * @return this instance.
66  	 */
67  	@SuppressWarnings("unchecked")
68  	public GetFromCacheCommand<K, V> useUnknownKeysLookup(CacheAdapter<K, ?> unknownKeysCache, CacheDestination.Mode mode) {
69  		unknownKeys = (CacheAdapter<K, Object>) unknownKeysCache;
70  		unknownKeysPutMode = mode;
71  		return this;
72  	}
73  
74  	/**
75  	 * Creates a callable that looks after the given keys using a regular source.
76  	 *
77  	 * @param keys the keys to look after.
78  	 * @return a callable that looks after the given keys using a regular source.
79  	 */
80  	protected Callable<Map<K, V>> createGetFromSourceCommand(Collection<K> keys) {
81  		return null;
82  	}
83  
84  	/**
85  	 * Creates a callable that normalizes the given keys by retaining only those that
86  	 * are not known in the cache source.
87  	 *
88  	 * @param keys the keys to normalize.
89  	 * @return a callable that normalizes the given keys by retaining only those that
90  	 *         are not known in the cache source.
91  	 */
92  	@SuppressWarnings("unchecked")
93  	protected Callable<Map<K, V>> createRetainMissingKeysCommand(Collection<K> keys) {
94  		if (unknownKeys == null)
95  			return new RetainMissingKeysCommand<K, V>(keys, cacheSource[0]);
96  		else
97  			return (Callable<Map<K, V>>) new RetainMissingKeysCommand<K, Object>(keys, unknownKeys);
98  	}
99  
100 	/**
101 	 * Creates a callable that writes un-cached results to the cache destination.
102 	 *
103 	 * @param unCachedResults the un-cached results to write.
104 	 * @return the command that writes un-cached results.
105 	 */
106 	protected Callable<Map<K, V>> createCacheResultsCommand(Map<K, V> unCachedResults) {
107 		Callable<Map<K, V>> cmd = new CacheResultsCommand<K, V>(unCachedResults, cacheDestination, mode);
108 		cmd = new BatchCommand<K, V>(keys, cmd, cacheDestination);
109 		return cmd;
110 	}
111 
112 	/**
113 	 * {@inheritDoc}
114 	 */
115 	@Override
116 	public Map<K, V> call() throws Exception {
117 		if (CacheSettings.DISABLED) {
118 			return createGetFromSourceCommand(keys).call();
119 		} else {
120 			final List<K> keys = this.keys instanceof List ? (List<K>) this.keys : new ArrayList<K>(this.keys);
121 			final List<V> results = cacheSource[0].getAll(keys);
122 
123 			replaceNullsWithResults(keys, results);
124 
125 			return new MapAdapter<K, V>(keys, results);
126 		}
127 	}
128 
129 	/**
130 	 * Adds any missing entries to the results list (replaces 'null' values if possible).
131 	 *
132 	 * @param keys    the keys of the request.
133 	 * @param results the results returned by the cache.
134 	 * @throws Exception in case of the operation fails.
135 	 */
136 	protected void replaceNullsWithResults(List<K> keys, List<V> results) throws Exception {
137 		// Mapping all results that were set to 'null'.
138 		NonUniqueMap<K, Integer> pendingResults = null;
139 		for (int i = 0; i < results.size(); i++) {
140 			if (results.get(i) == null) {
141 				if (pendingResults == null) pendingResults = new NonUniqueMap<K, Integer>();
142 
143 				pendingResults.put(keys.get(i), i);
144 			}
145 		}
146 
147 		if (log.isTraceEnabled()) {
148 			log.trace("Primary cache source '{}' returned '{}' results of {} requested keys.", new Object[]{
149 					cacheSource[0], results.size() - (pendingResults == null ? 0 : pendingResults.size()), results.size()});
150 		}
151 
152 		if (pendingResults != null) {
153 			// Ask alternate caches for results first.
154 			for (int i = 1; i < cacheSource.length; i++) {
155 				final CacheSource<K, V> source = cacheSource[i];
156 				final List<V> nextResults = source.getAll(pendingResults.keySet());
157 				applyReturnedResults(results, pendingResults, nextResults, source);
158 			}
159 
160 			// Fetch and cache remaining results.
161 			if (!pendingResults.isEmpty()) fetchAndCachePendingResults(results, pendingResults);
162 		}
163 	}
164 
165 	/**
166 	 * Fetches the pending results from the given source and caches them afterwards if a destination was specified.
167 	 *
168 	 * @param results        the results to build.
169 	 * @param pendingResults a map of pending results mapped against the list index in results.
170 	 * @throws Exception in case of the operation fails.
171 	 */
172 	protected void fetchAndCachePendingResults(List<V> results, Map<K, Integer> pendingResults) throws Exception {
173 		final Set<K> pendingKeys = pendingResults.keySet();
174 		final Callable<Map<K, V>> retainCommand = createRetainMissingKeysCommand(pendingKeys);
175 
176 		if (!retainCommand.call().isEmpty()) {
177 			final Callable<Map<K, V>> sourceCommand = createGetFromSourceCommand(pendingKeys);
178 			if (sourceCommand == null) {
179 				if (log.isTraceEnabled())
180 					log.trace("No regular source specified, cannot fetch un-cached pending results {}", pendingResults);
181 			} else
182 				fetchAndCachePendingResults(results, pendingResults, sourceCommand);
183 		}
184 	}
185 
186 	/**
187 	 * Fetches the pending results from the given source and caches them afterwards if a destination was specified.
188 	 *
189 	 * @param results              the results to build.
190 	 * @param pendingResults       a map of pending results mapped against the list index in results.
191 	 * @param regularSourceCommand the command used to fetch the data from the regular source.
192 	 * @throws Exception in case of the operation fails.
193 	 */
194 	protected void fetchAndCachePendingResults(List<V> results, Map<K, Integer> pendingResults,
195 	                                           Callable<Map<K, V>> regularSourceCommand) throws Exception {
196 		final Map<K, V> unCachedResults = regularSourceCommand.call();
197 		try {
198 			cacheResults(unCachedResults);
199 			applyReturnedResults(results, pendingResults, unCachedResults.values(), regularSourceCommand);
200 		} catch (Exception e) {
201 			if (log.isDebugEnabled()) {
202 				log.debug("Failed to apply un-cached results '" + unCachedResults + "', indexes '" +
203 						pendingResults + "', keys '" + keys + "', results '" + results + '\'', e);
204 			}
205 			throw e;
206 		}
207 	}
208 
209 	/**
210 	 * Applies the returned results in the results list and removes the corresponding pending entries if the value
211 	 * of the returned result was not 'null'.
212 	 *
213 	 * @param results         the commands result list.
214 	 * @param pendingResults  the results that are still 'null'.
215 	 * @param returnedResults a newly returned result map.
216 	 * @param resultsSource   The instance that generated the results.
217 	 */
218 	protected void applyReturnedResults(List<V> results, Map<K, Integer> pendingResults,
219 	                                    Collection<V> returnedResults, Object resultsSource) {
220 		if (returnedResults.size() != pendingResults.size()) {
221 			throw new IllegalStateException("The returned list size was " +
222 					returnedResults.size() + " , though " + pendingResults.size() + " entries were expected.\n" +
223 					"Keys: " + pendingResults.keySet() + "\n" +
224 					"Returned: " + returnedResults + "");
225 		}
226 
227 		final int previousPendingSize = pendingResults.size();
228 		final Iterator<Map.Entry<K, Integer>> pendingIterator = pendingResults.entrySet().iterator();
229 		for (V result : returnedResults) {
230 			results.set(pendingIterator.next().getValue(), result);
231 
232 			if (result != null) pendingIterator.remove();
233 		}
234 
235 		if (log.isTraceEnabled()) {
236 			log.trace("Additional source '{}' returned '{}' results of {} requested keys.", new Object[]{
237 					resultsSource, previousPendingSize - pendingResults.size(), previousPendingSize});
238 		}
239 	}
240 
241 	/**
242 	 * Caches the given results if a destination was added to this command.
243 	 *
244 	 * @param unCachedResults the cache results to cache.
245 	 * @throws Exception in case of the write command fails.
246 	 */
247 	protected void cacheResults(Map<K, V> unCachedResults) throws Exception {
248 		if (!CacheSettings.NO_NEGATIVE_CACHING && unknownKeys != null) {
249 			final Map<K, Boolean> nonExisting = new NonUniqueMap<K, Boolean>();
250 			for (Iterator<Map.Entry<K, V>> iterator = unCachedResults.entrySet().iterator(); iterator.hasNext(); ) {
251 				final Map.Entry<K, V> entry = iterator.next();
252 				if (entry.getValue() == null) {
253 					nonExisting.put(entry.getKey(), Boolean.FALSE);
254 					iterator.remove();
255 				}
256 			}
257 
258 			if (!nonExisting.isEmpty()) {
259 				if (log.isTraceEnabled())  log.trace("Caching non-existing file identifiers {}", nonExisting);
260 				unknownKeys.putAll(nonExisting, unknownKeysPutMode);
261 			}
262 		}
263 
264 		if (cacheDestination == null) {
265 			if (log.isTraceEnabled()) log.trace("Not caching {} results, as no cache destination was specified.", unCachedResults.size());
266 		} else {
267 			if (log.isTraceEnabled())
268 				log.trace("Adding {} un-cached results to the cache destination {}.", unCachedResults.size(), cacheDestination);
269 
270 			createCacheResultsCommand(unCachedResults).call();
271 		}
272 	}
273 }