1 package com.trendmicro.grid.acl.ds.bclog;
2
3 import com.trendmicro.grid.acl.l0.datatypes.ProcessPackageDataSet;
4 import org.slf4j.Logger;
5 import org.slf4j.LoggerFactory;
6 import org.springframework.stereotype.Repository;
7
8 import javax.annotation.Resource;
9 import java.io.IOException;
10 import java.util.ArrayList;
11 import java.util.List;
12 import java.util.concurrent.ArrayBlockingQueue;
13
14
15
16
17
18
19
20 @Repository
21 public final class BinaryChangeLogQueue extends ArrayBlockingQueue<ProcessPackageDataSet> {
22
23 private static final Logger log = LoggerFactory.getLogger(BinaryChangeLogQueue.class);
24 private static final long serialVersionUID = 4904787490898023306L;
25
26 @Resource
27 BinaryChangeLogWriterService writerService;
28
29 private transient final Runnable writer = new Runnable() {
30 public void run() {
31 ProcessPackageDataSet first = null;
32
33 while ((first = poll()) != null) {
34
35 List<ProcessPackageDataSet> elements = new ArrayList<ProcessPackageDataSet>(25);
36 elements.add(first);
37 drainTo(elements, 24);
38
39 for (ProcessPackageDataSet element : elements)
40 try {
41 writerService.writeNext(element);
42 } catch (IOException e) {
43 log.error("TMACL-00650:Failed writing results of job '" +
44 element.getReferringJob() + "' to the binary changelog. " +
45 "Operations that depend on this data (e.g. multisite) will be outdated.", e);
46 }
47
48 }
49
50 synchronized (checkpoint) {
51 checkpoint.notifyAll();
52 }
53 }
54 };
55
56 final Object checkpoint = new Object();
57 final transient Thread executor;
58
59 public BinaryChangeLogQueue() {
60 super(250, false);
61
62 executor = new Thread() {
63 public void run() {
64 while (!isInterrupted()) {
65 try {
66 synchronized (this) {
67 wait(2000);
68 }
69
70 writer.run();
71 } catch (InterruptedException e) {
72 return;
73 } catch (Throwable e) {
74 log.error("TMACL-00700:Cought an unexpected exception on " +
75 "the attempt to write binary logs.", e);
76 }
77 }
78 }
79 };
80 executor.setName("Binary ChangeLog Writer");
81 executor.start();
82 }
83
84
85
86
87
88
89
90 public synchronized void checkpoint() throws InterruptedException, IOException {
91
92 synchronized (executor) {
93 executor.notify();
94 }
95
96
97 synchronized (checkpoint) {
98 checkpoint.wait();
99 }
100
101 writerService.fsync();
102 }
103 }