Commit 7be0dc98 authored by bseeger's avatar bseeger Committed by acoburn
Browse files

Adds JMS message broadcast OSGi service

parent bb70effc
......@@ -35,6 +35,7 @@ Connectors
These modules listen to repository events and react accordingly.
* `acrepo-connector-broadcast`: ActiveMQ Message Broadcast Service: This rebroadcasts messages from one queue/topic to every queue/topic in a specified list
* `acrepo-idiomatic`: Id Mapping Service: This maps a public ID to a (internal and typically much longer) Fedora URI
* `acrepo-idiomatic-pgsql`: Id Mapping Service Database: This exposes a Postgres datastore for use with the Id Mapping service
......@@ -65,6 +66,7 @@ Each of these projects can be deployed in an OSGi container. For example using
command from its shell:
feature:repo-add mvn:edu.amherst.acdc/acrepo-karaf/LATEST/xml/features
feature:install acrepo-connector-broadcast
feature:install acrepo-exts-fits
feature:install acrepo-exts-image
feature:install acrepo-exts-jsonld
......
Amherst College Message Broadcaster
===================================
The Message Broadcast service will take messages off one ActiveMQ queue or topic and broadcast the
messages across any number of other queues specified in the config file.
`edu.amherst.acdc.connector.broadcast.cfg` is the configuration file for this service.
Deploying in OSGi
-----------------
This project can be deployed in an OSGi container. For example using
[Apache Karaf](http://karaf.apache.org) version 4.x and above, you can run the following
command from its shell:
feature:repo-add mvn:edu.amherst.acdc/acrepo-karaf/LATEST/xml/features
feature:install acrepo-connector-broadcast
Configuration
-------------
This application can be configured by creating the following configuration
file `$KARAF_HOME/etc/edu.amherst.acdc.connector.broadcast.cfg`. The following
values are available for configuration:
The JMS broker to use
jms.brokerUrl=tcp://localhost:61616
jms.username=
jms.password=
The queue/topic, on the above broker, that should be listened to for incoming messages
jms.input=activemq:topic:fedora
Comma separate list of recipient queues to broadcast the incoming messages to
message.recipients=activemq:queue:fcrepo-serialization,activemq:queue:fcrepo-indexing-triplestore
By editing this file, any currently running routes in this service will be immediately redeployed
with the new values.
More information
----------------
For more information, please visit [Apache Camel](http://camel.apache.org) documentation
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>edu.amherst.acdc</groupId>
<artifactId>repository-services</artifactId>
<version>1.0.1-SNAPSHOT</version>
</parent>
<artifactId>acrepo-connector-broadcast</artifactId>
<packaging>bundle</packaging>
<name>Message Broadcaster</name>
<description>Camel-based service for broadcasting messages from one queue/topic to many</description>
<properties>
<osgi.export.packages>edu.amherst.acdc.connector.broadcast;version=${project.version}</osgi.export.packages>
<osgi.import.packages>
org.apache.camel.*,
org.apache.activemq.camel.component,
org.osgi.service.blueprint,
*
</osgi.import.packages>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-blueprint</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-camel</artifactId>
</dependency>
<dependency>
<groupId>org.fcrepo.camel</groupId>
<artifactId>fcrepo-camel</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-test-blueprint</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<defaultGoal>install</defaultGoal>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
</plugin>
<!-- add configuration file to artifact set for OSGi deployment -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>attach-artifact</goal>
</goals>
<configuration>
<artifacts>
<artifact>
<file>src/main/cfg/edu.amherst.acdc.connector.broadcast.cfg</file>
<type>cfg</type>
<classifier>configuration</classifier>
</artifact>
</artifacts>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
# Information about the broker to use
jms.brokerUrl=tcp://localhost:61616
jms.username=
jms.password=
# Which queue/topic to listen to on the above broker
jms.input=activemq:topic:fedora
# Comma separated list of recipient queues to broadcast the incoming messages to
# for example: "activemq:queue:fcrepo-serialization,activemq:queue:fcrepo-indexing-triplestore"
message.recipients=
<?xml version="1.0" encoding="UTF-8"?>
<blueprint
xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.1.0"
xsi:schemaLocation="
http://www.osgi.org/xmlns/blueprint/v1.0.0
http://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd">
<cm:property-placeholder persistent-id="edu.amherst.acdc.connector.broadcast" update-strategy="reload">
<cm:default-properties>
<cm:property name="jms.brokerUrl" value="tcp://localhost:61616" />
<cm:property name="jms.username" value="" />
<cm:property name="jms.password" value="" />
<cm:property name="jms.input" value="activemq:topic:fedora" />
<cm:property name="message.recipients" value="" />
</cm:default-properties>
</cm:property-placeholder>
<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="brokerURL" value="${jms.brokerUrl}" />
<property name="userName" value="${jms.username}" />
<property name="password" value="${jms.password}" />
</bean>
<camelContext id="AcrepoConnectorBroadcast" xmlns="http://camel.apache.org/schema/blueprint">
<route id="MessageBroadcaster">
<description>Broadcast messages from one queue/topic to other specified queues/topics.</description>
<from uri="{{jms.input}}" />
<log message="Distributing message: ${headers[org.fcrepo.jms.timestamp]}: ${headers[org.fcrepo.jms.identifier]}:${headers[org.fcrepo.jms.eventType]}"/>
<recipientList parallelProcessing="true" ignoreInvalidEndpoints="true">
<simple>{{message.recipients}}</simple>
</recipientList>
</route>
</camelContext>
</blueprint>
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE configuration>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%p %d{HH:mm:ss.SSS} \(%c{0}\) %m%n</pattern>
</encoder>
</appender>
<logger name="edu.amherst.acdc.connector.broadcast.xml" additivity="false" level="INFO">
<appender-ref ref="STDOUT"/>
</logger>
<logger name="org.apache.camel" additivity="false" level="INFO">
<appender-ref ref="STDOUT"/>
</logger>
<logger name="org.fcrepo.camel" additivity="false" level="INFO">
<appender-ref ref="STDOUT"/>
</logger>
<root additivity="false" level="INFO">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
......@@ -74,6 +74,7 @@ public abstract class AbstractOSGiIT {
req.setHeader("Content-Type", contentType);
req.setEntity(new InputStreamEntity(stream));
}
final HttpResponse response = httpclient.execute(req);
assertEquals(SC_CREATED, response.getStatusLine().getStatusCode());
return EntityUtils.toString(response.getEntity(), "UTF-8");
......
/*
* Copyright 2016 Amherst College
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.amherst.acdc.itests;
import static org.apache.camel.component.mock.MockEndpoint.assertIsSatisfied;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.ops4j.pax.exam.CoreOptions.maven;
import static org.ops4j.pax.exam.CoreOptions.mavenBundle;
import static org.ops4j.pax.exam.CoreOptions.systemProperty;
import static org.ops4j.pax.exam.karaf.options.KarafDistributionOption.configureConsole;
import static org.ops4j.pax.exam.karaf.options.KarafDistributionOption.editConfigurationFilePut;
import static org.ops4j.pax.exam.karaf.options.KarafDistributionOption.features;
import static org.ops4j.pax.exam.karaf.options.KarafDistributionOption.karafDistributionConfiguration;
import static org.ops4j.pax.exam.karaf.options.KarafDistributionOption.keepRuntimeFolder;
import static org.ops4j.pax.exam.karaf.options.KarafDistributionOption.logLevel;
import static org.slf4j.LoggerFactory.getLogger;
import java.io.File;
import org.apache.camel.CamelContext;
import org.apache.camel.component.mock.MockEndpoint;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.ops4j.pax.exam.Configuration;
import org.ops4j.pax.exam.ConfigurationManager;
import org.ops4j.pax.exam.Option;
import org.ops4j.pax.exam.junit.PaxExam;
import org.ops4j.pax.exam.karaf.options.LogLevelOption.LogLevel;
import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
import org.ops4j.pax.exam.spi.reactors.PerClass;
import org.slf4j.Logger;
/**
* @author Bethany Seeger
* @since June 24, 2016
*/
@RunWith(PaxExam.class)
@ExamReactorStrategy(PerClass.class)
public class AcrepoBroadcastIT extends AbstractOSGiIT {
private static Logger LOGGER = getLogger(AcrepoBroadcastIT.class);
@Configuration
public Option[] config() {
final ConfigurationManager cm = new ConfigurationManager();
final String fcrepoPort = cm.getProperty("fcrepo.dynamic.test.port");
final String jmsPort = cm.getProperty("fcrepo.dynamic.jms.port");
final String rmiRegistryPort = cm.getProperty("karaf.rmiRegistry.port");
final String rmiServerPort = cm.getProperty("karaf.rmiServer.port");
final String sshPort = cm.getProperty("karaf.ssh.port");
final String fcrepoBaseUrl = "localhost:" + fcrepoPort + "/fcrepo/rest";
final String brokerUrl = "tcp://localhost:" + jmsPort;
final String messageRecipients = "mock:queue1,mock:queue2,mock:queue3";
return new Option[] {
karafDistributionConfiguration()
.frameworkUrl(maven().groupId("org.apache.karaf").artifactId("apache-karaf")
.versionAsInProject().type("zip"))
.unpackDirectory(new File("target", "exam"))
.useDeployFolder(false),
logLevel(LogLevel.WARN),
keepRuntimeFolder(),
configureConsole().ignoreLocalConsole(),
features(maven().groupId("org.apache.karaf.features").artifactId("standard")
.versionAsInProject().classifier("features").type("xml"), "scr"),
features(maven().groupId("org.apache.activemq").artifactId("activemq-karaf")
.type("xml").classifier("features").versionAsInProject(), "activemq-camel"),
features(maven().groupId("edu.amherst.acdc").artifactId("acrepo-karaf")
.type("xml").classifier("features").versionAsInProject(),
"acrepo-connector-broadcast"),
mavenBundle().groupId("org.apache.httpcomponents").artifactId("httpclient-osgi").versionAsInProject(),
mavenBundle().groupId("org.apache.httpcomponents").artifactId("httpcore-osgi").versionAsInProject(),
systemProperty("fcrepo.port").value(fcrepoPort),
editConfigurationFilePut("etc/org.apache.karaf.management.cfg", "rmiRegistryPort", rmiRegistryPort),
editConfigurationFilePut("etc/org.apache.karaf.management.cfg", "rmiServerPort", rmiServerPort),
editConfigurationFilePut("etc/org.apache.karaf.shell.cfg", "sshPort", sshPort),
editConfigurationFilePut("etc/edu.amherst.acdc.connector.broadcast.cfg", "jms.brokerUrl", brokerUrl),
editConfigurationFilePut("etc/edu.amherst.acdc.connector.broadcast.cfg", "message.recipients",
messageRecipients)
};
}
@Test
public void testInstallation() throws Exception {
assertTrue(featuresService.isInstalled(featuresService.getFeature("camel-core")));
assertTrue(featuresService.isInstalled(featuresService.getFeature("camel-blueprint")));
assertTrue(featuresService.isInstalled(featuresService.getFeature("activemq-camel")));
assertTrue(featuresService.isInstalled(featuresService.getFeature("acrepo-connector-broadcast")));
}
@Test
public void testBroadcastingConnector() throws Exception {
final CamelContext ctx = getOsgiService(CamelContext.class, "(camel.context.name=AcrepoConnectorBroadcast)",
10000);
assertNotNull(ctx);
final String baseUrl = "http://localhost:" + System.getProperty("fcrepo.port") + "/fcrepo/rest";
final String url1 = post(baseUrl).replace(baseUrl, "");
final String url2 = post(baseUrl).replace(baseUrl, "");
final String url3 = post(baseUrl + url1).replace(baseUrl, "");
final String url4 = post(baseUrl + url2).replace(baseUrl, "");
final MockEndpoint queue1 = (MockEndpoint) ctx.getEndpoint("mock:queue1");
final MockEndpoint queue2 = (MockEndpoint) ctx.getEndpoint("mock:queue2");
final MockEndpoint queue3 = (MockEndpoint) ctx.getEndpoint("mock:queue3");
queue1.expectedMessageCount(6);
queue2.expectedMessageCount(6);
queue3.expectedMessageCount(6);
assertIsSatisfied(queue1);
assertIsSatisfied(queue2);
assertIsSatisfied(queue3);
}
}
......@@ -83,7 +83,8 @@ public class AcrepoServicesIT extends AbstractOSGiIT {
"acrepo-libs-jena", "acrepo-libs-sesame", "acrepo-libs-jsonld",
"acrepo-libs-jackson", "acrepo-libs-marmotta",
"acrepo-services-jsonld", "acrepo-services-ldcache",
"acrepo-services-mint", "acrepo-services-pcdm", "acrepo-services-validation"),
"acrepo-services-mint", "acrepo-services-pcdm", "acrepo-services-validation",
"acrepo-connector-broadcast"),
editConfigurationFilePut("etc/edu.amherst.acdc.exts.fits.cfg", "rest.port", fitsPort),
editConfigurationFilePut("etc/edu.amherst.acdc.exts.image.cfg", "rest.port", imagePort),
......@@ -101,6 +102,7 @@ public class AcrepoServicesIT extends AbstractOSGiIT {
public void testInstallation() throws Exception {
assertTrue(featuresService.isInstalled(featuresService.getFeature("camel-core")));
assertTrue(featuresService.isInstalled(featuresService.getFeature("fcrepo-camel")));
assertTrue(featuresService.isInstalled(featuresService.getFeature("acrepo-connector-broadcast")));
assertTrue(featuresService.isInstalled(featuresService.getFeature("acrepo-idiomatic")));
assertTrue(featuresService.isInstalled(featuresService.getFeature("acrepo-idiomatic-pgsql")));
assertTrue(featuresService.isInstalled(featuresService.getFeature("acrepo-exts-fits")));
......
......@@ -289,4 +289,16 @@
<bundle>mvn:com.fasterxml.jackson.core/jackson-databind/${jackson2.version}</bundle>
</feature>
<feature name="acrepo-connector-broadcast" version="${project.version}">
<details>Installs the message broadcasting service</details>
<feature version="${camel.version}">camel</feature>
<feature version="${camel.version}">camel-blueprint</feature>
<feature version="${activemq.version}">activemq-camel</feature>
<bundle>mvn:edu.amherst.acdc/acrepo-connector-broadcast/${project.version}</bundle>
<configfile finalname="/etc/edu.amherst.acdc.connector.broadcast.cfg">mvn:edu.amherst.acdc/acrepo-connector-broadcast/${project.version}/cfg/configuration</configfile>
</feature>
</features>
......@@ -106,6 +106,7 @@
</scm>
<modules>
<module>acrepo-connector-broadcast</module>
<module>acrepo-idiomatic-pgsql</module>
<module>acrepo-idiomatic</module>
<module>acrepo-exts-fits</module>
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment