1 package com.trendmicro.grid.acl.ds.cache.commands;
2
3 import com.trendmicro.grid.acl.ds.cache.CacheAdapter;
4 import com.trendmicro.grid.acl.ds.cache.CacheDestination;
5 import com.trendmicro.grid.acl.ds.cache.CacheSettings;
6 import com.trendmicro.grid.acl.ds.cache.CacheSource;
7 import com.trendmicro.grid.acl.l0.datatypes.FileIdentifier;
8 import org.slf4j.Logger;
9 import org.slf4j.LoggerFactory;
10
11 import java.util.*;
12 import java.util.concurrent.Callable;
13
14
15
16
17
18
19
20 public class GetFromCacheCommand<K, V> extends AbstractCommand<K, V> {
21
22 private static final Logger log = LoggerFactory.getLogger(GetFromCacheCommand.class);
23
24 private CacheSource<K, V>[] cacheSource;
25
26 private CacheDestination<K, V> cacheDestination;
27 private CacheDestination.Mode mode;
28
29 private CacheAdapter<K, Object> unknownKeys;
30 private CacheDestination.Mode unknownKeysPutMode;
31
32
33
34
35
36
37
38 public GetFromCacheCommand(Collection<K> keys, CacheSource<K, V>... cacheSource) {
39 super(keys);
40
41 if (cacheSource.length == 0)
42 throw new IllegalArgumentException("At least one cache source must be specified.");
43
44 this.cacheSource = cacheSource;
45 }
46
47
48
49
50
51
52
53
54 public GetFromCacheCommand<K, V> useDestination(CacheDestination<K, V> cacheDestination, CacheDestination.Mode mode) {
55 this.cacheDestination = cacheDestination;
56 this.mode = mode;
57 return this;
58 }
59
60
61
62
63
64
65
66
67 @SuppressWarnings("unchecked")
68 public GetFromCacheCommand<K, V> useUnknownKeysLookup(CacheAdapter<K, ?> unknownKeysCache, CacheDestination.Mode mode) {
69 unknownKeys = (CacheAdapter<K, Object>) unknownKeysCache;
70 unknownKeysPutMode = mode;
71 return this;
72 }
73
74
75
76
77
78
79
80 protected Callable<Map<K, V>> createGetFromSourceCommand(Collection<K> keys) {
81 return null;
82 }
83
84
85
86
87
88
89
90
91
92 @SuppressWarnings("unchecked")
93 protected Callable<Map<K, V>> createRetainMissingKeysCommand(Collection<K> keys) {
94 if (unknownKeys == null)
95 return new RetainMissingKeysCommand<K, V>(keys, cacheSource[0]);
96 else
97 return (Callable<Map<K, V>>) new RetainMissingKeysCommand<K, Object>(keys, unknownKeys);
98 }
99
100
101
102
103
104
105
106 protected Callable<Map<K, V>> createCacheResultsCommand(Map<K, V> unCachedResults) {
107 Callable<Map<K, V>> cmd = new CacheResultsCommand<K, V>(unCachedResults, cacheDestination, mode);
108 cmd = new BatchCommand<K, V>(keys, cmd, cacheDestination);
109 return cmd;
110 }
111
112
113
114
115 @Override
116 public Map<K, V> call() throws Exception {
117 if (CacheSettings.DISABLED) {
118 return createGetFromSourceCommand(keys).call();
119 } else {
120 final List<K> keys = this.keys instanceof List ? (List<K>) this.keys : new ArrayList<K>(this.keys);
121 final List<V> results = cacheSource[0].getAll(keys);
122
123 replaceNullsWithResults(keys, results);
124
125 return new MapAdapter<K, V>(keys, results);
126 }
127 }
128
129
130
131
132
133
134
135
136 protected void replaceNullsWithResults(List<K> keys, List<V> results) throws Exception {
137
138 NonUniqueMap<K, Integer> pendingResults = null;
139 for (int i = 0; i < results.size(); i++) {
140 if (results.get(i) == null) {
141 if (pendingResults == null) pendingResults = new NonUniqueMap<K, Integer>();
142
143 pendingResults.put(keys.get(i), i);
144 }
145 }
146
147 if (log.isTraceEnabled()) {
148 log.trace("Primary cache source '{}' returned '{}' results of {} requested keys.", new Object[]{
149 cacheSource[0], results.size() - (pendingResults == null ? 0 : pendingResults.size()), results.size()});
150 }
151
152 if (pendingResults != null) {
153
154 for (int i = 1; i < cacheSource.length; i++) {
155 final CacheSource<K, V> source = cacheSource[i];
156 final List<V> nextResults = source.getAll(pendingResults.keySet());
157 applyReturnedResults(results, pendingResults, nextResults, source);
158 }
159
160
161 if (!pendingResults.isEmpty()) fetchAndCachePendingResults(results, pendingResults);
162 }
163 }
164
165
166
167
168
169
170
171
172 protected void fetchAndCachePendingResults(List<V> results, Map<K, Integer> pendingResults) throws Exception {
173 final Set<K> pendingKeys = pendingResults.keySet();
174 final Callable<Map<K, V>> retainCommand = createRetainMissingKeysCommand(pendingKeys);
175
176 if (!retainCommand.call().isEmpty()) {
177 final Callable<Map<K, V>> sourceCommand = createGetFromSourceCommand(pendingKeys);
178 if (sourceCommand == null) {
179 if (log.isTraceEnabled())
180 log.trace("No regular source specified, cannot fetch un-cached pending results {}", pendingResults);
181 } else
182 fetchAndCachePendingResults(results, pendingResults, sourceCommand);
183 }
184 }
185
186
187
188
189
190
191
192
193
194 protected void fetchAndCachePendingResults(List<V> results, Map<K, Integer> pendingResults,
195 Callable<Map<K, V>> regularSourceCommand) throws Exception {
196 final Map<K, V> unCachedResults = regularSourceCommand.call();
197 try {
198 cacheResults(unCachedResults);
199 applyReturnedResults(results, pendingResults, unCachedResults.values(), regularSourceCommand);
200 } catch (Exception e) {
201 if (log.isDebugEnabled()) {
202 log.debug("Failed to apply un-cached results '" + unCachedResults + "', indexes '" +
203 pendingResults + "', keys '" + keys + "', results '" + results + '\'', e);
204 }
205 throw e;
206 }
207 }
208
209
210
211
212
213
214
215
216
217
218 protected void applyReturnedResults(List<V> results, Map<K, Integer> pendingResults,
219 Collection<V> returnedResults, Object resultsSource) {
220 if (returnedResults.size() != pendingResults.size()) {
221 throw new IllegalStateException("The returned list size was " +
222 returnedResults.size() + " , though " + pendingResults.size() + " entries were expected.\n" +
223 "Keys: " + pendingResults.keySet() + "\n" +
224 "Returned: " + returnedResults + "");
225 }
226
227 final int previousPendingSize = pendingResults.size();
228 final Iterator<Map.Entry<K, Integer>> pendingIterator = pendingResults.entrySet().iterator();
229 for (V result : returnedResults) {
230 results.set(pendingIterator.next().getValue(), result);
231
232 if (result != null) pendingIterator.remove();
233 }
234
235 if (log.isTraceEnabled()) {
236 log.trace("Additional source '{}' returned '{}' results of {} requested keys.", new Object[]{
237 resultsSource, previousPendingSize - pendingResults.size(), previousPendingSize});
238 }
239 }
240
241
242
243
244
245
246
247 protected void cacheResults(Map<K, V> unCachedResults) throws Exception {
248 if (!CacheSettings.NO_NEGATIVE_CACHING && unknownKeys != null) {
249 final Map<K, Boolean> nonExisting = new NonUniqueMap<K, Boolean>();
250 for (Iterator<Map.Entry<K, V>> iterator = unCachedResults.entrySet().iterator(); iterator.hasNext(); ) {
251 final Map.Entry<K, V> entry = iterator.next();
252 if (entry.getValue() == null) {
253 nonExisting.put(entry.getKey(), Boolean.FALSE);
254 iterator.remove();
255 }
256 }
257
258 if (!nonExisting.isEmpty()) {
259 if (log.isTraceEnabled()) log.trace("Caching non-existing file identifiers {}", nonExisting);
260 unknownKeys.putAll(nonExisting, unknownKeysPutMode);
261 }
262 }
263
264 if (cacheDestination == null) {
265 if (log.isTraceEnabled()) log.trace("Not caching {} results, as no cache destination was specified.", unCachedResults.size());
266 } else {
267 if (log.isTraceEnabled())
268 log.trace("Adding {} un-cached results to the cache destination {}.", unCachedResults.size(), cacheDestination);
269
270 createCacheResultsCommand(unCachedResults).call();
271 }
272 }
273 }