1   package com.trendmicro.grid.acl.ds.jpa;
2   
3   import com.trendmicro.grid.acl.ds.JobRepository;
4   import com.trendmicro.grid.acl.ds.datatypes.SharedJob;
5   import com.trendmicro.grid.acl.ds.datatypes.SharedJobDetails;
6   import com.trendmicro.grid.acl.ds.datatypes.SharedSource;
7   import com.trendmicro.grid.acl.ds.jpa.entities.JpaJob;
8   import com.trendmicro.grid.acl.ds.jpa.entities.JpaJobSources;
9   import com.trendmicro.grid.acl.ds.jpa.entities.JpaSource;
10  import com.trendmicro.grid.acl.l0.datatypes.*;
11  import org.slf4j.Logger;
12  import org.slf4j.LoggerFactory;
13  import org.springframework.stereotype.Repository;
14  import org.springframework.transaction.annotation.Transactional;
15  
16  import javax.annotation.Resource;
17  import javax.persistence.EntityManager;
18  import javax.persistence.PersistenceContext;
19  import javax.persistence.TypedQuery;
20  import java.awt.*;
21  import java.util.*;
22  import java.util.List;
23  
24  import static com.trendmicro.grid.acl.ds.DSUtil.cast;
25  import static com.trendmicro.grid.acl.ds.jpa.util.JpaUtils.*;
26  import static com.trendmicro.grid.acl.ds.jpa.util.JpaUtils.toListPage;
27  import static java.lang.System.currentTimeMillis;
28  import static net.sf.tinyjee.util.Assert.assertNotNull;
29  
30  /**
31   * Implements JobRepository using JPA.
32   *
33   * @author juergen_kellerer, 2010-06-01
34   * @version 1.0
35   */
36  @Repository
37  @Transactional(readOnly = true)
38  public class JpaJobRepository implements JobRepository {
39  
40  	private static final Logger log = LoggerFactory.getLogger(JpaJobRepository.class);
41  
42  	private static int pageSize = 25;
43  
44  	public static int getPageSize() {
45  		return pageSize;
46  	}
47  
48  	public static void setPageSize(int pageSize) {
49  		JpaJobRepository.pageSize = pageSize;
50  	}
51  
52  	static final Callback<byte[], UUID> bytesToUUIDConverter = new Callback<byte[], UUID>() {
53  		public UUID call(byte[] element) {
54  			return fromBytes(element);
55  		}
56  	};
57  
58  	@PersistenceContext(unitName = "CoreDB")
59  	EntityManager em;
60  
61  	@Resource
62  	JpaSourceFetcher sourceFetcher;
63  
64  	/**
65  	 * {@inheritDoc}
66  	 */
67  	@Override
68  	@Transactional(readOnly = false)
69  	public UUID createJob() {
70  		JpaJob job = new JpaJob(UUID.randomUUID(), null);
71  		em.persist(job);
72  		em.flush();
73  		return job.getJobId();
74  	}
75  
76  	/**
77  	 * {@inheritDoc}
78  	 */
79  	@Override
80  	@Transactional(readOnly = false)
81  	public UUID createSubJob(UUID parentJobId) {
82  		JpaJob parent = getJob(parentJobId);
83  		JpaJob job = new JpaJob(UUID.randomUUID(), parent);
84  		em.persist(job);
85  		parent.getSubJobs().add(job);
86  		return job.getJobId();
87  	}
88  
89  	/**
90  	 * {@inheritDoc}
91  	 */
92  	@Override
93  	@Transactional(readOnly = false)
94  	public void addJobSources(UUID jobId, List<Source> sources) {
95  		addJobSources(getJob(jobId), sources);
96  	}
97  
98  	/**
99  	 * {@inheritDoc}
100 	 */
101 	@Override
102 	@Transactional(readOnly = false)
103 	public void addJobSources(Job job, List<Source> sources) {
104 		JpaJob jpaJob = toJpaJob(job);
105 		for (Source source : sources) {
106 			JpaSource jpaSource = sourceFetcher.fetch(source.getSourceInformation().getIdentifier());
107 			em.persist(new JpaJobSources(jpaJob, jpaSource));
108 		}
109 	}
110 
111 	/**
112 	 * {@inheritDoc}
113 	 */
114 	@Override
115 	@Transactional(readOnly = false)
116 	public void updateJob(Job job) {
117 		assertNotNull("job", job);
118 
119 		if (job instanceof JpaJob) {
120 			job.setLastUpdated(new Date());
121 			em.merge(job);
122 		} else {
123 			JpaJob j = getJob(job.getJobId());
124 			if (j == null)
125 				throw new IllegalArgumentException("Cannot update unexisting job " + job);
126 
127 			j.setLastUpdated(new Date());
128 			j.setFinished(job.getFinished());
129 			j.setMetadata(job.getMetadata());
130 			j.setStagesPassed(job.getStagesPassed());
131 			j.setState(job.getState());
132 		}
133 	}
134 
135 	/**
136 	 * {@inheritDoc}
137 	 */
138 	@Override
139 	@Transactional(readOnly = false)
140 	public void finalizeJob(UUID jobId, Job.State finalJobState) throws IllegalArgumentException {
141 		finalizeJob(getJob(jobId), finalJobState);
142 	}
143 
144 	/**
145 	 * {@inheritDoc}
146 	 */
147 	@Override
148 	@Transactional(readOnly = false)
149 	public void finalizeJob(Job job, Job.State finalJobState) throws IllegalArgumentException {
150 		JpaJob jpaJob = toJpaJob(job);
151 
152 		if (jpaJob == null)
153 			return;
154 
155 		if (jpaJob.getState() == Job.State.PREPARED) {
156 			if (finalJobState != null) {
157 				throw new IllegalArgumentException(
158 						"finalJobState is not 'null' of a job that is still in prepared state.");
159 			}
160 
161 			em.remove(jpaJob);
162 		} else {
163 			if (finalJobState == null || !Job.FINAL_STATES.contains(finalJobState))
164 				throw new IllegalArgumentException("finalJobState not in " + Job.FINAL_STATES);
165 
166 			jpaJob.setState(finalJobState);
167 			jpaJob.setLastUpdated(new Date());
168 			jpaJob.setFinished(jpaJob.getLastUpdated());
169 		}
170 	}
171 
172 	private JpaJob toJpaJob(Job job) {
173 		JpaJob jpaJob = null;
174 		if (job != null) {
175 			if (job instanceof JpaJob && em.contains(job))
176 				jpaJob = (JpaJob) job;
177 			else
178 				jpaJob = getJob(job.getJobId());
179 		}
180 		return jpaJob;
181 	}
182 
183 	/**
184 	 * {@inheritDoc}
185 	 */
186 	@Override
187 	public UUIDListPage getJobsByStateInRange(Job.State jobState,
188 											  Date updatedFromDate, Date updatedToDate, int pageNumber) {
189 		final boolean rangeQuery = updatedFromDate != null || updatedToDate != null;
190 		final TypedQuery<byte[]> q = em.createNamedQuery(rangeQuery ?
191 				"Job.SelectJobIdsByStateInRange" : "Job.SelectJobIdsByState", byte[].class);
192 
193 		if (rangeQuery) {
194 			q.setParameter("fromDate", updatedFromDate == null ? new Date(0) : updatedFromDate);
195 			q.setParameter("toDate", updatedToDate == null ? new Date(currentTimeMillis() + 1000) : updatedToDate);
196 		}
197 
198 		applyPage(q, pageNumber, pageSize).setParameter("state", jobState);
199 		return toListPage(q, bytesToUUIDConverter, new UUIDListPage());
200 	}
201 
202 	/**
203 	 * {@inheritDoc}
204 	 */
205 	public Job.State getJobState(UUID jobId) {
206 		List<Job.State> result = em.createNamedQuery("Job.SelectJobStateById", Job.State.class).
207 				setParameter("publicGUID", toBytes(jobId)).getResultList();
208 
209 		if (result.isEmpty()) {
210 			if (log.isTraceEnabled())
211 				log.trace("Queried state for non-existing job '{}'", jobId);
212 			return null;
213 		}
214 
215 		return result.get(0);
216 	}
217 
218 	JpaJob getJob(UUID jobId) {
219 		return jobId == null ? null : getJob(em.createNamedQuery("Job.SelectById", JpaJob.class), jobId);
220 	}
221 
222 	<E extends Job> E getJob(TypedQuery<E> query, UUID jobId) {
223 		List<E> result = query.setParameter("publicGUID", toBytes(jobId)).getResultList();
224 
225 		if (result.isEmpty()) {
226 			if (log.isTraceEnabled())
227 				log.trace("Queried non-existing job '{}'", jobId);
228 			return null;
229 		}
230 
231 		return result.get(0);
232 	}
233 
234 	/**
235 	 * {@inheritDoc}
236 	 */
237 	public Collection<SharedJob> getJobs(Collection<UUID> jobIds) {
238 		final Collection<SharedJob> result = new ArrayList<SharedJob>(jobIds.size());
239 		final TypedQuery<SharedJob> query = em.createNamedQuery("Job.SelectById", SharedJob.class);
240 
241 		for (UUID id : jobIds)
242 			result.add(getJob(query, id));
243 
244 		return result;
245 	}
246 
247 	/**
248 	 * {@inheritDoc}
249 	 */
250 	public SharedJobDetails getJobDetails(UUID jobId) {
251 		JpaJob job = getJob(jobId);
252 		if (job == null)
253 			return null;
254 
255 		List<byte[]> guids = em.createNamedQuery("JobSources.SelectAssignedSourceIds", byte[].class).
256 				setParameter("primaryKey", job.getPrimaryKey()).getResultList();
257 		List<SharedSource> assignedSources = new ArrayList<SharedSource>(guids.size());
258 		for (byte[] guid : guids)
259 			assignedSources.add(sourceFetcher.fetch(new SourceIdentifier(guid)));
260 
261 		return new SharedJobDetails(job, cast(job.getSubJobs(), SharedJob.class), assignedSources);
262 	}
263 }