1   package com.trendmicro.grid.acl.ds.jpa;
2   
3   import com.trendmicro.grid.acl.ds.SourceRepository;
4   import com.trendmicro.grid.acl.ds.datatypes.SharedSource;
5   import com.trendmicro.grid.acl.ds.datatypes.SharedSourceInformation;
6   import com.trendmicro.grid.acl.ds.jpa.entities.*;
7   import com.trendmicro.grid.acl.ds.jpa.util.FileQueryConfigurator;
8   import com.trendmicro.grid.acl.l0.datatypes.*;
9   import com.trendmicro.grid.acl.metadata.Metadata;
10  import net.sf.tinyjee.concurrent.LockingLRUMap;
11  import org.slf4j.Logger;
12  import org.slf4j.LoggerFactory;
13  import org.springframework.stereotype.Repository;
14  import org.springframework.transaction.annotation.Transactional;
15  
16  import javax.annotation.Resource;
17  import javax.persistence.*;
18  import java.net.URI;
19  import java.util.*;
20  
21  import static com.trendmicro.grid.acl.ds.jpa.util.JpaUtils.*;
22  import static java.lang.System.currentTimeMillis;
23  import static net.sf.tinyjee.util.Assert.assertEquals;
24  
25  /**
26   * Implements SourceRepository using JPA.
27   *
28   * @author juergen_kellerer, 2010-06-01
29   * @version 1.0
30   */
31  @Repository
32  @Transactional(readOnly = true)
33  public class JpaSourceRepository implements SourceRepository {
34  
35  	private static final Logger log = LoggerFactory.getLogger(JpaSourceRepository.class);
36  
37  	private static int pageSize = 1000;
38  
39  	public static int getPageSize() {
40  		return pageSize;
41  	}
42  
43  	public static void setPageSize(int pageSize) {
44  		JpaSourceRepository.pageSize = pageSize;
45  	}
46  
47  	static final Callback<byte[], SourceIdentifier> hashToIdentifierConverter =
48  			new Callback<byte[], SourceIdentifier>() {
49  				public SourceIdentifier call(byte[] element) {
50  					return createIdentifier(element);
51  				}
52  			};
53  
54  	static SourceIdentifier createIdentifier(byte[] hash) {
55  		return new SourceIdentifier(hash);
56  	}
57  
58  	@Resource
59  	JpaSourceDomainFetcher domainFetcher;
60  
61  	@PersistenceContext(unitName = "CoreDB")
62  	EntityManager em;
63  
64  	private final Map<Integer, Long> oversizedSourceLinks = new LockingLRUMap<Integer, Long>(64);
65  
66  	@Override
67  	public SourceIdentifier createIdentifier(URI remoteURI, URI internalURI) {
68  		return new SourceIdentifier(remoteURI, internalURI);
69  	}
70  
71  	/**
72  	 * Links (references) the given files with all given sources.
73  	 *
74  	 * @param fileReferences   the file references that should get references from the given sources.
75  	 * @param sourceReferences the source references to link all files with.
76  	 */
77  	@SuppressWarnings("unchecked")
78  	@Transactional(readOnly = false)
79  	protected void referenceFilesFromSources(final Collection<JpaFileDetails> fileReferences,
80  											 final Collection<JpaSource> sourceReferences) {
81  
82  		final long time = currentTimeMillis();
83  		final Set<JpaFileSource> newLinks = new HashSet<JpaFileSource>(fileReferences.size() * sourceReferences.size());
84  
85  		final int existsLength = StorageOptions.MAX_SOURCE_LINK_FETCH_SIZE;
86  		final Query existsQuery = em.createNamedQuery("FileSources.SelectAssignedFiles").setMaxResults(existsLength);
87  
88  		// Prepare to keep the "assigned" query when the quick check fails with "oversized".
89  		Query isAssignedQuery = null;
90  
91  		for (JpaSource sourceRef : sourceReferences) {
92  			List existingRefs = null;
93  			int sourceId = sourceRef.getPrimaryKey();
94  			Long checkTimestamp = oversizedSourceLinks.get(sourceId);
95  			boolean needsSingleFileCheck = checkTimestamp != null &&
96  					time - checkTimestamp < StorageOptions.KEEP_OVERSIZED_SOURCE_LINK_INFO_INTERVAL;
97  
98  			if (!needsSingleFileCheck) {
99  				existingRefs = existsQuery.setParameter("sourceId", sourceId).getResultList();
100 				needsSingleFileCheck = existingRefs.size() >= existsLength;
101 			}
102 
103 			if (needsSingleFileCheck) {
104 				oversizedSourceLinks.put(sourceId, time);
105 				if (isAssignedQuery == null)
106 					isAssignedQuery = em.createNamedQuery("FileSources.SelectSourceIsAssigned").setMaxResults(1);
107 
108 				isAssignedQuery.setParameter("sourceId", sourceRef.getPrimaryKey());
109 				// Insert all missing entries.
110 				for (JpaFileDetails fileDetails : fileReferences) {
111 					isAssignedQuery.setParameter("fileId", fileDetails.getPrimaryKey());
112 					if (isAssignedQuery.getResultList().isEmpty())
113 						newLinks.add(new JpaFileSource(fileDetails, sourceRef));
114 				}
115 			} else {
116 				final Set existingRefSet = new HashSet(existingRefs);
117 				for (JpaFileDetails fileDetails : fileReferences) {
118 					if (!existingRefSet.contains(fileDetails.getPrimaryKey()))
119 						newLinks.add(new JpaFileSource(fileDetails, sourceRef));
120 				}
121 			}
122 		}
123 
124 		// Persisting file sources now, to get a better chance that hibernate can use batch processing.
125 		for (JpaFileSource fileSource : newLinks)
126 			em.persist(fileSource);
127 	}
128 
129 	@Override
130 	public FileIdentiferListPage getReferencedFiles(SourceIdentifier sourceIdentifier, int pageNumber) {
131 		final TypedQuery<FileIdentifier> query = em.createNamedQuery("FileSources.SelectAssignedFileIdentifiers", FileIdentifier.class).
132 				setParameter("publicGUID", sourceIdentifier.getSHA1Hash());
133 
134 		applyPage(query, pageNumber, pageSize);
135 
136 		return toListPage(query, new FileIdentiferListPage());
137 	}
138 
139 	@Override
140 	public NameListPage getReferencedPackages(SourceIdentifier sourceIdentifier, int pageNumber) {
141 		final TypedQuery<String> query = em.createNamedQuery("FileSources.SelectAssignedPackages", String.class).
142 				setParameter("publicGUID", sourceIdentifier.getSHA1Hash());
143 
144 		applyPage(query, pageNumber, pageSize);
145 
146 		return toListPage(query, new NameListPage());
147 	}
148 
149 	@Override
150 	public SourceIdentiferListPage getReferencingSources(FileIdentifier file, int pageNumber) {
151 		final TypedQuery<byte[]> query = new FileQueryConfigurator<byte[]>(em, byte[].class, "FileSources.SelectAssignedSource").getQuery(file);
152 
153 		applyPage(query, pageNumber, pageSize);
154 		return toListPage(query, hashToIdentifierConverter, new SourceIdentiferListPage());
155 	}
156 
157 	@Override
158 	public SourceIdentiferListPage getSourcesOfDomainInRange(String domainName, Date modifiedFromDate, Date modifiedToDate, int pageNumber) {
159 
160 		final boolean rangeQuery = modifiedFromDate != null || modifiedToDate != null;
161 		final TypedQuery<byte[]> q = em.createNamedQuery(rangeQuery ?
162 				"Source.SelectSourcesOfDomainInRange" : "Source.SelectSourcesOfDomain", byte[].class);
163 
164 		q.setParameter("name", domainName);
165 
166 		if (rangeQuery) {
167 			q.setParameter("fromDate", modifiedFromDate == null ? new Date(0) : modifiedFromDate);
168 			q.setParameter("toDate", modifiedToDate == null ? new Date(currentTimeMillis() + 1000) : modifiedToDate);
169 		}
170 
171 		applyPage(q, pageNumber, pageSize);
172 		return toListPage(q, hashToIdentifierConverter, new SourceIdentiferListPage());
173 	}
174 
175 	private <T> Collection<T> toElementListById(final TypedQuery<T> q, final Collection<SourceIdentifier> identifiers) {
176 
177 		final Collection<T> results = new ArrayList<T>(identifiers.size());
178 
179 		for (SourceIdentifier identifier : identifiers) {
180 			q.setParameter("publicGUID", identifier.getSHA1Hash());
181 			List<T> result = q.getResultList();
182 			results.add(result.isEmpty() ? null : result.get(0));
183 		}
184 
185 		return results;
186 	}
187 
188 	@Override
189 	public Collection<SharedSourceInformation> getSourceInformationList(Collection<SourceIdentifier> identifiers) {
190 		return toElementListById(em.createNamedQuery("Source.SelectSourceInformationById", SharedSourceInformation.class), identifiers);
191 	}
192 
193 	@Override
194 	public Collection<SharedSource> getSources(Collection<SourceIdentifier> identifiers) {
195 		final Collection<SharedSource> sources = toElementListById(
196 				em.createNamedQuery("Source.SelectSourceById", SharedSource.class), identifiers);
197 
198 		// Fill-in the cached SourceDomains.
199 		JpaSourceDomain domain = null;
200 		if (sources != null)
201 			for (SharedSource source : sources) {
202 				URI remoteURI = source == null ? null : source.getRemoteURI();
203 				if (remoteURI == null)
204 					continue;
205 
206 				em.detach(source);
207 				String name = SourceDomain.getDomainNameFrom(remoteURI);
208 				if (domain == null || !domain.getName().equals(name))
209 					domain = domainFetcher.fetch(name);
210 
211 				((JpaSource) source).setSourceDomain(domain);
212 			}
213 
214 		return sources;
215 	}
216 
217 	/**
218 	 * Returns a modifyable source instance or 'null' if no source existis under the id.
219 	 *
220 	 * @param identifier the identifier of the source.
221 	 * @return a modifyable source instance or 'null' if no source existis under the id.
222 	 */
223 	public JpaSource getSource(SourceIdentifier identifier) {
224 		final List<JpaSource> result = em.createNamedQuery("Source.SelectSourceById", JpaSource.class).
225 				setParameter("publicGUID", identifier.getSHA1Hash()).getResultList();
226 
227 		return result.isEmpty() ? null : result.get(0);
228 	}
229 
230 	/**
231 	 * Returns a lightweight reference to the source of 'null' if the source doesn't exist.
232 	 *
233 	 * @param identifier the identifier of the source.
234 	 * @return a lightweight reference to the source of 'null' if the source doesn't exist.
235 	 */
236 	public JpaSource getSourceReference(SourceIdentifier identifier) {
237 		final List<Integer> result = em.createNamedQuery("Source.SelectSourcePrimaryKeyById", Integer.class).
238 				setParameter("publicGUID", identifier.getSHA1Hash()).getResultList();
239 
240 		return result.isEmpty() ? null : em.getReference(JpaSource.class, result.get(0));
241 	}
242 
243 	@Override
244 	@Transactional(readOnly = false)
245 	public SharedSource createSource(URI remoteURI, URI internalURI,
246 									 SourceInformation sourceInformation, Metadata metadata) {
247 		final SourceIdentifier sid = new SourceIdentifier(remoteURI, internalURI);
248 
249 		final long sourceCount = em.createNamedQuery("Source.SelectSourceExistsById", Long.class).
250 				setParameter("publicGUID", sid.getSHA1Hash()).getSingleResult();
251 
252 		if (sourceCount == 0) {
253 			final JpaSourceInformation info = sourceInformation == null ?
254 					new JpaSourceInformation(sid) :
255 					new JpaSourceInformation(sourceInformation, sid);
256 
257 			final JpaSourceDomain domain = domainFetcher.fetch(SourceDomain.getDomainNameFrom(remoteURI));
258 			final JpaSource source = new JpaSource(remoteURI, internalURI, info, domain, metadata);
259 
260 			em.persist(source);
261 
262 			// Store the new source now.
263 			em.flush();
264 
265 			return source;
266 		}
267 
268 		return null;
269 	}
270 
271 	@Override
272 	@Transactional(readOnly = false)
273 	public SharedSource updateSource(SourceInformation sourceInformation, Metadata metadata) {
274 		SourceIdentifier sid = sourceInformation.getIdentifier();
275 		try {
276 
277 			final TypedQuery<JpaSource> query = em.createNamedQuery(
278 					"Source.SelectSourceById", JpaSource.class).setParameter("publicGUID", sid.getSHA1Hash());
279 
280 			// Enable write locks and apply changes when the method is left.
281 			query.setFlushMode(FlushModeType.COMMIT);
282 			query.setLockMode(LockModeType.PESSIMISTIC_WRITE);
283 
284 			final List<JpaSource> sourceList = query.getResultList();
285 			final JpaSource source = sourceList.isEmpty() ? null : sourceList.get(0);
286 
287 			if (source == null) return null;
288 
289 			final JpaSourceInformation information = source.getSourceInformation();
290 			assertEquals("SourceInformation#isTemporary()", information.isTemporary(), sourceInformation.isTemporary());
291 
292 			information.updateFrom(sourceInformation);
293 			source.setMetadata(metadata);
294 
295 			// Inject domain from external cache instead of the JPA cache, to ensure we
296 			// have physically only as many domain instances as entries.
297 			source.setSourceDomain(domainFetcher.fetch(SourceDomain.getDomainNameFrom(source.getRemoteURI())));
298 
299 			// Store the updates now.
300 			em.flush();
301 
302 			return source;
303 		} catch (Exception e) {
304 			log.error("TMACL-00730:Failed updating the source: " + sourceInformation, e);
305 			throw new RuntimeException(e);
306 		}
307 	}
308 
309 	@Override
310 	@Transactional(readOnly = false)
311 	public void setInternalURI(SourceIdentifier identifier, URI internalURI) throws IllegalStateException {
312 		final int updateCount = em.createNamedQuery("Source.UpdateInternalURIById").
313 				setParameter("publicGUID", identifier.getSHA1Hash()).
314 				setParameter("internalURI", internalURI.toASCIIString()).executeUpdate();
315 
316 		if (updateCount == 0) {
317 			throw new IllegalStateException("Failed setting internalURI '" + internalURI + '\'' +
318 					" on source '" + identifier + "'. " +
319 					"The source did not exist or doesn't define a remoteURI.");
320 		}
321 	}
322 }