1   package com.trendmicro.grid.acl.ds.bclog;
2   
3   import com.trendmicro.grid.acl.l0.datatypes.ProcessPackageDataSet;
4   import org.slf4j.Logger;
5   import org.slf4j.LoggerFactory;
6   import org.springframework.stereotype.Repository;
7   
8   import javax.annotation.Resource;
9   import java.io.IOException;
10  import java.util.ArrayList;
11  import java.util.List;
12  import java.util.concurrent.ArrayBlockingQueue;
13  
14  /**
15   * Queues datasets and writes them inside a separate writer thread that runs every second.
16   *
17   * @author juergen_kellerer, 2010-05-28
18   * @version 1.0
19   */
20  @Repository
21  public final class BinaryChangeLogQueue extends ArrayBlockingQueue<ProcessPackageDataSet> {
22  
23  	private static final Logger log = LoggerFactory.getLogger(BinaryChangeLogQueue.class);
24  	private static final long serialVersionUID = 4904787490898023306L;
25  
26  	@Resource
27  	BinaryChangeLogWriterService writerService;
28  
29  	private transient final Runnable writer = new Runnable() {
30  		public void run() {
31  			ProcessPackageDataSet first = null;
32  
33  			while ((first = poll()) != null) {
34  				// Copy elements in batches to reduce the locking overhead of constant polling.
35  				List<ProcessPackageDataSet> elements = new ArrayList<ProcessPackageDataSet>(25);
36  				elements.add(first);
37  				drainTo(elements, 24);
38  
39  				for (ProcessPackageDataSet element : elements)
40  					try {
41  						writerService.writeNext(element);
42  					} catch (IOException e) {
43  						log.error("TMACL-00650:Failed writing results of job '" +
44  							element.getReferringJob() + "' to the binary changelog. " +
45  							"Operations that depend on this data (e.g. multisite) will be outdated.", e);
46  					}
47  
48  			}
49  
50  			synchronized (checkpoint) {
51  				checkpoint.notifyAll();
52  			}
53  		}
54  	};
55  
56  	final Object checkpoint = new Object();
57  	final transient Thread executor;
58  
59  	public BinaryChangeLogQueue() {
60  		super(250, false);
61  
62  		executor = new Thread() {
63  			public void run() {
64  				while (!isInterrupted()) {
65  					try {
66  						synchronized (this) {
67  							wait(2000);
68  						}
69  
70  						writer.run();
71  					} catch (InterruptedException e) {
72  						return;
73  					} catch (Throwable e) {
74  						log.error("TMACL-00700:Cought an unexpected exception on " +
75  							"the attempt to write binary logs.", e);
76  					}
77  				}
78  			}
79  		};
80  		executor.setName("Binary ChangeLog Writer");
81  		executor.start();
82  	}
83  
84  	/**
85  	 * Ensures that all queued elements are written.
86  	 *
87  	 * @throws java.io.IOException  if the disk io failed.
88  	 * @throws InterruptedException in cause of waiting for the queue to become empty got interrupted.
89  	 */
90  	public synchronized void checkpoint() throws InterruptedException, IOException {
91  		// Notify the worker.
92  		synchronized (executor) {
93  			executor.notify();
94  		}
95  
96  		// Wait for the signal
97  		synchronized (checkpoint) {
98  			checkpoint.wait();
99  		}
100 
101 		writerService.fsync();
102 	}
103 }