Skip to content

Instantly share code, notes, and snippets.

@FelixGV
Created July 5, 2014 02:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save FelixGV/7f0978dfa90fe4430c24 to your computer and use it in GitHub Desktop.
Save FelixGV/7f0978dfa90fe4430c24 to your computer and use it in GitHub Desktop.
AVRO-1124.patch
diff --git a/lang/java/ipc/pom.xml b/lang/java/ipc/pom.xml
index 35fdeaf..50cf2d7 100644
--- a/lang/java/ipc/pom.xml
+++ b/lang/java/ipc/pom.xml
@@ -117,12 +117,10 @@
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
- <version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
- <version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
diff --git a/lang/java/mapred/pom.xml b/lang/java/mapred/pom.xml
index fa198bc..cfe7510 100644
--- a/lang/java/mapred/pom.xml
+++ b/lang/java/mapred/pom.xml
@@ -82,7 +82,7 @@
<include>**/mapred/tether/*.avpr</include>
</includes>
<sourceDirectory>${parent.project.basedir}/../../../../share/schemas/</sourceDirectory>
- <outputDirectory>${project.build.directory}/generated-sources/java</outputDirectory>a
+ <outputDirectory>${project.build.directory}/generated-sources/java</outputDirectory>
</configuration>
</execution>
</executions>
diff --git a/lang/java/pom.xml b/lang/java/pom.xml
index fcad280..6c523b8 100644
--- a/lang/java/pom.xml
+++ b/lang/java/pom.xml
@@ -54,7 +54,7 @@
<protobuf.version>2.4.1</protobuf.version>
<thrift.version>0.7.0</thrift.version>
<slf4j.version>1.6.4</slf4j.version>
- <snappy.version>1.0.5</snappy.version>
+ <snappy.version>1.0.5-M3</snappy.version>
<velocity.version>1.7</velocity.version>
<maven.version>2.0.10</maven.version>
<ant.version>1.8.2</ant.version>
@@ -62,6 +62,9 @@
<commons-compress.version>1.4.1</commons-compress.version>
<easymock.version>3.0</easymock.version>
<hamcrest.version>1.1</hamcrest.version>
+ <jetty-8.version>8.1.14.v20131031</jetty-8.version>
+ <jersey.version>1.15</jersey.version>
+ <guice.version>3.0</guice.version>
<commons-httpclient.version>3.1</commons-httpclient.version>
<!-- version properties for plugins -->
@@ -86,6 +89,7 @@
<module>avro</module>
<module>compiler</module>
<module>maven-plugin</module>
+ <module>schema-repo</module>
<module>ipc</module>
<module>trevni</module>
<module>tools</module>
@@ -406,6 +410,37 @@
<artifactId>jetty</artifactId>
<version>${jetty.version}</version>
</dependency>
+ <!-- jetty 8.x management (used by schema-repo-server) -->
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-servlet</artifactId>
+ <version>${jetty-8.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ <version>${jetty-8.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.inject</groupId>
+ <artifactId>guice</artifactId>
+ <version>${guice.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.inject.extensions</groupId>
+ <artifactId>guice-servlet</artifactId>
+ <version>${guice.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey.contribs</groupId>
+ <artifactId>jersey-guice</artifactId>
+ <version>${jersey.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-server</artifactId>
+ <version>${jersey.version}</version>
+ </dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
diff --git a/lang/java/schema-repo/bundle/config/config.properties b/lang/java/schema-repo/bundle/config/config.properties
new file mode 100644
index 0000000..2abd798
--- /dev/null
+++ b/lang/java/schema-repo/bundle/config/config.properties
@@ -0,0 +1,3 @@
+avro.repo.class=org.apache.avro.repo.InMemoryRepository
+#avro.repo.cached=org.apache.avro.repo.CacheRepository
+avro.repo.file-repo-path=target/data/
\ No newline at end of file
diff --git a/lang/java/schema-repo/bundle/pom.xml b/lang/java/schema-repo/bundle/pom.xml
new file mode 100644
index 0000000..f9a3b30
--- /dev/null
+++ b/lang/java/schema-repo/bundle/pom.xml
@@ -0,0 +1,91 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+ xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>avro-repo-parent</artifactId>
+ <groupId>org.apache.avro.repo</groupId>
+ <version>1.7.7-SNAPSHOT</version>
+ <relativePath>../</relativePath>
+ </parent>
+
+ <artifactId>avro-repo-bundle</artifactId>
+
+ <name>Apache Avro Schema Repository Bundle</name>
+ <url>http://avro.apache.org</url>
+ <description>Avro Schema Repository standalone bundle</description>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>bundle</id>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+ <shadedArtifactAttached>true</shadedArtifactAttached>
+ <shadedClassifierName>withdeps</shadedClassifierName>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>org.apache.avro.repo.server.RepositoryServer</mainClass>
+ </transformer>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+ </transformers>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/SUN_MICR.*</exclude>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <profiles>
+ </profiles>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.avro.repo</groupId>
+ <artifactId>avro-repo-server</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro.repo</groupId>
+ <artifactId>avro-repo-client</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git a/lang/java/schema-repo/client/pom.xml b/lang/java/schema-repo/client/pom.xml
new file mode 100644
index 0000000..6ae3b6d
--- /dev/null
+++ b/lang/java/schema-repo/client/pom.xml
@@ -0,0 +1,74 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+ xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>avro-repo-parent</artifactId>
+ <groupId>org.apache.avro.repo</groupId>
+ <version>1.7.7-SNAPSHOT</version>
+ <relativePath>../</relativePath>
+ </parent>
+
+ <artifactId>avro-repo-client</artifactId>
+
+ <name>Apache Avro Schema Repository Client</name>
+ <url>http://avro.apache.org</url>
+ <description>Avro Schema Repository Client</description>
+
+ <build>
+ </build>
+
+ <profiles>
+ </profiles>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.avro.repo</groupId>
+ <artifactId>avro-repo-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-client</artifactId>
+ <version>${jersey.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>javax.inject</groupId>
+ <artifactId>javax.inject</artifactId>
+ <version>1</version>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro.repo</groupId>
+ <artifactId>avro-repo-common</artifactId>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro.repo</groupId>
+ <artifactId>avro-repo-server</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git a/lang/java/schema-repo/client/src/main/java/org/apache/avro/repo/client/RESTRepositoryClient.java b/lang/java/schema-repo/client/src/main/java/org/apache/avro/repo/client/RESTRepositoryClient.java
new file mode 100644
index 0000000..bf732e8
--- /dev/null
+++ b/lang/java/schema-repo/client/src/main/java/org/apache/avro/repo/client/RESTRepositoryClient.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.repo.client;
+
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+
+import javax.inject.Inject;
+import javax.inject.Named;
+import javax.ws.rs.core.MediaType;
+
+import org.apache.avro.repo.Repository;
+import org.apache.avro.repo.RepositoryUtil;
+import org.apache.avro.repo.SchemaEntry;
+import org.apache.avro.repo.SchemaValidationException;
+import org.apache.avro.repo.Subject;
+import org.apache.avro.repo.SubjectConfig;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientHandlerException;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.UniformInterfaceException;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.api.representation.Form;
+
+/**
+ * An Implementation of {@link Repository} that connects to a remote
+ * RESTRepository over HTTP.<br/>
+ * <br/>
+ * Typically, this is used in a client wrapped in a
+ * {@link org.apache.avro.repo.CacheRepository} to limit network communication.<br/>
+ * <br/>
+ * Alternatively, this implementation can itself be what is used behind a
+ * RESTRepository in a RepositoryServer, thus creating a caching proxy.
+ */
+public class RESTRepositoryClient implements Repository {
+
+ private WebResource webResource;
+
+ @Inject
+ public RESTRepositoryClient(@Named("avro.repo.url") String url) {
+ this.webResource = Client.create().resource(url);
+ }
+
+ @Override
+ public Subject register(String subject, SubjectConfig config) {
+ String path = subject;
+ Form form = new Form();
+ for(Map.Entry<String, String> entry : RepositoryUtil.safeConfig(config).asMap().entrySet()) {
+ form.putSingle(entry.getKey(), entry.getValue());
+ }
+
+ String regSubjectName = webResource.path(path).accept(MediaType.TEXT_PLAIN)
+ .type(MediaType.APPLICATION_FORM_URLENCODED).put(String.class, form);
+
+ RESTSubject sub = new RESTSubject(regSubjectName);
+ return sub;
+ }
+
+ @Override
+ public Subject lookup(String subject) {
+ RepositoryUtil.validateSchemaOrSubject(subject);
+ try {//returns ok or exception if not found
+ webResource.path(subject).get(String.class);
+ return new RESTSubject(subject);
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
+ @Override
+ public Iterable<Subject> subjects() {
+ ArrayList<Subject> subjectList = new ArrayList<Subject>();
+ try {
+ String subjects = webResource.get(String.class);
+ for (String subjName : RepositoryUtil.subjectNamesFromString(subjects)) {
+ subjectList.add(new RESTSubject(subjName));
+ }
+ } catch (Exception e) {
+ //no op. return empty list anyways
+ }
+ return subjectList;
+ }
+
+ private class RESTSubject extends Subject {
+
+ private RESTSubject(String name) {
+ super(name);
+ }
+
+ @Override
+ public SubjectConfig getConfig() {
+ String path = getName() + "/config" ;
+ try {
+ String propString = webResource.path(path).accept(MediaType.TEXT_PLAIN)
+ .get(String.class);
+ Properties props = new Properties();
+ props.load(new StringReader(propString));
+ return RepositoryUtil.configFromProperties(props);
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
+ @Override
+ public SchemaEntry register(String schema) throws SchemaValidationException {
+ RepositoryUtil.validateSchemaOrSubject(schema);
+
+ String path = getName() + "/register";
+ return handleRegisterRequest(path, schema);
+ }
+
+ @Override
+ public SchemaEntry registerIfLatest(String schema, SchemaEntry latest)
+ throws SchemaValidationException {
+ RepositoryUtil.validateSchemaOrSubject(schema);
+ String idStr = (latest == null) ? "" : latest.getId();
+
+ String path = getName() + "/register_if_latest/" + idStr;
+ return handleRegisterRequest(path, schema);
+ }
+
+ private SchemaEntry handleRegisterRequest(String path, String schema)
+ throws SchemaValidationException {
+ String schemaId;
+ try {
+ schemaId = webResource.path(path).accept(MediaType.TEXT_PLAIN)
+ .type(MediaType.TEXT_PLAIN_TYPE).put(String.class, schema);
+ return new SchemaEntry(schemaId, schema);
+ } catch (UniformInterfaceException e) {
+ ClientResponse cr = e.getResponse();
+ if (ClientResponse.Status.fromStatusCode(cr.getStatus()).equals(
+ ClientResponse.Status.FORBIDDEN)) {
+ throw new SchemaValidationException("Invalid schema: " + schema);
+ } else {
+ //any other status should return null
+ return null;
+ }
+ } catch (ClientHandlerException e) {
+ return null;
+ }
+ }
+
+ @Override
+ public SchemaEntry lookupBySchema(String schema) {
+ RepositoryUtil.validateSchemaOrSubject(schema);
+ String path = getName() + "/schema";
+ try {
+ String schemaId = webResource.path(path).accept(MediaType.TEXT_PLAIN)
+ .type(MediaType.TEXT_PLAIN_TYPE).post(String.class, schema);
+ return new SchemaEntry(schemaId, schema);
+ } catch (UniformInterfaceException e) {
+ return null;
+ }
+ }
+
+ @Override
+ public SchemaEntry lookupById(String schemaId) {
+ RepositoryUtil.validateSchemaOrSubject(schemaId);
+ String path = getName() + "/id/" + schemaId;
+ try {
+ String schema = webResource.path(path).get(String.class);
+ return new SchemaEntry(schemaId, schema);
+ } catch (UniformInterfaceException e) {
+ return null;
+ }
+ }
+
+ @Override
+ public SchemaEntry latest() {
+ String path = getName() + "/latest";
+ String entryStr;
+ try {
+ entryStr = webResource.path(path).get(String.class);
+ return new SchemaEntry(entryStr);
+ } catch (UniformInterfaceException e) {
+ return null;
+ }
+ }
+
+ @Override
+ public Iterable<SchemaEntry> allEntries() {
+ String path = getName() + "/all";
+ try {
+ String entriesStr = webResource.path(path).get(String.class);
+ return schemaEntriesFromStr(entriesStr);
+ } catch (UniformInterfaceException e) {
+ return Collections.emptyList();
+ }
+ }
+
+ private Iterable<SchemaEntry> schemaEntriesFromStr(String entriesStr) {
+ return RepositoryUtil.schemasFromString(entriesStr);
+ }
+
+ @Override
+ public boolean integralKeys() {
+ try {
+ String path = getName() + "/integral";
+ String integral = webResource.path(path).get(String.class);
+ return Boolean.parseBoolean(integral);
+ } catch (UniformInterfaceException e){
+ return false;
+ }
+ }
+
+ }
+
+}
diff --git a/lang/java/schema-repo/client/src/test/java/org/apache/avro/repo/client/RESTRepositoryClientTest.java b/lang/java/schema-repo/client/src/test/java/org/apache/avro/repo/client/RESTRepositoryClientTest.java
new file mode 100644
index 0000000..3c7c096
--- /dev/null
+++ b/lang/java/schema-repo/client/src/test/java/org/apache/avro/repo/client/RESTRepositoryClientTest.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.repo.client;
+
+import java.util.Properties;
+
+import org.apache.avro.repo.AbstractTestRepository;
+import org.apache.avro.repo.InMemoryRepository;
+import org.apache.avro.repo.server.RepositoryServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public class RESTRepositoryClientTest extends
+ AbstractTestRepository<RESTRepositoryClient> {
+
+ static RepositoryServer server;
+ static RESTRepositoryClient client;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ Properties props = new Properties();
+ props.put("repo.class", InMemoryRepository.class.getName());
+ props.put("jetty.host", "localhost");
+ props.put("jetty.port", "8123");
+ server = new RepositoryServer(props);
+ server.start();
+ }
+
+ @Override
+ protected RESTRepositoryClient createRepository() {
+ return new RESTRepositoryClient("http://localhost:8123/schema-repo/");
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ server.stop();
+ }
+
+}
diff --git a/lang/java/schema-repo/common/pom.xml b/lang/java/schema-repo/common/pom.xml
new file mode 100644
index 0000000..47ae97b
--- /dev/null
+++ b/lang/java/schema-repo/common/pom.xml
@@ -0,0 +1,68 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+ xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>avro-repo-parent</artifactId>
+ <groupId>org.apache.avro.repo</groupId>
+ <version>1.7.7-SNAPSHOT</version>
+ <relativePath>../</relativePath>
+ </parent>
+
+ <artifactId>avro-repo-common</artifactId>
+
+ <name>Apache Avro Schema Repository Common</name>
+ <url>http://avro.apache.org</url>
+ <description>Avro Schema Repository common components</description>
+
+ <build>
+ <!-- additionally create a test-jar artifact -->
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <profiles>
+ </profiles>
+
+ <dependencies>
+ <dependency>
+ <groupId>javax.inject</groupId>
+ <artifactId>javax.inject</artifactId>
+ <version>1</version>
+ <!-- users who choose to use the annotations for Guice/Spring/etc
+ will need to include this, others can ignore -->
+ <optional>true</optional>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git a/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/CacheRepository.java b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/CacheRepository.java
new file mode 100644
index 0000000..34e203d
--- /dev/null
+++ b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/CacheRepository.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.repo;
+
+import javax.inject.Inject;
+
+/**
+ * CacheRepository is a {@link Repository} implementation that wraps another
+ * {@link Repository} and acts as a write-through cache of {@link Subject}s and
+ * schema to id mappings, shielding the inner {@link Repository} from repetitive
+ * lookups.
+ *
+ * CacheRepository can cache Subjects (which cannot be deleted) and returns an
+ * implementation of {@link Subject} that caches schema to id mappings.
+ *
+ * CacheRepository can only cache the immutable elements of a Repository, because
+ * it is intended for use in any context -- in a client, in a proxy, or above a raw
+ * implementation of a repository.
+ * It cannot cache the entire list of subjects since the list is mutable.
+ * Similarly, a cached subject cannot cache the list of schemas, the subject configuration,
+ * or the latest() schema because those are mutable.
+ *
+ */
+public class CacheRepository implements Repository {
+
+ private final RepositoryCache cache;
+ private final Repository repo;
+
+ /**
+ * Create a caching repository that wraps the provided repository using the
+ * cache provided
+ * @param repo The repository to wrap
+ * @param cache The cache to use
+ */
+ @Inject
+ public CacheRepository(Repository repo, RepositoryCache cache) {
+ this.repo = repo;
+ this.cache = cache;
+ }
+
+ @Override
+ public Subject register(String subjectName, SubjectConfig config) {
+ Subject s = cache.lookup(subjectName);
+ if (s == null) {
+ return cache.add(repo.register(subjectName, config));
+ }
+ return s;
+ }
+
+ @Override
+ public Subject lookup(String subjectName) {
+ Subject s = cache.lookup(subjectName);
+ if (s == null) {
+ return cache.add(repo.lookup(subjectName));
+ }
+ return s;
+ }
+
+ @Override
+ public Iterable<Subject> subjects() {
+ // the full list of subjects cannot be cached.
+ // however we can populate the cache with the result
+ Iterable<Subject> subs = repo.subjects();
+ for (Subject s : subs) {
+ cache.add(s);
+ }
+ return subs;
+ }
+}
diff --git a/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/DelegatingSubject.java b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/DelegatingSubject.java
new file mode 100644
index 0000000..6efcfef
--- /dev/null
+++ b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/DelegatingSubject.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.repo;
+
+
+/**
+ * A {@link DelegatingSubject} is a Subject that delegates work to an underlying
+ * {@link Subject}.
+ *
+ * Specific implementations may override various methods.
+ *
+ */
+public abstract class DelegatingSubject extends Subject {
+ private final Subject delegate;
+
+ /**
+ * A {@link DelegatingSubject} delegates work to a provided Subject.
+ **/
+ protected DelegatingSubject(Subject delegate) {
+ super(delegate.getName());
+ this.delegate = delegate;
+ }
+
+ @Override
+ public SchemaEntry register(String schema) throws SchemaValidationException {
+ return delegate.register(schema);
+ }
+
+ @Override
+ public SchemaEntry registerIfLatest(String schema, SchemaEntry latest)
+ throws SchemaValidationException {
+ return delegate.registerIfLatest(schema, latest);
+ }
+
+ @Override
+ public SchemaEntry lookupBySchema(String schema) {
+ return delegate.lookupBySchema(schema);
+ }
+
+ @Override
+ public SchemaEntry lookupById(String id) {
+ return delegate.lookupById(id);
+ }
+
+ @Override
+ public SchemaEntry latest() {
+ return delegate.latest();
+ }
+
+ @Override
+ public Iterable<SchemaEntry> allEntries() {
+ return delegate.allEntries();
+ }
+
+ @Override
+ public SubjectConfig getConfig() {
+ return delegate.getConfig();
+ }
+
+ @Override
+ public boolean integralKeys() {
+ return delegate.integralKeys();
+ }
+}
diff --git a/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/FileRepository.java b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/FileRepository.java
new file mode 100644
index 0000000..8678366
--- /dev/null
+++ b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/FileRepository.java
@@ -0,0 +1,554 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.repo;
+
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.RandomAccessFile;
+import java.io.Writer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Properties;
+import java.util.Scanner;
+
+import javax.inject.Inject;
+import javax.inject.Named;
+
+/**
+ * A {@link Repository} that persists content to file. <br/>
+ * <br/>
+ * The {@link Repository} stores all of its data in a single base directory.
+ * Within this directory each {@link Subject} is represented by a nested
+ * directory with the same name as the {@link Subject}. Within each
+ * {@link Subject} directory there are three file types: <li>
+ * A properties file named 'subject.properties' containing the configured
+ * properties for the Subject. At this time, the only used property is
+ * "avro.repo.validator.class".</li> <li>
+ * A text file named 'schema_ids' containing the schema ids, in order of their
+ * creation, delimited by newline, encoded in UTF-8. This is used to track the
+ * order of schema registration for {@link Subject#latest()} and
+ * {@link Subject#allEntries()}</li> <li>
+ * One file per schema the contents of which are the schema encoded in UTF-8 and
+ * the name of which is the schema id followed by the postfix '.schema'.</li>
+ *
+ */
+public class FileRepository implements Repository, Closeable {
+
+ private static final String LOCKFILE = ".repo.lock";
+ private static final String SUBJECT_PROPERTIES = "subject.properties";
+ private static final String SCHEMA_IDS = "schema_ids";
+ private static final String SCHEMA_POSTFIX = ".schema";
+
+ private final InMemorySubjectCache subjects = new InMemorySubjectCache();
+ private final ValidatorFactory validators;
+ private final File rootDir;
+ private final FileChannel lockChannel;
+ private final FileLock fileLock;
+ private boolean closed = false;
+
+ /**
+ * Create a FileRepository in the directory path provided. Locks a file
+ * "repository.lock" to ensure no other object or process is running a
+ * FileRepository from the same place. The lock is released if
+ * {@link #close()} is called, the object is finalized, or the JVM exits.
+ *
+ * Not all platforms support file locks. See {@link FileLock}
+ *
+ * @param repoPath
+ * The
+ */
+ @Inject
+ public FileRepository(@Named("avro.repo.file-repo-path") String repoPath, ValidatorFactory validators) {
+ this.validators = validators;
+ this.rootDir = new File(repoPath);
+ if ((!rootDir.exists() && !rootDir.mkdirs()) || !rootDir.isDirectory()) {
+ throw new java.lang.RuntimeException(
+ "Unable to create repo directory, or not a directory: "
+ + rootDir.getAbsolutePath());
+ }
+ // lock repository
+ try {
+ File lockfile = new File(rootDir, LOCKFILE);
+ lockfile.createNewFile();
+ @SuppressWarnings("resource") // raf is closed when lockChannel is closed
+ RandomAccessFile raf = new RandomAccessFile(lockfile, "rw");
+ lockChannel = raf.getChannel();
+ fileLock = lockChannel.tryLock();
+ if (fileLock != null) {
+ lockfile.deleteOnExit();
+ } else {
+ throw new IllegalStateException("Failed to lock file: "
+ + lockfile.getAbsolutePath());
+ }
+ } catch (IOException e) {
+ throw new IllegalStateException("Unable to lock repository directory: "
+ + rootDir.getAbsolutePath(), e);
+ }
+ // eagerly load up subjects
+ loadSubjects(rootDir, subjects);
+ }
+
+ private void loadSubjects(File repoDir, SubjectCache subjects) {
+ for (File file : repoDir.listFiles()) {
+ if (file.isDirectory()) {
+ subjects.add(new FileSubject(file));
+ }
+ }
+ }
+
+ private void isValid() {
+ if (closed) {
+ throw new IllegalStateException("FileRepository is closed");
+ }
+ }
+
+ @Override
+ public synchronized void close() {
+ if (closed) {
+ return;
+ }
+ try {
+ fileLock.release();
+ } catch (IOException e) {
+ // nothing to do here -- it was already released
+ // or there are underlying errors we cannot recover from
+ } finally {
+ closed = true;
+ try {
+ lockChannel.close();
+ } catch (IOException e) {
+ // nothing to do here -- underlying errors but recovery
+ // not possible here or in client, and already closed
+ }
+ }
+ }
+
+ @Override
+ public synchronized Subject register(String subjectName, SubjectConfig config) {
+ isValid();
+ Subject subject = subjects.lookup(subjectName);
+ if (null == subject) {
+ subject = subjects.add(Subject.validatingSubject(createNewFileSubject(subjectName, config), validators));
+ }
+ return subject;
+ }
+
+ @Override
+ public synchronized Subject lookup(String subjectName) {
+ isValid();
+ return subjects.lookup(subjectName);
+ }
+
+ @Override
+ public synchronized Iterable<Subject> subjects() {
+ isValid();
+ return subjects.values();
+ }
+
+ private FileSubject createNewFileSubject(String subject,
+ SubjectConfig config) {
+ File subjectDir = new File(rootDir, subject);
+ createNewSubjectDir(subjectDir, config);
+ return new FileSubject(subjectDir);
+ }
+
+ // create a new empty subject directory
+ private static void createNewSubjectDir(File subjectDir, SubjectConfig config) {
+ if (subjectDir.exists()) {
+ throw new RuntimeException(
+ "Cannot create a FileSubject, directory already exists: "
+ + subjectDir.getAbsolutePath());
+ }
+ if (!subjectDir.mkdir()) {
+ throw new RuntimeException("Cannot create a FileSubject dir: "
+ + subjectDir.getAbsolutePath());
+ }
+
+ createNewFileInDir(subjectDir, SCHEMA_IDS);
+ File subjectProperties = createNewFileInDir(subjectDir, SUBJECT_PROPERTIES);
+
+ Properties props = new Properties();
+ props.putAll(RepositoryUtil.safeConfig(config).asMap());
+ writePropertyFile(subjectProperties, props);
+
+ }
+
+ private static File createNewFileInDir(File dir, String filename) {
+ File result = new File(dir, filename);
+ try {
+ if (!result.createNewFile()) {
+ throw new RuntimeException(result.getAbsolutePath() + " already exists");
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to create file: "
+ + result.getAbsolutePath(), e);
+ }
+ return result;
+ }
+
+ private static void writeToFile(File file, WriteOp op, boolean append) {
+ FileOutputStream out;
+ try {
+ out = new FileOutputStream(file, append);
+ } catch (FileNotFoundException e) {
+ throw new RuntimeException("Could not open file for write: "
+ + file.getAbsolutePath());
+ }
+ try {
+ OutputStreamWriter writer = new OutputStreamWriter(out, "UTF-8");
+ BufferedWriter bwriter = new BufferedWriter(writer);
+ op.write(bwriter);
+ bwriter.flush();
+ bwriter.close();
+ writer.close();
+ out.close();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to write and close file "
+ + file.getAbsolutePath());
+ }
+ }
+
+ private static void writePropertyFile(File file, final Properties prop) {
+ writeToFile(file, new WriteOp() {
+ @Override
+ protected void write(Writer writer) throws IOException {
+ prop.store(writer, "Schema Repository Subject Properties");
+ }
+ }, false);
+ }
+
+ private static void appendLineToFile(File file, final String line) {
+ writeToFile(file, new WriteOp() {
+ @Override
+ protected void write(Writer writer) throws IOException {
+ writer.append(line).append('\n');
+ }
+ }, true);
+ }
+
+ private static void dirExists(File dir) {
+ if (!dir.exists() || !dir.isDirectory()) {
+ throw new RuntimeException(
+ "directory does not exist or is not a directory: " + dir.toString());
+ }
+ }
+
+ private static void fileReadable(File file) {
+ if (!file.canRead()) {
+ throw new RuntimeException("file does not exist or is not readable: "
+ + file.toString());
+ }
+ }
+
+ private static void fileWriteable(File file) {
+ if (!file.canWrite()) {
+ throw new RuntimeException("file does not exist or is not writeable: "
+ + file.toString());
+ }
+ }
+
+ private abstract static class WriteOp {
+ protected abstract void write(Writer writer) throws IOException;
+ }
+
+ private class FileSubject extends Subject {
+ private final File subjectDir;
+ private final File idFile;
+ private final File propertyFile;
+ private final SubjectConfig config;
+
+ private int largestId = -1;
+ private SchemaEntry latest;
+
+ private FileSubject(File dir) {
+ super(dir.getName());
+ this.subjectDir = dir;
+ this.idFile = new File(dir, SCHEMA_IDS);
+ this.propertyFile = new File(dir, SUBJECT_PROPERTIES);
+ dirExists(subjectDir);
+ fileReadable(idFile);
+ fileWriteable(idFile);
+ fileReadable(propertyFile);
+ fileWriteable(propertyFile);
+
+ // read from config file
+ Properties props = new Properties();
+ try {
+ props.load(new FileInputStream(propertyFile));
+ config = RepositoryUtil.configFromProperties(props);
+ Integer lastId = null;
+ HashSet<String> schemaFileNames = getSchemaFiles();
+ HashSet<Integer> foundIds = new HashSet<Integer>();
+ for (Integer id : getSchemaIds()) {
+ if (id > largestId) {
+ largestId = id;
+ }
+ lastId = id;
+ if(!foundIds.add(id)) {
+ throw new RuntimeException("Corrupt id file, id '" + id +
+ "' duplicated in " + idFile.getAbsolutePath());
+ }
+ fileReadable(getSchemaFile(id));
+ schemaFileNames.remove(getSchemaFileName(id));
+ }
+ if (schemaFileNames.size() > 0) {
+ throw new RuntimeException("Schema files found in subject directory "
+ + subjectDir.getAbsolutePath()
+ + " that are not referenced in the " + SCHEMA_IDS + " file: "
+ + schemaFileNames.toString());
+ }
+ if (lastId != null) {
+ latest = new SchemaEntry(lastId.toString(),
+ readSchemaForId(lastId.toString()));
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("error initializing subject: "
+ + subjectDir.getAbsolutePath(), e);
+ }
+ }
+
+ @Override
+ public SubjectConfig getConfig() {
+ return config;
+ }
+
+ @Override
+ public synchronized SchemaEntry register(String schema)
+ throws SchemaValidationException {
+ isValid();
+ RepositoryUtil.validateSchemaOrSubject(schema);
+ SchemaEntry entry = lookupBySchema(schema);
+ if (entry == null) {
+ entry = createNewSchemaFile(schema);
+ appendLineToFile(idFile, entry.getId());
+ latest = entry;
+ }
+ return entry;
+ }
+
+ private synchronized SchemaEntry createNewSchemaFile(String schema) {
+ try {
+
+ int newId = largestId + 1;
+ File f = getSchemaFile(String.valueOf(newId));
+ if (!f.exists() && f.createNewFile()) {
+ Writer output = new BufferedWriter(new FileWriter(f));
+ try {
+ output.write(schema);
+ output.flush();
+ } finally {
+ output.close();
+ }
+
+ latest = new SchemaEntry(String.valueOf(newId), schema);
+ largestId++;
+ return latest;
+ } else {
+ throw new RuntimeException(
+ "Unable to register schema, schema file either exists already "
+ + " or couldn't create new file");
+ }
+ } catch (NumberFormatException e) {
+ throw new RuntimeException(
+ "Unable to register schema, invalid schema latest schema id ", e);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "Unable to register schema, couldn't create schema file ", e);
+ }
+
+ }
+
+ @Override
+ public synchronized SchemaEntry registerIfLatest(String schema,
+ SchemaEntry latest) throws SchemaValidationException {
+ isValid();
+ if (latest == this.latest // both null
+ || (latest != null && latest.equals(this.latest))) {
+ return register(schema);
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public synchronized SchemaEntry lookupBySchema(String schema) {
+ isValid();
+ RepositoryUtil.validateSchemaOrSubject(schema);
+ for (Integer id : getSchemaIds()) {
+ String idStr = id.toString();
+ String schemaInFile = readSchemaForIdOrNull(idStr);
+ if (schema.equals(schemaInFile)) {
+ return new SchemaEntry(idStr, schema);
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public synchronized SchemaEntry lookupById(String id) {
+ isValid();
+ String schema = readSchemaForIdOrNull(id);
+ if (schema != null) {
+ return new SchemaEntry(id, schema);
+ }
+ return null;
+ }
+
+ @Override
+ public synchronized SchemaEntry latest() {
+ isValid();
+ return latest;
+ }
+
+ @Override
+ public synchronized Iterable<SchemaEntry> allEntries() {
+ isValid();
+ List<SchemaEntry> entries = new ArrayList<SchemaEntry>();
+ for (Integer id : getSchemaIds()) {
+ String idStr = id.toString();
+ String schema = readSchemaForId(idStr);
+ entries.add(new SchemaEntry(idStr, schema));
+ }
+ Collections.reverse(entries);
+ return entries;
+ }
+
+ @Override
+ public boolean integralKeys() {
+ return true;
+ }
+
+ private String readSchemaForIdOrNull(String id) {
+ try {
+ return readSchemaForId(id);
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
+ private String readSchemaForId(String id) {
+ File schemaFile = getSchemaFile(id);
+ return readSchemaFile(schemaFile);
+ }
+
+ private String readSchemaFile(File schemaFile) {
+ try {
+ return readAllAsString(schemaFile);
+ } catch (FileNotFoundException e) {
+ throw new RuntimeException(
+ "Could not read schema contents at: "
+ + schemaFile.getAbsolutePath(), e);
+ }
+ }
+
+ private final String endOfLine = System.getProperty("line.separator");
+
+ private String readAllAsString(File file) throws FileNotFoundException {
+ // a scanner that will read a whole file
+ Scanner s = new Scanner(file, "UTF-8").useDelimiter(endOfLine);
+ StringBuilder strBuilder = new StringBuilder();
+ try {
+ while (s.hasNext()) {
+ strBuilder.append(s.nextLine());
+ if (s.hasNext()) {
+ strBuilder.append(endOfLine);
+ }
+ }
+ return strBuilder.toString();
+ } catch (NoSuchElementException e) {
+ throw new RuntimeException(
+ "file is empty: " + file.getAbsolutePath(), e);
+ } finally {
+ s.close();
+ }
+ }
+
+ private HashSet<String> getSchemaFiles() {
+ String[] files = subjectDir.list(new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ if (null != name && name.endsWith(SCHEMA_POSTFIX)) {
+ return true;
+ }
+ return false;
+ }
+ });
+ return new HashSet<String>(Arrays.asList(files));
+ }
+
+ // schema ids from the schema id file, in order from oldest to newest
+ private List<Integer> getSchemaIds(){
+ Scanner s = getIdFileScanner();
+ List<Integer> ids = new ArrayList<Integer>();
+ try {
+ while (s.hasNextLine()) {
+ if(s.hasNext()) {
+ // only read non-empty lines
+ ids.add(s.nextInt());
+ }
+ s.nextLine();
+ }
+ return ids;
+ } finally {
+ s.close();
+ }
+ }
+
+ private Scanner getIdFileScanner() {
+ try {
+ return new Scanner(idFile, "UTF-8");
+ } catch (FileNotFoundException e) {
+ throw new RuntimeException("Unable to read schema id file: "
+ + idFile.getAbsolutePath(), e);
+ }
+ }
+
+ private File getSchemaFile(String id) {
+ return new File(subjectDir, getSchemaFileName(id));
+ }
+
+ private File getSchemaFile(int id) {
+ return getSchemaFile(String.valueOf(id));
+ }
+
+ private String getSchemaFileName(String id) {
+ return id + SCHEMA_POSTFIX;
+ }
+
+ private String getSchemaFileName(int id) {
+ return getSchemaFileName(String.valueOf(id));
+ }
+
+ }
+
+}
diff --git a/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/InMemoryCache.java b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/InMemoryCache.java
new file mode 100644
index 0000000..6317834
--- /dev/null
+++ b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/InMemoryCache.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.repo;
+/**
+ * <p>
+ * A {@link InMemoryCache} is an implementation of {@link RepositoryCache}
+ * that uses {@link InMemorySubjectCache} and {@link InMemorySchemaEntryCache}
+ */
+public class InMemoryCache extends RepositoryCache {
+ public InMemoryCache() {
+ super(new InMemorySubjectCache(), new InMemorySchemaEntryCache.Factory());
+ }
+
+}
diff --git a/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/InMemoryRepository.java b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/InMemoryRepository.java
new file mode 100644
index 0000000..e528b59
--- /dev/null
+++ b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/InMemoryRepository.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.repo;
+
+import javax.inject.Inject;
+
+
+/**
+ * A {@link Repository} that stores its data in memory and is not persistent.
+ * This is useful primarily for testing.
+ */
+public class InMemoryRepository implements Repository {
+ private final InMemorySubjectCache subjects = new InMemorySubjectCache();
+ private final ValidatorFactory validators;
+
+ @Inject
+ public InMemoryRepository(ValidatorFactory validators) {
+ this.validators = validators;
+ }
+
+ @Override
+ public Subject register(String subjectName, SubjectConfig config) {
+ return subjects.add(Subject.validatingSubject(new MemSubject(subjectName, config), validators));
+ }
+
+ @Override
+ public Subject lookup(String subject) {
+ return subjects.lookup(subject);
+ }
+
+ /**
+ * List all subjects
+ */
+ @Override
+ public Iterable<Subject> subjects() {
+ return subjects.values();
+ }
+
+ private static class MemSubject extends Subject {
+ private final InMemorySchemaEntryCache schemas = new InMemorySchemaEntryCache();
+ private SchemaEntry latest = null;
+ private int nextId = 0;
+ private SubjectConfig config;
+
+ protected MemSubject(String name, SubjectConfig config) {
+ super(name);
+ this.config = RepositoryUtil.safeConfig(config);
+ }
+
+ @Override
+ public SubjectConfig getConfig() {
+ return config;
+ }
+
+ @Override
+ public synchronized SchemaEntry register(String schema)
+ throws SchemaValidationException {
+ String id = String.valueOf(nextId);
+ SchemaEntry toRegister = new SchemaEntry(id, schema);
+ SchemaEntry valueInCache = schemas.add(toRegister);
+ if (toRegister == valueInCache) {
+ // schema is new
+ nextId++;
+ this.latest = toRegister;
+ }
+ return valueInCache;
+ }
+
+ @Override
+ public synchronized SchemaEntry registerIfLatest(String schema,
+ SchemaEntry latest) throws SchemaValidationException {
+ if (latest == this.latest
+ || (latest != null && latest.equals(this.latest))) {
+ return register(schema);
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public SchemaEntry lookupBySchema(String schema) {
+ return schemas.lookupBySchema(schema);
+ }
+
+ @Override
+ public SchemaEntry lookupById(String id) {
+ return schemas.lookupById(id);
+ }
+
+ @Override
+ public synchronized SchemaEntry latest() {
+ return latest;
+ }
+
+ @Override
+ public synchronized Iterable<SchemaEntry> allEntries() {
+ return schemas.values();
+ }
+
+ @Override
+ public boolean integralKeys() {
+ return true;
+ }
+ }
+
+}
diff --git a/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/InMemorySchemaEntryCache.java b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/InMemorySchemaEntryCache.java
new file mode 100644
index 0000000..0d2bb29
--- /dev/null
+++ b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/InMemorySchemaEntryCache.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.repo;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * An unbounded in memory {@link SchemaEntryCache} that never evicts any values;
+ */
+public class InMemorySchemaEntryCache implements SchemaEntryCache {
+
+ private final ConcurrentHashMap<String, SchemaEntry> schemaToEntry =
+ new ConcurrentHashMap<String, SchemaEntry>();
+ private final ConcurrentHashMap<String, SchemaEntry> idToSchema =
+ new ConcurrentHashMap<String, SchemaEntry>();
+ private final LinkedList<SchemaEntry> schemasInOrder =
+ new LinkedList<SchemaEntry>();
+
+ @Override
+ public SchemaEntry lookupBySchema(String schema) {
+ return schemaToEntry.get(schema);
+ }
+
+ @Override
+ public SchemaEntry lookupById(String id) {
+ return idToSchema.get(id);
+ }
+
+ @Override
+ public synchronized SchemaEntry add(SchemaEntry entry) {
+ if (null == entry) {
+ return entry;
+ }
+ SchemaEntry prior = schemaToEntry.putIfAbsent(entry.getSchema(), entry);
+ if (null != prior) {
+ entry = prior;
+ }
+ idToSchema.put(entry.getId(), entry);
+ schemasInOrder.push(entry);
+ return entry;
+ }
+
+ /** return all of the values in this cache **/
+ public synchronized Iterable<SchemaEntry> values() {
+ return new ArrayList<SchemaEntry>(schemasInOrder);
+ }
+
+ public static class Factory implements SchemaEntryCache.Factory {
+ @Override
+ public SchemaEntryCache createSchemaEntryCache() {
+ return new InMemorySchemaEntryCache();
+ }
+ }
+
+}
diff --git a/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/InMemorySubjectCache.java b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/InMemorySubjectCache.java
new file mode 100644
index 0000000..ce86208
--- /dev/null
+++ b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/InMemorySubjectCache.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.repo;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * An unbounded in memory {@link SubjectCache} that never evicts any values.
+ */
+public class InMemorySubjectCache implements SubjectCache {
+ private final ConcurrentHashMap<String, Subject> subjects =
+ new ConcurrentHashMap<String, Subject>();
+
+ @Override
+ public Subject lookup(String name) {
+ return subjects.get(name);
+ }
+
+ @Override
+ public Subject add(Subject subject) {
+ if (subject == null) {
+ return subject;
+ }
+ Subject prior = subjects.putIfAbsent(subject.getName(), subject);
+ return (null != prior) ? prior : subject;
+ }
+
+ /**
+ * @return All fo the {@link Subject} values in this
+ * {@link InMemorySubjectCache}
+ */
+ public Iterable<Subject> values() {
+ return subjects.values();
+ }
+
+}
diff --git a/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/ReadOnlyRepository.java b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/ReadOnlyRepository.java
new file mode 100644
index 0000000..4329c60
--- /dev/null
+++ b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/ReadOnlyRepository.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.repo;
+
+import java.util.ArrayList;
+
+import javax.inject.Inject;
+
+/**
+ * ReadOnlyRepository is a {@link Repository} implementation that wraps another
+ * {@link Repository} and rejects all operations that can modify the state of
+ * the {@link Repository}.<br/>
+ * <br/>
+ *
+ * {@link #register(String, Map)}, {@link Subject#register(String)}
+ * and {@link Subject#registerIfLatest(String, SchemaEntry)} throw
+ * {@link IllegalStateException} if called.
+ */
+public class ReadOnlyRepository implements Repository {
+ private final Repository repo;
+
+ /**
+ * Create a repository that disallows mutations to the underlying repository.
+ * @param repo The repository to wrap
+ */
+ @Inject
+ public ReadOnlyRepository(Repository repo) {
+ this.repo = repo;
+ }
+
+ @Override
+ public Subject register(String subjectName, SubjectConfig config) {
+ throw new IllegalStateException(
+ "Cannot register a Subject in a ReadOnlyRepository");
+ }
+
+ @Override
+ public Subject lookup(String subjectName) {
+ return Subject.readOnly(repo.lookup(subjectName));
+ }
+
+ @Override
+ public Iterable<Subject> subjects() {
+ ArrayList<Subject> subjects = new ArrayList<Subject>();
+ for(Subject sub : repo.subjects()) {
+ subjects.add(Subject.readOnly(sub));
+ }
+ return subjects;
+ }
+}
diff --git a/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/Repository.java b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/Repository.java
new file mode 100644
index 0000000..91dca81
--- /dev/null
+++ b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/Repository.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.repo;
+
+/**
+ * A {@link Repository} is a collection of {@link Subject}s. A {@link Subject}
+ * can be looked up by name on a {@link Repository}, or registered.<br/>
+ * <br/>
+ * Registration of a {@link Subject} in a {@link Repository} is done via
+ * {@link #register(String, Map)}, which requires the subject name and its
+ * configuration. The configuration is a map of configuration keys
+ * to configuration values, both of which are Strings.<br/>
+ * <br/>
+ *
+ */
+public interface Repository {
+
+ /**
+ * Attempt to create a Subject with the given name and validator.
+ *
+ * @param subjectName
+ * The name of the subject. Must not be null.
+ * @param config
+ * The subject configuration. May be null.
+ * @return The newly created Subject, or an equivalent one if already created.
+ * Does not return null.
+ * @throws NullPointerException
+ * if subjectName is null
+ */
+ Subject register(String subjectName, SubjectConfig config);
+
+ /**
+ * Returns the subject if it exists, null otherwise.
+ *
+ * @param subject
+ * the subject name
+ * @return The subject if it exists, null otherwise.
+ */
+ Subject lookup(String subjectName);
+
+ /**
+ * List all subjects. Does not return null.
+ */
+ Iterable<Subject> subjects();
+
+}
diff --git a/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/RepositoryCache.java b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/RepositoryCache.java
new file mode 100644
index 0000000..001248c
--- /dev/null
+++ b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/RepositoryCache.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.repo;
+
+import javax.inject.Inject;
+
+/**
+ * <p>
+ * A {@link RepositoryCache} composes a {@link SubjectCache} with a
+ * {@link SchemaEntryCache.Factory}, using
+ * {@link Subject#cacheWith(Subject, SchemaEntryCache)} to wrap {@link Subject}
+ * instances prior to insertion into the {@link SubjectCache}.
+ */
+public class RepositoryCache implements SubjectCache {
+
+ private final SubjectCache subjects;
+ private final SchemaEntryCache.Factory entryCacheFactory;
+
+ @Inject
+ public RepositoryCache(SubjectCache subjects,
+ SchemaEntryCache.Factory entryCacheFactory) {
+ this.subjects = subjects;
+ this.entryCacheFactory = entryCacheFactory;
+ }
+
+ @Override
+ public Subject add(Subject entry) {
+ return subjects.add(Subject.cacheWith(entry,
+ entryCacheFactory.createSchemaEntryCache()));
+ }
+
+ @Override
+ public Subject lookup(String name) {
+ return subjects.lookup(name);
+ }
+}
diff --git a/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/RepositoryUtil.java b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/RepositoryUtil.java
new file mode 100644
index 0000000..23f19e3
--- /dev/null
+++ b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/RepositoryUtil.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.repo;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * {@link RepositoryUtil} contains static helper methods for the
+ * org.apache.avro.repo package.
+ * <p>
+ * {@link #subjectsToString(Iterable)} and
+ * {@link #subjectNamesFromString(String)} can be used to encode
+ * subjects to string format. <br/>
+ * {@link #schemasToString(Iterable)} and {@link #schemasFromString(String)} can
+ * be used to encode schemas to string format.
+ * </p>
+ * <p>
+ * These formats simply delimit items by the newline character and can be used
+ * for human readable output. The Avro RESTRepository uses this format to encode
+ * subjects and schemas over HTTP. Subject names are forbidden from containing
+ * whitespace. Schemas have their whitespace removed prior to use in the
+ * Repository.
+ * </p>
+ */
+public final class RepositoryUtil {
+ private RepositoryUtil() {
+ }
+
+ /**
+ * Encode {@link Subject}s into a {@link String} for use by
+ * {@link #subjectNamesFromString(String)}
+ *
+ * @param subjects
+ * the Subject objects to encode
+ * @return The {@link Subject} objects encoded as a String
+ */
+ public static String subjectsToString(Iterable<Subject> subjects) {
+ StringBuilder sb = new StringBuilder();
+ for (Subject s : subjects) {
+ sb.append(s.getName()).append("\n");
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Decode a string created by {@link #subjectsToString(Iterable)}
+ *
+ * @param str
+ * The String to decode
+ * @return an {@link java.lang.Iterable} of {@link Subject}
+ */
+ public static Iterable<String> subjectNamesFromString(String str) {
+ List<String> subjects = new ArrayList<String>();
+ if (str != null && !str.isEmpty()) {
+ String[] strs = str.split("\n");
+ for (String s : strs) {
+ subjects.add(s);
+ }
+ }
+ return subjects;
+ }
+
+ /**
+ * Encode {@link SchemaEntry} objects into a {@link String} for use by
+ * {@link #schemasFromString(String)}
+ *
+ * @param allEntries
+ * the SchemaEntry objects to encode
+ * @return The {@link SchemaEntry} objects encoded as a String
+ */
+ public static String schemasToString(Iterable<SchemaEntry> allEntries) {
+ StringBuilder sb = new StringBuilder();
+ for (SchemaEntry s : allEntries) {
+ sb.append(s.toString()).append("\n");
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Decode a string created by {@link #schemasToString(Iterable)}
+ *
+ * @param str
+ * The String to decode
+ * @return An {@link java.lang.Iterable} of {@link SchemaEntry}
+ */
+ public static Iterable<SchemaEntry> schemasFromString(String str) {
+ List<SchemaEntry> schemas = new ArrayList<SchemaEntry>();
+ if (str != null && !str.isEmpty()) {
+ String[] strs = str.split("\n");
+ for (String s : strs) {
+ schemas.add(new SchemaEntry(s));
+ }
+ }
+ return schemas;
+ }
+
+ /**
+ * Throws IllegalArgumentException if the string provided is null, or empty.
+ */
+ public static void validateSchemaOrSubject(String val) {
+ if (null == val || val.isEmpty()) {
+ throw new IllegalArgumentException(
+ "Provided string is null or empty: '" + val + "'");
+ }
+ }
+
+ /**
+ * Returns an immutable Map<String, String> from the properties provided.
+ * Includes any default values that exist in the properties.
+ */
+ public static SubjectConfig configFromProperties(Properties props) {
+ HashMap<String, String> propData = new HashMap<String, String>();
+ for (String key :props.stringPropertyNames()) {
+ propData.put(key, props.getProperty(key));
+ }
+ return new SubjectConfig.Builder().set(propData).build();
+ }
+
+ /** temporary until we have decided how to deal with null configs or create a SubjectConfig class **/
+ public static SubjectConfig safeConfig(SubjectConfig config) {
+ if (null == config) {
+ return SubjectConfig.emptyConfig();
+ } else {
+ return config;
+ }
+ }
+
+}
diff --git a/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/SchemaEntry.java b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/SchemaEntry.java
new file mode 100644
index 0000000..ee226ff
--- /dev/null
+++ b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/SchemaEntry.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.repo;
+
+/**
+ * {@link SchemaEntry} is composed of a schema and its corresponding id.
+ */
+public final class SchemaEntry {
+ private final String id;
+ private final String schema;
+
+ /**
+ * Primary constructor taking a literal id and schema.
+ */
+ public SchemaEntry(String id, String schema) {
+ this.id = id;
+ this.schema = schema;
+ }
+
+ /**
+ * Constructor taking the string representation of the SchemaEntry produced by
+ * {@link #toString()}
+ */
+ public SchemaEntry(String stringForm) {
+ int tab = stringForm.indexOf('\t');
+ if (tab < 1) {
+ throw new IllegalArgumentException("Invalid Schema Entry Serialization: "
+ + stringForm);
+ }
+ this.id = stringForm.substring(0, tab);
+ this.schema = stringForm.substring(tab + 1);
+ }
+
+ /** @return the id */
+ public String getId() {
+ return id;
+ }
+
+ /** @return the schema */
+ public String getSchema() {
+ return schema;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = ((id == null) ? 0 : id.hashCode());
+ result = prime * result + ((schema == null) ? 0 : schema.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ SchemaEntry other = (SchemaEntry) obj;
+ if (id == null) {
+ if (other.id != null)
+ return false;
+ } else if (!id.equals(other.id))
+ return false;
+ if (schema == null) {
+ if (other.schema != null)
+ return false;
+ } else if (!schema.equals(other.schema))
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return id + "\t" + schema;
+ }
+
+}
diff --git a/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/SchemaEntryCache.java b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/SchemaEntryCache.java
new file mode 100644
index 0000000..2b3f6e8
--- /dev/null
+++ b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/SchemaEntryCache.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.repo;
+
+/**
+ * <p>
+ * A {@link SchemaEntryCache} is a bi-directional cache from a schema string to
+ * a schema id string. In a given {@link Subject} the mapping between a schema
+ * and its id is immutalbe and can be cached. Other aspects of {@link Subject}
+ * are not safe to cache.
+ * </p>
+ * <p>
+ * In a {@link Subject} schemas are Strings and can not be null or empty.
+ * See {@link RepositoryUtil#validateSchemaOrSubject(String)}
+ * </p>
+ * {@link #add(SchemaEntry)}, {@link #lookupById(String)}, and
+ * {@link #lookupBySchema(String)} must be thread-safe with respect to
+ * each-other.
+ */
+public interface SchemaEntryCache {
+
+ /**
+ * Look up a schema entry by its full string form. Thread-safe.
+ *
+ * @throws NullPointerException
+ * If the provided schema is null
+ */
+ SchemaEntry lookupBySchema(String schema);
+
+ /**
+ * Look up a schema entry by its id. Thread-safe.
+ *
+ * @throws NullPointerException
+ * If the provided id is null
+ */
+ SchemaEntry lookupById(String id);
+
+ /**
+ * Add the schema entry to this cache.
+ *
+ * @param the
+ * schema entry to add. If the provided entry is null, returns null;
+ *
+ * @return the {@link SchemaEntry} that is in the cache after the call
+ * completes. If the entry already exists the pre-existing value is
+ * returned, otherwise the value returned is the entry provided. If
+ * the provided entry is null, returns null. Thread-safe.
+ */
+ SchemaEntry add(SchemaEntry entry);
+
+ /**
+ * Creates instances of {@link SchemaEntryCache}<br/>
+ * <br/>
+ *
+ * @see {@link Subject#cacheWith(Subject, SchemaEntryCache)}
+ */
+ interface Factory {
+ /**
+ * Create a {@link SchemaEntryCache} instance for use with
+ * {@link Subject#cacheWith(Subject, SchemaEntryCache)}.
+ *
+ * May return null to dicable use of a SchemaEntryCache
+ */
+ SchemaEntryCache createSchemaEntryCache();
+ }
+
+}
diff --git a/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/SchemaValidationException.java b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/SchemaValidationException.java
new file mode 100644
index 0000000..c65355d
--- /dev/null
+++ b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/SchemaValidationException.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.repo;
+
+public class SchemaValidationException extends Exception {
+ private static final long serialVersionUID = -3915576082651907606L;
+
+ public SchemaValidationException(String msg) {
+ super(msg);
+ }
+}
diff --git a/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/Subject.java b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/Subject.java
new file mode 100644
index 0000000..795a887
--- /dev/null
+++ b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/Subject.java
@@ -0,0 +1,346 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.repo;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A {@link Subject} is a collection of mutually compatible Schemas. <br/>
+ * <br/>
+ * Validation of schemas is pluggable and each subject may have its own
+ * validation rules defined with its own {@link Validator} when registered with
+ * a {@link Repository}. To create a {@link Subject} that validates its schemas,
+ * use {@link #validateWith(Subject, Validator)}. <br/>
+ * <br/>
+ * Caching of schemas is pluggable via
+ * {@link #cacheWith(Subject, SchemaEntryCache)}. A {@link Subject} can only
+ * cache the schema to id mappings, as other properties of a Subject are not
+ * safe to cache. <br/>
+ * <br/>
+ * A {@link Subject} has a few basic methods for interacting with Schemas:
+ * <li>
+ * {@link #register(String)} attempts to register a schema with the Subject,
+ * according to the validation rules of the Subject. The operation is idempotent
+ * -- the return value is the {@link SchemaEntry} corresponding to the schema
+ * String whether the schema existed before the operation or not.</li>
+ * <li>
+ * {@link #registerIfLatest(String, SchemaEntry)} attempts to register a schema
+ * with the Subject, according to the validation rules of the Subject. The
+ * operation succeeds only if the provided {@code latest} value is the current
+ * latest schema in the system, and returns null otherwise.</li>
+ * <li>
+ * {@link #lookupById(String)} looks up a schema by its id, and returns null if
+ * such a schema does not exist. Since the mapping from id to schema is
+ * immutable, this result is cacheable.</li>
+ * <li>
+ * {@link #lookupBySchema(String)} looks up an id for a schema, and returns null
+ * if no such schema exists. Since the mapping from id to schema is immutable,
+ * this result is cacheable.</li>
+ * <li>
+ * {@link #allEntries()} returns all the schema entries for the subject, ordered
+ * from most recent to oldest. The result is not cacheable, since additional
+ * entries may be added.</li>
+ *
+ */
+public abstract class Subject {
+ private final String name;
+
+ /**
+ * A {@link Subject} has a name. The name must not be null or empty, and
+ * cannot contain whitespace. If the name contains whitespace an
+ * {@link IllegalArgumentException} is thrown.
+ **/
+ protected Subject(String name) {
+ RepositoryUtil.validateSchemaOrSubject(name);
+ this.name = name;
+ }
+
+ /**
+ * Return the Name of the Subject. A Subject name can not contain whitespace,
+ * and must not be empty or null.
+ */
+ public String getName() {
+ return name;
+ }
+
+ public abstract SubjectConfig getConfig();
+
+ public abstract boolean integralKeys();
+
+ /**
+ * If the provided schema has already been registered in this subject, return
+ * the id.
+ *
+ * If the provided schema has not been registered in this subject, register it
+ * and return its id.
+ *
+ * Idempotent -- If two users simultaneously register the same schema, they
+ * will both get the same {@link SchemaEntry} result and succeed.
+ *
+ * @param schema
+ * The schema to register
+ * @return The id of the schema
+ * @throws SchemaValidationException
+ * If the schema change is not valid according the validation rules
+ * of the subject
+ */
+ public abstract SchemaEntry register(String schema)
+ throws SchemaValidationException;
+
+ /**
+ * Register the provided schema only if the current latest schema matches the
+ * provided latest entry.
+ *
+ *
+ * @param schema
+ * The schema to register
+ * @param latest
+ * the entry that must match the current actual latest value in order
+ * to register this schema.
+ * @return The id of the schema, or null if latest does not match.
+ * @throws SchemaValidationException
+ * If the schema change is not valid according the validation rules
+ * of the subject
+ */
+ public abstract SchemaEntry registerIfLatest(String schema, SchemaEntry latest)
+ throws SchemaValidationException;
+
+ /**
+ * Lookup the {@link SchemaEntry} for the given schema. Since the mapping of
+ * schema to id is immutable, this result can be cached.
+ *
+ * @param schema
+ * The schema to look up
+ * @return The SchemaEntry of the schema or null if the schema is not
+ * registered
+ */
+ public abstract SchemaEntry lookupBySchema(String schema);
+
+ /**
+ * Lookup the {@link SchemaEntry} for the given subject by id. Since the
+ * mapping of schema to id is immutable the result can be cached.
+ *
+ * @param id
+ * the id of the schema to look up
+ * @return The SchemaEntry of the schema or null if no such schema is
+ * registered for the provided id
+ */
+ public abstract SchemaEntry lookupById(String id);
+
+ /**
+ * Lookup the most recently registered schema for the given subject. This
+ * result is not cacheable, since the latest schema may change.
+ *
+ * @return The {@link SchemaEntry} or null if no schema is registered with
+ * this subject
+ */
+ public abstract SchemaEntry latest();
+
+ /**
+ * List the ids of schemas registered with the given subject, ordered from
+ * most recent to oldest. This result is not cacheable, since the
+ * {@link SchemaEntry} in the subject may grow over time.
+ *
+ * @return the {@link SchemaEntry} objects in this subject, ordered from most
+ * recent to oldest.
+ */
+ public abstract Iterable<SchemaEntry> allEntries();
+
+ @Override
+ public String toString() {
+ return name;
+ }
+
+ /**
+ * Create a {@link Subject} that rejects modifications, throwing
+ * {@link IllegalStateException} if a modification is attempted
+ **/
+ public static final Subject readOnly(Subject subject) {
+ if (null == subject) {
+ return subject;
+ } else {
+ return new ReadOnlySubject(subject);
+ }
+ }
+
+ private static class ReadOnlySubject extends DelegatingSubject {
+ private ReadOnlySubject(Subject subject) {
+ super(subject);
+ }
+
+ @Override
+ public SchemaEntry register(String schema) throws SchemaValidationException {
+ throw new IllegalStateException("Cannot register, subject is read-only");
+ }
+
+ @Override
+ public SchemaEntry registerIfLatest(String schema, SchemaEntry latest) {
+ throw new IllegalStateException("Cannot register, subject is read-only");
+ }
+
+ }
+
+ /**
+ * Create a {@link Subject} that validates schemas as configured.
+ */
+ public static Subject validatingSubject(Subject subject, ValidatorFactory factory) {
+ if (null == subject) {
+ return subject;
+ }
+ List<Validator> validators = factory.getValidators(subject.getConfig().getValidators());
+ if (!validators.isEmpty()) {
+ return new ValidatingSubject(subject, new CompositeValidator(validators));
+ } else {
+ return subject;
+ }
+ }
+
+ private static final class CompositeValidator implements Validator {
+ private final ArrayList<Validator> validators;
+ private CompositeValidator(List<Validator> validators) {
+ this.validators = new ArrayList<Validator>(validators);
+ }
+
+ @Override
+ public void validate(String schemaToValidate,
+ Iterable<SchemaEntry> schemasInOrder) throws SchemaValidationException {
+ for(Validator v : validators) {
+ v.validate(schemaToValidate, schemasInOrder);
+ }
+ }
+ }
+
+ private static class ValidatingSubject extends DelegatingSubject {
+ protected final Validator validator;
+
+ private ValidatingSubject(Subject delegate, Validator validator) {
+ super(delegate);
+ this.validator = validator;
+ }
+
+ @Override
+ public SchemaEntry register(String schema) throws SchemaValidationException {
+ while (true) {
+ Iterable<SchemaEntry> schemaEntries = allEntries();
+ SchemaEntry actualLatest = null;
+ for (SchemaEntry entry : schemaEntries) {
+ actualLatest = entry;
+ break;
+ }
+ validator.validate(schema, schemaEntries);
+ SchemaEntry registered = super.registerIfLatest(schema, actualLatest);
+ // if registered is not null, it was successful
+ if (null != registered) {
+ return registered;
+ }
+ }
+ }
+
+ @Override
+ public SchemaEntry registerIfLatest(String schema, SchemaEntry latest)
+ throws SchemaValidationException {
+ Iterable<SchemaEntry> schemaEntries = allEntries();
+ SchemaEntry actualLatest = null;
+ for (SchemaEntry entry : schemaEntries) {
+ actualLatest = entry;
+ break;
+ }
+ if (actualLatest == latest
+ || ((actualLatest != null) && actualLatest.equals(latest))) {
+ // they are equal, either both are null or they equal
+ validator.validate(schema, schemaEntries);
+ return super.registerIfLatest(schema, latest);
+ } else {
+ return null;
+ }
+ }
+ }
+
+ /**
+ * Create a {@link Subject} that caches id to schema mappings using the
+ * {@link SchemaEntryCache} provided.
+ *
+ * @param subject
+ * The {@link Subject} to wrap
+ * @param cache
+ * The {@link SchemaEntryCache} to cache with
+ * @return returns a {@link Subject} instance that caches {@link SchemaEntry}
+ * values with the cache provided, if and only if both parameters are
+ * not null. <br/>
+ * If the provided subject is null, returns null. If the provided
+ * cache is null, returns the provided subject without wrapping it.
+ */
+ public static Subject cacheWith(Subject subject, SchemaEntryCache cache) {
+ return (null == subject || null == cache) ?
+ subject : new CachingSubject(subject, cache);
+ }
+
+ private static class CachingSubject extends DelegatingSubject {
+ private final SchemaEntryCache cache;
+
+ private CachingSubject(Subject delegate, SchemaEntryCache cache) {
+ super(delegate);
+ this.cache = cache;
+ }
+
+ @Override
+ public SchemaEntry register(String schema) throws SchemaValidationException {
+ SchemaEntry entry = cache.lookupBySchema(schema);
+ if (entry == null) {
+ return cache.add(super.register(schema));
+ }
+ return entry;
+ }
+
+ @Override
+ public SchemaEntry registerIfLatest(String schema, SchemaEntry latest)
+ throws SchemaValidationException {
+ return cache.add(super.registerIfLatest(schema, latest));
+ }
+
+ @Override
+ public SchemaEntry lookupBySchema(String schema) {
+ SchemaEntry entry = cache.lookupBySchema(schema);
+ if (entry == null) {
+ return cache.add(super.lookupBySchema(schema));
+ }
+ return entry;
+ }
+
+ @Override
+ public SchemaEntry lookupById(String id) {
+ SchemaEntry entry = cache.lookupById(id);
+ if (entry == null) {
+ return cache.add(super.lookupById(id));
+ }
+ return entry;
+ }
+
+ @Override
+ public Iterable<SchemaEntry> allEntries() {
+ Iterable<SchemaEntry> all = super.allEntries();
+ for (SchemaEntry entry : all) {
+ cache.add(entry);
+ }
+ return all;
+ }
+ }
+
+}
diff --git a/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/SubjectCache.java b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/SubjectCache.java
new file mode 100644
index 0000000..360f703
--- /dev/null
+++ b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/SubjectCache.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.repo;
+
+/**
+ * <p>
+ * A {@link SubjectCache} is a cache from a string subject name to a
+ * {@link Subject}
+ * </p>
+ * <p>
+ * In a {@link Repository} subjects can be cached because they can only be
+ * created. However, a {@link Subject} can have its meta-data altered, so this
+ * cannot be cached.
+ * </p>
+ * {@link #add(Subject)} and {@link #lookup(String)} must be thread-safe
+ * with respect to each-other.
+ */
+public interface SubjectCache {
+ /**
+ * Look up a {@link Subject} by its name. Thread-safe.
+ *
+ * @throws NullPointerException
+ * if the provided name is null
+ */
+ Subject lookup(String name);
+
+ /**
+ * Add or update the {@link Subject} entry in this cache.
+ *
+ * @return the {@link Subject} that is in the cache after the call completes.
+ * If the entry already exists this is the pre-existing value,
+ * otherwise it is the value provided. If the value provided is null,
+ * returns null. Thread-safe.
+ */
+ Subject add(Subject entry);
+
+}
diff --git a/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/SubjectConfig.java b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/SubjectConfig.java
new file mode 100644
index 0000000..c1993bc
--- /dev/null
+++ b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/SubjectConfig.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.repo;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A {@link SubjectConfig} is effectively a Map<String, String> , with reserved
+ * keys and default values for certain keys.
+ * <br/>
+ * Keys starting with "repo." are reserved.
+ *
+ */
+public class SubjectConfig {
+ private static final SubjectConfig EMPTY = new Builder().build();
+
+ private final Map<String, String> conf;
+ private final Set<String> validators;
+
+ private SubjectConfig(Map<String, String> conf, Set<String> validators) {
+ this.conf = conf;
+ this.validators = validators;
+ }
+
+ public String get(String key) {
+ return conf.get(key);
+ }
+
+ public Set<String> getValidators() {
+ return validators;
+ }
+
+ public Map<String, String> asMap() {
+ return conf;
+ }
+
+ public static SubjectConfig emptyConfig() {
+ return EMPTY;
+ }
+
+ @Override
+ public int hashCode() {
+ return conf.hashCode() * 31 + validators.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ SubjectConfig other = (SubjectConfig) obj;
+ if (!validators.equals(other.validators))
+ return false;
+ if (!conf.equals(other.conf))
+ return false;
+ return true;
+ }
+
+ public static class Builder {
+ private static final String RESERVED_PREFIX = "repo.";
+ private static final String VALIDATORS_KEY = "repo.validators";
+
+ private final HashMap<String, String> conf = new HashMap<String, String>();
+ private final HashSet<String> validators = new HashSet<String>();
+
+ public Builder set(Map<String, String> config) {
+ for(Map.Entry<String, String> entry : config.entrySet()) {
+ set(entry.getKey(), entry.getValue());
+ }
+ return this;
+ }
+
+ public Builder set(String key, String value) {
+ if(key.startsWith(RESERVED_PREFIX)) {
+ if(VALIDATORS_KEY.equals(key)) {
+ setValidators(commaSplit(value));
+ } else {
+ throw new RuntimeException("SubjectConfig keys starting with '" +
+ RESERVED_PREFIX + "' are reserved, failed to set: " + key +
+ " to value: " + value);
+ }
+ } else {
+ conf.put(key, value);
+ }
+ return this;
+ }
+
+ public Builder setValidators(Collection<String> validatorNames) {
+ this.validators.clear();
+ this.conf.remove(VALIDATORS_KEY);
+ if(!validatorNames.isEmpty()) {
+ this.validators.addAll(validatorNames);
+ this.conf.put(VALIDATORS_KEY, commaJoin(validators));
+ }
+ return this;
+ }
+
+ public Builder addValidator(String validatorName) {
+ this.validators.add(validatorName);
+ this.conf.put(VALIDATORS_KEY, commaJoin(validators));
+ return this;
+ }
+
+ public SubjectConfig build() {
+ return new SubjectConfig(
+ Collections.unmodifiableMap(new HashMap<String, String>(conf)),
+ Collections.unmodifiableSet(new HashSet<String>(validators)));
+ }
+
+ }
+
+ /**
+ * Helper method for splitting a string by a delimiter with
+ * java.util.String.split().
+ * Omits empty values.
+ * @param toSplit The string to split. If null, an empty
+ * String[] is returned
+ * @return A String[] containing the non-empty values resulting
+ * from the split. Does not return null.
+ */
+ private static List<String> commaSplit(String toSplit) {
+ if (toSplit == null) {
+ return Collections.emptyList();
+ }
+ ArrayList<String> list = new ArrayList<String>();
+ for(String s : toSplit.split(",")) {
+ s = s.trim();
+ if (!s.isEmpty()) {
+ list.add(s);
+ }
+ }
+ return list;
+ }
+
+ private static String commaJoin(Collection<String> strings) {
+ StringBuilder sb = new StringBuilder();
+ for(String s : strings) {
+ sb.append(s).append(',');
+ }
+ return sb.toString();
+ }
+
+}
diff --git a/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/Validator.java b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/Validator.java
new file mode 100644
index 0000000..94bf796
--- /dev/null
+++ b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/Validator.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.repo;
+
+/**
+ * A Validator may be bound to each {@link Subject} in a {@link Repository}
+ * to ensure that schemas added to the {@link Subject} are mutualy compatible.
+ * <br/>
+ * <br/>
+ * There are many useful notions of compatibility, for example:
+ * <li>Forwards Compatible: A user of any old schema
+ * is able to read data written with the new schema</li>
+ * <li>Backwards Compabible: A user of the most recent schema
+ * is able to read data written with any older schema</li>
+ * <li>Full Compatibility: A user of any schema
+ * is able to read data written in any other schema</li>
+ * <li>N+1 Compatibility: Forward compatibility constrained to
+ * only the verison one prior to the current version.
+ * e.g. a reader with the second most recent schema
+ * is able to read data written with the most recent schema</li>
+ * <li>N+1 Compatibility: Backward compatibility constrained to
+ * only the verison one prior to the current version.
+ * e.g. a reader with the most recent schema
+ * is able to read data written with the second most recent schema</li>
+ */
+public interface Validator {
+
+ /**
+ * Validate that a schema is compatible with the schemas provided.
+ *
+ * @param schemaToValidate
+ * The schema to validate.
+ * @param schemasInOrder
+ * The schemas to validate against,
+ * presented in order from latest to oldest.
+ * @throws SchemaValidationException
+ * if {@code schemaToValidate} is not compatible with the schemas
+ * in {@code schemasInOrder}
+ */
+ void validate(String schemaToValidate, Iterable<SchemaEntry> schemasInOrder)
+ throws SchemaValidationException;
+
+}
diff --git a/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/ValidatorFactory.java b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/ValidatorFactory.java
new file mode 100644
index 0000000..3d1ac41
--- /dev/null
+++ b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/ValidatorFactory.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.repo;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * A factory for mapping Validator names to instantiated instances. Validator
+ * names starting with "repo."
+ */
+public class ValidatorFactory {
+ public static final String REJECT_VALIDATOR = "repo.reject";
+
+ private final HashMap<String, Validator> validators;
+
+ private ValidatorFactory(HashMap<String, Validator> validators) {
+ this.validators = validators;
+ }
+
+ /**
+ * @param validatorNames
+ * The set of {@link Validator} names to resolve. Must not be null.
+ * @return A list of {@link Validator}s. Not null.
+ */
+ public final List<Validator> getValidators(Set<String> validatorNames) {
+ ArrayList<Validator> result = new ArrayList<Validator>();
+ for (String name : validatorNames) {
+ Validator v = validators.get(name);
+ if (v != null) {
+ result.add(v);
+ }
+ }
+ return result;
+ }
+
+ public static class Builder {
+ private final HashMap<String, Validator> validators;
+ {
+ validators = new HashMap<String, Validator>();
+ validators.put(REJECT_VALIDATOR, new Reject());
+ }
+
+ /**
+ * Configure this builder to return a {@link ValidatorFactory} that maps the
+ * {@link Validator} provided to the name given. <br/>
+ * The name must not be null and must not start with "repo.".
+ */
+ public Builder setValidator(String name, Validator validator) {
+ if (name.startsWith("repo.")) {
+ throw new RuntimeException("Validator names starting with 'repo.'"
+ + " are reserved. Attempted to set validator with name: " + name);
+ }
+ validators.put(name, validator);
+ return this;
+ }
+
+ public ValidatorFactory build() {
+ return new ValidatorFactory(new HashMap<String, Validator>(validators));
+ }
+ }
+
+ private static class Reject implements Validator {
+ @Override
+ public void validate(String schemaToValidate,
+ Iterable<SchemaEntry> schemasInOrder) throws SchemaValidationException {
+ throw new SchemaValidationException(
+ "repo.validator.reject validator always rejects validation");
+ }
+ }
+}
diff --git a/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/AbstractTestRepository.java b/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/AbstractTestRepository.java
new file mode 100644
index 0000000..b21509a
--- /dev/null
+++ b/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/AbstractTestRepository.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.repo;
+
+import java.util.Iterator;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * An abstract JUnit test for thoroughly testing a Repository implementation
+ */
+public abstract class AbstractTestRepository<R extends Repository> {
+ private static final String SUB = "sub";
+ private static final String CONF = "conf";
+ private static final String NOCONF = "noconf";
+ private static final String VALIDATING = "validating";
+ private static final String FOO = "foo";
+ private static final String BAR = "bar";
+ private static final String BAZ = "baz";
+
+ private R repo;
+
+ protected abstract R createRepository();
+
+ @Before
+ public void setUpRepository() {
+ repo = createRepository();
+ }
+
+ @After
+ public void tearDownRepository() {
+ repo = null;
+ }
+
+ protected final R getRepo() {
+ return repo;
+ }
+
+ @Test
+ public void testRepository() throws SchemaValidationException {
+ // lookup a subject that does not exist, when none do
+ Subject none = repo.lookup(SUB);
+ Assert.assertNull("non-existent subject lookup should return null", none);
+ // ensure that when there are no subjects, an empty iterable is produced
+ Iterable<Subject> subjects = repo.subjects();
+ Assert.assertNotNull("subjects must not be null");
+ Assert.assertFalse("subjects must be empty", subjects.iterator().hasNext());
+
+ // register a subject
+ Subject sub = repo.register(SUB, null);
+ Assert.assertNotNull("failed to register subject: " + SUB, sub);
+ // a duplicate register is idempotent; the result is the same
+ Subject sub2 = repo.register(SUB, null);
+ Assert.assertNotNull("failed to re-register subject: " + SUB, sub2);
+ Assert.assertEquals(
+ "registering a subject twice did not produce the same result",
+ sub.getName(), sub2.getName());
+
+ // lookup subject that was just registered
+ Subject sub3 = repo.lookup(SUB);
+ Assert.assertNotNull("subject lookup failed", sub3);
+ Assert.assertEquals("subject lookup failed", sub.getName(), sub3.getName());
+
+ // lookup a subject that does not exist, this time when some do
+ Subject none2 = repo.lookup("nothing");
+ Assert.assertNull("non-existent subject lookup should return null", none2);
+
+ // go through all subjects
+ boolean hasSub = false;
+ for (Subject s : repo.subjects()) {
+ if (sub.getName().equals(s.getName())) {
+ hasSub = true;
+ break;
+ }
+ }
+ Assert.assertTrue("subjects() did not contain registered subject: " + sub,
+ hasSub);
+
+ // ensure that latest is null when a subject is first created
+ SchemaEntry noLatest = sub.latest();
+ Assert.assertNull("latest msut be null if it does not exist", noLatest);
+
+ // ensure that registerIfLatest does not register if provided a latest
+ // value when latest is null
+ SchemaEntry didNotRegister =
+ sub.registerIfLatest(FOO, new SchemaEntry("not", "there"));
+ Assert.assertNull(
+ "registerIfLatest must return null if there are no schemas in the " +
+ "subject and the passed in latest is not null. Found: " +
+ didNotRegister, didNotRegister);
+
+ // ensure registerIflatest works when latest is null
+ SchemaEntry foo = sub.registerIfLatest(FOO, null);
+ validateSchemaEntry(FOO, foo);
+
+ // ensure that register is idempotent
+ SchemaEntry foo2 = sub.register(FOO);
+ Assert.assertEquals("duplicate schema registration must be idempotent",
+ foo, foo2);
+ validateSchemaEntry(FOO, foo2);
+
+ // ensure registerIflatest works when latest is not null
+ SchemaEntry bar = sub.registerIfLatest(BAR, foo2);
+ validateSchemaEntry(BAR, bar);
+
+ // ensure registerIfLatest does not register if latest does not match
+ SchemaEntry none3 = sub.registerIfLatest("none", foo);
+ Assert.assertNull(
+ "registerIfLatest must return null if latest does not match", none3);
+ // ensure registerIfLatest does not register when provided null does not match
+ SchemaEntry none4 = sub.registerIfLatest("none", null);
+ Assert.assertNull(
+ "registerIfLatest must return null if there is a latest schema in"
+ + " the subject and the passed in value is null", none4);
+
+ // ensure register can add new schemas
+ SchemaEntry baz = sub.register(BAZ);
+ validateSchemaEntry(BAZ, baz);
+
+ // test lookup
+ Subject subject = repo.lookup(SUB);
+ Assert.assertNotNull("lookup of previously registered subject failed",
+ subject);
+
+ // test latest
+ Assert.assertEquals("latest schema must match last registered", baz,
+ subject.latest());
+ boolean foundfoo = false, foundbar = false;
+ for (SchemaEntry s : sub3.allEntries()) {
+ if (s.equals(foo)) {
+ foundfoo = true;
+ }
+ if (s.equals(bar)) {
+ foundbar = true;
+ }
+ }
+ Assert.assertTrue("AllEntries did not contain schema: " + FOO, foundfoo);
+ Assert.assertTrue("AllEntries did not contain schema: " + BAR, foundbar);
+
+ //ensure order of allEntries is correct
+ Iterator<SchemaEntry> allEntries = sub3.allEntries().iterator();
+ // latest must match first one:
+ Assert.assertEquals("Latest must be first returned from allEntries()",
+ sub3.latest(), allEntries.next());
+ Assert.assertEquals("second-latest must be BAR", bar, allEntries.next());
+ Assert.assertEquals("third must be FOO", foo, allEntries.next());
+
+
+ // test lookupBySchema
+ SchemaEntry resultfoo = subject.lookupBySchema(foo.getSchema());
+ Assert.assertEquals("lookup by Schema did not return same result", foo,
+ resultfoo);
+ SchemaEntry notThere = subject.lookupBySchema("notThere");
+ Assert.assertNull("non existent schema should return null", notThere);
+ // test lookupById
+ SchemaEntry resultfooid = subject.lookupById(foo.getId());
+ Assert.assertEquals("lookup by ID did not return same result", foo,
+ resultfooid);
+ SchemaEntry notTherById = subject.lookupById("notThere");
+ Assert.assertNull("non existent schema should return null", notTherById);
+
+ // test integralKeys()
+ if(subject.integralKeys()) {
+ Integer.parseInt(resultfoo.getId());
+ }
+
+ }
+
+ @Test
+ public void testSubjectConfigs() {
+ String testKey = "test.key";
+ String testVal = "test.val";
+ String testVal2 = "test.val2";
+ SubjectConfig conf = new SubjectConfig.Builder().set(testKey, testVal).build();
+ SubjectConfig conf2 = new SubjectConfig.Builder().set(testKey, testVal2).build();
+ // lookup a subject that does not exist, when none do
+ Subject none = repo.lookup(CONF);
+ Assert.assertNull("non-existent subject lookup should return null", none);
+
+ // register a subject
+ Subject sub = repo.register(CONF, conf);
+ Assert.assertNotNull("failed to register subject: " + CONF, sub);
+ // a duplicate register is idempotent; the result is the same
+ Subject sub2 = repo.register(CONF, conf2);
+ Assert.assertNotNull("failed to re-register subject: " + SUB, sub2);
+ validateSubject(sub, sub2);
+
+ // lookup subject that was just registered
+ Subject sub3 = repo.lookup(CONF);
+ validateSubject(sub, sub3);
+
+ // lookup something that is not there
+ Subject sub4 = repo.lookup(NOCONF);
+ Assert.assertNull("subject should not exist", sub4);
+
+ }
+
+ @Test
+ public void testValidation() {
+ SubjectConfig conf = new SubjectConfig.Builder().addValidator(ValidatorFactory.REJECT_VALIDATOR).build();
+ // lookup a subject that does not exist, when none do
+ Subject none = repo.lookup(VALIDATING);
+ Assert.assertNull("non-existent subject lookup should return null", none);
+
+ // register a subject that should reject all schemas
+ Subject sub = repo.register(VALIDATING, conf);
+ Assert.assertNotNull("failed to register subject: " + VALIDATING, sub);
+
+ boolean threw = false;
+ try {
+ sub.register("stuff");
+ } catch (SchemaValidationException e) {
+ threw = true;
+ }
+ Assert.assertTrue("must throw a SchemaValidationException", threw);
+ }
+
+ private void validateSubject(Subject sub, Subject sub3) {
+ Assert.assertEquals(
+ "subject names do not match",
+ sub.getName(), sub3.getName());
+ Assert.assertEquals(
+ "subject configurations do not match",
+ sub.getConfig(), sub3.getConfig());
+ }
+
+ private void validateSchemaEntry(String expectedSchema, SchemaEntry foo) {
+ Assert.assertNotNull("Failed to create SchemaEntry with schema: "
+ + expectedSchema, foo);
+ Assert.assertEquals("SchemaEntry does not have expected schema value",
+ expectedSchema, foo.getSchema());
+ Assert.assertNotNull("SchemaEntry does not have a valid id", foo.getId());
+ Assert.assertFalse("SchemaEntry has an empty id", foo.getId().isEmpty());
+ }
+
+}
diff --git a/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestCacheRepository.java b/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestCacheRepository.java
new file mode 100644
index 0000000..a27269f
--- /dev/null
+++ b/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestCacheRepository.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.repo;
+
+public class TestCacheRepository extends
+ AbstractTestRepository<CacheRepository> {
+
+ @Override
+ protected CacheRepository createRepository() {
+ return new CacheRepository(new InMemoryRepository(new ValidatorFactory.Builder().build()), new InMemoryCache());
+ }
+}
\ No newline at end of file
diff --git a/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestFileRepository.java b/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestFileRepository.java
new file mode 100644
index 0000000..0a1c338
--- /dev/null
+++ b/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestFileRepository.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.repo;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestFileRepository extends AbstractTestRepository<FileRepository> {
+ private static final String TEST_PATH = "target/test/TestFileRepository-paths/";
+ private static final String REPO_PATH = "target/test/TestFileRepository/";
+
+ @BeforeClass
+ public static void setup() {
+ rmDir(new File(TEST_PATH));
+ rmDir(new File(REPO_PATH));
+ }
+
+ @After
+ public void cleanUp() throws IOException {
+ getRepo().close();
+ }
+
+ @Override
+ protected FileRepository createRepository() {
+ return newRepo(REPO_PATH);
+ }
+
+ private FileRepository newRepo(String path) {
+ return new FileRepository(path, new ValidatorFactory.Builder().build());
+ }
+
+ @Test
+ public void testPathHandling() throws SchemaValidationException {
+ String paths[] = new String[] {
+ "data", "data/", "/tmp/file_repo",
+ "/tmp/file_repo/", "/tmp/file_repo/" };
+
+ for (String path : paths) {
+ FileRepository r = newRepo(TEST_PATH + path);
+ try {
+ File expected = new File(TEST_PATH, path);
+ assertTrue("Expected directory not created: " +
+ expected.getAbsolutePath() + " for path: " + path, expected.exists());
+ } finally {
+ r.close();
+ // should be ok to call close twice
+ r.close();
+ }
+ }
+ // verify idempotent
+ newRepo(TEST_PATH + "/tmp/repo").close();
+ newRepo(TEST_PATH + "/tmp/repo").close();
+ }
+
+ @Test
+ public void testReadWritten() throws SchemaValidationException {
+ String path = TEST_PATH + "/readWrite";
+ FileRepository r = newRepo(path);
+ try {
+ r.register("sub1", null).register("sc1");
+ r.register("sub2", null).register("sc2");
+ r.register("sub2", null).register("sc3");
+ } finally {
+ r.close();
+ }
+ r = newRepo(path);
+ try {
+ Subject s1 = r.lookup("sub1");
+ Assert.assertNotNull(s1);
+ Subject s2 = r.lookup("sub2");
+ Assert.assertNotNull(s2);
+
+ SchemaEntry e1 = s1.lookupBySchema("sc1");
+ Assert.assertNotNull(e1);
+ Assert.assertEquals("sc1", e1.getSchema());
+
+ SchemaEntry e2 = s2.lookupBySchema("sc2");
+ Assert.assertNotNull(e2);
+ Assert.assertEquals("sc2", e2.getSchema());
+
+ SchemaEntry e3 = s2.lookupBySchema("sc3");
+ Assert.assertNotNull(e3);
+ Assert.assertEquals("sc3", e3.getSchema());
+ } finally {
+ r.close();
+ }
+ }
+
+ @Test
+ public void testReadWrittenMultiLineSchema() throws SchemaValidationException {
+ String path = TEST_PATH + "/readWriteMultiLine";
+
+ String endOfLine = System.getProperty("line.separator");
+
+ String multiLineSchema1 = "first line" + endOfLine + "second line";
+ String multiLineSchema2 = "first line" + endOfLine + "second line" + endOfLine;
+
+ FileRepository r = newRepo(path);
+ try {
+ r.register("sub1", null).register(multiLineSchema1);
+ r.register("sub1", null).register(multiLineSchema2);
+ } finally {
+ r.close();
+ }
+ r = newRepo(path);
+ try {
+ Subject s1 = r.lookup("sub1");
+ Assert.assertNotNull(s1);
+
+ SchemaEntry schemaEntry1ById = s1.lookupById("0");
+ Assert.assertNotNull(schemaEntry1ById);
+ Assert.assertEquals(multiLineSchema1, schemaEntry1ById.getSchema());
+
+ SchemaEntry schemaEntryBy1Schema = s1.lookupBySchema(multiLineSchema1);
+ Assert.assertNotNull(schemaEntryBy1Schema);
+ Assert.assertEquals(multiLineSchema1, schemaEntryBy1Schema.getSchema());
+
+ SchemaEntry schemaEntry2ById = s1.lookupById("1");
+ Assert.assertNotNull(schemaEntry2ById);
+ Assert.assertEquals(multiLineSchema1, schemaEntry2ById.getSchema());
+
+ SchemaEntry schemaEntry2BySchema = s1.lookupBySchema(multiLineSchema1);
+ Assert.assertNotNull(schemaEntry2BySchema);
+ Assert.assertEquals(multiLineSchema1, schemaEntry2BySchema.getSchema());
+
+ } finally {
+ r.close();
+ }
+ }
+
+
+ @Test(expected = RuntimeException.class)
+ public void testInvalidDir() throws IOException {
+ String badPath = TEST_PATH + "/bad";
+ new File(TEST_PATH).mkdirs();
+ new File(badPath).createNewFile();
+ FileRepository r = newRepo(badPath);
+ r.close();
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testCantUseClosedRepo() {
+ FileRepository r = newRepo(TEST_PATH + "/tmp/repo");
+ r.close();
+ r.lookup("nothing");
+ }
+
+ private static void rmDir(File dir) {
+ if (!dir.exists() || !dir.isDirectory()) {
+ return;
+ }
+ for (String filename : dir.list()) {
+ File entry = new File(dir, filename);
+ if (entry.isDirectory()) {
+ rmDir(entry);
+ } else {
+ entry.delete();
+ }
+ }
+ dir.delete();
+ }
+}
diff --git a/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestInMemoryRepository.java b/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestInMemoryRepository.java
new file mode 100644
index 0000000..dcad958
--- /dev/null
+++ b/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestInMemoryRepository.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.repo;
+
+public class TestInMemoryRepository extends
+ AbstractTestRepository<InMemoryRepository> {
+ @Override
+ protected InMemoryRepository createRepository() {
+ return new InMemoryRepository(new ValidatorFactory.Builder().build());
+ }
+
+}
diff --git a/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestReadOnlyRepository.java b/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestReadOnlyRepository.java
new file mode 100644
index 0000000..e32cc98
--- /dev/null
+++ b/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestReadOnlyRepository.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.repo;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestReadOnlyRepository {
+ private static final String SUB = "sub";
+ private static final String FOO = "foo";
+
+ private InMemoryRepository repo;
+ private ReadOnlyRepository readOnlyRepo;
+
+ @Before
+ public void setUpRepository() {
+ repo = new InMemoryRepository(new ValidatorFactory.Builder().build());
+ readOnlyRepo = new ReadOnlyRepository(repo);
+ }
+
+ @After
+ public void tearDownRepository() {
+ repo = null;
+ readOnlyRepo = null;
+ }
+
+ @Test
+ public void testReadOnlyRepository() throws SchemaValidationException {
+
+ // lookup a subject that does not exist, when none do
+ Subject none = readOnlyRepo.lookup(SUB);
+ Assert.assertNull("non-existent subject lookup should return null", none);
+ // ensure that when there are no subjects, an empty iterable is produced
+ Iterable<Subject> subjects = readOnlyRepo.subjects();
+ Assert.assertNotNull("subjects must not be null");
+ Assert.assertFalse("subjects must be empty", subjects.iterator().hasNext());
+
+ // register a subject
+ Subject sub = repo.register(SUB, null);
+ Assert.assertNotNull("failed to register subject: " + SUB, sub);
+ // a duplicate register is idempotent; the result is the same
+ Subject sub2 = repo.register(SUB, null);
+ Assert.assertNotNull("failed to re-register subject: " + SUB, sub2);
+ Assert.assertEquals(
+ "registering a subject twice did not produce the same result",
+ sub.getName(), sub2.getName());
+
+ // lookup subject that was just registered
+ Subject readOnlySubject = readOnlyRepo.lookup(SUB);
+ Assert.assertNotNull("subject lookup failed", readOnlySubject);
+ Assert.assertEquals("subject lookup failed", sub.getName(), readOnlySubject.getName());
+
+ // lookup a subject that does not exist, this time when some do
+ Subject none2 = readOnlyRepo.lookup("nothing");
+ Assert.assertNull("non-existent subject lookup should return null", none2);
+
+ // go through all subjects
+ boolean hasSub = false;
+ for (Subject s : readOnlyRepo.subjects()) {
+ if (sub.getName().equals(s.getName())) {
+ hasSub = true;
+ break;
+ }
+ }
+ Assert.assertTrue("subjects() did not contain registered subject: " + sub,
+ hasSub);
+
+ SchemaEntry foo = sub.register(FOO);
+ boolean foundfoo = false;
+ for (SchemaEntry s : readOnlySubject.allEntries()) {
+ if (s.equals(foo)) {
+ foundfoo = true;
+ }
+ }
+ Assert.assertTrue("AllEntries did not contain schema: " + FOO, foundfoo);
+
+ }
+
+ @Test(expected=IllegalStateException.class)
+ public void testCannotCreateSubject() {
+ readOnlyRepo.register(null, null);
+ }
+
+ @Test(expected=IllegalStateException.class)
+ public void testCannotRegisterSchema() throws SchemaValidationException {
+ repo.register(FOO, null);
+ readOnlyRepo.lookup(FOO).register(null);
+ }
+
+ @Test(expected=IllegalStateException.class)
+ public void testCannotRegisterSchemaIfLatest() throws SchemaValidationException {
+ repo.register(FOO, null);
+ readOnlyRepo.lookup(FOO).registerIfLatest(null, null);
+ }
+
+}
diff --git a/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestRepositoryUtil.java b/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestRepositoryUtil.java
new file mode 100644
index 0000000..03301af
--- /dev/null
+++ b/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestRepositoryUtil.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.repo;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestRepositoryUtil {
+
+ @Test(expected = IllegalArgumentException.class)
+ public void validateNullString() {
+ RepositoryUtil.validateSchemaOrSubject(null);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void validateEmptyString() {
+ RepositoryUtil.validateSchemaOrSubject("");
+ }
+
+ @Test
+ public void validateString() {
+ RepositoryUtil.validateSchemaOrSubject("org.apache.avro.Something");
+ }
+
+ @Test
+ public void testSchemasToFromString() {
+ SchemaEntry e1 = new SchemaEntry("id1", "s1");
+ SchemaEntry e2 = new SchemaEntry("id2", "s2");
+ ArrayList<SchemaEntry> empty = new ArrayList<SchemaEntry>();
+ ArrayList<SchemaEntry> vals = new ArrayList<SchemaEntry>();
+ vals.add(e1);
+ vals.add(e2);
+
+ Iterable<SchemaEntry> emptyResult = RepositoryUtil
+ .schemasFromString(RepositoryUtil.schemasToString(empty));
+ Iterable<SchemaEntry> emptyResult2 = RepositoryUtil.schemasFromString(null);
+ Iterable<SchemaEntry> emptyResult3 = RepositoryUtil.schemasFromString("");
+ Assert.assertEquals(empty, emptyResult);
+ Assert.assertEquals(emptyResult, emptyResult2);
+ Assert.assertEquals(emptyResult, emptyResult3);
+
+ Iterable<SchemaEntry> result = RepositoryUtil
+ .schemasFromString(RepositoryUtil.schemasToString(vals));
+ Assert.assertEquals(vals, result);
+ }
+
+ @Test
+ public void testSubjectsToFromString() {
+ Repository r = new InMemoryRepository(new ValidatorFactory.Builder().build());
+ Subject s1 = r.register("s1", null);
+ Subject s2 = r.register("s2", null);
+ ArrayList<Subject> empty = new ArrayList<Subject>();
+ ArrayList<Subject> vals = new ArrayList<Subject>();
+ vals.add(s1);
+ vals.add(s2);
+
+ Iterable<String> emptyResult = RepositoryUtil
+ .subjectNamesFromString(RepositoryUtil.subjectsToString(empty));
+ Iterable<String> emptyResult2 = RepositoryUtil.subjectNamesFromString(null);
+ Iterable<String> emptyResult3 = RepositoryUtil.subjectNamesFromString("");
+ validate(emptyResult, empty);
+ Assert.assertEquals(emptyResult, emptyResult2);
+ Assert.assertEquals(emptyResult, emptyResult3);
+
+ Iterable<String> result = RepositoryUtil
+ .subjectNamesFromString(RepositoryUtil.subjectsToString(vals));
+ validate(result, vals);
+ }
+
+ private void validate(Iterable<String> names, Iterable<Subject> subjects) {
+ Iterator<String> nameIter = names.iterator();
+ for (Subject s : subjects) {
+ String name = nameIter.next();
+ Assert.assertEquals(s.getName(), name);
+ }
+ }
+
+}
diff --git a/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestSchemaEntry.java b/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestSchemaEntry.java
new file mode 100644
index 0000000..3279248
--- /dev/null
+++ b/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestSchemaEntry.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.repo;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestSchemaEntry {
+ @Test
+ public void testToString() {
+ SchemaEntry entry = new SchemaEntry("id", "schema");
+ SchemaEntry toAndFrom = new SchemaEntry(entry.toString());
+ Assert.assertEquals(entry, toAndFrom);
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void testInvalid() {
+ new SchemaEntry("invalidString");
+ }
+
+ @Test
+ public void testGetters() {
+ String id = "id";
+ String sc = "sc";
+ SchemaEntry entry = new SchemaEntry(id, sc);
+ Assert.assertEquals(id, entry.getId());
+ Assert.assertEquals(sc, entry.getSchema());
+ }
+
+ @Test
+ public void testEqualsAndHashCode() {
+ SchemaEntry entry = new SchemaEntry("id", "schema");
+ SchemaEntry entry2 = new SchemaEntry("id", "schema");
+ SchemaEntry nullId = new SchemaEntry(null, "schema");
+ SchemaEntry nullSchema = new SchemaEntry("id", null);
+ SchemaEntry idDiffers = new SchemaEntry("iddd", "schema");
+ SchemaEntry sDiffers = new SchemaEntry("id", "schemaaaa");
+ SchemaEntry allNull = new SchemaEntry(null, null);
+ SchemaEntry allNull2 = new SchemaEntry(null, null);
+
+ Assert.assertEquals(entry, entry);
+ Assert.assertEquals(entry, entry2);
+ Assert.assertEquals(entry2, entry);
+ Assert.assertEquals(entry.hashCode(), entry2.hashCode());
+
+ Assert.assertFalse(entry.equals(null));
+ Assert.assertFalse(entry.equals(new Object()));
+
+ Assert.assertFalse(entry.equals(nullId));
+ Assert.assertFalse(nullId.equals(entry));
+
+ Assert.assertFalse(entry.equals(nullSchema));
+ Assert.assertFalse(nullSchema.equals(entry));
+
+ Assert.assertFalse(entry.equals(idDiffers));
+ Assert.assertFalse(idDiffers.equals(entry));
+
+ Assert.assertFalse(entry.equals(sDiffers));
+ Assert.assertFalse(sDiffers.equals(entry));
+
+ Assert.assertEquals(allNull, allNull2);
+ Assert.assertEquals(allNull.hashCode(), allNull2.hashCode());
+ }
+
+
+
+}
diff --git a/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestSubjectConfig.java b/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestSubjectConfig.java
new file mode 100644
index 0000000..c34c626
--- /dev/null
+++ b/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestSubjectConfig.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.repo;
+
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestSubjectConfig {
+
+ @Test
+ public void testBuilder() {
+ SubjectConfig conf = SubjectConfig.emptyConfig();
+ Assert.assertTrue(conf.getValidators().isEmpty());
+
+ SubjectConfig custom = new SubjectConfig.Builder()
+ .set("k", "v")
+ .set("repo.validators", "valid1, valid2 ,,")
+ .addValidator("oneMore")
+ .build();
+
+ Assert.assertEquals("v", custom.get("k"));
+ Set<String> validators = custom.getValidators();
+ Assert.assertEquals(3, validators.size());
+ Assert.assertTrue(validators.contains("valid1"));
+ Assert.assertTrue(validators.contains("valid2"));
+ Assert.assertTrue(validators.contains("oneMore"));
+ }
+
+ @Test
+ public void testBuilderHashAndEquals() {
+ SubjectConfig empty = SubjectConfig.emptyConfig();
+ Assert.assertEquals(empty, empty);
+ SubjectConfig conf = new SubjectConfig.Builder().build();
+ Assert.assertEquals(empty, conf);
+ SubjectConfig conf2 = new SubjectConfig.Builder()
+ .set("repo.validators", null)
+ .build();
+ Assert.assertEquals(conf, conf2);
+ Assert.assertEquals(conf2, empty);
+ Assert.assertFalse(conf.equals(null));
+ Assert.assertFalse(conf.equals(new Object()));
+ Assert.assertEquals(conf.hashCode(), empty.hashCode());
+ Assert.assertEquals(conf.hashCode(), conf2.hashCode());
+
+ String k = "key";
+ String v = "val";
+ SubjectConfig custom = new SubjectConfig.Builder()
+ .set(k, v).build();
+ SubjectConfig custom2 = new SubjectConfig.Builder()
+ .set(custom.asMap()).build();
+ SubjectConfig custom3 = new SubjectConfig.Builder()
+ .set(custom.asMap()).addValidator("foo").build();
+ Assert.assertEquals(custom, custom2);
+ Assert.assertFalse(custom.equals(custom3));
+ Assert.assertFalse(custom.equals(conf));
+ Assert.assertEquals(custom.hashCode(), custom2.hashCode());
+
+ }
+
+ @Test(expected=RuntimeException.class)
+ public void testInvalidConfigName() {
+ new SubjectConfig.Builder()
+ .set("repo.notValid", "");
+ }
+
+}
diff --git a/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestValidatingSubject.java b/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestValidatingSubject.java
new file mode 100644
index 0000000..b27ace7
--- /dev/null
+++ b/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestValidatingSubject.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.repo;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestValidatingSubject {
+ private static final String ACCEPT = "accept";
+ private static final String REJECT = "reject";
+ private static final String FOO = "foo";
+ private static final String BAR = "bar";
+ private static final String BAZ = "baz";
+ private static final String ACCEPT_VALIDATOR = "accept.validator";
+
+ private InMemoryRepository repo;
+
+ @Before
+ public void setUpRepository() {
+ repo = new InMemoryRepository(new ValidatorFactory.Builder().
+ setValidator(ACCEPT_VALIDATOR, new Validator(){
+ @Override
+ public void validate(String schemaToValidate,
+ Iterable<SchemaEntry> schemasInOrder)
+ throws SchemaValidationException {
+ // nothing
+ }
+ }).build());
+ }
+
+ @After
+ public void tearDownRepository() {
+ repo = null;
+ }
+
+ @Test
+ public void testSuccessfulValidation() throws SchemaValidationException {
+
+ Subject accept = repo.register(ACCEPT, new SubjectConfig.Builder()
+ .addValidator(ACCEPT_VALIDATOR).build());
+
+ SchemaEntry foo = accept.registerIfLatest(FOO, null);
+ Assert.assertNotNull("failed to register schema", foo);
+ SchemaEntry bar = accept.registerIfLatest(BAR, foo);
+ Assert.assertNotNull("failed to register schema", bar);
+ SchemaEntry none = accept.registerIfLatest("nothing", null);
+ Assert.assertNull(none);
+ SchemaEntry baz = accept.register(BAZ);
+ Assert.assertNotNull("failed to register schema", baz);
+
+ }
+
+ @Test
+ public void testValidatorConstruction() {
+ Assert.assertNull("Must pass null through Subject.validateWith()",
+ Subject.validatingSubject(null,
+ new ValidatorFactory.Builder().build()));
+ }
+
+ @Test(expected=SchemaValidationException.class)
+ public void testCannotRegister() throws SchemaValidationException {
+ Subject reject = repo.register(REJECT, new SubjectConfig.Builder()
+ .addValidator(ValidatorFactory.REJECT_VALIDATOR).build());
+ reject.register(FOO);
+ }
+
+ @Test(expected=SchemaValidationException.class)
+ public void testCannotRegisterIfLatest() throws SchemaValidationException {
+ Subject reject = repo.register(REJECT, new SubjectConfig.Builder()
+ .addValidator(ValidatorFactory.REJECT_VALIDATOR).build());
+ reject.registerIfLatest(FOO, null);
+ }
+
+}
diff --git a/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestValidatorFactory.java b/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestValidatorFactory.java
new file mode 100644
index 0000000..4af8496
--- /dev/null
+++ b/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestValidatorFactory.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.repo;
+
+import java.util.HashSet;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestValidatorFactory {
+
+ @Test
+ public void test() throws SchemaValidationException {
+ Validator foo = new Validator() {
+ @Override
+ public void validate(String schemaToValidate,
+ Iterable<SchemaEntry> schemasInOrder)
+ throws SchemaValidationException {
+ }
+ };
+
+ ValidatorFactory fact = new ValidatorFactory.Builder().setValidator("foo", foo).build();
+ HashSet<String> fooset = new HashSet<String>();
+ fooset.add("foo");
+ fooset.add(null); // should ignore
+ Assert.assertSame(foo, fact.getValidators(fooset).get(0));
+ fact.getValidators(fooset).get(0).validate(null, null);
+ }
+
+ @Test(expected=RuntimeException.class)
+ public void testInvalidName() {
+ new ValidatorFactory.Builder()
+ .setValidator("repo.willBreak", null);
+ }
+
+}
diff --git a/lang/java/schema-repo/pom.xml b/lang/java/schema-repo/pom.xml
new file mode 100644
index 0000000..e72822a
--- /dev/null
+++ b/lang/java/schema-repo/pom.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+ xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>avro-parent</artifactId>
+ <groupId>org.apache.avro</groupId>
+ <version>1.7.7-SNAPSHOT</version>
+ <relativePath>../</relativePath>
+ </parent>
+
+ <groupId>org.apache.avro.repo</groupId>
+ <artifactId>avro-repo-parent</artifactId>
+
+ <name>Apache Avro Schema Repository</name>
+ <url>http://avro.apache.org</url>
+ <description>Avro Schema Repository parent project</description>
+ <packaging>pom</packaging>
+
+ <modules>
+ <module>common</module>
+ <module>server</module>
+ <module>client</module>
+ <module>bundle</module>
+ </modules>
+
+</project>
diff --git a/lang/java/schema-repo/server/pom.xml b/lang/java/schema-repo/server/pom.xml
new file mode 100644
index 0000000..3aedc79
--- /dev/null
+++ b/lang/java/schema-repo/server/pom.xml
@@ -0,0 +1,74 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+ xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>avro-repo-parent</artifactId>
+ <groupId>org.apache.avro.repo</groupId>
+ <version>1.7.7-SNAPSHOT</version>
+ <relativePath>../</relativePath>
+ </parent>
+
+ <artifactId>avro-repo-server</artifactId>
+
+ <name>Apache Avro Schema Repository Server</name>
+ <url>http://avro.apache.org</url>
+ <description>Avro Schema Repository REST server components</description>
+
+ <build>
+ </build>
+
+ <profiles>
+ </profiles>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.avro.repo</groupId>
+ <artifactId>avro-repo-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-servlet</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.inject</groupId>
+ <artifactId>guice</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.inject.extensions</groupId>
+ <artifactId>guice-servlet</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-server</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey.contribs</groupId>
+ <artifactId>jersey-guice</artifactId>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git a/lang/java/schema-repo/server/src/main/java/org/apache/avro/repo/server/ConfigModule.java b/lang/java/schema-repo/server/src/main/java/org/apache/avro/repo/server/ConfigModule.java
new file mode 100644
index 0000000..d8a3e7c
--- /dev/null
+++ b/lang/java/schema-repo/server/src/main/java/org/apache/avro/repo/server/ConfigModule.java
@@ -0,0 +1,100 @@
+package org.apache.avro.repo.server;
+
+import java.io.PrintStream;
+import java.util.Properties;
+
+import javax.inject.Named;
+import javax.inject.Singleton;
+
+import org.apache.avro.repo.CacheRepository;
+import org.apache.avro.repo.InMemoryCache;
+import org.apache.avro.repo.Repository;
+import org.apache.avro.repo.RepositoryCache;
+import org.apache.avro.repo.Validator;
+import org.apache.avro.repo.ValidatorFactory;
+
+import com.google.inject.Binder;
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.google.inject.Module;
+import com.google.inject.Provides;
+import com.google.inject.TypeLiteral;
+import com.google.inject.name.Names;
+
+/**
+ * A {@link Module} for configuration based on a set of {@link Properties}
+ * <br/>
+ * Binds every property value in the properties provided to the property name
+ * in Guice, making them available with the {@link Named} annotation. Guice
+ * will automatically convert these to constant values, such as Integers,
+ * Strings, or Class constants.
+ * <br/>
+ * Keys starting with "validator." bind {@link Validator} classes
+ * in a {@link ValidatorFactory}, where the name is the remainder of the key
+ * following "validator.". For example, a property
+ * "validtator.backwards_compatible=com.foo.BackwardsCompatible"
+ * will set a validator named "backwards_compatible" to an instance of the
+ * class com.foo.BackwardsCompatible.
+ */
+class ConfigModule implements Module {
+ static final String JETTY_HOST = "jetty.host";
+ static final String JETTY_PORT = "jetty.port";
+ static final String JETTY_PATH = "jetty.path";
+ static final String JETTY_HEADER_SIZE = "jetty.header.size";
+ static final String JETTY_BUFFER_SIZE = "jetty.buffer.size";
+ private static final String REPO_CLASS = "repo.class";
+ private static final String REPO_CACHE = "repo.cache";
+
+ private static final Properties DEFAULTS = new Properties();
+ static {
+ DEFAULTS.setProperty(JETTY_HOST, "");
+ DEFAULTS.setProperty(JETTY_PORT, "2876"); // 'AVRO' on a t-9 keypad
+ DEFAULTS.setProperty(JETTY_PATH, "/schema-repo");
+ DEFAULTS.setProperty(JETTY_HEADER_SIZE, "16384");
+ DEFAULTS.setProperty(JETTY_BUFFER_SIZE, "16384");
+ DEFAULTS.setProperty(REPO_CACHE, InMemoryCache.class.getName());
+ }
+
+ public static void printDefaults(PrintStream writer) {
+ writer.println(DEFAULTS);
+ }
+
+ private final Properties props;
+
+ public ConfigModule(Properties props) {
+ Properties copy = new Properties(DEFAULTS);
+ copy.putAll(props);
+ this.props = copy;
+ }
+
+ @Override
+ public void configure(Binder binder) {
+ Names.bindProperties(binder, props);
+ }
+
+ @Provides
+ @Singleton
+ Repository provideRepository(Injector injector,
+ @Named(REPO_CLASS) Class<Repository> repoClass,
+ @Named(REPO_CACHE) Class<RepositoryCache> cacheClass) {
+ Repository repo = injector.getInstance(repoClass);
+ RepositoryCache cache = injector.getInstance(cacheClass);
+ return new CacheRepository(repo, cache);
+ }
+
+ @Provides
+ @Singleton
+ ValidatorFactory provideValidatorFactory(Injector injector) {
+ ValidatorFactory.Builder builder = new ValidatorFactory.Builder();
+ for(String prop : props.stringPropertyNames()) {
+ if (prop.startsWith("validator.")) {
+ String validatorName = prop.substring("validator.".length());
+ Class<Validator> validatorClass = injector.getInstance(
+ Key.<Class<Validator>>get(
+ new TypeLiteral<Class<Validator>>(){}, Names.named(prop)));
+ builder.setValidator(validatorName, injector.getInstance(validatorClass));
+ }
+ }
+ return builder.build();
+ }
+}
diff --git a/lang/java/schema-repo/server/src/main/java/org/apache/avro/repo/server/RESTRepository.java b/lang/java/schema-repo/server/src/main/java/org/apache/avro/repo/server/RESTRepository.java
new file mode 100644
index 0000000..88fa43a
--- /dev/null
+++ b/lang/java/schema-repo/server/src/main/java/org/apache/avro/repo/server/RESTRepository.java
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.repo.server;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+
+import org.apache.avro.repo.Repository;
+import org.apache.avro.repo.RepositoryUtil;
+import org.apache.avro.repo.SchemaEntry;
+import org.apache.avro.repo.SchemaValidationException;
+import org.apache.avro.repo.Subject;
+import org.apache.avro.repo.SubjectConfig;
+
+import com.sun.jersey.api.NotFoundException;
+
+/**
+ * {@link RESTRepository} Is a JSR-311 REST Interface to a {@link Repository}.
+ *
+ * Combine with {@link RepositoryServer} to run an embedded REST server.
+ */
+@Singleton
+@Produces(MediaType.TEXT_PLAIN)
+@Path("/")
+public class RESTRepository {
+
+ private final Repository repo;
+
+ /**
+ * Create a {@link RESTRepository} that wraps a given {@link Repository}
+ * Typically the wrapped repository is a
+ * {@link org.apache.avro.repo.CacheRepository} that wraps a non-caching
+ * underlying repository.
+ *
+ * @param repo
+ * The {@link Repository} to wrap.
+ */
+ @Inject
+ public RESTRepository(Repository repo) {
+ this.repo = repo;
+ }
+
+ /**
+ * @return All subjects in the repository, serialized with
+ * {@link RepositoryUtil#subjectsToString(Iterable)}
+ */
+ @GET
+ public String allSubjects() {
+ return RepositoryUtil.subjectsToString(repo.subjects());
+ }
+
+ /**
+ * Returns all schemas in the given subject, serialized wity
+ * {@link RepositoryUtil#schemasToString(Iterable)}
+ *
+ * @param subject
+ * The name of the subject
+ * @return all schemas in the subject. Return a 404 Not Found if there is no
+ * such subject
+ */
+ @GET
+ @Path("{subject}/all")
+ public String subjectList(@PathParam("subject") String subject) {
+ Subject s = repo.lookup(subject);
+ if (null == s) {
+ throw new NotFoundException();
+ }
+ String result = RepositoryUtil.schemasToString(s.allEntries());
+ return result;
+ }
+
+ @GET
+ @Path("{subject}/config")
+ public String subjectConfig(@PathParam("subject") String subject) {
+ Subject s = repo.lookup(subject);
+ if (null == s) {
+ throw new NotFoundException();
+ }
+ Properties props = new Properties();
+ props.putAll(s.getConfig().asMap());
+ StringWriter writer = new StringWriter();
+ try {
+ props.store(writer, null);
+ } catch (IOException e) {
+ // stringWriter can't throw ... but just in case
+ throw new RuntimeException(e);
+ }
+ return writer.toString();
+ }
+
+ /**
+ * Create a subejct if it does not already exist.
+ *
+ * @param subject
+ * the name of the subject
+ * @param configParams
+ * the configuration values for the Subject, as form parameters
+ * @return the subject name in a 200 response if successful. HTTP 404 if the
+ * subject does not exist, or HTTP 409 if there was a conflict
+ * creating the subject
+ */
+ @PUT
+ @Path("{subject}")
+ @Consumes(MediaType.APPLICATION_FORM_URLENCODED)
+ public Response createSubject(@PathParam("subject") String subject,
+ MultivaluedMap<String, String> configParams) {
+ if (null == subject) {
+ return Response.status(400).build();
+ }
+ SubjectConfig.Builder builder = new SubjectConfig.Builder();
+ for(Map.Entry<String, List<String>> entry : configParams.entrySet()) {
+ List<String> val = entry.getValue();
+ if(val.size() > 0) {
+ builder.set(entry.getKey(), val.get(0));
+ }
+ }
+ Subject created = repo.register(subject, builder.build());
+ return Response.ok(created.getName()).build();
+ }
+
+ /**
+ * Get the latest schema for a subject
+ *
+ * @param subject
+ * the name of the subject
+ * @return A 200 response with {@link SchemaEntry#toString()} as the body, or
+ * a 404 response if either the subject or latest schema is not found.
+ */
+ @GET
+ @Path("{subject}/latest")
+ public String latest(@PathParam("subject") String subject) {
+ return exists(getSubject(subject).latest()).toString();
+ }
+
+ /**
+ * Look up a schema by subject + id pair.
+ *
+ * @param subject
+ * the name of the subject
+ * @param id
+ * the id of the schema
+ * @return A 200 response with the schema as the body, or a 404 response if
+ * the subject or schema is not found
+ */
+ @GET
+ @Path("{subject}/id/{id}")
+ public String schemaFromId(@PathParam("subject") String subject,
+ @PathParam("id") String id) {
+ return exists(getSubject(subject).lookupById(id)).getSchema();
+ }
+
+ /**
+ * Look up an id by a subject + schema pair.
+ *
+ * @param subject
+ * the name of the subject
+ * @param schema
+ * the schema to search for
+ * @return A 200 response with the id in the body, or a 404 response if the
+ * subject or schema is not found
+ */
+ @POST
+ @Path("{subject}/schema")
+ @Consumes(MediaType.TEXT_PLAIN)
+ public String idFromSchema(@PathParam("subject") String subject, String schema) {
+ return exists(getSubject(subject).lookupBySchema(schema)).getId();
+ }
+
+ /**
+ * Register a schema with a subject
+ *
+ * @param subject
+ * The subject name to register the schema in
+ * @param schema
+ * The schema to register
+ * @return A 200 response with the corresponding id if successful, a 403
+ * forbidden response if the schema fails validation, or a 404 not
+ * found response if the subject does not exist
+ */
+ @PUT
+ @Path("{subject}/register")
+ @Consumes(MediaType.TEXT_PLAIN)
+ public Response addSchema(@PathParam("subject") String subject, String schema) {
+ try {
+ return Response.ok(getSubject(subject).register(schema).getId()).build();
+ } catch (SchemaValidationException e) {
+ return Response.status(Status.FORBIDDEN).build();
+ }
+ }
+
+ /**
+ * Register a schema with a subject, only if the latest schema equals the
+ * expected value. This is for resolving race conditions between multiple
+ * registrations and schema invalidation events in underlying repositories.
+ *
+ * @param subject
+ * the name of the subject
+ * @param latestId
+ * the latest schema id, possibly null
+ * @param schema
+ * the schema to attempt to register
+ * @return a 200 response with the id of the newly registered schema, or a 404
+ * response if the subject or id does not exist or a 409 conflict if
+ * the id does not match the latest id or a 403 forbidden response if
+ * the schema failed validation
+ */
+ @PUT
+ @Path("{subject}/register_if_latest/{latestId: .*}")
+ @Consumes(MediaType.TEXT_PLAIN)
+ public Response addSchema(@PathParam("subject") String subject,
+ @PathParam("latestId") String latestId, String schema) {
+ Subject s = getSubject(subject);
+ SchemaEntry latest;
+ if ("".equals(latestId)) {
+ latest = null;
+ } else {
+ latest = exists(s.lookupById(latestId));
+ }
+ SchemaEntry created = null;
+ try {
+ created = s.registerIfLatest(schema, latest);
+ if (null == created) {
+ return Response.status(Status.CONFLICT).build();
+ }
+ return Response.ok(created.getId()).build();
+ } catch (SchemaValidationException e) {
+ return Response.status(Status.FORBIDDEN).build();
+ }
+ }
+
+ /**
+ * Get a subject
+ *
+ * @param subject
+ * the name of the subject
+ * @return a 200 response if the subject exists, or a 404 response if the
+ * subject does not.
+ */
+ @GET
+ @Path("{subject}")
+ public Response checkSubject(@PathParam("subject") String subject) {
+ getSubject(subject);
+ return Response.ok().build();
+ }
+
+ @GET
+ @Path("{subject}/integral")
+ public String getSubjectIntegralKeys(@PathParam("subject") String subject) {
+ return Boolean.toString(getSubject(subject).integralKeys());
+ }
+
+ private Subject getSubject(String subjectName) {
+ Subject subject = repo.lookup(subjectName);
+ if (null == subject) {
+ throw new NotFoundException();
+ }
+ return subject;
+ }
+
+ private SchemaEntry exists(SchemaEntry entry) {
+ if (null == entry) {
+ throw new NotFoundException();
+ }
+ return entry;
+ }
+
+}
diff --git a/lang/java/schema-repo/server/src/main/java/org/apache/avro/repo/server/RepositoryServer.java b/lang/java/schema-repo/server/src/main/java/org/apache/avro/repo/server/RepositoryServer.java
new file mode 100644
index 0000000..6620cfa
--- /dev/null
+++ b/lang/java/schema-repo/server/src/main/java/org/apache/avro/repo/server/RepositoryServer.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.repo.server;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.util.Properties;
+
+import javax.inject.Named;
+import javax.inject.Singleton;
+import javax.servlet.http.HttpServlet;
+
+import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.nio.SelectChannelConnector;
+import org.eclipse.jetty.servlet.FilterHolder;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Provides;
+import com.google.inject.servlet.GuiceFilter;
+import com.sun.jersey.guice.JerseyServletModule;
+import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
+
+import static java.lang.Thread.sleep;
+
+/**
+ * A {@link RepositoryServer} is a stand-alone server for running a
+ * {@link RESTRepository}. {@link #main(String...)} takes a single argument
+ * containing a property file for configuration. <br/>
+ * <br/>
+ *
+ */
+public class RepositoryServer {
+ private final Server server;
+
+ /**
+ * Constructs an instance of this class, overlaying the default properties
+ * with any identically-named properties in the supplied {@link Properties}
+ * instance.
+ *
+ * @param props
+ * Property values for overriding the defaults.
+ * <p>
+ * <b><i>Any overriding properties must be supplied as type </i>
+ * <code>String</code><i> or they will not work and the default
+ * values will be used.</i></b>
+ *
+ */
+ public RepositoryServer(Properties props) {
+ Injector injector = Guice.createInjector(
+ new ConfigModule(props),
+ new ServerModule());
+ this.server = injector.getInstance(Server.class);
+ }
+
+ public static void main(String... args) throws Exception {
+ if (args.length != 1) {
+ printHelp();
+ System.exit(1);
+ }
+ File config = new File(args[0]);
+ if (!config.canRead()) {
+ System.err.println("Cannot read file: " + config);
+ printHelp();
+ System.exit(1);
+ }
+ Properties props = new Properties();
+ props.load(new BufferedInputStream(new FileInputStream(config)));
+sleep(3000); //TODO DELETE!!! FOR DEBUG ONLY
+ RepositoryServer server = new RepositoryServer(props);
+ try {
+ server.start();
+ server.join();
+ } finally {
+ server.stop();
+ }
+ }
+
+ public void start() throws Exception {
+ server.start();
+ }
+
+ public void join() throws InterruptedException {
+ server.join();
+ }
+
+ public void stop() throws Exception {
+ server.stop();
+ }
+
+ private static void printHelp() {
+ System.err.println("One argument expected containing a configuration "
+ + "properties file. Default properties are:");
+ ConfigModule.printDefaults(System.err);
+ }
+
+ private static class ServerModule extends JerseyServletModule {
+
+ @Override
+ protected void configureServlets() {
+ bind(Connector.class).to(SelectChannelConnector.class);
+ serve("/*").with(GuiceContainer.class);
+ bind(RESTRepository.class);
+ }
+
+ @Provides
+ @Singleton
+ public Server provideServer(
+ @Named(ConfigModule.JETTY_HOST) String host,
+ @Named(ConfigModule.JETTY_PORT) Integer port,
+ @Named(ConfigModule.JETTY_PATH) String path,
+ @Named(ConfigModule.JETTY_HEADER_SIZE) Integer headerSize,
+ @Named(ConfigModule.JETTY_BUFFER_SIZE) Integer bufferSize,
+ Connector connector,
+ GuiceFilter guiceFilter,
+ ServletContextHandler handler) {
+
+ Server server = new Server();
+ if (null != host && !host.isEmpty()) {
+ connector.setHost(host);
+ }
+ connector.setPort(port);
+ connector.setRequestHeaderSize(headerSize);
+ connector.setRequestBufferSize(bufferSize);
+ server.setConnectors(new Connector[] { connector });
+
+ // the guice filter intercepts all inbound requests and uses its bindings
+ // for servlets
+ FilterHolder holder = new FilterHolder(guiceFilter);
+ handler.addFilter(holder, "/*", null);
+ handler.addServlet(NoneServlet.class, "/");
+ handler.setContextPath(path);
+ server.setHandler(handler);
+ server.dumpStdErr();
+ return server;
+ }
+
+ private static final class NoneServlet extends HttpServlet {
+ private static final long serialVersionUID = 4560115319373180139L;
+ }
+ }
+
+}
diff --git a/lang/java/schema-repo/server/src/test/java/org/apache/avro/repo/server/TestConfigModule.java b/lang/java/schema-repo/server/src/test/java/org/apache/avro/repo/server/TestConfigModule.java
new file mode 100644
index 0000000..dc63b9c
--- /dev/null
+++ b/lang/java/schema-repo/server/src/test/java/org/apache/avro/repo/server/TestConfigModule.java
@@ -0,0 +1,52 @@
+package org.apache.avro.repo.server;
+
+import java.util.Properties;
+
+import org.apache.avro.repo.InMemoryRepository;
+import org.apache.avro.repo.Repository;
+import org.apache.avro.repo.SchemaEntry;
+import org.apache.avro.repo.SchemaValidationException;
+import org.apache.avro.repo.Subject;
+import org.apache.avro.repo.SubjectConfig;
+import org.apache.avro.repo.Validator;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+public class TestConfigModule {
+
+ @Test
+ public void testConfig() {
+ Properties props = new Properties();
+ props.setProperty("repo.class", InMemoryRepository.class.getName());
+ props.put("validator.rejectAll", Reject.class.getName());
+ ConfigModule module = new ConfigModule(props);
+ Injector injector = Guice.createInjector(module);
+ Repository repo = injector.getInstance(Repository.class);
+ Subject rejects = repo.register("rejects", new SubjectConfig.Builder()
+ .addValidator("rejectAll").build());
+ boolean threw = false;
+ try {
+ rejects.register("stuff");
+ } catch (SchemaValidationException se) {
+ threw = true;
+ }
+ Assert.assertTrue(threw);
+ }
+
+ @Test
+ public void testPrintDefaults() {
+ ConfigModule.printDefaults(System.out);
+ }
+
+ public static class Reject implements Validator {
+ @Override
+ public void validate(String schemaToValidate,
+ Iterable<SchemaEntry> schemasInOrder) throws SchemaValidationException {
+ throw new SchemaValidationException("no");
+ }
+ }
+
+}
diff --git a/lang/java/schema-repo/server/src/test/java/org/apache/avro/repo/server/TestRESTRepository.java b/lang/java/schema-repo/server/src/test/java/org/apache/avro/repo/server/TestRESTRepository.java
new file mode 100644
index 0000000..a0b40ec
--- /dev/null
+++ b/lang/java/schema-repo/server/src/test/java/org/apache/avro/repo/server/TestRESTRepository.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.repo.server;
+
+import org.apache.avro.repo.InMemoryRepository;
+import org.apache.avro.repo.ValidatorFactory;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.sun.jersey.api.NotFoundException;
+
+public class TestRESTRepository {
+ RESTRepository repo;
+
+ @Before
+ public void setUp() {
+ repo = new RESTRepository(new InMemoryRepository(new ValidatorFactory.Builder().build()));
+ }
+
+ @After
+ public void tearDown() {
+ repo = null;
+ }
+
+ @Test(expected=NotFoundException.class)
+ public void testNonExistentSubjectList() throws Exception {
+ repo.subjectList("nothing");
+ }
+
+ @Test(expected=NotFoundException.class)
+ public void testNonExistentSubjectGetConfig() throws Exception {
+ repo.subjectConfig("nothing");
+ }
+
+ @Test
+ public void testCreateNullSubject() {
+ Assert.assertEquals(400, repo.createSubject(null, null).getStatus());
+ }
+}
\ No newline at end of file
diff --git a/lang/java/schema-repo/server/src/test/java/org/apache/avro/repo/server/TestRepo.java b/lang/java/schema-repo/server/src/test/java/org/apache/avro/repo/server/TestRepo.java
new file mode 100644
index 0000000..2c29e07
--- /dev/null
+++ b/lang/java/schema-repo/server/src/test/java/org/apache/avro/repo/server/TestRepo.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.repo.server;
+
+import java.util.Properties;
+
+import org.apache.avro.repo.InMemoryRepository;
+import org.junit.Test;
+
+public class TestRepo {
+
+ @Test
+ public void testRepoInit() throws Exception {
+ Properties props = new Properties();
+ props.setProperty("repo.class", InMemoryRepository.class.getName());
+ props.setProperty("jetty.port", "6782");
+
+ RepositoryServer server = new RepositoryServer(props);
+
+ server.start();
+
+ server.stop();
+ }
+
+}
\ No newline at end of file
diff --git a/lang/java/tools/pom.xml b/lang/java/tools/pom.xml
index 6e0fe83..23c516b 100644
--- a/lang/java/tools/pom.xml
+++ b/lang/java/tools/pom.xml
@@ -141,7 +141,6 @@
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
- <version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment