Browse Source

initial MOM example using ActiveMQ

master
Sebastian Rieger 7 years ago
parent
commit
6193c07c82
  1. 41
      VerteilteSysteme-Examples/.classpath
  2. 1
      VerteilteSysteme-Examples/.gitignore
  3. BIN
      VerteilteSysteme-Examples/lib/activemq-all-5.15.2.jar
  4. 142
      VerteilteSysteme-Examples/src/verteiltesysteme/mom/ActiveMQHelloWorld.java
  5. 70
      VerteilteSysteme-Examples/src/verteiltesysteme/mom/Consumer.java
  6. 88
      VerteilteSysteme-Examples/src/verteiltesysteme/mom/Producer.java

41
VerteilteSysteme-Examples/.classpath

@ -1,20 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8"/>
<classpathentry kind="src" path="src"/>
<classpathentry kind="lib" path="lib/grizzly-http-all-2.4.2.jar"/>
<classpathentry kind="lib" path="lib/hk2-api-2.5.0-b42.jar"/>
<classpathentry kind="lib" path="lib/hk2-locator-2.5.0-b42.jar"/>
<classpathentry kind="lib" path="lib/hk2-utils-2.5.0-b42.jar"/>
<classpathentry kind="lib" path="lib/javax.annotation-api-1.2.jar"/>
<classpathentry kind="lib" path="lib/javax.inject-2.5.0-b42.jar"/>
<classpathentry kind="lib" path="lib/javax.json.bind-api-1.0.jar"/>
<classpathentry kind="lib" path="lib/javax.ws.rs-api-2.1.jar"/>
<classpathentry kind="lib" path="lib/jersey-client.jar"/>
<classpathentry kind="lib" path="lib/jersey-common.jar"/>
<classpathentry kind="lib" path="lib/jersey-container-grizzly2-http-2.26.jar"/>
<classpathentry kind="lib" path="lib/jersey-hk2.jar"/>
<classpathentry kind="lib" path="lib/jersey-server.jar"/>
<classpathentry kind="lib" path="lib/validation-api-1.1.0.Final.jar"/>
<classpathentry kind="output" path="bin"/>
</classpath>
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8"/>
<classpathentry kind="src" path="src"/>
<classpathentry kind="lib" path="lib/grizzly-http-all-2.4.2.jar"/>
<classpathentry kind="lib" path="lib/hk2-api-2.5.0-b42.jar"/>
<classpathentry kind="lib" path="lib/hk2-locator-2.5.0-b42.jar"/>
<classpathentry kind="lib" path="lib/hk2-utils-2.5.0-b42.jar"/>
<classpathentry kind="lib" path="lib/javax.annotation-api-1.2.jar"/>
<classpathentry kind="lib" path="lib/javax.inject-2.5.0-b42.jar"/>
<classpathentry kind="lib" path="lib/javax.json.bind-api-1.0.jar"/>
<classpathentry kind="lib" path="lib/javax.ws.rs-api-2.1.jar"/>
<classpathentry kind="lib" path="lib/jersey-client.jar"/>
<classpathentry kind="lib" path="lib/jersey-common.jar"/>
<classpathentry kind="lib" path="lib/jersey-container-grizzly2-http-2.26.jar"/>
<classpathentry kind="lib" path="lib/jersey-hk2.jar"/>
<classpathentry kind="lib" path="lib/jersey-server.jar"/>
<classpathentry kind="lib" path="lib/validation-api-1.1.0.Final.jar"/>
<classpathentry kind="lib" path="lib/activemq-all-5.15.2.jar"/>
<classpathentry kind="output" path="bin"/>
</classpath>

1
VerteilteSysteme-Examples/.gitignore

@ -1 +1,2 @@
/bin/ /bin/
/activemq-data/

BIN
VerteilteSysteme-Examples/lib/activemq-all-5.15.2.jar

142
VerteilteSysteme-Examples/src/verteiltesysteme/mom/ActiveMQHelloWorld.java

