1   package com.trendmicro.grid.acl.ds.msmq;
2   
3   import com.trendmicro.grid.acl.ds.ProcessingInitiator;
4   import com.trendmicro.grid.acl.l0.datatypes.ProcessPackageDataSet;
5   import net.sf.tinyjee.config.Connection;
6   import org.slf4j.Logger;
7   import org.slf4j.LoggerFactory;
8   import org.springframework.stereotype.Service;
9   
10  import javax.annotation.PostConstruct;
11  import javax.xml.bind.JAXBException;
12  import javax.xml.bind.Marshaller;
13  import java.io.IOException;
14  import java.io.StringWriter;
15  import java.io.Writer;
16  import java.net.URI;
17  import java.util.ArrayList;
18  import java.util.Collection;
19  import java.util.List;
20  
21  import static net.sf.tinyjee.util.Assert.assertNotNull;
22  
23  /**
24   * Implements {@link ProcessingInitiator} as a MSMQ bound service.
25   *
26   * @author juergen_kellerer, werner_watzka, 2010-06-02
27   * @version 1.0
28   */
29  @Service
30  public class MSMQProcessingInitiator implements ProcessingInitiator {
31  
32  	private static final Logger log = LoggerFactory.getLogger(MSMQProcessingInitiator.class);
33  
34  	private static final String SENDER_IMPLEMENTATION = System.getProperty("gacl.msmq.sender",
35  			"com.trendmicro.grid.acl.ds.msmq.PlainRoundRobinSender");
36  
37  	List<URI> serviceEndpoints;
38  	String targetQueueName;
39  	Sender sender;
40  
41  	@PostConstruct
42  	void initializeInitiator() throws Exception {
43  		Collection<Connection> connections = MSMQPropertySection.getMSMQConnections();
44  
45  		// Abort if config is incorrect.
46  		int connectionCount = connections.size();
47  		if (connectionCount != 1) {
48  			if (connectionCount == 0)
49  				log.warn("TMACL-00790:No MSQM connection is configured. MSMQ sending will be disabled.");
50  			else
51  				log.warn("TMACL-00760:More than one MSMQ connection is configured. MSMQ sending will be disabled.");
52  			return;
53  		}
54  
55  		// Define fields
56  		serviceEndpoints = new ArrayList<URI>();
57  		for (Connection c : connections)
58  			serviceEndpoints.add(c.getURI());
59  
60  		if (!serviceEndpoints.isEmpty()) {
61  			targetQueueName = MSMQPropertySection.getTargetQueue(MSMQPropertySection.QUEUE_PROCESS_REQUEST);
62  			sender = (Sender) Class.forName(SENDER_IMPLEMENTATION).newInstance();
63  			sender.reset(serviceEndpoints, targetQueueName);
64  		}
65  	}
66  
67  	/**
68  	 * {@inheritDoc}
69  	 * <p/>
70  	 * Note: This method is called by the web-service implementation
71  	 * after a job was assembled and produced this initial package.
72  	 * <p/>
73  	 * The implementation of this method sends the given payload against
74  	 * the MSMQ queue "al_out".
75  	 */
76  	public void initiate(ProcessPackageDataSet dataSet) {
77  		assertNotNull("ProcessPackageDataSet", dataSet);
78  
79  		if (serviceEndpoints.isEmpty()) {
80  			log.error("TMACL-00800:Not initiating the processing for job " +
81  					"'{}' as the MSMQ sending is disabled.", dataSet.getReferringJob());
82  			return;
83  		}
84  
85  		String messageBody = toXmlString(dataSet);
86  		try {
87  			sender.send(dataSet.getProcessPriority(), messageBody);
88  		} catch (IOException e) {
89  			// TODO: Note: In theory we'd need to store the messages in between in this case and
90  			// TODO: send later when the MSMQ server is available again.
91  			log.error("TMACL-00810:Failed initiating the processing for job '" +
92  					dataSet.getReferringJob() + "' as sending to MSMQ failed.", e);
93  			throw new RuntimeException(e);
94  		}
95  	}
96  
97  	String toXmlString(ProcessPackageDataSet dataSet) {
98  		try {
99  			Writer w = new StringWriter();
100 			Marshaller m = ProcessPackageDataSet.getXmlSerializer().getJaxbContext().createMarshaller();
101 			m.marshal(dataSet, w);
102 			return w.toString();
103 		} catch (JAXBException e) {
104 			throw new RuntimeException(e);
105 		}
106 	}
107 }