diff --git a/VerteilteSysteme-Examples/.classpath b/VerteilteSysteme-Examples/.classpath index 500db78..5e6ecb8 100644 --- a/VerteilteSysteme-Examples/.classpath +++ b/VerteilteSysteme-Examples/.classpath @@ -1,20 +1,21 @@ - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + diff --git a/VerteilteSysteme-Examples/.gitignore b/VerteilteSysteme-Examples/.gitignore index ae3c172..15cead2 100644 --- a/VerteilteSysteme-Examples/.gitignore +++ b/VerteilteSysteme-Examples/.gitignore @@ -1 +1,2 @@ /bin/ +/activemq-data/ diff --git a/VerteilteSysteme-Examples/lib/activemq-all-5.15.2.jar b/VerteilteSysteme-Examples/lib/activemq-all-5.15.2.jar new file mode 100644 index 0000000..6f8bf00 Binary files /dev/null and b/VerteilteSysteme-Examples/lib/activemq-all-5.15.2.jar differ diff --git a/VerteilteSysteme-Examples/src/verteiltesysteme/mom/ActiveMQHelloWorld.java b/VerteilteSysteme-Examples/src/verteiltesysteme/mom/ActiveMQHelloWorld.java new file mode 100644 index 0000000..411880d --- /dev/null +++ b/VerteilteSysteme-Examples/src/verteiltesysteme/mom/ActiveMQHelloWorld.java @@ -0,0 +1,142 @@ +/* Quelle: http://activemq.apache.org/hello-world.html */ +package verteiltesysteme.mom; + +import org.apache.activemq.ActiveMQConnectionFactory; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +/** + * Hello world! + */ +public class ActiveMQHelloWorld { + + public static void main(String[] args) throws Exception { + thread(new HelloWorldProducer(), false); + thread(new HelloWorldProducer(), false); + thread(new HelloWorldConsumer(), false); + Thread.sleep(10000); + thread(new HelloWorldConsumer(), false); + thread(new HelloWorldProducer(), false); + thread(new HelloWorldConsumer(), false); + thread(new HelloWorldProducer(), false); + Thread.sleep(10000); + thread(new HelloWorldConsumer(), false); + thread(new HelloWorldProducer(), false); + thread(new HelloWorldConsumer(), false); + thread(new HelloWorldConsumer(), false); + thread(new HelloWorldProducer(), false); + thread(new HelloWorldProducer(), false); + Thread.sleep(10000); + thread(new HelloWorldProducer(), false); + thread(new HelloWorldConsumer(), false); + thread(new HelloWorldConsumer(), false); + thread(new HelloWorldProducer(), false); + thread(new HelloWorldConsumer(), false); + thread(new HelloWorldProducer(), false); + thread(new HelloWorldConsumer(), false); + thread(new HelloWorldProducer(), false); + thread(new HelloWorldConsumer(), false); + thread(new HelloWorldConsumer(), false); + thread(new HelloWorldProducer(), false); + } + + public static void thread(Runnable runnable, boolean daemon) { + Thread brokerThread = new Thread(runnable); + brokerThread.setDaemon(daemon); + brokerThread.start(); + } + + public static class HelloWorldProducer implements Runnable { + public void run() { + try { + // Create a ConnectionFactory + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost"); + + // Create a Connection + Connection connection = connectionFactory.createConnection(); + connection.start(); + + // Create a Session + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Create the destination (Topic or Queue) + Destination destination = session.createQueue("TEST.FOO"); + + // Create a MessageProducer from the Session to the Topic or Queue + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + // Create a messages + String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode(); + TextMessage message = session.createTextMessage(text); + + // Tell the producer to send the message + System.out.println("Sent message: " + message.hashCode() + " : " + Thread.currentThread().getName()); + producer.send(message); + + // Clean up + session.close(); + connection.close(); + } catch (Exception e) { + System.out.println("Caught: " + e); + e.printStackTrace(); + } + } + } + + public static class HelloWorldConsumer implements Runnable, ExceptionListener { + public void run() { + try { + + // Create a ConnectionFactory + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost"); + + // Create a Connection + Connection connection = connectionFactory.createConnection(); + connection.start(); + + connection.setExceptionListener(this); + + // Create a Session + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Create the destination (Topic or Queue) + Destination destination = session.createQueue("TEST.FOO"); + + // Create a MessageConsumer from the Session to the Topic or Queue + MessageConsumer consumer = session.createConsumer(destination); + + // Wait for a message + Message message = consumer.receive(1000); + + if (message instanceof TextMessage) { + TextMessage textMessage = (TextMessage) message; + String text = textMessage.getText(); + System.out.println("Received: " + text); + } else { + System.out.println("Received: " + message); + } + + consumer.close(); + session.close(); + connection.close(); + } catch (Exception e) { + System.out.println("Caught: " + e); + e.printStackTrace(); + } + } + + public synchronized void onException(JMSException ex) { + System.out.println("JMS Exception occured. Shutting down client."); + } + } +} \ No newline at end of file diff --git a/VerteilteSysteme-Examples/src/verteiltesysteme/mom/Consumer.java b/VerteilteSysteme-Examples/src/verteiltesysteme/mom/Consumer.java new file mode 100644 index 0000000..eb2eb9f --- /dev/null +++ b/VerteilteSysteme-Examples/src/verteiltesysteme/mom/Consumer.java @@ -0,0 +1,70 @@ +/* + * Beispiel für MOM / AMQP + * + * Message Consumer, bezieht Messages von Queue aus ActiveMQ + * + * zuvor ActiveMQ starten, Zugriff auf Web-Oberfläche http://localhost:8161/admin Login: admin ;) + * + * Beispiele, siehe z.B. http://activemq.apache.org/hello-world.html + * oder http://activemq.apache.org/getting-started.html + */ + +package verteiltesysteme.mom; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnectionFactory; + +public class Consumer { + + public static void main(String[] args) { + try { + // ConnectionFactory erzeugen + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); + + // Connection erzeugen + Connection connection = connectionFactory.createConnection(); + connection.start(); + + // Session erzeugen + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Destination erzeugen (Queue oder Topic) + Destination destination = session.createQueue("VerteilteSysteme.TimestampJobs"); + + // MessageConsumer für die Session zur Destination (hier eine Queue) erzeugen + MessageConsumer consumer = session.createConsumer(destination); + + // max. 10 Sekunden auf Nachricht warten + Message message = consumer.receive(10000); + + // Einzene Naxchricht als TextMessage aus der Queue nehmen, sofern verfügbar + if (message instanceof TextMessage) { + TextMessage textMessage = (TextMessage) message; + String text = textMessage.getText(); + System.out.println("Empfangene Nachricht: " + text); + } else { + if (message == null) + { + System.out.println("Queue leer - keine Nachricht vorhanden (" + message + ")"); + } + else + { + System.out.println("Empfangene Nachricht: " + message); + } + } + + consumer.close(); + session.close(); + connection.close(); + } catch (Exception e) { + e.printStackTrace(); + } + } + +} diff --git a/VerteilteSysteme-Examples/src/verteiltesysteme/mom/Producer.java b/VerteilteSysteme-Examples/src/verteiltesysteme/mom/Producer.java new file mode 100644 index 0000000..4cb1e76 --- /dev/null +++ b/VerteilteSysteme-Examples/src/verteiltesysteme/mom/Producer.java @@ -0,0 +1,88 @@ +/* + * Beispiel für MOM / AMQP + * + * Message Producer, schickt Messages an Queue in ActiveMQ + * + * zuvor ActiveMQ starten, Zugriff auf Web-Oberfläche http://localhost:8161/admin Login: admin ;) + * + * Beispiele, siehe z.B. http://activemq.apache.org/hello-world.html + * oder http://activemq.apache.org/getting-started.html + */ + +package verteiltesysteme.mom; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Date; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnectionFactory; + +public class Producer { + + static int jobNumber = 0; + + public static void main(String[] args) { + try { + BufferedReader inFromUser = new BufferedReader(new InputStreamReader(System.in)); + String jobMessage; + + // ConnectionFactory erzeugen + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); + + // Connection erzeugen + Connection connection = connectionFactory.createConnection(); + connection.start(); + + // Session erzeugen + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Destination erzeugen (Queue oder Topic) + Destination destination = session.createQueue("VerteilteSysteme.TimestampJobs"); + + // MessageProducer für die Session zur Destination (hier eine Queue) erzeugen + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + System.out.println("Geben Sie eine Nachricht ein (leere Zeile zum Abbrechen): "); + jobMessage = inFromUser.readLine(); + + // Erzeugt eine neue Nachricht, im Beispiel könnte dies ein Job (Arbeitsauftrag + // an die Consumer) sein inkl. Zeitstempel etc. + while (!jobMessage.isEmpty()) { + jobNumber++; + DateFormat df = new SimpleDateFormat("dd.MM.yyyy HH:mm:ss"); + String text = "Neuer Arbeitsauftrag (Job) (von Thread: " + Thread.currentThread().getName() + ", " + + "Producer: " + producer.hashCode() + "), " + + "Job-Zeit: " + df.format(new Date(System.currentTimeMillis())) + ", " + + " Job-Nr: " + jobNumber + ", " + + "Job-Nachricht: " + jobMessage; + TextMessage message = session.createTextMessage(text); + + // Tell the producer to send the message + System.out.println("Nachricht gesendet! HashCode: " + message.hashCode() + " Inhalt: \"" + + message.getText() + "\""); + producer.send(message); + + System.out.println("\nGeben Sie eine Nachricht ein (leere Zeile zum Abbrechen): "); + jobMessage = inFromUser.readLine(); + } + + // Clean up + session.close(); + connection.close(); + } catch (Exception e) { + e.printStackTrace(); + } + + } + +}