Commit c2da8f51 authored by bseeger's avatar bseeger
Browse files

Converts connectors to use acrepo-services-activemq

parent e086ee8d
......@@ -23,19 +23,13 @@ This application can be configured by creating the following configuration
file `$KARAF_HOME/etc/edu.amherst.acdc.connector.broadcast.cfg`. The following
values are available for configuration:
The JMS broker to use
The Camel URI for the incoming message stream
jms.brokerUrl=tcp://localhost:61616
jms.username=
jms.password=
The queue/topic, on the above broker, that should be listened to for incoming messages
jms.input=activemq:topic:fedora
input.stream=broker:topic:fedora
Comma separate list of recipient queues to broadcast the incoming messages to
message.recipients=activemq:queue:fcrepo-serialization,activemq:queue:fcrepo-indexing-triplestore
message.recipients=broker:queue:fcrepo-serialization,broker:queue:fcrepo-indexing-triplestore
By editing this file, any currently running routes in this service will be immediately redeployed
with the new values.
......
......@@ -16,12 +16,6 @@
<properties>
<osgi.export.packages>edu.amherst.acdc.connector.broadcast;version=${project.version}</osgi.export.packages>
<osgi.import.packages>
org.apache.camel.*,
org.apache.activemq.camel.component,
org.osgi.service.blueprint,
*
</osgi.import.packages>
</properties>
<dependencies>
......@@ -33,10 +27,6 @@
<groupId>org.apache.camel</groupId>
<artifactId>camel-blueprint</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-camel</artifactId>
</dependency>
<dependency>
<groupId>org.fcrepo.camel</groupId>
<artifactId>fcrepo-camel</artifactId>
......
# Information about the broker to use
jms.brokerUrl=tcp://localhost:61616
jms.username=
jms.password=
# Which queue/topic to listen to on the above broker
jms.input=activemq:topic:fedora
input.stream=broker:topic:fedora
# Comma separated list of recipient queues to broadcast the incoming messages to
# for example: "activemq:queue:fcrepo-serialization,activemq:queue:fcrepo-indexing-triplestore"
# for example: "broker:queue:fcrepo-serialization,broker:queue:fcrepo-indexing-triplestore"
message.recipients=
......@@ -9,30 +9,21 @@
<cm:property-placeholder persistent-id="edu.amherst.acdc.connector.broadcast" update-strategy="reload">
<cm:default-properties>
<cm:property name="jms.brokerUrl" value="tcp://localhost:61616" />
<cm:property name="jms.username" value="" />
<cm:property name="jms.password" value="" />
<cm:property name="jms.input" value="activemq:topic:fedora" />
<cm:property name="input.stream" value="broker:topic:fedora" />
<cm:property name="message.recipients" value="" />
</cm:default-properties>
</cm:property-placeholder>
<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="brokerURL" value="${jms.brokerUrl}" />
<property name="userName" value="${jms.username}" />
<property name="password" value="${jms.password}" />
</bean>
<reference id="broker" interface="org.apache.camel.Component" filter="(osgi.jndi.service.name=acrepobroker)"/>
<camelContext id="AcrepoConnectorBroadcast" xmlns="http://camel.apache.org/schema/blueprint">
<route id="MessageBroadcaster">
<description>Broadcast messages from one queue/topic to other specified queues/topics.</description>
<from uri="{{jms.input}}" />
<from uri="{{input.stream}}" />
<log message="Distributing message: ${headers[org.fcrepo.jms.timestamp]}: ${headers[org.fcrepo.jms.identifier]}:${headers[org.fcrepo.jms.eventType]}"/>
<recipientList parallelProcessing="true" ignoreInvalidEndpoints="true">
<simple>{{message.recipients}}</simple>
</recipientList>
</route>
</camelContext>
</blueprint>
......@@ -41,13 +41,9 @@ In the event of failure, the maximum number of times a redelivery will be attemp
error.maxRedeliveries=10
The connection URI used to connect to a local or remote ActiveMQ broker
jms.brokerUrl=tcp://localhost:61616
The camel URI for the incoming message stream.
input.stream=activemq:topic:fedora
input.stream=broker:topic:fedora
The RDF property used to identify external IDs
......
......@@ -37,10 +37,6 @@
<groupId>org.apache.camel</groupId>
<artifactId>camel-blueprint</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-camel</artifactId>
</dependency>
<dependency>
<groupId>org.fcrepo.camel</groupId>
<artifactId>fcrepo-camel</artifactId>
......
......@@ -4,11 +4,8 @@ error.maxRedeliveries=10
# Fedora Repository location
fcrepo.baseUrl=localhost:8080/fcrepo/rest
# The connection URI used to connect to a local or remote ActiveMQ broker
jms.brokerUrl=tcp://localhost:61616
# The camel URI for the incoming message stream.
input.stream=activemq:topic:fedora
input.stream=broker:topic:fedora
# The rest endpoint components
rest.host=localhost
......
......@@ -11,8 +11,7 @@
<cm:property-placeholder persistent-id="edu.amherst.acdc.connector.idiomatic" update-strategy="reload">
<cm:default-properties>
<cm:property name="error.maxRedeliveries" value="10"/>
<cm:property name="jms.brokerUrl" value="tcp://localhost:61616"/>
<cm:property name="input.stream" value="activemq:topic:fedora"/>
<cm:property name="input.stream" value="broker:topic:fedora"/>
<cm:property name="rest.host" value="localhost"/>
<cm:property name="rest.prefix" value="/idiomatic"/>
<cm:property name="rest.port" value="9101"/>
......@@ -27,15 +26,13 @@
<reference id="minterService" interface="java.util.function.Supplier" filter="(osgi.jndi.service.name=minter)" />
<reference id="broker" interface="org.apache.camel.Component" filter="(osgi.jndi.service.name=acrepobroker)"/>
<!-- component-wide configuration of jdbc -->
<bean id="sql" class="org.apache.camel.component.sql.SqlComponent">
<property name="dataSource" ref="dataSource"/>
</bean>
<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="brokerURL" value="${jms.brokerUrl}"/>
</bean>
<camelContext xmlns="http://camel.apache.org/schema/blueprint">
<package>edu.amherst.acdc.connector.idiomatic</package>
......
......@@ -27,11 +27,13 @@ import javax.sql.DataSource;
import edu.amherst.acdc.services.mint.MinterService;
import java.util.function.Supplier;
import org.apache.camel.Component;
import org.apache.camel.EndpointInject;
import org.apache.camel.Produce;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.AdviceWithRouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.component.seda.SedaComponent;
import org.apache.camel.test.blueprint.CamelBlueprintTestSupport;
import org.apache.camel.util.KeyValueHolder;
import org.apache.camel.util.ObjectHelper;
......@@ -86,6 +88,8 @@ public class RouteTest extends CamelBlueprintTestSupport {
asService(new EmbeddedDataSource(), "osgi.jndi.service.name", "idiomaticds"));
services.put(Supplier.class.getName(),
asService(new MinterService(MINT_LENGTH), "osgi.jndi.service.name", "minter"));
services.put(Component.class.getName(),
asService(new SedaComponent(), "osgi.jndi.service.name", "acrepobroker"));
}
@Test
......
......@@ -69,6 +69,13 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>edu.amherst.acdc</groupId>
<artifactId>acrepo-services-activemq</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>edu.amherst.acdc</groupId>
<artifactId>acrepo-karaf</artifactId>
......
......@@ -58,14 +58,15 @@ public class AcrepoBroadcastIT extends AbstractOSGiIT {
@Configuration
public Option[] config() {
final ConfigurationManager cm = new ConfigurationManager();
final String fcrepoPort = cm.getProperty("fcrepo.dynamic.test.port");
final String jmsPort = cm.getProperty("fcrepo.dynamic.jms.port");
final String fcrepoPort = cm.getProperty("fcrepo.dynamic.test.port");
final String rmiRegistryPort = cm.getProperty("karaf.rmiRegistry.port");
final String rmiServerPort = cm.getProperty("karaf.rmiServer.port");
final String sshPort = cm.getProperty("karaf.ssh.port");
final String fcrepoBaseUrl = "localhost:" + fcrepoPort + "/fcrepo/rest";
final String brokerUrl = "tcp://localhost:" + jmsPort;
final String inputStream = "broker:topic:fedora";
final String messageRecipients = "mock:queue1,mock:queue2,mock:queue3";
final String brokerUrl = "tcp://localhost:" + jmsPort;
return new Option[] {
karafDistributionConfiguration()
......@@ -83,7 +84,7 @@ public class AcrepoBroadcastIT extends AbstractOSGiIT {
.type("xml").classifier("features").versionAsInProject(), "activemq-camel"),
features(maven().groupId("edu.amherst.acdc").artifactId("acrepo-karaf")
.type("xml").classifier("features").versionAsInProject(),
"acrepo-connector-broadcast"),
"acrepo-services-activemq", "acrepo-connector-broadcast"),
mavenBundle().groupId("org.apache.httpcomponents").artifactId("httpclient-osgi").versionAsInProject(),
mavenBundle().groupId("org.apache.httpcomponents").artifactId("httpcore-osgi").versionAsInProject(),
......@@ -93,7 +94,8 @@ public class AcrepoBroadcastIT extends AbstractOSGiIT {
editConfigurationFilePut("etc/org.apache.karaf.management.cfg", "rmiRegistryPort", rmiRegistryPort),
editConfigurationFilePut("etc/org.apache.karaf.management.cfg", "rmiServerPort", rmiServerPort),
editConfigurationFilePut("etc/org.apache.karaf.shell.cfg", "sshPort", sshPort),
editConfigurationFilePut("etc/edu.amherst.acdc.connector.broadcast.cfg", "jms.brokerUrl", brokerUrl),
editConfigurationFilePut("etc/edu.amherst.acdc.services.activemq.cfg", "jms.brokerUrl", brokerUrl),
editConfigurationFilePut("etc/edu.amherst.acdc.connector.broadcast.cfg", "input.stream", inputStream),
editConfigurationFilePut("etc/edu.amherst.acdc.connector.broadcast.cfg", "message.recipients",
messageRecipients)
};
......@@ -117,6 +119,7 @@ public class AcrepoBroadcastIT extends AbstractOSGiIT {
assertTrue(featuresService.isInstalled(featuresService.getFeature("camel-core")));
assertTrue(featuresService.isInstalled(featuresService.getFeature("camel-blueprint")));
assertTrue(featuresService.isInstalled(featuresService.getFeature("activemq-camel")));
assertTrue(featuresService.isInstalled(featuresService.getFeature("acrepo-services-activemq")));
assertTrue(featuresService.isInstalled(featuresService.getFeature("acrepo-connector-broadcast")));
}
......
......@@ -64,6 +64,12 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>edu.amherst.acdc</groupId>
<artifactId>acrepo-services-activemq</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
......
......@@ -15,7 +15,6 @@
<feature version="${camel.version}">camel-jetty9</feature>
<feature version="${camel.version}">camel-jackson</feature>
<feature version="${camel.version}">camel-sql</feature>
<feature version="${activemq.version}">activemq-camel</feature>
<feature version="${fcrepo-camel.version}">fcrepo-camel</feature>
<bundle dependency="true">mvn:org.codehaus.woodstox/woodstox-core-asl/${woodstox.version}</bundle>
......@@ -319,7 +318,6 @@
<feature version="${camel.version}">camel</feature>
<feature version="${camel.version}">camel-blueprint</feature>
<feature version="${activemq.version}">activemq-camel</feature>
<bundle>mvn:edu.amherst.acdc/acrepo-connector-broadcast/${project.version}</bundle>
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment