1   import com.trendmicro.grid.acl.client.ServiceClient;
2   import com.trendmicro.grid.acl.client.util.CommandlineClientFactory;
3   import com.trendmicro.grid.acl.client.util.CommandlineParser;
4   import com.trendmicro.grid.acl.client.util.ProgressIndicator;
5   import com.trendmicro.grid.acl.l0.ProcessingService;
6   import com.trendmicro.grid.acl.l0.SourceService;
7   import com.trendmicro.grid.acl.l0.datatypes.FileIdentifier;
8   import com.trendmicro.grid.acl.l0.datatypes.Job;
9   import com.trendmicro.grid.acl.l0.datatypes.SourceIdentifier;
10  import com.trendmicro.grid.acl.l0.datatypes.SourceInformation;
11  import com.trendmicro.grid.acl.metadata.Metadata;
12  
13  import java.io.*;
14  import java.net.HttpURLConnection;
15  import java.net.URI;
16  import java.net.URL;
17  import java.security.DigestInputStream;
18  import java.util.*;
19  import java.util.concurrent.ExecutorService;
20  import java.util.concurrent.Executors;
21  
22  import static java.lang.System.out;
23  import static java.security.MessageDigest.getInstance;
24  
25  /**
26   * Is a simple commandline client that can send directories recursively against the GRID.
27   *
28   * @author juergen_kellerer, 2010-06-21
29   * @version 1.0
30   */
31  public class DirectorySender {
32  
33  	static final ExecutorService EXECUTOR = Executors.newSingleThreadExecutor();
34  
35  	static final String[] HELP = {
36  			"GRID Directory Sender - Version 1.0",
37  			"",
38  			"Sends directories into the GRID using the Source and Processing",
39  			"services offered by the GRID Access Layer (GACL).",
40  			"",
41  	};
42  
43  	static final CommandlineParser CLI = new CommandlineParser().
44  			defineParameter("Specifies the source directory containing " +
45  					"the files to send.", true, File.class, null, "-s", "--dir").
46  			defineSwitchParameter("Enables recursive directory scanning", "-R", "-r").
47  			defineSwitchParameter("Force sending. (Disables de-duplication).", "-f", "--force").
48  
49  			defineParameter("Defines an URI prefix to use instead of using 'file:/' uris.",
50  					false, String.class, null, "--uri-prefix").
51  
52  			defineParameter("Specifies the extension to use for looking after metadata properties files. " +
53  					"Metadata properties files contain key & value pairs that are used as source metadata " +
54  					"when sending files.\n" +
55  					"The default search algorithm appends the specified extension to the filename of the " +
56  					"source file and searches for a corresponding properties file in the same path.\n" +
57  					"In case of the extension ends with '.xml', the detault XML based metadata format is " +
58  					"assumed instead of java properties file styled metadata.",
59  					false, String.class, ".properties", "-m", "--metadata-ext").
60  
61  			defineSwitchParameter("Wait on jobs to get finished.\n" +
62  					"When enabled waits unitl all jobs are labelled " +
63  					"finished before returning.", "--wait").
64  
65  			defineSwitchParameter("Be verbose.", "-v", "--verbose");
66  
67  	static final CommandlineClientFactory CLIENT_FACTORY = new CommandlineClientFactory(CLI);
68  
69  
70  	public static void main(String[] args) throws Exception {
71  		boolean canContinue = CLI.parse(args, out, HELP);
72  		if (!canContinue) return;
73  
74  		final File sourceDirectory = CLI.getParameter("-s", File.class);
75  		if (!sourceDirectory.isDirectory()) {
76  			throw new IllegalArgumentException("The specified source directory " + sourceDirectory + " doesn't seem to be a directory.");
77  		}
78  
79  		final ServiceClient client = CLIENT_FACTORY.newServiceClient(CLI);
80  		client.getContext().getSessionHandler().registerAsSystemDefault();
81  
82  		Map<UUID, File> pendingJobs = CLI.isParameterTrue("--wait") ? new HashMap<UUID, File>() : null;
83  		final long time = System.currentTimeMillis();
84  		try {
85  			sendDirectory(sourceDirectory, client, pendingJobs);
86  			waitOnPendingJobs(client, pendingJobs);
87  		} finally {
88  			out.println();
89  			out.printf("Time: %.3f sec %n", ((double) (System.currentTimeMillis() - time)) / 1000D);
90  
91  			EXECUTOR.shutdown();
92  		}
93  	}
94  
95  	static void sendDirectory(final File sourceDirectory, ServiceClient client, Map<UUID, File> pendingJobs) {
96  		final boolean force = CLI.isParameterTrue("-f");
97  		final boolean recursive = CLI.isParameterTrue("-R");
98  		final String metadataExtension = CLI.getParameter("-m", String.class);
99  		final String uriPrefix = CLI.getParameter("--uri-prefix", String.class);
100 
101 		final File[] children = sourceDirectory.listFiles(new FilenameFilter() {
102 			public boolean accept(File dir, String name) {
103 				return !name.endsWith(metadataExtension);
104 			}
105 		});
106 
107 		if (children == null) return;
108 
109 		for (File child : children) {
110 			if (child.isDirectory()) {
111 				if (recursive)
112 					sendDirectory(child, client, pendingJobs);
113 				else
114 					out.println("Ignoring directory: " + child + " (recursion is disabled)");
115 			} else {
116 				Metadata metadata = loadMetadata(new File(child + metadataExtension));
117 				try {
118 					UUID jobId = sendFile(child, force, uriPrefix, metadata, client);
119 					if (jobId != null && pendingJobs != null)
120 						pendingJobs.put(jobId, child);
121 				} catch (Exception e) {
122 					out.println();
123 					System.err.println("Failed sending " + child);
124 					e.printStackTrace();
125 				}
126 			}
127 		}
128 	}
129 
130 	static UUID sendFile(File source, boolean force, String uriPrefix,
131 	                     Metadata metadata, ServiceClient client) throws Exception {
132 		final ProgressIndicator indicator = new ProgressIndicator("Sending " + source);
133 		try {
134 			EXECUTOR.execute(indicator);
135 
136 			URI sourceURI = source.toURI();
137 			if (uriPrefix != null) sourceURI = URI.create(uriPrefix + sourceURI.getPath());
138 
139 			final SourceService sourceService = client.getPort(SourceService.class);
140 			final ProcessingService service = client.getPort(ProcessingService.class);
141 
142 			if (!force) {
143 				SourceInformation info = sourceService.getSourceInformationForURL(sourceURI);
144 				Date lm = info == null ? null : info.getLastModified();
145 				if (lm != null && lm.getTime() / 1000 == source.lastModified() / 1000) {
146 					indicator.append(" - OK (not modified).");
147 					return null;
148 				} else if (info != null)
149 					indicator.append(" - modified");
150 			}
151 
152 			final UUID uuid = service.prepareJob();
153 			try {
154 				final SourceIdentifier sourceId = service.assignProcessSource(
155 						uuid, sourceURI, new Date(source.lastModified()), "", metadata);
156 				try {
157 					service.assignExistingContentToProcessSource(sourceId, generateFileId(source, indicator));
158 					indicator.append(" - OK (updated).");
159 				} catch (Exception ignored) {
160 					URL putURL = service.assignContentToProcessSource(sourceId);
161 					putData(source, putURL, indicator);
162 					indicator.append(" - OK.");
163 				}
164 
165 				if (CLI.isParameterTrue("-v")) {
166 					indicator.println("Starting job '" + uuid + "' with source '" + sourceURI + "' at '" + new Date() + '\'');
167 				}
168 			} finally {
169 				service.startJob(uuid);
170 			}
171 
172 			return uuid;
173 		} finally {
174 			indicator.close();
175 		}
176 	}
177 
178 	static void putData(File file, URL putURL, ProgressIndicator indicator) throws Exception {
179 		indicator.setMessage("transferring");
180 
181 		HttpURLConnection huc = (HttpURLConnection) putURL.openConnection();
182 		huc.setRequestMethod("PUT");
183 		huc.setDoOutput(true);
184 		huc.setChunkedStreamingMode(8 * 1024);
185 
186 		copy(file, huc.getOutputStream(), indicator);
187 
188 		if (huc.getResponseCode() != 200) throw new IOException("Server replied with " + huc.getResponseCode() + ' ' + huc.getResponseMessage());
189 	}
190 
191 	static FileIdentifier generateFileId(File file, ProgressIndicator indicator) throws Exception {
192 		indicator.setMessage("hashing");
193 
194 		final DigestInputStream sha1Stream = new DigestInputStream(new BufferedInputStream(new FileInputStream(file)), getInstance("SHA1"));
195 		final DigestInputStream md5Stream = new DigestInputStream(sha1Stream, getInstance("MD5"));
196 
197 		copy(md5Stream, null, file.length(), indicator);
198 
199 		return new FileIdentifier(sha1Stream.getMessageDigest().digest(), md5Stream.getMessageDigest().digest());
200 	}
201 
202 	static void copy(File file, OutputStream out, ProgressIndicator indicator) throws IOException {
203 		copy(new BufferedInputStream(new FileInputStream(file)), out, file.length(), indicator);
204 	}
205 
206 	static void copy(InputStream in, OutputStream out, long totalLength, ProgressIndicator indicator) throws IOException {
207 		try {
208 			int r;
209 			long bytesCount = 0;
210 			byte[] buffer = new byte[1024 * 8];
211 			while ((r = in.read(buffer)) != -1) {
212 				if (out != null)
213 					out.write(buffer, 0, r);
214 				bytesCount += r;
215 				indicator.setProgress(bytesCount, totalLength);
216 			}
217 		} finally {
218 			in.close();
219 			if (out != null) out.close();
220 		}
221 	}
222 
223 	static Metadata loadMetadata(File metadataFile) {
224 		Metadata metadata = new Metadata();
225 		if (metadataFile.isFile()) {
226 			try {
227 				if (metadataFile.getName().endsWith(".xml")) {
228 					metadata = Metadata.getXmlSerializer().load(metadataFile);
229 				} else {
230 					Properties p = new Properties();
231 					InputStream in = new BufferedInputStream(new FileInputStream(metadataFile));
232 					try {
233 						p.load(in);
234 					} finally {
235 						in.close();
236 					}
237 
238 					for (Map.Entry<Object, Object> entry : p.entrySet())
239 						metadata.add((String) entry.getKey()).value((String) entry.getValue());
240 				}
241 			} catch (Exception ignored) {
242 				System.err.println("Failed loading metadata from '" + metadataFile + '\'');
243 			}
244 		}
245 
246 		return metadata;
247 	}
248 
249 	static void waitOnPendingJobs(ServiceClient client, Map<UUID, File> pendingJobs) throws Exception {
250 
251 		if (pendingJobs == null) return;
252 
253 		int totalAmount = pendingJobs.size(), closed = 0;
254 		final int chunkSize = Integer.getInteger("max.chunksize", 10);
255 		final ProgressIndicator indicator = new ProgressIndicator(">");
256 		EXECUTOR.execute(indicator);
257 		final ProcessingService processingService = client.getPort(ProcessingService.class);
258 
259 		try {
260 			while (!pendingJobs.isEmpty()) {
261 				indicator.setMessage("Waiting on " + (totalAmount - closed) + " Jobs to finish.");
262 
263 				final List<UUID> jobIdChunk = new ArrayList<UUID>();
264 				final Map<UUID, Long> removed = new HashMap<UUID, Long>();
265 				for (Map.Entry<UUID, File> entry : pendingJobs.entrySet()) {
266 					if (jobIdChunk.size() < chunkSize)
267 						jobIdChunk.add(entry.getKey());
268 					else {
269 						queryCompleteStateOnPendingJobs(processingService, jobIdChunk, removed);
270 						jobIdChunk.clear();
271 					}
272 					indicator.setProgress(closed + removed.size(), totalAmount);
273 				}
274 
275 				queryCompleteStateOnPendingJobs(processingService, jobIdChunk, removed);
276 
277 				closed += removed.size();
278 				for (Map.Entry<UUID, Long> entry : removed.entrySet()) {
279 					File file = pendingJobs.remove(entry.getKey());
280 					if (entry.getValue() == -1)
281 						indicator.println("ERROR: Job " + entry.getKey() + " doesn't appear to be valid.");
282 					else
283 						indicator.println(String.format("Finished: %s: %s in %.3f sec %n", entry.getKey(), file, entry.getValue() / 1000D));
284 				}
285 
286 				indicator.setProgress(closed, totalAmount);
287 
288 				if (!pendingJobs.isEmpty()) {
289 					Thread.sleep(10 * 1000);
290 					indicator.println((totalAmount - closed) + " Jobs still pending, continue waiting (abort with CTRL-C).");
291 				}
292 			}
293 		} finally {
294 			for (Map.Entry<UUID, File> entry : pendingJobs.entrySet())
295 				indicator.println("Unfinished: " + entry.getKey() + ": " + entry.getValue());
296 
297 			indicator.close();
298 		}
299 	}
300 
301 	static void queryCompleteStateOnPendingJobs(ProcessingService processingService,
302 	                                            List<UUID> jobIdChunk, Map<UUID, Long> removed) throws Exception {
303 		if (jobIdChunk.isEmpty()) return;
304 
305 		final Iterator<Job> jobs = processingService.getJobs(jobIdChunk).iterator();
306 		for (UUID uuid : jobIdChunk) {
307 			final Job job = jobs.hasNext() ? jobs.next() : null;
308 
309 			if (job == null) {
310 				removed.put(uuid, -1L);
311 			} else {
312 				switch (job.getState()) {
313 					case FINISHED:
314 						Date fd = job.getFinished() == null ? new Date() : job.getFinished();
315 						removed.put(uuid, fd.getTime() - job.getCreated().getTime());
316 						break;
317 					case ABORTED:
318 					case FAILED:
319 						removed.put(uuid, -1L);
320 						break;
321 				}
322 			}
323 		}
324 	}
325 }