1 import com.trendmicro.grid.acl.client.ServiceClient;
2 import com.trendmicro.grid.acl.client.util.CommandlineClientFactory;
3 import com.trendmicro.grid.acl.client.util.CommandlineParser;
4 import com.trendmicro.grid.acl.client.util.ProgressIndicator;
5 import com.trendmicro.grid.acl.l0.ProcessingService;
6 import com.trendmicro.grid.acl.l0.SourceService;
7 import com.trendmicro.grid.acl.l0.datatypes.FileIdentifier;
8 import com.trendmicro.grid.acl.l0.datatypes.Job;
9 import com.trendmicro.grid.acl.l0.datatypes.SourceIdentifier;
10 import com.trendmicro.grid.acl.l0.datatypes.SourceInformation;
11 import com.trendmicro.grid.acl.metadata.Metadata;
12
13 import java.io.*;
14 import java.net.HttpURLConnection;
15 import java.net.URI;
16 import java.net.URL;
17 import java.security.DigestInputStream;
18 import java.util.*;
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.Executors;
21
22 import static java.lang.System.out;
23 import static java.security.MessageDigest.getInstance;
24
25
26
27
28
29
30
31 public class DirectorySender {
32
33 static final ExecutorService EXECUTOR = Executors.newSingleThreadExecutor();
34
35 static final String[] HELP = {
36 "GRID Directory Sender - Version 1.0",
37 "",
38 "Sends directories into the GRID using the Source and Processing",
39 "services offered by the GRID Access Layer (GACL).",
40 "",
41 };
42
43 static final CommandlineParser CLI = new CommandlineParser().
44 defineParameter("Specifies the source directory containing " +
45 "the files to send.", true, File.class, null, "-s", "--dir").
46 defineSwitchParameter("Enables recursive directory scanning", "-R", "-r").
47 defineSwitchParameter("Force sending. (Disables de-duplication).", "-f", "--force").
48
49 defineParameter("Defines an URI prefix to use instead of using 'file:/' uris.",
50 false, String.class, null, "--uri-prefix").
51
52 defineParameter("Specifies the extension to use for looking after metadata properties files. " +
53 "Metadata properties files contain key & value pairs that are used as source metadata " +
54 "when sending files.\n" +
55 "The default search algorithm appends the specified extension to the filename of the " +
56 "source file and searches for a corresponding properties file in the same path.\n" +
57 "In case of the extension ends with '.xml', the detault XML based metadata format is " +
58 "assumed instead of java properties file styled metadata.",
59 false, String.class, ".properties", "-m", "--metadata-ext").
60
61 defineSwitchParameter("Wait on jobs to get finished.\n" +
62 "When enabled waits unitl all jobs are labelled " +
63 "finished before returning.", "--wait").
64
65 defineSwitchParameter("Be verbose.", "-v", "--verbose");
66
67 static final CommandlineClientFactory CLIENT_FACTORY = new CommandlineClientFactory(CLI);
68
69
70 public static void main(String[] args) throws Exception {
71 boolean canContinue = CLI.parse(args, out, HELP);
72 if (!canContinue) return;
73
74 final File sourceDirectory = CLI.getParameter("-s", File.class);
75 if (!sourceDirectory.isDirectory()) {
76 throw new IllegalArgumentException("The specified source directory " + sourceDirectory + " doesn't seem to be a directory.");
77 }
78
79 final ServiceClient client = CLIENT_FACTORY.newServiceClient(CLI);
80 client.getContext().getSessionHandler().registerAsSystemDefault();
81
82 Map<UUID, File> pendingJobs = CLI.isParameterTrue("--wait") ? new HashMap<UUID, File>() : null;
83 final long time = System.currentTimeMillis();
84 try {
85 sendDirectory(sourceDirectory, client, pendingJobs);
86 waitOnPendingJobs(client, pendingJobs);
87 } finally {
88 out.println();
89 out.printf("Time: %.3f sec %n", ((double) (System.currentTimeMillis() - time)) / 1000D);
90
91 EXECUTOR.shutdown();
92 }
93 }
94
95 static void sendDirectory(final File sourceDirectory, ServiceClient client, Map<UUID, File> pendingJobs) {
96 final boolean force = CLI.isParameterTrue("-f");
97 final boolean recursive = CLI.isParameterTrue("-R");
98 final String metadataExtension = CLI.getParameter("-m", String.class);
99 final String uriPrefix = CLI.getParameter("--uri-prefix", String.class);
100
101 final File[] children = sourceDirectory.listFiles(new FilenameFilter() {
102 public boolean accept(File dir, String name) {
103 return !name.endsWith(metadataExtension);
104 }
105 });
106
107 if (children == null) return;
108
109 for (File child : children) {
110 if (child.isDirectory()) {
111 if (recursive)
112 sendDirectory(child, client, pendingJobs);
113 else
114 out.println("Ignoring directory: " + child + " (recursion is disabled)");
115 } else {
116 Metadata metadata = loadMetadata(new File(child + metadataExtension));
117 try {
118 UUID jobId = sendFile(child, force, uriPrefix, metadata, client);
119 if (jobId != null && pendingJobs != null)
120 pendingJobs.put(jobId, child);
121 } catch (Exception e) {
122 out.println();
123 System.err.println("Failed sending " + child);
124 e.printStackTrace();
125 }
126 }
127 }
128 }
129
130 static UUID sendFile(File source, boolean force, String uriPrefix,
131 Metadata metadata, ServiceClient client) throws Exception {
132 final ProgressIndicator indicator = new ProgressIndicator("Sending " + source);
133 try {
134 EXECUTOR.execute(indicator);
135
136 URI sourceURI = source.toURI();
137 if (uriPrefix != null) sourceURI = URI.create(uriPrefix + sourceURI.getPath());
138
139 final SourceService sourceService = client.getPort(SourceService.class);
140 final ProcessingService service = client.getPort(ProcessingService.class);
141
142 if (!force) {
143 SourceInformation info = sourceService.getSourceInformationForURL(sourceURI);
144 Date lm = info == null ? null : info.getLastModified();
145 if (lm != null && lm.getTime() / 1000 == source.lastModified() / 1000) {
146 indicator.append(" - OK (not modified).");
147 return null;
148 } else if (info != null)
149 indicator.append(" - modified");
150 }
151
152 final UUID uuid = service.prepareJob();
153 try {
154 final SourceIdentifier sourceId = service.assignProcessSource(
155 uuid, sourceURI, new Date(source.lastModified()), "", metadata);
156 try {
157 service.assignExistingContentToProcessSource(sourceId, generateFileId(source, indicator));
158 indicator.append(" - OK (updated).");
159 } catch (Exception ignored) {
160 URL putURL = service.assignContentToProcessSource(sourceId);
161 putData(source, putURL, indicator);
162 indicator.append(" - OK.");
163 }
164
165 if (CLI.isParameterTrue("-v")) {
166 indicator.println("Starting job '" + uuid + "' with source '" + sourceURI + "' at '" + new Date() + '\'');
167 }
168 } finally {
169 service.startJob(uuid);
170 }
171
172 return uuid;
173 } finally {
174 indicator.close();
175 }
176 }
177
178 static void putData(File file, URL putURL, ProgressIndicator indicator) throws Exception {
179 indicator.setMessage("transferring");
180
181 HttpURLConnection huc = (HttpURLConnection) putURL.openConnection();
182 huc.setRequestMethod("PUT");
183 huc.setDoOutput(true);
184 huc.setChunkedStreamingMode(8 * 1024);
185
186 copy(file, huc.getOutputStream(), indicator);
187
188 if (huc.getResponseCode() != 200) throw new IOException("Server replied with " + huc.getResponseCode() + ' ' + huc.getResponseMessage());
189 }
190
191 static FileIdentifier generateFileId(File file, ProgressIndicator indicator) throws Exception {
192 indicator.setMessage("hashing");
193
194 final DigestInputStream sha1Stream = new DigestInputStream(new BufferedInputStream(new FileInputStream(file)), getInstance("SHA1"));
195 final DigestInputStream md5Stream = new DigestInputStream(sha1Stream, getInstance("MD5"));
196
197 copy(md5Stream, null, file.length(), indicator);
198
199 return new FileIdentifier(sha1Stream.getMessageDigest().digest(), md5Stream.getMessageDigest().digest());
200 }
201
202 static void copy(File file, OutputStream out, ProgressIndicator indicator) throws IOException {
203 copy(new BufferedInputStream(new FileInputStream(file)), out, file.length(), indicator);
204 }
205
206 static void copy(InputStream in, OutputStream out, long totalLength, ProgressIndicator indicator) throws IOException {
207 try {
208 int r;
209 long bytesCount = 0;
210 byte[] buffer = new byte[1024 * 8];
211 while ((r = in.read(buffer)) != -1) {
212 if (out != null)
213 out.write(buffer, 0, r);
214 bytesCount += r;
215 indicator.setProgress(bytesCount, totalLength);
216 }
217 } finally {
218 in.close();
219 if (out != null) out.close();
220 }
221 }
222
223 static Metadata loadMetadata(File metadataFile) {
224 Metadata metadata = new Metadata();
225 if (metadataFile.isFile()) {
226 try {
227 if (metadataFile.getName().endsWith(".xml")) {
228 metadata = Metadata.getXmlSerializer().load(metadataFile);
229 } else {
230 Properties p = new Properties();
231 InputStream in = new BufferedInputStream(new FileInputStream(metadataFile));
232 try {
233 p.load(in);
234 } finally {
235 in.close();
236 }
237
238 for (Map.Entry<Object, Object> entry : p.entrySet())
239 metadata.add((String) entry.getKey()).value((String) entry.getValue());
240 }
241 } catch (Exception ignored) {
242 System.err.println("Failed loading metadata from '" + metadataFile + '\'');
243 }
244 }
245
246 return metadata;
247 }
248
249 static void waitOnPendingJobs(ServiceClient client, Map<UUID, File> pendingJobs) throws Exception {
250
251 if (pendingJobs == null) return;
252
253 int totalAmount = pendingJobs.size(), closed = 0;
254 final int chunkSize = Integer.getInteger("max.chunksize", 10);
255 final ProgressIndicator indicator = new ProgressIndicator(">");
256 EXECUTOR.execute(indicator);
257 final ProcessingService processingService = client.getPort(ProcessingService.class);
258
259 try {
260 while (!pendingJobs.isEmpty()) {
261 indicator.setMessage("Waiting on " + (totalAmount - closed) + " Jobs to finish.");
262
263 final List<UUID> jobIdChunk = new ArrayList<UUID>();
264 final Map<UUID, Long> removed = new HashMap<UUID, Long>();
265 for (Map.Entry<UUID, File> entry : pendingJobs.entrySet()) {
266 if (jobIdChunk.size() < chunkSize)
267 jobIdChunk.add(entry.getKey());
268 else {
269 queryCompleteStateOnPendingJobs(processingService, jobIdChunk, removed);
270 jobIdChunk.clear();
271 }
272 indicator.setProgress(closed + removed.size(), totalAmount);
273 }
274
275 queryCompleteStateOnPendingJobs(processingService, jobIdChunk, removed);
276
277 closed += removed.size();
278 for (Map.Entry<UUID, Long> entry : removed.entrySet()) {
279 File file = pendingJobs.remove(entry.getKey());
280 if (entry.getValue() == -1)
281 indicator.println("ERROR: Job " + entry.getKey() + " doesn't appear to be valid.");
282 else
283 indicator.println(String.format("Finished: %s: %s in %.3f sec %n", entry.getKey(), file, entry.getValue() / 1000D));
284 }
285
286 indicator.setProgress(closed, totalAmount);
287
288 if (!pendingJobs.isEmpty()) {
289 Thread.sleep(10 * 1000);
290 indicator.println((totalAmount - closed) + " Jobs still pending, continue waiting (abort with CTRL-C).");
291 }
292 }
293 } finally {
294 for (Map.Entry<UUID, File> entry : pendingJobs.entrySet())
295 indicator.println("Unfinished: " + entry.getKey() + ": " + entry.getValue());
296
297 indicator.close();
298 }
299 }
300
301 static void queryCompleteStateOnPendingJobs(ProcessingService processingService,
302 List<UUID> jobIdChunk, Map<UUID, Long> removed) throws Exception {
303 if (jobIdChunk.isEmpty()) return;
304
305 final Iterator<Job> jobs = processingService.getJobs(jobIdChunk).iterator();
306 for (UUID uuid : jobIdChunk) {
307 final Job job = jobs.hasNext() ? jobs.next() : null;
308
309 if (job == null) {
310 removed.put(uuid, -1L);
311 } else {
312 switch (job.getState()) {
313 case FINISHED:
314 Date fd = job.getFinished() == null ? new Date() : job.getFinished();
315 removed.put(uuid, fd.getTime() - job.getCreated().getTime());
316 break;
317 case ABORTED:
318 case FAILED:
319 removed.put(uuid, -1L);
320 break;
321 }
322 }
323 }
324 }
325 }