1 package com.trendmicro.grid.acl.ds.jpa;
2
3 import com.trendmicro.grid.acl.ds.JobRepository;
4 import com.trendmicro.grid.acl.ds.datatypes.SharedJob;
5 import com.trendmicro.grid.acl.ds.datatypes.SharedJobDetails;
6 import com.trendmicro.grid.acl.ds.datatypes.SharedSource;
7 import com.trendmicro.grid.acl.ds.jpa.entities.JpaJob;
8 import com.trendmicro.grid.acl.ds.jpa.entities.JpaJobSources;
9 import com.trendmicro.grid.acl.ds.jpa.entities.JpaSource;
10 import com.trendmicro.grid.acl.l0.datatypes.*;
11 import org.slf4j.Logger;
12 import org.slf4j.LoggerFactory;
13 import org.springframework.stereotype.Repository;
14 import org.springframework.transaction.annotation.Transactional;
15
16 import javax.annotation.Resource;
17 import javax.persistence.EntityManager;
18 import javax.persistence.PersistenceContext;
19 import javax.persistence.TypedQuery;
20 import java.awt.*;
21 import java.util.*;
22 import java.util.List;
23
24 import static com.trendmicro.grid.acl.ds.DSUtil.cast;
25 import static com.trendmicro.grid.acl.ds.jpa.util.JpaUtils.*;
26 import static com.trendmicro.grid.acl.ds.jpa.util.JpaUtils.toListPage;
27 import static java.lang.System.currentTimeMillis;
28 import static net.sf.tinyjee.util.Assert.assertNotNull;
29
30
31
32
33
34
35
36 @Repository
37 @Transactional(readOnly = true)
38 public class JpaJobRepository implements JobRepository {
39
40 private static final Logger log = LoggerFactory.getLogger(JpaJobRepository.class);
41
42 private static int pageSize = 25;
43
44 public static int getPageSize() {
45 return pageSize;
46 }
47
48 public static void setPageSize(int pageSize) {
49 JpaJobRepository.pageSize = pageSize;
50 }
51
52 static final Callback<byte[], UUID> bytesToUUIDConverter = new Callback<byte[], UUID>() {
53 public UUID call(byte[] element) {
54 return fromBytes(element);
55 }
56 };
57
58 @PersistenceContext(unitName = "CoreDB")
59 EntityManager em;
60
61 @Resource
62 JpaSourceFetcher sourceFetcher;
63
64
65
66
67 @Override
68 @Transactional(readOnly = false)
69 public UUID createJob() {
70 JpaJob job = new JpaJob(UUID.randomUUID(), null);
71 em.persist(job);
72 em.flush();
73 return job.getJobId();
74 }
75
76
77
78
79 @Override
80 @Transactional(readOnly = false)
81 public UUID createSubJob(UUID parentJobId) {
82 JpaJob parent = getJob(parentJobId);
83 JpaJob job = new JpaJob(UUID.randomUUID(), parent);
84 em.persist(job);
85 parent.getSubJobs().add(job);
86 return job.getJobId();
87 }
88
89
90
91
92 @Override
93 @Transactional(readOnly = false)
94 public void addJobSources(UUID jobId, List<Source> sources) {
95 addJobSources(getJob(jobId), sources);
96 }
97
98
99
100
101 @Override
102 @Transactional(readOnly = false)
103 public void addJobSources(Job job, List<Source> sources) {
104 JpaJob jpaJob = toJpaJob(job);
105 for (Source source : sources) {
106 JpaSource jpaSource = sourceFetcher.fetch(source.getSourceInformation().getIdentifier());
107 em.persist(new JpaJobSources(jpaJob, jpaSource));
108 }
109 }
110
111
112
113
114 @Override
115 @Transactional(readOnly = false)
116 public void updateJob(Job job) {
117 assertNotNull("job", job);
118
119 if (job instanceof JpaJob) {
120 job.setLastUpdated(new Date());
121 em.merge(job);
122 } else {
123 JpaJob j = getJob(job.getJobId());
124 if (j == null)
125 throw new IllegalArgumentException("Cannot update unexisting job " + job);
126
127 j.setLastUpdated(new Date());
128 j.setFinished(job.getFinished());
129 j.setMetadata(job.getMetadata());
130 j.setStagesPassed(job.getStagesPassed());
131 j.setState(job.getState());
132 }
133 }
134
135
136
137
138 @Override
139 @Transactional(readOnly = false)
140 public void finalizeJob(UUID jobId, Job.State finalJobState) throws IllegalArgumentException {
141 finalizeJob(getJob(jobId), finalJobState);
142 }
143
144
145
146
147 @Override
148 @Transactional(readOnly = false)
149 public void finalizeJob(Job job, Job.State finalJobState) throws IllegalArgumentException {
150 JpaJob jpaJob = toJpaJob(job);
151
152 if (jpaJob == null)
153 return;
154
155 if (jpaJob.getState() == Job.State.PREPARED) {
156 if (finalJobState != null) {
157 throw new IllegalArgumentException(
158 "finalJobState is not 'null' of a job that is still in prepared state.");
159 }
160
161 em.remove(jpaJob);
162 } else {
163 if (finalJobState == null || !Job.FINAL_STATES.contains(finalJobState))
164 throw new IllegalArgumentException("finalJobState not in " + Job.FINAL_STATES);
165
166 jpaJob.setState(finalJobState);
167 jpaJob.setLastUpdated(new Date());
168 jpaJob.setFinished(jpaJob.getLastUpdated());
169 }
170 }
171
172 private JpaJob toJpaJob(Job job) {
173 JpaJob jpaJob = null;
174 if (job != null) {
175 if (job instanceof JpaJob && em.contains(job))
176 jpaJob = (JpaJob) job;
177 else
178 jpaJob = getJob(job.getJobId());
179 }
180 return jpaJob;
181 }
182
183
184
185
186 @Override
187 public UUIDListPage getJobsByStateInRange(Job.State jobState,
188 Date updatedFromDate, Date updatedToDate, int pageNumber) {
189 final boolean rangeQuery = updatedFromDate != null || updatedToDate != null;
190 final TypedQuery<byte[]> q = em.createNamedQuery(rangeQuery ?
191 "Job.SelectJobIdsByStateInRange" : "Job.SelectJobIdsByState", byte[].class);
192
193 if (rangeQuery) {
194 q.setParameter("fromDate", updatedFromDate == null ? new Date(0) : updatedFromDate);
195 q.setParameter("toDate", updatedToDate == null ? new Date(currentTimeMillis() + 1000) : updatedToDate);
196 }
197
198 applyPage(q, pageNumber, pageSize).setParameter("state", jobState);
199 return toListPage(q, bytesToUUIDConverter, new UUIDListPage());
200 }
201
202
203
204
205 public Job.State getJobState(UUID jobId) {
206 List<Job.State> result = em.createNamedQuery("Job.SelectJobStateById", Job.State.class).
207 setParameter("publicGUID", toBytes(jobId)).getResultList();
208
209 if (result.isEmpty()) {
210 if (log.isTraceEnabled())
211 log.trace("Queried state for non-existing job '{}'", jobId);
212 return null;
213 }
214
215 return result.get(0);
216 }
217
218 JpaJob getJob(UUID jobId) {
219 return jobId == null ? null : getJob(em.createNamedQuery("Job.SelectById", JpaJob.class), jobId);
220 }
221
222 <E extends Job> E getJob(TypedQuery<E> query, UUID jobId) {
223 List<E> result = query.setParameter("publicGUID", toBytes(jobId)).getResultList();
224
225 if (result.isEmpty()) {
226 if (log.isTraceEnabled())
227 log.trace("Queried non-existing job '{}'", jobId);
228 return null;
229 }
230
231 return result.get(0);
232 }
233
234
235
236
237 public Collection<SharedJob> getJobs(Collection<UUID> jobIds) {
238 final Collection<SharedJob> result = new ArrayList<SharedJob>(jobIds.size());
239 final TypedQuery<SharedJob> query = em.createNamedQuery("Job.SelectById", SharedJob.class);
240
241 for (UUID id : jobIds)
242 result.add(getJob(query, id));
243
244 return result;
245 }
246
247
248
249
250 public SharedJobDetails getJobDetails(UUID jobId) {
251 JpaJob job = getJob(jobId);
252 if (job == null)
253 return null;
254
255 List<byte[]> guids = em.createNamedQuery("JobSources.SelectAssignedSourceIds", byte[].class).
256 setParameter("primaryKey", job.getPrimaryKey()).getResultList();
257 List<SharedSource> assignedSources = new ArrayList<SharedSource>(guids.size());
258 for (byte[] guid : guids)
259 assignedSources.add(sourceFetcher.fetch(new SourceIdentifier(guid)));
260
261 return new SharedJobDetails(job, cast(job.getSubJobs(), SharedJob.class), assignedSources);
262 }
263 }