@ -0,0 +1,142 @@
/* Quelle: http://activemq.apache.org/hello-world.html */
package verteiltesysteme.mom;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
/**
* Hello world!
*/
public class ActiveMQHelloWorld {
public static void main(String[] args) throws Exception {
thread(new HelloWorldProducer(), false);
thread(new HelloWorldProducer(), false);
thread(new HelloWorldConsumer(), false);
Thread.sleep(10000);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldProducer(), false);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldProducer(), false);
Thread.sleep(10000);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldProducer(), false);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldProducer(), false);
thread(new HelloWorldProducer(), false);
Thread.sleep(10000);
thread(new HelloWorldProducer(), false);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldProducer(), false);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldProducer(), false);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldProducer(), false);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldProducer(), false);
}
public static void thread(Runnable runnable, boolean daemon) {
Thread brokerThread = new Thread(runnable);
brokerThread.setDaemon(daemon);
brokerThread.start();
}
public static class HelloWorldProducer implements Runnable {
public void run() {
try {
// Create a ConnectionFactory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
// Create a Connection
Connection connection = connectionFactory.createConnection();
connection.start();
// Create a Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create the destination (Topic or Queue)
Destination destination = session.createQueue("TEST.FOO");
// Create a MessageProducer from the Session to the Topic or Queue
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// Create a messages
String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
TextMessage message = session.createTextMessage(text);
// Tell the producer to send the message
System.out.println("Sent message: " + message.hashCode() + " : " + Thread.currentThread().getName());
producer.send(message);
// Clean up
session.close();
connection.close();
} catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
}
}
}
public static class HelloWorldConsumer implements Runnable, ExceptionListener {
public void run() {
try {
// Create a ConnectionFactory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
// Create a Connection
Connection connection = connectionFactory.createConnection();
connection.start();
connection.setExceptionListener(this);
// Create a Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create the destination (Topic or Queue)
Destination destination = session.createQueue("TEST.FOO");
// Create a MessageConsumer from the Session to the Topic or Queue
MessageConsumer consumer = session.createConsumer(destination);
// Wait for a message
Message message = consumer.receive(1000);
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println("Received: " + text);
} else {
System.out.println("Received: " + message);
}
consumer.close();
session.close();
connection.close();
} catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
}
}
public synchronized void onException(JMSException ex) {
System.out.println("JMS Exception occured. Shutting down client.");
}
}
}

70
VerteilteSysteme-Examples/src/verteiltesysteme/mom/Consumer.java

@ -0,0 +1,70 @@
/*
* Beispiel für MOM / AMQP
*
* Message Consumer, bezieht Messages von Queue aus ActiveMQ
*
* zuvor ActiveMQ starten, Zugriff auf Web-Oberfläche http://localhost:8161/admin Login: admin ;)
*
* Beispiele, siehe z.B. http://activemq.apache.org/hello-world.html
* oder http://activemq.apache.org/getting-started.html
*/
package verteiltesysteme.mom;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Consumer {
public static void main(String[] args) {
try {
// ConnectionFactory erzeugen
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// Connection erzeugen
Connection connection = connectionFactory.createConnection();
connection.start();
// Session erzeugen
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Destination erzeugen (Queue oder Topic)
Destination destination = session.createQueue("VerteilteSysteme.TimestampJobs");
// MessageConsumer für die Session zur Destination (hier eine Queue) erzeugen
MessageConsumer consumer = session.createConsumer(destination);
// max. 10 Sekunden auf Nachricht warten
Message message = consumer.receive(10000);
// Einzene Naxchricht als TextMessage aus der Queue nehmen, sofern verfügbar
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println("Empfangene Nachricht: " + text);
} else {
if (message == null)
{
System.out.println("Queue leer - keine Nachricht vorhanden (" + message + ")");
}
else
{
System.out.println("Empfangene Nachricht: " + message);
}
}
consumer.close();
session.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}

88
VerteilteSysteme-Examples/src/verteiltesysteme/mom/Producer.java

@ -0,0 +1,88 @@
/*
* Beispiel für MOM / AMQP
*
* Message Producer, schickt Messages an Queue in ActiveMQ
*
* zuvor ActiveMQ starten, Zugriff auf Web-Oberfläche http://localhost:8161/admin Login: admin ;)
*
* Beispiele, siehe z.B. http://activemq.apache.org/hello-world.html
* oder http://activemq.apache.org/getting-started.html
*/
package verteiltesysteme.mom;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Producer {
static int jobNumber = 0;
public static void main(String[] args) {
try {
BufferedReader inFromUser = new BufferedReader(new InputStreamReader(System.in));
String jobMessage;
// ConnectionFactory erzeugen
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// Connection erzeugen
Connection connection = connectionFactory.createConnection();
connection.start();
// Session erzeugen
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Destination erzeugen (Queue oder Topic)
Destination destination = session.createQueue("VerteilteSysteme.TimestampJobs");
// MessageProducer für die Session zur Destination (hier eine Queue) erzeugen
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
System.out.println("Geben Sie eine Nachricht ein (leere Zeile zum Abbrechen): ");
jobMessage = inFromUser.readLine();
// Erzeugt eine neue Nachricht, im Beispiel könnte dies ein Job (Arbeitsauftrag
// an die Consumer) sein inkl. Zeitstempel etc.
while (!jobMessage.isEmpty()) {
jobNumber++;
DateFormat df = new SimpleDateFormat("dd.MM.yyyy HH:mm:ss");
String text = "Neuer Arbeitsauftrag (Job) (von Thread: " + Thread.currentThread().getName() + ", " +
"Producer: " + producer.hashCode() + "), " +
"Job-Zeit: " + df.format(new Date(System.currentTimeMillis())) + ", " +
" Job-Nr: " + jobNumber + ", " +
"Job-Nachricht: " + jobMessage;
TextMessage message = session.createTextMessage(text);
// Tell the producer to send the message
System.out.println("Nachricht gesendet! HashCode: " + message.hashCode() + " Inhalt: \""
+ message.getText() + "\"");
producer.send(message);
System.out.println("\nGeben Sie eine Nachricht ein (leere Zeile zum Abbrechen): ");
jobMessage = inFromUser.readLine();
}
// Clean up
session.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
Loading…
Cancel
Save