Created
July 5, 2014 02:11
-
-
Save FelixGV/7f0978dfa90fe4430c24 to your computer and use it in GitHub Desktop.
AVRO-1124.patch
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/lang/java/ipc/pom.xml b/lang/java/ipc/pom.xml | |
index 35fdeaf..50cf2d7 100644 | |
--- a/lang/java/ipc/pom.xml | |
+++ b/lang/java/ipc/pom.xml | |
@@ -117,12 +117,10 @@ | |
<dependency> | |
<groupId>org.codehaus.jackson</groupId> | |
<artifactId>jackson-core-asl</artifactId> | |
- <version>${jackson.version}</version> | |
</dependency> | |
<dependency> | |
<groupId>org.codehaus.jackson</groupId> | |
<artifactId>jackson-mapper-asl</artifactId> | |
- <version>${jackson.version}</version> | |
</dependency> | |
<dependency> | |
<groupId>org.mortbay.jetty</groupId> | |
diff --git a/lang/java/mapred/pom.xml b/lang/java/mapred/pom.xml | |
index fa198bc..cfe7510 100644 | |
--- a/lang/java/mapred/pom.xml | |
+++ b/lang/java/mapred/pom.xml | |
@@ -82,7 +82,7 @@ | |
<include>**/mapred/tether/*.avpr</include> | |
</includes> | |
<sourceDirectory>${parent.project.basedir}/../../../../share/schemas/</sourceDirectory> | |
- <outputDirectory>${project.build.directory}/generated-sources/java</outputDirectory>a | |
+ <outputDirectory>${project.build.directory}/generated-sources/java</outputDirectory> | |
</configuration> | |
</execution> | |
</executions> | |
diff --git a/lang/java/pom.xml b/lang/java/pom.xml | |
index fcad280..6c523b8 100644 | |
--- a/lang/java/pom.xml | |
+++ b/lang/java/pom.xml | |
@@ -54,7 +54,7 @@ | |
<protobuf.version>2.4.1</protobuf.version> | |
<thrift.version>0.7.0</thrift.version> | |
<slf4j.version>1.6.4</slf4j.version> | |
- <snappy.version>1.0.5</snappy.version> | |
+ <snappy.version>1.0.5-M3</snappy.version> | |
<velocity.version>1.7</velocity.version> | |
<maven.version>2.0.10</maven.version> | |
<ant.version>1.8.2</ant.version> | |
@@ -62,6 +62,9 @@ | |
<commons-compress.version>1.4.1</commons-compress.version> | |
<easymock.version>3.0</easymock.version> | |
<hamcrest.version>1.1</hamcrest.version> | |
+ <jetty-8.version>8.1.14.v20131031</jetty-8.version> | |
+ <jersey.version>1.15</jersey.version> | |
+ <guice.version>3.0</guice.version> | |
<commons-httpclient.version>3.1</commons-httpclient.version> | |
<!-- version properties for plugins --> | |
@@ -86,6 +89,7 @@ | |
<module>avro</module> | |
<module>compiler</module> | |
<module>maven-plugin</module> | |
+ <module>schema-repo</module> | |
<module>ipc</module> | |
<module>trevni</module> | |
<module>tools</module> | |
@@ -406,6 +410,37 @@ | |
<artifactId>jetty</artifactId> | |
<version>${jetty.version}</version> | |
</dependency> | |
+ <!-- jetty 8.x management (used by schema-repo-server) --> | |
+ <dependency> | |
+ <groupId>org.eclipse.jetty</groupId> | |
+ <artifactId>jetty-servlet</artifactId> | |
+ <version>${jetty-8.version}</version> | |
+ </dependency> | |
+ <dependency> | |
+ <groupId>org.eclipse.jetty</groupId> | |
+ <artifactId>jetty-server</artifactId> | |
+ <version>${jetty-8.version}</version> | |
+ </dependency> | |
+ <dependency> | |
+ <groupId>com.google.inject</groupId> | |
+ <artifactId>guice</artifactId> | |
+ <version>${guice.version}</version> | |
+ </dependency> | |
+ <dependency> | |
+ <groupId>com.google.inject.extensions</groupId> | |
+ <artifactId>guice-servlet</artifactId> | |
+ <version>${guice.version}</version> | |
+ </dependency> | |
+ <dependency> | |
+ <groupId>com.sun.jersey.contribs</groupId> | |
+ <artifactId>jersey-guice</artifactId> | |
+ <version>${jersey.version}</version> | |
+ </dependency> | |
+ <dependency> | |
+ <groupId>com.sun.jersey</groupId> | |
+ <artifactId>jersey-server</artifactId> | |
+ <version>${jersey.version}</version> | |
+ </dependency> | |
<dependency> | |
<groupId>io.netty</groupId> | |
<artifactId>netty</artifactId> | |
diff --git a/lang/java/schema-repo/bundle/config/config.properties b/lang/java/schema-repo/bundle/config/config.properties | |
new file mode 100644 | |
index 0000000..2abd798 | |
--- /dev/null | |
+++ b/lang/java/schema-repo/bundle/config/config.properties | |
@@ -0,0 +1,3 @@ | |
+avro.repo.class=org.apache.avro.repo.InMemoryRepository | |
+#avro.repo.cached=org.apache.avro.repo.CacheRepository | |
+avro.repo.file-repo-path=target/data/ | |
\ No newline at end of file | |
diff --git a/lang/java/schema-repo/bundle/pom.xml b/lang/java/schema-repo/bundle/pom.xml | |
new file mode 100644 | |
index 0000000..f9a3b30 | |
--- /dev/null | |
+++ b/lang/java/schema-repo/bundle/pom.xml | |
@@ -0,0 +1,91 @@ | |
+<?xml version="1.0" encoding="UTF-8"?> | |
+<!-- | |
+ Licensed to the Apache Software Foundation (ASF) under one or more | |
+ contributor license agreements. See the NOTICE file distributed with | |
+ this work for additional information regarding copyright ownership. | |
+ The ASF licenses this file to You under the Apache License, Version 2.0 | |
+ (the "License"); you may not use this file except in compliance with | |
+ the License. You may obtain a copy of the License at | |
+ | |
+ http://www.apache.org/licenses/LICENSE-2.0 | |
+ | |
+ Unless required by applicable law or agreed to in writing, software | |
+ distributed under the License is distributed on an "AS IS" BASIS, | |
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ See the License for the specific language governing permissions and | |
+ limitations under the License. | |
+--> | |
+<project | |
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" | |
+ xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> | |
+ <modelVersion>4.0.0</modelVersion> | |
+ | |
+ <parent> | |
+ <artifactId>avro-repo-parent</artifactId> | |
+ <groupId>org.apache.avro.repo</groupId> | |
+ <version>1.7.7-SNAPSHOT</version> | |
+ <relativePath>../</relativePath> | |
+ </parent> | |
+ | |
+ <artifactId>avro-repo-bundle</artifactId> | |
+ | |
+ <name>Apache Avro Schema Repository Bundle</name> | |
+ <url>http://avro.apache.org</url> | |
+ <description>Avro Schema Repository standalone bundle</description> | |
+ | |
+ <build> | |
+ <plugins> | |
+ <plugin> | |
+ <groupId>org.apache.maven.plugins</groupId> | |
+ <artifactId>maven-shade-plugin</artifactId> | |
+ <executions> | |
+ <execution> | |
+ <id>bundle</id> | |
+ <goals> | |
+ <goal>shade</goal> | |
+ </goals> | |
+ <phase>package</phase> | |
+ <configuration> | |
+ <shadedArtifactAttached>true</shadedArtifactAttached> | |
+ <shadedClassifierName>withdeps</shadedClassifierName> | |
+ <transformers> | |
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> | |
+ <mainClass>org.apache.avro.repo.server.RepositoryServer</mainClass> | |
+ </transformer> | |
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> | |
+ </transformers> | |
+ <filters> | |
+ <filter> | |
+ <artifact>*:*</artifact> | |
+ <excludes> | |
+ <exclude>META-INF/SUN_MICR.*</exclude> | |
+ <exclude>META-INF/*.SF</exclude> | |
+ <exclude>META-INF/*.DSA</exclude> | |
+ <exclude>META-INF/*.RSA</exclude> | |
+ </excludes> | |
+ </filter> | |
+ </filters> | |
+ </configuration> | |
+ </execution> | |
+ </executions> | |
+ </plugin> | |
+ </plugins> | |
+ </build> | |
+ | |
+ <profiles> | |
+ </profiles> | |
+ | |
+ <dependencies> | |
+ <dependency> | |
+ <groupId>org.apache.avro.repo</groupId> | |
+ <artifactId>avro-repo-server</artifactId> | |
+ <version>${project.version}</version> | |
+ </dependency> | |
+ <dependency> | |
+ <groupId>org.apache.avro.repo</groupId> | |
+ <artifactId>avro-repo-client</artifactId> | |
+ <version>${project.version}</version> | |
+ </dependency> | |
+ </dependencies> | |
+ | |
+</project> | |
diff --git a/lang/java/schema-repo/client/pom.xml b/lang/java/schema-repo/client/pom.xml | |
new file mode 100644 | |
index 0000000..6ae3b6d | |
--- /dev/null | |
+++ b/lang/java/schema-repo/client/pom.xml | |
@@ -0,0 +1,74 @@ | |
+<?xml version="1.0" encoding="UTF-8"?> | |
+<!-- | |
+ Licensed to the Apache Software Foundation (ASF) under one or more | |
+ contributor license agreements. See the NOTICE file distributed with | |
+ this work for additional information regarding copyright ownership. | |
+ The ASF licenses this file to You under the Apache License, Version 2.0 | |
+ (the "License"); you may not use this file except in compliance with | |
+ the License. You may obtain a copy of the License at | |
+ | |
+ http://www.apache.org/licenses/LICENSE-2.0 | |
+ | |
+ Unless required by applicable law or agreed to in writing, software | |
+ distributed under the License is distributed on an "AS IS" BASIS, | |
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ See the License for the specific language governing permissions and | |
+ limitations under the License. | |
+--> | |
+<project | |
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" | |
+ xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> | |
+ <modelVersion>4.0.0</modelVersion> | |
+ | |
+ <parent> | |
+ <artifactId>avro-repo-parent</artifactId> | |
+ <groupId>org.apache.avro.repo</groupId> | |
+ <version>1.7.7-SNAPSHOT</version> | |
+ <relativePath>../</relativePath> | |
+ </parent> | |
+ | |
+ <artifactId>avro-repo-client</artifactId> | |
+ | |
+ <name>Apache Avro Schema Repository Client</name> | |
+ <url>http://avro.apache.org</url> | |
+ <description>Avro Schema Repository Client</description> | |
+ | |
+ <build> | |
+ </build> | |
+ | |
+ <profiles> | |
+ </profiles> | |
+ | |
+ <dependencies> | |
+ <dependency> | |
+ <groupId>org.apache.avro.repo</groupId> | |
+ <artifactId>avro-repo-common</artifactId> | |
+ <version>${project.version}</version> | |
+ </dependency> | |
+ <dependency> | |
+ <groupId>com.sun.jersey</groupId> | |
+ <artifactId>jersey-client</artifactId> | |
+ <version>${jersey.version}</version> | |
+ </dependency> | |
+ <dependency> | |
+ <groupId>javax.inject</groupId> | |
+ <artifactId>javax.inject</artifactId> | |
+ <version>1</version> | |
+ <optional>true</optional> | |
+ </dependency> | |
+ <dependency> | |
+ <groupId>org.apache.avro.repo</groupId> | |
+ <artifactId>avro-repo-common</artifactId> | |
+ <classifier>tests</classifier> | |
+ <scope>test</scope> | |
+ <version>${project.version}</version> | |
+ </dependency> | |
+ <dependency> | |
+ <groupId>org.apache.avro.repo</groupId> | |
+ <artifactId>avro-repo-server</artifactId> | |
+ <version>${project.version}</version> | |
+ <scope>test</scope> | |
+ </dependency> | |
+ </dependencies> | |
+ | |
+</project> | |
diff --git a/lang/java/schema-repo/client/src/main/java/org/apache/avro/repo/client/RESTRepositoryClient.java b/lang/java/schema-repo/client/src/main/java/org/apache/avro/repo/client/RESTRepositoryClient.java | |
new file mode 100644 | |
index 0000000..bf732e8 | |
--- /dev/null | |
+++ b/lang/java/schema-repo/client/src/main/java/org/apache/avro/repo/client/RESTRepositoryClient.java | |
@@ -0,0 +1,228 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
+ * implied. See the License for the specific language governing | |
+ * permissions and limitations under the License. | |
+ */ | |
+ | |
+package org.apache.avro.repo.client; | |
+ | |
+import java.io.StringReader; | |
+import java.util.ArrayList; | |
+import java.util.Collections; | |
+import java.util.Map; | |
+import java.util.Properties; | |
+ | |
+import javax.inject.Inject; | |
+import javax.inject.Named; | |
+import javax.ws.rs.core.MediaType; | |
+ | |
+import org.apache.avro.repo.Repository; | |
+import org.apache.avro.repo.RepositoryUtil; | |
+import org.apache.avro.repo.SchemaEntry; | |
+import org.apache.avro.repo.SchemaValidationException; | |
+import org.apache.avro.repo.Subject; | |
+import org.apache.avro.repo.SubjectConfig; | |
+ | |
+import com.sun.jersey.api.client.Client; | |
+import com.sun.jersey.api.client.ClientHandlerException; | |
+import com.sun.jersey.api.client.ClientResponse; | |
+import com.sun.jersey.api.client.UniformInterfaceException; | |
+import com.sun.jersey.api.client.WebResource; | |
+import com.sun.jersey.api.representation.Form; | |
+ | |
+/** | |
+ * An Implementation of {@link Repository} that connects to a remote | |
+ * RESTRepository over HTTP.<br/> | |
+ * <br/> | |
+ * Typically, this is used in a client wrapped in a | |
+ * {@link org.apache.avro.repo.CacheRepository} to limit network communication.<br/> | |
+ * <br/> | |
+ * Alternatively, this implementation can itself be what is used behind a | |
+ * RESTRepository in a RepositoryServer, thus creating a caching proxy. | |
+ */ | |
+public class RESTRepositoryClient implements Repository { | |
+ | |
+ private WebResource webResource; | |
+ | |
+ @Inject | |
+ public RESTRepositoryClient(@Named("avro.repo.url") String url) { | |
+ this.webResource = Client.create().resource(url); | |
+ } | |
+ | |
+ @Override | |
+ public Subject register(String subject, SubjectConfig config) { | |
+ String path = subject; | |
+ Form form = new Form(); | |
+ for(Map.Entry<String, String> entry : RepositoryUtil.safeConfig(config).asMap().entrySet()) { | |
+ form.putSingle(entry.getKey(), entry.getValue()); | |
+ } | |
+ | |
+ String regSubjectName = webResource.path(path).accept(MediaType.TEXT_PLAIN) | |
+ .type(MediaType.APPLICATION_FORM_URLENCODED).put(String.class, form); | |
+ | |
+ RESTSubject sub = new RESTSubject(regSubjectName); | |
+ return sub; | |
+ } | |
+ | |
+ @Override | |
+ public Subject lookup(String subject) { | |
+ RepositoryUtil.validateSchemaOrSubject(subject); | |
+ try {//returns ok or exception if not found | |
+ webResource.path(subject).get(String.class); | |
+ return new RESTSubject(subject); | |
+ } catch (Exception e) { | |
+ return null; | |
+ } | |
+ } | |
+ | |
+ @Override | |
+ public Iterable<Subject> subjects() { | |
+ ArrayList<Subject> subjectList = new ArrayList<Subject>(); | |
+ try { | |
+ String subjects = webResource.get(String.class); | |
+ for (String subjName : RepositoryUtil.subjectNamesFromString(subjects)) { | |
+ subjectList.add(new RESTSubject(subjName)); | |
+ } | |
+ } catch (Exception e) { | |
+ //no op. return empty list anyways | |
+ } | |
+ return subjectList; | |
+ } | |
+ | |
+ private class RESTSubject extends Subject { | |
+ | |
+ private RESTSubject(String name) { | |
+ super(name); | |
+ } | |
+ | |
+ @Override | |
+ public SubjectConfig getConfig() { | |
+ String path = getName() + "/config" ; | |
+ try { | |
+ String propString = webResource.path(path).accept(MediaType.TEXT_PLAIN) | |
+ .get(String.class); | |
+ Properties props = new Properties(); | |
+ props.load(new StringReader(propString)); | |
+ return RepositoryUtil.configFromProperties(props); | |
+ } catch (Exception e) { | |
+ return null; | |
+ } | |
+ } | |
+ | |
+ @Override | |
+ public SchemaEntry register(String schema) throws SchemaValidationException { | |
+ RepositoryUtil.validateSchemaOrSubject(schema); | |
+ | |
+ String path = getName() + "/register"; | |
+ return handleRegisterRequest(path, schema); | |
+ } | |
+ | |
+ @Override | |
+ public SchemaEntry registerIfLatest(String schema, SchemaEntry latest) | |
+ throws SchemaValidationException { | |
+ RepositoryUtil.validateSchemaOrSubject(schema); | |
+ String idStr = (latest == null) ? "" : latest.getId(); | |
+ | |
+ String path = getName() + "/register_if_latest/" + idStr; | |
+ return handleRegisterRequest(path, schema); | |
+ } | |
+ | |
+ private SchemaEntry handleRegisterRequest(String path, String schema) | |
+ throws SchemaValidationException { | |
+ String schemaId; | |
+ try { | |
+ schemaId = webResource.path(path).accept(MediaType.TEXT_PLAIN) | |
+ .type(MediaType.TEXT_PLAIN_TYPE).put(String.class, schema); | |
+ return new SchemaEntry(schemaId, schema); | |
+ } catch (UniformInterfaceException e) { | |
+ ClientResponse cr = e.getResponse(); | |
+ if (ClientResponse.Status.fromStatusCode(cr.getStatus()).equals( | |
+ ClientResponse.Status.FORBIDDEN)) { | |
+ throw new SchemaValidationException("Invalid schema: " + schema); | |
+ } else { | |
+ //any other status should return null | |
+ return null; | |
+ } | |
+ } catch (ClientHandlerException e) { | |
+ return null; | |
+ } | |
+ } | |
+ | |
+ @Override | |
+ public SchemaEntry lookupBySchema(String schema) { | |
+ RepositoryUtil.validateSchemaOrSubject(schema); | |
+ String path = getName() + "/schema"; | |
+ try { | |
+ String schemaId = webResource.path(path).accept(MediaType.TEXT_PLAIN) | |
+ .type(MediaType.TEXT_PLAIN_TYPE).post(String.class, schema); | |
+ return new SchemaEntry(schemaId, schema); | |
+ } catch (UniformInterfaceException e) { | |
+ return null; | |
+ } | |
+ } | |
+ | |
+ @Override | |
+ public SchemaEntry lookupById(String schemaId) { | |
+ RepositoryUtil.validateSchemaOrSubject(schemaId); | |
+ String path = getName() + "/id/" + schemaId; | |
+ try { | |
+ String schema = webResource.path(path).get(String.class); | |
+ return new SchemaEntry(schemaId, schema); | |
+ } catch (UniformInterfaceException e) { | |
+ return null; | |
+ } | |
+ } | |
+ | |
+ @Override | |
+ public SchemaEntry latest() { | |
+ String path = getName() + "/latest"; | |
+ String entryStr; | |
+ try { | |
+ entryStr = webResource.path(path).get(String.class); | |
+ return new SchemaEntry(entryStr); | |
+ } catch (UniformInterfaceException e) { | |
+ return null; | |
+ } | |
+ } | |
+ | |
+ @Override | |
+ public Iterable<SchemaEntry> allEntries() { | |
+ String path = getName() + "/all"; | |
+ try { | |
+ String entriesStr = webResource.path(path).get(String.class); | |
+ return schemaEntriesFromStr(entriesStr); | |
+ } catch (UniformInterfaceException e) { | |
+ return Collections.emptyList(); | |
+ } | |
+ } | |
+ | |
+ private Iterable<SchemaEntry> schemaEntriesFromStr(String entriesStr) { | |
+ return RepositoryUtil.schemasFromString(entriesStr); | |
+ } | |
+ | |
+ @Override | |
+ public boolean integralKeys() { | |
+ try { | |
+ String path = getName() + "/integral"; | |
+ String integral = webResource.path(path).get(String.class); | |
+ return Boolean.parseBoolean(integral); | |
+ } catch (UniformInterfaceException e){ | |
+ return false; | |
+ } | |
+ } | |
+ | |
+ } | |
+ | |
+} | |
diff --git a/lang/java/schema-repo/client/src/test/java/org/apache/avro/repo/client/RESTRepositoryClientTest.java b/lang/java/schema-repo/client/src/test/java/org/apache/avro/repo/client/RESTRepositoryClientTest.java | |
new file mode 100644 | |
index 0000000..3c7c096 | |
--- /dev/null | |
+++ b/lang/java/schema-repo/client/src/test/java/org/apache/avro/repo/client/RESTRepositoryClientTest.java | |
@@ -0,0 +1,55 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
+ * implied. See the License for the specific language governing | |
+ * permissions and limitations under the License. | |
+ */ | |
+ | |
+package org.apache.avro.repo.client; | |
+ | |
+import java.util.Properties; | |
+ | |
+import org.apache.avro.repo.AbstractTestRepository; | |
+import org.apache.avro.repo.InMemoryRepository; | |
+import org.apache.avro.repo.server.RepositoryServer; | |
+import org.junit.AfterClass; | |
+import org.junit.BeforeClass; | |
+ | |
+public class RESTRepositoryClientTest extends | |
+ AbstractTestRepository<RESTRepositoryClient> { | |
+ | |
+ static RepositoryServer server; | |
+ static RESTRepositoryClient client; | |
+ | |
+ @BeforeClass | |
+ public static void setUpBeforeClass() throws Exception { | |
+ Properties props = new Properties(); | |
+ props.put("repo.class", InMemoryRepository.class.getName()); | |
+ props.put("jetty.host", "localhost"); | |
+ props.put("jetty.port", "8123"); | |
+ server = new RepositoryServer(props); | |
+ server.start(); | |
+ } | |
+ | |
+ @Override | |
+ protected RESTRepositoryClient createRepository() { | |
+ return new RESTRepositoryClient("http://localhost:8123/schema-repo/"); | |
+ } | |
+ | |
+ @AfterClass | |
+ public static void tearDownAfterClass() throws Exception { | |
+ server.stop(); | |
+ } | |
+ | |
+} | |
diff --git a/lang/java/schema-repo/common/pom.xml b/lang/java/schema-repo/common/pom.xml | |
new file mode 100644 | |
index 0000000..47ae97b | |
--- /dev/null | |
+++ b/lang/java/schema-repo/common/pom.xml | |
@@ -0,0 +1,68 @@ | |
+<?xml version="1.0" encoding="UTF-8"?> | |
+<!-- | |
+ Licensed to the Apache Software Foundation (ASF) under one or more | |
+ contributor license agreements. See the NOTICE file distributed with | |
+ this work for additional information regarding copyright ownership. | |
+ The ASF licenses this file to You under the Apache License, Version 2.0 | |
+ (the "License"); you may not use this file except in compliance with | |
+ the License. You may obtain a copy of the License at | |
+ | |
+ http://www.apache.org/licenses/LICENSE-2.0 | |
+ | |
+ Unless required by applicable law or agreed to in writing, software | |
+ distributed under the License is distributed on an "AS IS" BASIS, | |
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ See the License for the specific language governing permissions and | |
+ limitations under the License. | |
+--> | |
+<project | |
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" | |
+ xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> | |
+ <modelVersion>4.0.0</modelVersion> | |
+ | |
+ <parent> | |
+ <artifactId>avro-repo-parent</artifactId> | |
+ <groupId>org.apache.avro.repo</groupId> | |
+ <version>1.7.7-SNAPSHOT</version> | |
+ <relativePath>../</relativePath> | |
+ </parent> | |
+ | |
+ <artifactId>avro-repo-common</artifactId> | |
+ | |
+ <name>Apache Avro Schema Repository Common</name> | |
+ <url>http://avro.apache.org</url> | |
+ <description>Avro Schema Repository common components</description> | |
+ | |
+ <build> | |
+ <!-- additionally create a test-jar artifact --> | |
+ <plugins> | |
+ <plugin> | |
+ <groupId>org.apache.maven.plugins</groupId> | |
+ <artifactId>maven-jar-plugin</artifactId> | |
+ <executions> | |
+ <execution> | |
+ <phase>package</phase> | |
+ <goals> | |
+ <goal>test-jar</goal> | |
+ </goals> | |
+ </execution> | |
+ </executions> | |
+ </plugin> | |
+ </plugins> | |
+ </build> | |
+ | |
+ <profiles> | |
+ </profiles> | |
+ | |
+ <dependencies> | |
+ <dependency> | |
+ <groupId>javax.inject</groupId> | |
+ <artifactId>javax.inject</artifactId> | |
+ <version>1</version> | |
+ <!-- users who choose to use the annotations for Guice/Spring/etc | |
+ will need to include this, others can ignore --> | |
+ <optional>true</optional> | |
+ </dependency> | |
+ </dependencies> | |
+ | |
+</project> | |
diff --git a/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/CacheRepository.java b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/CacheRepository.java | |
new file mode 100644 | |
index 0000000..34e203d | |
--- /dev/null | |
+++ b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/CacheRepository.java | |
@@ -0,0 +1,85 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
+ * implied. See the License for the specific language governing | |
+ * permissions and limitations under the License. | |
+ */ | |
+ | |
+package org.apache.avro.repo; | |
+ | |
+import javax.inject.Inject; | |
+ | |
+/** | |
+ * CacheRepository is a {@link Repository} implementation that wraps another | |
+ * {@link Repository} and acts as a write-through cache of {@link Subject}s and | |
+ * schema to id mappings, shielding the inner {@link Repository} from repetitive | |
+ * lookups. | |
+ * | |
+ * CacheRepository can cache Subjects (which cannot be deleted) and returns an | |
+ * implementation of {@link Subject} that caches schema to id mappings. | |
+ * | |
+ * CacheRepository can only cache the immutable elements of a Repository, because | |
+ * it is intended for use in any context -- in a client, in a proxy, or above a raw | |
+ * implementation of a repository. | |
+ * It cannot cache the entire list of subjects since the list is mutable. | |
+ * Similarly, a cached subject cannot cache the list of schemas, the subject configuration, | |
+ * or the latest() schema because those are mutable. | |
+ * | |
+ */ | |
+public class CacheRepository implements Repository { | |
+ | |
+ private final RepositoryCache cache; | |
+ private final Repository repo; | |
+ | |
+ /** | |
+ * Create a caching repository that wraps the provided repository using the | |
+ * cache provided | |
+ * @param repo The repository to wrap | |
+ * @param cache The cache to use | |
+ */ | |
+ @Inject | |
+ public CacheRepository(Repository repo, RepositoryCache cache) { | |
+ this.repo = repo; | |
+ this.cache = cache; | |
+ } | |
+ | |
+ @Override | |
+ public Subject register(String subjectName, SubjectConfig config) { | |
+ Subject s = cache.lookup(subjectName); | |
+ if (s == null) { | |
+ return cache.add(repo.register(subjectName, config)); | |
+ } | |
+ return s; | |
+ } | |
+ | |
+ @Override | |
+ public Subject lookup(String subjectName) { | |
+ Subject s = cache.lookup(subjectName); | |
+ if (s == null) { | |
+ return cache.add(repo.lookup(subjectName)); | |
+ } | |
+ return s; | |
+ } | |
+ | |
+ @Override | |
+ public Iterable<Subject> subjects() { | |
+ // the full list of subjects cannot be cached. | |
+ // however we can populate the cache with the result | |
+ Iterable<Subject> subs = repo.subjects(); | |
+ for (Subject s : subs) { | |
+ cache.add(s); | |
+ } | |
+ return subs; | |
+ } | |
+} | |
diff --git a/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/DelegatingSubject.java b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/DelegatingSubject.java | |
new file mode 100644 | |
index 0000000..6efcfef | |
--- /dev/null | |
+++ b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/DelegatingSubject.java | |
@@ -0,0 +1,80 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
+ * implied. See the License for the specific language governing | |
+ * permissions and limitations under the License. | |
+ */ | |
+ | |
+package org.apache.avro.repo; | |
+ | |
+ | |
+/** | |
+ * A {@link DelegatingSubject} is a Subject that delegates work to an underlying | |
+ * {@link Subject}. | |
+ * | |
+ * Specific implementations may override various methods. | |
+ * | |
+ */ | |
+public abstract class DelegatingSubject extends Subject { | |
+ private final Subject delegate; | |
+ | |
+ /** | |
+ * A {@link DelegatingSubject} delegates work to a provided Subject. | |
+ **/ | |
+ protected DelegatingSubject(Subject delegate) { | |
+ super(delegate.getName()); | |
+ this.delegate = delegate; | |
+ } | |
+ | |
+ @Override | |
+ public SchemaEntry register(String schema) throws SchemaValidationException { | |
+ return delegate.register(schema); | |
+ } | |
+ | |
+ @Override | |
+ public SchemaEntry registerIfLatest(String schema, SchemaEntry latest) | |
+ throws SchemaValidationException { | |
+ return delegate.registerIfLatest(schema, latest); | |
+ } | |
+ | |
+ @Override | |
+ public SchemaEntry lookupBySchema(String schema) { | |
+ return delegate.lookupBySchema(schema); | |
+ } | |
+ | |
+ @Override | |
+ public SchemaEntry lookupById(String id) { | |
+ return delegate.lookupById(id); | |
+ } | |
+ | |
+ @Override | |
+ public SchemaEntry latest() { | |
+ return delegate.latest(); | |
+ } | |
+ | |
+ @Override | |
+ public Iterable<SchemaEntry> allEntries() { | |
+ return delegate.allEntries(); | |
+ } | |
+ | |
+ @Override | |
+ public SubjectConfig getConfig() { | |
+ return delegate.getConfig(); | |
+ } | |
+ | |
+ @Override | |
+ public boolean integralKeys() { | |
+ return delegate.integralKeys(); | |
+ } | |
+} | |
diff --git a/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/FileRepository.java b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/FileRepository.java | |
new file mode 100644 | |
index 0000000..8678366 | |
--- /dev/null | |
+++ b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/FileRepository.java | |
@@ -0,0 +1,554 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
+ * implied. See the License for the specific language governing | |
+ * permissions and limitations under the License. | |
+ */ | |
+ | |
+package org.apache.avro.repo; | |
+ | |
+import java.io.BufferedWriter; | |
+import java.io.Closeable; | |
+import java.io.File; | |
+import java.io.FileInputStream; | |
+import java.io.FileNotFoundException; | |
+import java.io.FileOutputStream; | |
+import java.io.FileWriter; | |
+import java.io.FilenameFilter; | |
+import java.io.IOException; | |
+import java.io.OutputStreamWriter; | |
+import java.io.RandomAccessFile; | |
+import java.io.Writer; | |
+import java.nio.channels.FileChannel; | |
+import java.nio.channels.FileLock; | |
+import java.util.ArrayList; | |
+import java.util.Arrays; | |
+import java.util.Collections; | |
+import java.util.HashSet; | |
+import java.util.List; | |
+import java.util.NoSuchElementException; | |
+import java.util.Properties; | |
+import java.util.Scanner; | |
+ | |
+import javax.inject.Inject; | |
+import javax.inject.Named; | |
+ | |
+/** | |
+ * A {@link Repository} that persists content to file. <br/> | |
+ * <br/> | |
+ * The {@link Repository} stores all of its data in a single base directory. | |
+ * Within this directory each {@link Subject} is represented by a nested | |
+ * directory with the same name as the {@link Subject}. Within each | |
+ * {@link Subject} directory there are three file types: <li> | |
+ * A properties file named 'subject.properties' containing the configured | |
+ * properties for the Subject. At this time, the only used property is | |
+ * "avro.repo.validator.class".</li> <li> | |
+ * A text file named 'schema_ids' containing the schema ids, in order of their | |
+ * creation, delimited by newline, encoded in UTF-8. This is used to track the | |
+ * order of schema registration for {@link Subject#latest()} and | |
+ * {@link Subject#allEntries()}</li> <li> | |
+ * One file per schema the contents of which are the schema encoded in UTF-8 and | |
+ * the name of which is the schema id followed by the postfix '.schema'.</li> | |
+ * | |
+ */ | |
+public class FileRepository implements Repository, Closeable { | |
+ | |
+ private static final String LOCKFILE = ".repo.lock"; | |
+ private static final String SUBJECT_PROPERTIES = "subject.properties"; | |
+ private static final String SCHEMA_IDS = "schema_ids"; | |
+ private static final String SCHEMA_POSTFIX = ".schema"; | |
+ | |
+ private final InMemorySubjectCache subjects = new InMemorySubjectCache(); | |
+ private final ValidatorFactory validators; | |
+ private final File rootDir; | |
+ private final FileChannel lockChannel; | |
+ private final FileLock fileLock; | |
+ private boolean closed = false; | |
+ | |
+ /** | |
+ * Create a FileRepository in the directory path provided. Locks a file | |
+ * "repository.lock" to ensure no other object or process is running a | |
+ * FileRepository from the same place. The lock is released if | |
+ * {@link #close()} is called, the object is finalized, or the JVM exits. | |
+ * | |
+ * Not all platforms support file locks. See {@link FileLock} | |
+ * | |
+ * @param repoPath | |
+ * The | |
+ */ | |
+ @Inject | |
+ public FileRepository(@Named("avro.repo.file-repo-path") String repoPath, ValidatorFactory validators) { | |
+ this.validators = validators; | |
+ this.rootDir = new File(repoPath); | |
+ if ((!rootDir.exists() && !rootDir.mkdirs()) || !rootDir.isDirectory()) { | |
+ throw new java.lang.RuntimeException( | |
+ "Unable to create repo directory, or not a directory: " | |
+ + rootDir.getAbsolutePath()); | |
+ } | |
+ // lock repository | |
+ try { | |
+ File lockfile = new File(rootDir, LOCKFILE); | |
+ lockfile.createNewFile(); | |
+ @SuppressWarnings("resource") // raf is closed when lockChannel is closed | |
+ RandomAccessFile raf = new RandomAccessFile(lockfile, "rw"); | |
+ lockChannel = raf.getChannel(); | |
+ fileLock = lockChannel.tryLock(); | |
+ if (fileLock != null) { | |
+ lockfile.deleteOnExit(); | |
+ } else { | |
+ throw new IllegalStateException("Failed to lock file: " | |
+ + lockfile.getAbsolutePath()); | |
+ } | |
+ } catch (IOException e) { | |
+ throw new IllegalStateException("Unable to lock repository directory: " | |
+ + rootDir.getAbsolutePath(), e); | |
+ } | |
+ // eagerly load up subjects | |
+ loadSubjects(rootDir, subjects); | |
+ } | |
+ | |
+ private void loadSubjects(File repoDir, SubjectCache subjects) { | |
+ for (File file : repoDir.listFiles()) { | |
+ if (file.isDirectory()) { | |
+ subjects.add(new FileSubject(file)); | |
+ } | |
+ } | |
+ } | |
+ | |
+ private void isValid() { | |
+ if (closed) { | |
+ throw new IllegalStateException("FileRepository is closed"); | |
+ } | |
+ } | |
+ | |
+ @Override | |
+ public synchronized void close() { | |
+ if (closed) { | |
+ return; | |
+ } | |
+ try { | |
+ fileLock.release(); | |
+ } catch (IOException e) { | |
+ // nothing to do here -- it was already released | |
+ // or there are underlying errors we cannot recover from | |
+ } finally { | |
+ closed = true; | |
+ try { | |
+ lockChannel.close(); | |
+ } catch (IOException e) { | |
+ // nothing to do here -- underlying errors but recovery | |
+ // not possible here or in client, and already closed | |
+ } | |
+ } | |
+ } | |
+ | |
+ @Override | |
+ public synchronized Subject register(String subjectName, SubjectConfig config) { | |
+ isValid(); | |
+ Subject subject = subjects.lookup(subjectName); | |
+ if (null == subject) { | |
+ subject = subjects.add(Subject.validatingSubject(createNewFileSubject(subjectName, config), validators)); | |
+ } | |
+ return subject; | |
+ } | |
+ | |
+ @Override | |
+ public synchronized Subject lookup(String subjectName) { | |
+ isValid(); | |
+ return subjects.lookup(subjectName); | |
+ } | |
+ | |
+ @Override | |
+ public synchronized Iterable<Subject> subjects() { | |
+ isValid(); | |
+ return subjects.values(); | |
+ } | |
+ | |
+ private FileSubject createNewFileSubject(String subject, | |
+ SubjectConfig config) { | |
+ File subjectDir = new File(rootDir, subject); | |
+ createNewSubjectDir(subjectDir, config); | |
+ return new FileSubject(subjectDir); | |
+ } | |
+ | |
+ // create a new empty subject directory | |
+ private static void createNewSubjectDir(File subjectDir, SubjectConfig config) { | |
+ if (subjectDir.exists()) { | |
+ throw new RuntimeException( | |
+ "Cannot create a FileSubject, directory already exists: " | |
+ + subjectDir.getAbsolutePath()); | |
+ } | |
+ if (!subjectDir.mkdir()) { | |
+ throw new RuntimeException("Cannot create a FileSubject dir: " | |
+ + subjectDir.getAbsolutePath()); | |
+ } | |
+ | |
+ createNewFileInDir(subjectDir, SCHEMA_IDS); | |
+ File subjectProperties = createNewFileInDir(subjectDir, SUBJECT_PROPERTIES); | |
+ | |
+ Properties props = new Properties(); | |
+ props.putAll(RepositoryUtil.safeConfig(config).asMap()); | |
+ writePropertyFile(subjectProperties, props); | |
+ | |
+ } | |
+ | |
+ private static File createNewFileInDir(File dir, String filename) { | |
+ File result = new File(dir, filename); | |
+ try { | |
+ if (!result.createNewFile()) { | |
+ throw new RuntimeException(result.getAbsolutePath() + " already exists"); | |
+ } | |
+ } catch (IOException e) { | |
+ throw new RuntimeException("Unable to create file: " | |
+ + result.getAbsolutePath(), e); | |
+ } | |
+ return result; | |
+ } | |
+ | |
+ private static void writeToFile(File file, WriteOp op, boolean append) { | |
+ FileOutputStream out; | |
+ try { | |
+ out = new FileOutputStream(file, append); | |
+ } catch (FileNotFoundException e) { | |
+ throw new RuntimeException("Could not open file for write: " | |
+ + file.getAbsolutePath()); | |
+ } | |
+ try { | |
+ OutputStreamWriter writer = new OutputStreamWriter(out, "UTF-8"); | |
+ BufferedWriter bwriter = new BufferedWriter(writer); | |
+ op.write(bwriter); | |
+ bwriter.flush(); | |
+ bwriter.close(); | |
+ writer.close(); | |
+ out.close(); | |
+ } catch (IOException e) { | |
+ throw new RuntimeException("Failed to write and close file " | |
+ + file.getAbsolutePath()); | |
+ } | |
+ } | |
+ | |
+ private static void writePropertyFile(File file, final Properties prop) { | |
+ writeToFile(file, new WriteOp() { | |
+ @Override | |
+ protected void write(Writer writer) throws IOException { | |
+ prop.store(writer, "Schema Repository Subject Properties"); | |
+ } | |
+ }, false); | |
+ } | |
+ | |
+ private static void appendLineToFile(File file, final String line) { | |
+ writeToFile(file, new WriteOp() { | |
+ @Override | |
+ protected void write(Writer writer) throws IOException { | |
+ writer.append(line).append('\n'); | |
+ } | |
+ }, true); | |
+ } | |
+ | |
+ private static void dirExists(File dir) { | |
+ if (!dir.exists() || !dir.isDirectory()) { | |
+ throw new RuntimeException( | |
+ "directory does not exist or is not a directory: " + dir.toString()); | |
+ } | |
+ } | |
+ | |
+ private static void fileReadable(File file) { | |
+ if (!file.canRead()) { | |
+ throw new RuntimeException("file does not exist or is not readable: " | |
+ + file.toString()); | |
+ } | |
+ } | |
+ | |
+ private static void fileWriteable(File file) { | |
+ if (!file.canWrite()) { | |
+ throw new RuntimeException("file does not exist or is not writeable: " | |
+ + file.toString()); | |
+ } | |
+ } | |
+ | |
+ private abstract static class WriteOp { | |
+ protected abstract void write(Writer writer) throws IOException; | |
+ } | |
+ | |
+ private class FileSubject extends Subject { | |
+ private final File subjectDir; | |
+ private final File idFile; | |
+ private final File propertyFile; | |
+ private final SubjectConfig config; | |
+ | |
+ private int largestId = -1; | |
+ private SchemaEntry latest; | |
+ | |
+ private FileSubject(File dir) { | |
+ super(dir.getName()); | |
+ this.subjectDir = dir; | |
+ this.idFile = new File(dir, SCHEMA_IDS); | |
+ this.propertyFile = new File(dir, SUBJECT_PROPERTIES); | |
+ dirExists(subjectDir); | |
+ fileReadable(idFile); | |
+ fileWriteable(idFile); | |
+ fileReadable(propertyFile); | |
+ fileWriteable(propertyFile); | |
+ | |
+ // read from config file | |
+ Properties props = new Properties(); | |
+ try { | |
+ props.load(new FileInputStream(propertyFile)); | |
+ config = RepositoryUtil.configFromProperties(props); | |
+ Integer lastId = null; | |
+ HashSet<String> schemaFileNames = getSchemaFiles(); | |
+ HashSet<Integer> foundIds = new HashSet<Integer>(); | |
+ for (Integer id : getSchemaIds()) { | |
+ if (id > largestId) { | |
+ largestId = id; | |
+ } | |
+ lastId = id; | |
+ if(!foundIds.add(id)) { | |
+ throw new RuntimeException("Corrupt id file, id '" + id + | |
+ "' duplicated in " + idFile.getAbsolutePath()); | |
+ } | |
+ fileReadable(getSchemaFile(id)); | |
+ schemaFileNames.remove(getSchemaFileName(id)); | |
+ } | |
+ if (schemaFileNames.size() > 0) { | |
+ throw new RuntimeException("Schema files found in subject directory " | |
+ + subjectDir.getAbsolutePath() | |
+ + " that are not referenced in the " + SCHEMA_IDS + " file: " | |
+ + schemaFileNames.toString()); | |
+ } | |
+ if (lastId != null) { | |
+ latest = new SchemaEntry(lastId.toString(), | |
+ readSchemaForId(lastId.toString())); | |
+ } | |
+ } catch (IOException e) { | |
+ throw new RuntimeException("error initializing subject: " | |
+ + subjectDir.getAbsolutePath(), e); | |
+ } | |
+ } | |
+ | |
+ @Override | |
+ public SubjectConfig getConfig() { | |
+ return config; | |
+ } | |
+ | |
+ @Override | |
+ public synchronized SchemaEntry register(String schema) | |
+ throws SchemaValidationException { | |
+ isValid(); | |
+ RepositoryUtil.validateSchemaOrSubject(schema); | |
+ SchemaEntry entry = lookupBySchema(schema); | |
+ if (entry == null) { | |
+ entry = createNewSchemaFile(schema); | |
+ appendLineToFile(idFile, entry.getId()); | |
+ latest = entry; | |
+ } | |
+ return entry; | |
+ } | |
+ | |
+ private synchronized SchemaEntry createNewSchemaFile(String schema) { | |
+ try { | |
+ | |
+ int newId = largestId + 1; | |
+ File f = getSchemaFile(String.valueOf(newId)); | |
+ if (!f.exists() && f.createNewFile()) { | |
+ Writer output = new BufferedWriter(new FileWriter(f)); | |
+ try { | |
+ output.write(schema); | |
+ output.flush(); | |
+ } finally { | |
+ output.close(); | |
+ } | |
+ | |
+ latest = new SchemaEntry(String.valueOf(newId), schema); | |
+ largestId++; | |
+ return latest; | |
+ } else { | |
+ throw new RuntimeException( | |
+ "Unable to register schema, schema file either exists already " | |
+ + " or couldn't create new file"); | |
+ } | |
+ } catch (NumberFormatException e) { | |
+ throw new RuntimeException( | |
+ "Unable to register schema, invalid schema latest schema id ", e); | |
+ } catch (IOException e) { | |
+ throw new RuntimeException( | |
+ "Unable to register schema, couldn't create schema file ", e); | |
+ } | |
+ | |
+ } | |
+ | |
+ @Override | |
+ public synchronized SchemaEntry registerIfLatest(String schema, | |
+ SchemaEntry latest) throws SchemaValidationException { | |
+ isValid(); | |
+ if (latest == this.latest // both null | |
+ || (latest != null && latest.equals(this.latest))) { | |
+ return register(schema); | |
+ } else { | |
+ return null; | |
+ } | |
+ } | |
+ | |
+ @Override | |
+ public synchronized SchemaEntry lookupBySchema(String schema) { | |
+ isValid(); | |
+ RepositoryUtil.validateSchemaOrSubject(schema); | |
+ for (Integer id : getSchemaIds()) { | |
+ String idStr = id.toString(); | |
+ String schemaInFile = readSchemaForIdOrNull(idStr); | |
+ if (schema.equals(schemaInFile)) { | |
+ return new SchemaEntry(idStr, schema); | |
+ } | |
+ } | |
+ return null; | |
+ } | |
+ | |
+ @Override | |
+ public synchronized SchemaEntry lookupById(String id) { | |
+ isValid(); | |
+ String schema = readSchemaForIdOrNull(id); | |
+ if (schema != null) { | |
+ return new SchemaEntry(id, schema); | |
+ } | |
+ return null; | |
+ } | |
+ | |
+ @Override | |
+ public synchronized SchemaEntry latest() { | |
+ isValid(); | |
+ return latest; | |
+ } | |
+ | |
+ @Override | |
+ public synchronized Iterable<SchemaEntry> allEntries() { | |
+ isValid(); | |
+ List<SchemaEntry> entries = new ArrayList<SchemaEntry>(); | |
+ for (Integer id : getSchemaIds()) { | |
+ String idStr = id.toString(); | |
+ String schema = readSchemaForId(idStr); | |
+ entries.add(new SchemaEntry(idStr, schema)); | |
+ } | |
+ Collections.reverse(entries); | |
+ return entries; | |
+ } | |
+ | |
+ @Override | |
+ public boolean integralKeys() { | |
+ return true; | |
+ } | |
+ | |
+ private String readSchemaForIdOrNull(String id) { | |
+ try { | |
+ return readSchemaForId(id); | |
+ } catch (Exception e) { | |
+ return null; | |
+ } | |
+ } | |
+ | |
+ private String readSchemaForId(String id) { | |
+ File schemaFile = getSchemaFile(id); | |
+ return readSchemaFile(schemaFile); | |
+ } | |
+ | |
+ private String readSchemaFile(File schemaFile) { | |
+ try { | |
+ return readAllAsString(schemaFile); | |
+ } catch (FileNotFoundException e) { | |
+ throw new RuntimeException( | |
+ "Could not read schema contents at: " | |
+ + schemaFile.getAbsolutePath(), e); | |
+ } | |
+ } | |
+ | |
+ private final String endOfLine = System.getProperty("line.separator"); | |
+ | |
+ private String readAllAsString(File file) throws FileNotFoundException { | |
+ // a scanner that will read a whole file | |
+ Scanner s = new Scanner(file, "UTF-8").useDelimiter(endOfLine); | |
+ StringBuilder strBuilder = new StringBuilder(); | |
+ try { | |
+ while (s.hasNext()) { | |
+ strBuilder.append(s.nextLine()); | |
+ if (s.hasNext()) { | |
+ strBuilder.append(endOfLine); | |
+ } | |
+ } | |
+ return strBuilder.toString(); | |
+ } catch (NoSuchElementException e) { | |
+ throw new RuntimeException( | |
+ "file is empty: " + file.getAbsolutePath(), e); | |
+ } finally { | |
+ s.close(); | |
+ } | |
+ } | |
+ | |
+ private HashSet<String> getSchemaFiles() { | |
+ String[] files = subjectDir.list(new FilenameFilter() { | |
+ @Override | |
+ public boolean accept(File dir, String name) { | |
+ if (null != name && name.endsWith(SCHEMA_POSTFIX)) { | |
+ return true; | |
+ } | |
+ return false; | |
+ } | |
+ }); | |
+ return new HashSet<String>(Arrays.asList(files)); | |
+ } | |
+ | |
+ // schema ids from the schema id file, in order from oldest to newest | |
+ private List<Integer> getSchemaIds(){ | |
+ Scanner s = getIdFileScanner(); | |
+ List<Integer> ids = new ArrayList<Integer>(); | |
+ try { | |
+ while (s.hasNextLine()) { | |
+ if(s.hasNext()) { | |
+ // only read non-empty lines | |
+ ids.add(s.nextInt()); | |
+ } | |
+ s.nextLine(); | |
+ } | |
+ return ids; | |
+ } finally { | |
+ s.close(); | |
+ } | |
+ } | |
+ | |
+ private Scanner getIdFileScanner() { | |
+ try { | |
+ return new Scanner(idFile, "UTF-8"); | |
+ } catch (FileNotFoundException e) { | |
+ throw new RuntimeException("Unable to read schema id file: " | |
+ + idFile.getAbsolutePath(), e); | |
+ } | |
+ } | |
+ | |
+ private File getSchemaFile(String id) { | |
+ return new File(subjectDir, getSchemaFileName(id)); | |
+ } | |
+ | |
+ private File getSchemaFile(int id) { | |
+ return getSchemaFile(String.valueOf(id)); | |
+ } | |
+ | |
+ private String getSchemaFileName(String id) { | |
+ return id + SCHEMA_POSTFIX; | |
+ } | |
+ | |
+ private String getSchemaFileName(int id) { | |
+ return getSchemaFileName(String.valueOf(id)); | |
+ } | |
+ | |
+ } | |
+ | |
+} | |
diff --git a/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/InMemoryCache.java b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/InMemoryCache.java | |
new file mode 100644 | |
index 0000000..6317834 | |
--- /dev/null | |
+++ b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/InMemoryCache.java | |
@@ -0,0 +1,30 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
+ * implied. See the License for the specific language governing | |
+ * permissions and limitations under the License. | |
+ */ | |
+ | |
+package org.apache.avro.repo; | |
+/** | |
+ * <p> | |
+ * A {@link InMemoryCache} is an implementation of {@link RepositoryCache} | |
+ * that uses {@link InMemorySubjectCache} and {@link InMemorySchemaEntryCache} | |
+ */ | |
+public class InMemoryCache extends RepositoryCache { | |
+ public InMemoryCache() { | |
+ super(new InMemorySubjectCache(), new InMemorySchemaEntryCache.Factory()); | |
+ } | |
+ | |
+} | |
diff --git a/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/InMemoryRepository.java b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/InMemoryRepository.java | |
new file mode 100644 | |
index 0000000..e528b59 | |
--- /dev/null | |
+++ b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/InMemoryRepository.java | |
@@ -0,0 +1,122 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
+ * implied. See the License for the specific language governing | |
+ * permissions and limitations under the License. | |
+ */ | |
+ | |
+package org.apache.avro.repo; | |
+ | |
+import javax.inject.Inject; | |
+ | |
+ | |
+/** | |
+ * A {@link Repository} that stores its data in memory and is not persistent. | |
+ * This is useful primarily for testing. | |
+ */ | |
+public class InMemoryRepository implements Repository { | |
+ private final InMemorySubjectCache subjects = new InMemorySubjectCache(); | |
+ private final ValidatorFactory validators; | |
+ | |
+ @Inject | |
+ public InMemoryRepository(ValidatorFactory validators) { | |
+ this.validators = validators; | |
+ } | |
+ | |
+ @Override | |
+ public Subject register(String subjectName, SubjectConfig config) { | |
+ return subjects.add(Subject.validatingSubject(new MemSubject(subjectName, config), validators)); | |
+ } | |
+ | |
+ @Override | |
+ public Subject lookup(String subject) { | |
+ return subjects.lookup(subject); | |
+ } | |
+ | |
+ /** | |
+ * List all subjects | |
+ */ | |
+ @Override | |
+ public Iterable<Subject> subjects() { | |
+ return subjects.values(); | |
+ } | |
+ | |
+ private static class MemSubject extends Subject { | |
+ private final InMemorySchemaEntryCache schemas = new InMemorySchemaEntryCache(); | |
+ private SchemaEntry latest = null; | |
+ private int nextId = 0; | |
+ private SubjectConfig config; | |
+ | |
+ protected MemSubject(String name, SubjectConfig config) { | |
+ super(name); | |
+ this.config = RepositoryUtil.safeConfig(config); | |
+ } | |
+ | |
+ @Override | |
+ public SubjectConfig getConfig() { | |
+ return config; | |
+ } | |
+ | |
+ @Override | |
+ public synchronized SchemaEntry register(String schema) | |
+ throws SchemaValidationException { | |
+ String id = String.valueOf(nextId); | |
+ SchemaEntry toRegister = new SchemaEntry(id, schema); | |
+ SchemaEntry valueInCache = schemas.add(toRegister); | |
+ if (toRegister == valueInCache) { | |
+ // schema is new | |
+ nextId++; | |
+ this.latest = toRegister; | |
+ } | |
+ return valueInCache; | |
+ } | |
+ | |
+ @Override | |
+ public synchronized SchemaEntry registerIfLatest(String schema, | |
+ SchemaEntry latest) throws SchemaValidationException { | |
+ if (latest == this.latest | |
+ || (latest != null && latest.equals(this.latest))) { | |
+ return register(schema); | |
+ } else { | |
+ return null; | |
+ } | |
+ } | |
+ | |
+ @Override | |
+ public SchemaEntry lookupBySchema(String schema) { | |
+ return schemas.lookupBySchema(schema); | |
+ } | |
+ | |
+ @Override | |
+ public SchemaEntry lookupById(String id) { | |
+ return schemas.lookupById(id); | |
+ } | |
+ | |
+ @Override | |
+ public synchronized SchemaEntry latest() { | |
+ return latest; | |
+ } | |
+ | |
+ @Override | |
+ public synchronized Iterable<SchemaEntry> allEntries() { | |
+ return schemas.values(); | |
+ } | |
+ | |
+ @Override | |
+ public boolean integralKeys() { | |
+ return true; | |
+ } | |
+ } | |
+ | |
+} | |
diff --git a/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/InMemorySchemaEntryCache.java b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/InMemorySchemaEntryCache.java | |
new file mode 100644 | |
index 0000000..0d2bb29 | |
--- /dev/null | |
+++ b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/InMemorySchemaEntryCache.java | |
@@ -0,0 +1,73 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
+ * implied. See the License for the specific language governing | |
+ * permissions and limitations under the License. | |
+ */ | |
+ | |
+package org.apache.avro.repo; | |
+ | |
+import java.util.ArrayList; | |
+import java.util.LinkedList; | |
+import java.util.concurrent.ConcurrentHashMap; | |
+ | |
+/** | |
+ * An unbounded in memory {@link SchemaEntryCache} that never evicts any values; | |
+ */ | |
+public class InMemorySchemaEntryCache implements SchemaEntryCache { | |
+ | |
+ private final ConcurrentHashMap<String, SchemaEntry> schemaToEntry = | |
+ new ConcurrentHashMap<String, SchemaEntry>(); | |
+ private final ConcurrentHashMap<String, SchemaEntry> idToSchema = | |
+ new ConcurrentHashMap<String, SchemaEntry>(); | |
+ private final LinkedList<SchemaEntry> schemasInOrder = | |
+ new LinkedList<SchemaEntry>(); | |
+ | |
+ @Override | |
+ public SchemaEntry lookupBySchema(String schema) { | |
+ return schemaToEntry.get(schema); | |
+ } | |
+ | |
+ @Override | |
+ public SchemaEntry lookupById(String id) { | |
+ return idToSchema.get(id); | |
+ } | |
+ | |
+ @Override | |
+ public synchronized SchemaEntry add(SchemaEntry entry) { | |
+ if (null == entry) { | |
+ return entry; | |
+ } | |
+ SchemaEntry prior = schemaToEntry.putIfAbsent(entry.getSchema(), entry); | |
+ if (null != prior) { | |
+ entry = prior; | |
+ } | |
+ idToSchema.put(entry.getId(), entry); | |
+ schemasInOrder.push(entry); | |
+ return entry; | |
+ } | |
+ | |
+ /** return all of the values in this cache **/ | |
+ public synchronized Iterable<SchemaEntry> values() { | |
+ return new ArrayList<SchemaEntry>(schemasInOrder); | |
+ } | |
+ | |
+ public static class Factory implements SchemaEntryCache.Factory { | |
+ @Override | |
+ public SchemaEntryCache createSchemaEntryCache() { | |
+ return new InMemorySchemaEntryCache(); | |
+ } | |
+ } | |
+ | |
+} | |
diff --git a/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/InMemorySubjectCache.java b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/InMemorySubjectCache.java | |
new file mode 100644 | |
index 0000000..ce86208 | |
--- /dev/null | |
+++ b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/InMemorySubjectCache.java | |
@@ -0,0 +1,52 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
+ * implied. See the License for the specific language governing | |
+ * permissions and limitations under the License. | |
+ */ | |
+ | |
+package org.apache.avro.repo; | |
+ | |
+import java.util.concurrent.ConcurrentHashMap; | |
+ | |
+/** | |
+ * An unbounded in memory {@link SubjectCache} that never evicts any values. | |
+ */ | |
+public class InMemorySubjectCache implements SubjectCache { | |
+ private final ConcurrentHashMap<String, Subject> subjects = | |
+ new ConcurrentHashMap<String, Subject>(); | |
+ | |
+ @Override | |
+ public Subject lookup(String name) { | |
+ return subjects.get(name); | |
+ } | |
+ | |
+ @Override | |
+ public Subject add(Subject subject) { | |
+ if (subject == null) { | |
+ return subject; | |
+ } | |
+ Subject prior = subjects.putIfAbsent(subject.getName(), subject); | |
+ return (null != prior) ? prior : subject; | |
+ } | |
+ | |
+ /** | |
+ * @return All fo the {@link Subject} values in this | |
+ * {@link InMemorySubjectCache} | |
+ */ | |
+ public Iterable<Subject> values() { | |
+ return subjects.values(); | |
+ } | |
+ | |
+} | |
diff --git a/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/ReadOnlyRepository.java b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/ReadOnlyRepository.java | |
new file mode 100644 | |
index 0000000..4329c60 | |
--- /dev/null | |
+++ b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/ReadOnlyRepository.java | |
@@ -0,0 +1,66 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
+ * implied. See the License for the specific language governing | |
+ * permissions and limitations under the License. | |
+ */ | |
+ | |
+package org.apache.avro.repo; | |
+ | |
+import java.util.ArrayList; | |
+ | |
+import javax.inject.Inject; | |
+ | |
+/** | |
+ * ReadOnlyRepository is a {@link Repository} implementation that wraps another | |
+ * {@link Repository} and rejects all operations that can modify the state of | |
+ * the {@link Repository}.<br/> | |
+ * <br/> | |
+ * | |
+ * {@link #register(String, Map)}, {@link Subject#register(String)} | |
+ * and {@link Subject#registerIfLatest(String, SchemaEntry)} throw | |
+ * {@link IllegalStateException} if called. | |
+ */ | |
+public class ReadOnlyRepository implements Repository { | |
+ private final Repository repo; | |
+ | |
+ /** | |
+ * Create a repository that disallows mutations to the underlying repository. | |
+ * @param repo The repository to wrap | |
+ */ | |
+ @Inject | |
+ public ReadOnlyRepository(Repository repo) { | |
+ this.repo = repo; | |
+ } | |
+ | |
+ @Override | |
+ public Subject register(String subjectName, SubjectConfig config) { | |
+ throw new IllegalStateException( | |
+ "Cannot register a Subject in a ReadOnlyRepository"); | |
+ } | |
+ | |
+ @Override | |
+ public Subject lookup(String subjectName) { | |
+ return Subject.readOnly(repo.lookup(subjectName)); | |
+ } | |
+ | |
+ @Override | |
+ public Iterable<Subject> subjects() { | |
+ ArrayList<Subject> subjects = new ArrayList<Subject>(); | |
+ for(Subject sub : repo.subjects()) { | |
+ subjects.add(Subject.readOnly(sub)); | |
+ } | |
+ return subjects; | |
+ } | |
+} | |
diff --git a/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/Repository.java b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/Repository.java | |
new file mode 100644 | |
index 0000000..91dca81 | |
--- /dev/null | |
+++ b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/Repository.java | |
@@ -0,0 +1,62 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
+ * implied. See the License for the specific language governing | |
+ * permissions and limitations under the License. | |
+ */ | |
+ | |
+package org.apache.avro.repo; | |
+ | |
+/** | |
+ * A {@link Repository} is a collection of {@link Subject}s. A {@link Subject} | |
+ * can be looked up by name on a {@link Repository}, or registered.<br/> | |
+ * <br/> | |
+ * Registration of a {@link Subject} in a {@link Repository} is done via | |
+ * {@link #register(String, Map)}, which requires the subject name and its | |
+ * configuration. The configuration is a map of configuration keys | |
+ * to configuration values, both of which are Strings.<br/> | |
+ * <br/> | |
+ * | |
+ */ | |
+public interface Repository { | |
+ | |
+ /** | |
+ * Attempt to create a Subject with the given name and validator. | |
+ * | |
+ * @param subjectName | |
+ * The name of the subject. Must not be null. | |
+ * @param config | |
+ * The subject configuration. May be null. | |
+ * @return The newly created Subject, or an equivalent one if already created. | |
+ * Does not return null. | |
+ * @throws NullPointerException | |
+ * if subjectName is null | |
+ */ | |
+ Subject register(String subjectName, SubjectConfig config); | |
+ | |
+ /** | |
+ * Returns the subject if it exists, null otherwise. | |
+ * | |
+ * @param subject | |
+ * the subject name | |
+ * @return The subject if it exists, null otherwise. | |
+ */ | |
+ Subject lookup(String subjectName); | |
+ | |
+ /** | |
+ * List all subjects. Does not return null. | |
+ */ | |
+ Iterable<Subject> subjects(); | |
+ | |
+} | |
diff --git a/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/RepositoryCache.java b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/RepositoryCache.java | |
new file mode 100644 | |
index 0000000..001248c | |
--- /dev/null | |
+++ b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/RepositoryCache.java | |
@@ -0,0 +1,52 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
+ * implied. See the License for the specific language governing | |
+ * permissions and limitations under the License. | |
+ */ | |
+ | |
+package org.apache.avro.repo; | |
+ | |
+import javax.inject.Inject; | |
+ | |
+/** | |
+ * <p> | |
+ * A {@link RepositoryCache} composes a {@link SubjectCache} with a | |
+ * {@link SchemaEntryCache.Factory}, using | |
+ * {@link Subject#cacheWith(Subject, SchemaEntryCache)} to wrap {@link Subject} | |
+ * instances prior to insertion into the {@link SubjectCache}. | |
+ */ | |
+public class RepositoryCache implements SubjectCache { | |
+ | |
+ private final SubjectCache subjects; | |
+ private final SchemaEntryCache.Factory entryCacheFactory; | |
+ | |
+ @Inject | |
+ public RepositoryCache(SubjectCache subjects, | |
+ SchemaEntryCache.Factory entryCacheFactory) { | |
+ this.subjects = subjects; | |
+ this.entryCacheFactory = entryCacheFactory; | |
+ } | |
+ | |
+ @Override | |
+ public Subject add(Subject entry) { | |
+ return subjects.add(Subject.cacheWith(entry, | |
+ entryCacheFactory.createSchemaEntryCache())); | |
+ } | |
+ | |
+ @Override | |
+ public Subject lookup(String name) { | |
+ return subjects.lookup(name); | |
+ } | |
+} | |
diff --git a/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/RepositoryUtil.java b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/RepositoryUtil.java | |
new file mode 100644 | |
index 0000000..23f19e3 | |
--- /dev/null | |
+++ b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/RepositoryUtil.java | |
@@ -0,0 +1,147 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
+ * implied. See the License for the specific language governing | |
+ * permissions and limitations under the License. | |
+ */ | |
+ | |
+package org.apache.avro.repo; | |
+ | |
+import java.util.ArrayList; | |
+import java.util.HashMap; | |
+import java.util.List; | |
+import java.util.Properties; | |
+ | |
+/** | |
+ * {@link RepositoryUtil} contains static helper methods for the | |
+ * org.apache.avro.repo package. | |
+ * <p> | |
+ * {@link #subjectsToString(Iterable)} and | |
+ * {@link #subjectNamesFromString(String)} can be used to encode | |
+ * subjects to string format. <br/> | |
+ * {@link #schemasToString(Iterable)} and {@link #schemasFromString(String)} can | |
+ * be used to encode schemas to string format. | |
+ * </p> | |
+ * <p> | |
+ * These formats simply delimit items by the newline character and can be used | |
+ * for human readable output. The Avro RESTRepository uses this format to encode | |
+ * subjects and schemas over HTTP. Subject names are forbidden from containing | |
+ * whitespace. Schemas have their whitespace removed prior to use in the | |
+ * Repository. | |
+ * </p> | |
+ */ | |
+public final class RepositoryUtil { | |
+ private RepositoryUtil() { | |
+ } | |
+ | |
+ /** | |
+ * Encode {@link Subject}s into a {@link String} for use by | |
+ * {@link #subjectNamesFromString(String)} | |
+ * | |
+ * @param subjects | |
+ * the Subject objects to encode | |
+ * @return The {@link Subject} objects encoded as a String | |
+ */ | |
+ public static String subjectsToString(Iterable<Subject> subjects) { | |
+ StringBuilder sb = new StringBuilder(); | |
+ for (Subject s : subjects) { | |
+ sb.append(s.getName()).append("\n"); | |
+ } | |
+ return sb.toString(); | |
+ } | |
+ | |
+ /** | |
+ * Decode a string created by {@link #subjectsToString(Iterable)} | |
+ * | |
+ * @param str | |
+ * The String to decode | |
+ * @return an {@link java.lang.Iterable} of {@link Subject} | |
+ */ | |
+ public static Iterable<String> subjectNamesFromString(String str) { | |
+ List<String> subjects = new ArrayList<String>(); | |
+ if (str != null && !str.isEmpty()) { | |
+ String[] strs = str.split("\n"); | |
+ for (String s : strs) { | |
+ subjects.add(s); | |
+ } | |
+ } | |
+ return subjects; | |
+ } | |
+ | |
+ /** | |
+ * Encode {@link SchemaEntry} objects into a {@link String} for use by | |
+ * {@link #schemasFromString(String)} | |
+ * | |
+ * @param allEntries | |
+ * the SchemaEntry objects to encode | |
+ * @return The {@link SchemaEntry} objects encoded as a String | |
+ */ | |
+ public static String schemasToString(Iterable<SchemaEntry> allEntries) { | |
+ StringBuilder sb = new StringBuilder(); | |
+ for (SchemaEntry s : allEntries) { | |
+ sb.append(s.toString()).append("\n"); | |
+ } | |
+ return sb.toString(); | |
+ } | |
+ | |
+ /** | |
+ * Decode a string created by {@link #schemasToString(Iterable)} | |
+ * | |
+ * @param str | |
+ * The String to decode | |
+ * @return An {@link java.lang.Iterable} of {@link SchemaEntry} | |
+ */ | |
+ public static Iterable<SchemaEntry> schemasFromString(String str) { | |
+ List<SchemaEntry> schemas = new ArrayList<SchemaEntry>(); | |
+ if (str != null && !str.isEmpty()) { | |
+ String[] strs = str.split("\n"); | |
+ for (String s : strs) { | |
+ schemas.add(new SchemaEntry(s)); | |
+ } | |
+ } | |
+ return schemas; | |
+ } | |
+ | |
+ /** | |
+ * Throws IllegalArgumentException if the string provided is null, or empty. | |
+ */ | |
+ public static void validateSchemaOrSubject(String val) { | |
+ if (null == val || val.isEmpty()) { | |
+ throw new IllegalArgumentException( | |
+ "Provided string is null or empty: '" + val + "'"); | |
+ } | |
+ } | |
+ | |
+ /** | |
+ * Returns an immutable Map<String, String> from the properties provided. | |
+ * Includes any default values that exist in the properties. | |
+ */ | |
+ public static SubjectConfig configFromProperties(Properties props) { | |
+ HashMap<String, String> propData = new HashMap<String, String>(); | |
+ for (String key :props.stringPropertyNames()) { | |
+ propData.put(key, props.getProperty(key)); | |
+ } | |
+ return new SubjectConfig.Builder().set(propData).build(); | |
+ } | |
+ | |
+ /** temporary until we have decided how to deal with null configs or create a SubjectConfig class **/ | |
+ public static SubjectConfig safeConfig(SubjectConfig config) { | |
+ if (null == config) { | |
+ return SubjectConfig.emptyConfig(); | |
+ } else { | |
+ return config; | |
+ } | |
+ } | |
+ | |
+} | |
diff --git a/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/SchemaEntry.java b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/SchemaEntry.java | |
new file mode 100644 | |
index 0000000..ee226ff | |
--- /dev/null | |
+++ b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/SchemaEntry.java | |
@@ -0,0 +1,95 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
+ * implied. See the License for the specific language governing | |
+ * permissions and limitations under the License. | |
+ */ | |
+ | |
+package org.apache.avro.repo; | |
+ | |
+/** | |
+ * {@link SchemaEntry} is composed of a schema and its corresponding id. | |
+ */ | |
+public final class SchemaEntry { | |
+ private final String id; | |
+ private final String schema; | |
+ | |
+ /** | |
+ * Primary constructor taking a literal id and schema. | |
+ */ | |
+ public SchemaEntry(String id, String schema) { | |
+ this.id = id; | |
+ this.schema = schema; | |
+ } | |
+ | |
+ /** | |
+ * Constructor taking the string representation of the SchemaEntry produced by | |
+ * {@link #toString()} | |
+ */ | |
+ public SchemaEntry(String stringForm) { | |
+ int tab = stringForm.indexOf('\t'); | |
+ if (tab < 1) { | |
+ throw new IllegalArgumentException("Invalid Schema Entry Serialization: " | |
+ + stringForm); | |
+ } | |
+ this.id = stringForm.substring(0, tab); | |
+ this.schema = stringForm.substring(tab + 1); | |
+ } | |
+ | |
+ /** @return the id */ | |
+ public String getId() { | |
+ return id; | |
+ } | |
+ | |
+ /** @return the schema */ | |
+ public String getSchema() { | |
+ return schema; | |
+ } | |
+ | |
+ @Override | |
+ public int hashCode() { | |
+ final int prime = 31; | |
+ int result = ((id == null) ? 0 : id.hashCode()); | |
+ result = prime * result + ((schema == null) ? 0 : schema.hashCode()); | |
+ return result; | |
+ } | |
+ | |
+ @Override | |
+ public boolean equals(Object obj) { | |
+ if (this == obj) | |
+ return true; | |
+ if (obj == null) | |
+ return false; | |
+ if (getClass() != obj.getClass()) | |
+ return false; | |
+ SchemaEntry other = (SchemaEntry) obj; | |
+ if (id == null) { | |
+ if (other.id != null) | |
+ return false; | |
+ } else if (!id.equals(other.id)) | |
+ return false; | |
+ if (schema == null) { | |
+ if (other.schema != null) | |
+ return false; | |
+ } else if (!schema.equals(other.schema)) | |
+ return false; | |
+ return true; | |
+ } | |
+ | |
+ @Override | |
+ public String toString() { | |
+ return id + "\t" + schema; | |
+ } | |
+ | |
+} | |
diff --git a/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/SchemaEntryCache.java b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/SchemaEntryCache.java | |
new file mode 100644 | |
index 0000000..2b3f6e8 | |
--- /dev/null | |
+++ b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/SchemaEntryCache.java | |
@@ -0,0 +1,83 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
+ * implied. See the License for the specific language governing | |
+ * permissions and limitations under the License. | |
+ */ | |
+ | |
+package org.apache.avro.repo; | |
+ | |
+/** | |
+ * <p> | |
+ * A {@link SchemaEntryCache} is a bi-directional cache from a schema string to | |
+ * a schema id string. In a given {@link Subject} the mapping between a schema | |
+ * and its id is immutalbe and can be cached. Other aspects of {@link Subject} | |
+ * are not safe to cache. | |
+ * </p> | |
+ * <p> | |
+ * In a {@link Subject} schemas are Strings and can not be null or empty. | |
+ * See {@link RepositoryUtil#validateSchemaOrSubject(String)} | |
+ * </p> | |
+ * {@link #add(SchemaEntry)}, {@link #lookupById(String)}, and | |
+ * {@link #lookupBySchema(String)} must be thread-safe with respect to | |
+ * each-other. | |
+ */ | |
+public interface SchemaEntryCache { | |
+ | |
+ /** | |
+ * Look up a schema entry by its full string form. Thread-safe. | |
+ * | |
+ * @throws NullPointerException | |
+ * If the provided schema is null | |
+ */ | |
+ SchemaEntry lookupBySchema(String schema); | |
+ | |
+ /** | |
+ * Look up a schema entry by its id. Thread-safe. | |
+ * | |
+ * @throws NullPointerException | |
+ * If the provided id is null | |
+ */ | |
+ SchemaEntry lookupById(String id); | |
+ | |
+ /** | |
+ * Add the schema entry to this cache. | |
+ * | |
+ * @param the | |
+ * schema entry to add. If the provided entry is null, returns null; | |
+ * | |
+ * @return the {@link SchemaEntry} that is in the cache after the call | |
+ * completes. If the entry already exists the pre-existing value is | |
+ * returned, otherwise the value returned is the entry provided. If | |
+ * the provided entry is null, returns null. Thread-safe. | |
+ */ | |
+ SchemaEntry add(SchemaEntry entry); | |
+ | |
+ /** | |
+ * Creates instances of {@link SchemaEntryCache}<br/> | |
+ * <br/> | |
+ * | |
+ * @see {@link Subject#cacheWith(Subject, SchemaEntryCache)} | |
+ */ | |
+ interface Factory { | |
+ /** | |
+ * Create a {@link SchemaEntryCache} instance for use with | |
+ * {@link Subject#cacheWith(Subject, SchemaEntryCache)}. | |
+ * | |
+ * May return null to dicable use of a SchemaEntryCache | |
+ */ | |
+ SchemaEntryCache createSchemaEntryCache(); | |
+ } | |
+ | |
+} | |
diff --git a/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/SchemaValidationException.java b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/SchemaValidationException.java | |
new file mode 100644 | |
index 0000000..c65355d | |
--- /dev/null | |
+++ b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/SchemaValidationException.java | |
@@ -0,0 +1,27 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
+ * implied. See the License for the specific language governing | |
+ * permissions and limitations under the License. | |
+ */ | |
+ | |
+package org.apache.avro.repo; | |
+ | |
+public class SchemaValidationException extends Exception { | |
+ private static final long serialVersionUID = -3915576082651907606L; | |
+ | |
+ public SchemaValidationException(String msg) { | |
+ super(msg); | |
+ } | |
+} | |
diff --git a/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/Subject.java b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/Subject.java | |
new file mode 100644 | |
index 0000000..795a887 | |
--- /dev/null | |
+++ b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/Subject.java | |
@@ -0,0 +1,346 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
+ * implied. See the License for the specific language governing | |
+ * permissions and limitations under the License. | |
+ */ | |
+ | |
+package org.apache.avro.repo; | |
+ | |
+import java.util.ArrayList; | |
+import java.util.List; | |
+ | |
+/** | |
+ * A {@link Subject} is a collection of mutually compatible Schemas. <br/> | |
+ * <br/> | |
+ * Validation of schemas is pluggable and each subject may have its own | |
+ * validation rules defined with its own {@link Validator} when registered with | |
+ * a {@link Repository}. To create a {@link Subject} that validates its schemas, | |
+ * use {@link #validateWith(Subject, Validator)}. <br/> | |
+ * <br/> | |
+ * Caching of schemas is pluggable via | |
+ * {@link #cacheWith(Subject, SchemaEntryCache)}. A {@link Subject} can only | |
+ * cache the schema to id mappings, as other properties of a Subject are not | |
+ * safe to cache. <br/> | |
+ * <br/> | |
+ * A {@link Subject} has a few basic methods for interacting with Schemas: | |
+ * <li> | |
+ * {@link #register(String)} attempts to register a schema with the Subject, | |
+ * according to the validation rules of the Subject. The operation is idempotent | |
+ * -- the return value is the {@link SchemaEntry} corresponding to the schema | |
+ * String whether the schema existed before the operation or not.</li> | |
+ * <li> | |
+ * {@link #registerIfLatest(String, SchemaEntry)} attempts to register a schema | |
+ * with the Subject, according to the validation rules of the Subject. The | |
+ * operation succeeds only if the provided {@code latest} value is the current | |
+ * latest schema in the system, and returns null otherwise.</li> | |
+ * <li> | |
+ * {@link #lookupById(String)} looks up a schema by its id, and returns null if | |
+ * such a schema does not exist. Since the mapping from id to schema is | |
+ * immutable, this result is cacheable.</li> | |
+ * <li> | |
+ * {@link #lookupBySchema(String)} looks up an id for a schema, and returns null | |
+ * if no such schema exists. Since the mapping from id to schema is immutable, | |
+ * this result is cacheable.</li> | |
+ * <li> | |
+ * {@link #allEntries()} returns all the schema entries for the subject, ordered | |
+ * from most recent to oldest. The result is not cacheable, since additional | |
+ * entries may be added.</li> | |
+ * | |
+ */ | |
+public abstract class Subject { | |
+ private final String name; | |
+ | |
+ /** | |
+ * A {@link Subject} has a name. The name must not be null or empty, and | |
+ * cannot contain whitespace. If the name contains whitespace an | |
+ * {@link IllegalArgumentException} is thrown. | |
+ **/ | |
+ protected Subject(String name) { | |
+ RepositoryUtil.validateSchemaOrSubject(name); | |
+ this.name = name; | |
+ } | |
+ | |
+ /** | |
+ * Return the Name of the Subject. A Subject name can not contain whitespace, | |
+ * and must not be empty or null. | |
+ */ | |
+ public String getName() { | |
+ return name; | |
+ } | |
+ | |
+ public abstract SubjectConfig getConfig(); | |
+ | |
+ public abstract boolean integralKeys(); | |
+ | |
+ /** | |
+ * If the provided schema has already been registered in this subject, return | |
+ * the id. | |
+ * | |
+ * If the provided schema has not been registered in this subject, register it | |
+ * and return its id. | |
+ * | |
+ * Idempotent -- If two users simultaneously register the same schema, they | |
+ * will both get the same {@link SchemaEntry} result and succeed. | |
+ * | |
+ * @param schema | |
+ * The schema to register | |
+ * @return The id of the schema | |
+ * @throws SchemaValidationException | |
+ * If the schema change is not valid according the validation rules | |
+ * of the subject | |
+ */ | |
+ public abstract SchemaEntry register(String schema) | |
+ throws SchemaValidationException; | |
+ | |
+ /** | |
+ * Register the provided schema only if the current latest schema matches the | |
+ * provided latest entry. | |
+ * | |
+ * | |
+ * @param schema | |
+ * The schema to register | |
+ * @param latest | |
+ * the entry that must match the current actual latest value in order | |
+ * to register this schema. | |
+ * @return The id of the schema, or null if latest does not match. | |
+ * @throws SchemaValidationException | |
+ * If the schema change is not valid according the validation rules | |
+ * of the subject | |
+ */ | |
+ public abstract SchemaEntry registerIfLatest(String schema, SchemaEntry latest) | |
+ throws SchemaValidationException; | |
+ | |
+ /** | |
+ * Lookup the {@link SchemaEntry} for the given schema. Since the mapping of | |
+ * schema to id is immutable, this result can be cached. | |
+ * | |
+ * @param schema | |
+ * The schema to look up | |
+ * @return The SchemaEntry of the schema or null if the schema is not | |
+ * registered | |
+ */ | |
+ public abstract SchemaEntry lookupBySchema(String schema); | |
+ | |
+ /** | |
+ * Lookup the {@link SchemaEntry} for the given subject by id. Since the | |
+ * mapping of schema to id is immutable the result can be cached. | |
+ * | |
+ * @param id | |
+ * the id of the schema to look up | |
+ * @return The SchemaEntry of the schema or null if no such schema is | |
+ * registered for the provided id | |
+ */ | |
+ public abstract SchemaEntry lookupById(String id); | |
+ | |
+ /** | |
+ * Lookup the most recently registered schema for the given subject. This | |
+ * result is not cacheable, since the latest schema may change. | |
+ * | |
+ * @return The {@link SchemaEntry} or null if no schema is registered with | |
+ * this subject | |
+ */ | |
+ public abstract SchemaEntry latest(); | |
+ | |
+ /** | |
+ * List the ids of schemas registered with the given subject, ordered from | |
+ * most recent to oldest. This result is not cacheable, since the | |
+ * {@link SchemaEntry} in the subject may grow over time. | |
+ * | |
+ * @return the {@link SchemaEntry} objects in this subject, ordered from most | |
+ * recent to oldest. | |
+ */ | |
+ public abstract Iterable<SchemaEntry> allEntries(); | |
+ | |
+ @Override | |
+ public String toString() { | |
+ return name; | |
+ } | |
+ | |
+ /** | |
+ * Create a {@link Subject} that rejects modifications, throwing | |
+ * {@link IllegalStateException} if a modification is attempted | |
+ **/ | |
+ public static final Subject readOnly(Subject subject) { | |
+ if (null == subject) { | |
+ return subject; | |
+ } else { | |
+ return new ReadOnlySubject(subject); | |
+ } | |
+ } | |
+ | |
+ private static class ReadOnlySubject extends DelegatingSubject { | |
+ private ReadOnlySubject(Subject subject) { | |
+ super(subject); | |
+ } | |
+ | |
+ @Override | |
+ public SchemaEntry register(String schema) throws SchemaValidationException { | |
+ throw new IllegalStateException("Cannot register, subject is read-only"); | |
+ } | |
+ | |
+ @Override | |
+ public SchemaEntry registerIfLatest(String schema, SchemaEntry latest) { | |
+ throw new IllegalStateException("Cannot register, subject is read-only"); | |
+ } | |
+ | |
+ } | |
+ | |
+ /** | |
+ * Create a {@link Subject} that validates schemas as configured. | |
+ */ | |
+ public static Subject validatingSubject(Subject subject, ValidatorFactory factory) { | |
+ if (null == subject) { | |
+ return subject; | |
+ } | |
+ List<Validator> validators = factory.getValidators(subject.getConfig().getValidators()); | |
+ if (!validators.isEmpty()) { | |
+ return new ValidatingSubject(subject, new CompositeValidator(validators)); | |
+ } else { | |
+ return subject; | |
+ } | |
+ } | |
+ | |
+ private static final class CompositeValidator implements Validator { | |
+ private final ArrayList<Validator> validators; | |
+ private CompositeValidator(List<Validator> validators) { | |
+ this.validators = new ArrayList<Validator>(validators); | |
+ } | |
+ | |
+ @Override | |
+ public void validate(String schemaToValidate, | |
+ Iterable<SchemaEntry> schemasInOrder) throws SchemaValidationException { | |
+ for(Validator v : validators) { | |
+ v.validate(schemaToValidate, schemasInOrder); | |
+ } | |
+ } | |
+ } | |
+ | |
+ private static class ValidatingSubject extends DelegatingSubject { | |
+ protected final Validator validator; | |
+ | |
+ private ValidatingSubject(Subject delegate, Validator validator) { | |
+ super(delegate); | |
+ this.validator = validator; | |
+ } | |
+ | |
+ @Override | |
+ public SchemaEntry register(String schema) throws SchemaValidationException { | |
+ while (true) { | |
+ Iterable<SchemaEntry> schemaEntries = allEntries(); | |
+ SchemaEntry actualLatest = null; | |
+ for (SchemaEntry entry : schemaEntries) { | |
+ actualLatest = entry; | |
+ break; | |
+ } | |
+ validator.validate(schema, schemaEntries); | |
+ SchemaEntry registered = super.registerIfLatest(schema, actualLatest); | |
+ // if registered is not null, it was successful | |
+ if (null != registered) { | |
+ return registered; | |
+ } | |
+ } | |
+ } | |
+ | |
+ @Override | |
+ public SchemaEntry registerIfLatest(String schema, SchemaEntry latest) | |
+ throws SchemaValidationException { | |
+ Iterable<SchemaEntry> schemaEntries = allEntries(); | |
+ SchemaEntry actualLatest = null; | |
+ for (SchemaEntry entry : schemaEntries) { | |
+ actualLatest = entry; | |
+ break; | |
+ } | |
+ if (actualLatest == latest | |
+ || ((actualLatest != null) && actualLatest.equals(latest))) { | |
+ // they are equal, either both are null or they equal | |
+ validator.validate(schema, schemaEntries); | |
+ return super.registerIfLatest(schema, latest); | |
+ } else { | |
+ return null; | |
+ } | |
+ } | |
+ } | |
+ | |
+ /** | |
+ * Create a {@link Subject} that caches id to schema mappings using the | |
+ * {@link SchemaEntryCache} provided. | |
+ * | |
+ * @param subject | |
+ * The {@link Subject} to wrap | |
+ * @param cache | |
+ * The {@link SchemaEntryCache} to cache with | |
+ * @return returns a {@link Subject} instance that caches {@link SchemaEntry} | |
+ * values with the cache provided, if and only if both parameters are | |
+ * not null. <br/> | |
+ * If the provided subject is null, returns null. If the provided | |
+ * cache is null, returns the provided subject without wrapping it. | |
+ */ | |
+ public static Subject cacheWith(Subject subject, SchemaEntryCache cache) { | |
+ return (null == subject || null == cache) ? | |
+ subject : new CachingSubject(subject, cache); | |
+ } | |
+ | |
+ private static class CachingSubject extends DelegatingSubject { | |
+ private final SchemaEntryCache cache; | |
+ | |
+ private CachingSubject(Subject delegate, SchemaEntryCache cache) { | |
+ super(delegate); | |
+ this.cache = cache; | |
+ } | |
+ | |
+ @Override | |
+ public SchemaEntry register(String schema) throws SchemaValidationException { | |
+ SchemaEntry entry = cache.lookupBySchema(schema); | |
+ if (entry == null) { | |
+ return cache.add(super.register(schema)); | |
+ } | |
+ return entry; | |
+ } | |
+ | |
+ @Override | |
+ public SchemaEntry registerIfLatest(String schema, SchemaEntry latest) | |
+ throws SchemaValidationException { | |
+ return cache.add(super.registerIfLatest(schema, latest)); | |
+ } | |
+ | |
+ @Override | |
+ public SchemaEntry lookupBySchema(String schema) { | |
+ SchemaEntry entry = cache.lookupBySchema(schema); | |
+ if (entry == null) { | |
+ return cache.add(super.lookupBySchema(schema)); | |
+ } | |
+ return entry; | |
+ } | |
+ | |
+ @Override | |
+ public SchemaEntry lookupById(String id) { | |
+ SchemaEntry entry = cache.lookupById(id); | |
+ if (entry == null) { | |
+ return cache.add(super.lookupById(id)); | |
+ } | |
+ return entry; | |
+ } | |
+ | |
+ @Override | |
+ public Iterable<SchemaEntry> allEntries() { | |
+ Iterable<SchemaEntry> all = super.allEntries(); | |
+ for (SchemaEntry entry : all) { | |
+ cache.add(entry); | |
+ } | |
+ return all; | |
+ } | |
+ } | |
+ | |
+} | |
diff --git a/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/SubjectCache.java b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/SubjectCache.java | |
new file mode 100644 | |
index 0000000..360f703 | |
--- /dev/null | |
+++ b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/SubjectCache.java | |
@@ -0,0 +1,53 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
+ * implied. See the License for the specific language governing | |
+ * permissions and limitations under the License. | |
+ */ | |
+ | |
+package org.apache.avro.repo; | |
+ | |
+/** | |
+ * <p> | |
+ * A {@link SubjectCache} is a cache from a string subject name to a | |
+ * {@link Subject} | |
+ * </p> | |
+ * <p> | |
+ * In a {@link Repository} subjects can be cached because they can only be | |
+ * created. However, a {@link Subject} can have its meta-data altered, so this | |
+ * cannot be cached. | |
+ * </p> | |
+ * {@link #add(Subject)} and {@link #lookup(String)} must be thread-safe | |
+ * with respect to each-other. | |
+ */ | |
+public interface SubjectCache { | |
+ /** | |
+ * Look up a {@link Subject} by its name. Thread-safe. | |
+ * | |
+ * @throws NullPointerException | |
+ * if the provided name is null | |
+ */ | |
+ Subject lookup(String name); | |
+ | |
+ /** | |
+ * Add or update the {@link Subject} entry in this cache. | |
+ * | |
+ * @return the {@link Subject} that is in the cache after the call completes. | |
+ * If the entry already exists this is the pre-existing value, | |
+ * otherwise it is the value provided. If the value provided is null, | |
+ * returns null. Thread-safe. | |
+ */ | |
+ Subject add(Subject entry); | |
+ | |
+} | |
diff --git a/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/SubjectConfig.java b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/SubjectConfig.java | |
new file mode 100644 | |
index 0000000..c1993bc | |
--- /dev/null | |
+++ b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/SubjectConfig.java | |
@@ -0,0 +1,169 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
+ * implied. See the License for the specific language governing | |
+ * permissions and limitations under the License. | |
+ */ | |
+ | |
+package org.apache.avro.repo; | |
+ | |
+import java.util.ArrayList; | |
+import java.util.Collection; | |
+import java.util.Collections; | |
+import java.util.HashMap; | |
+import java.util.HashSet; | |
+import java.util.List; | |
+import java.util.Map; | |
+import java.util.Set; | |
+ | |
+/** | |
+ * A {@link SubjectConfig} is effectively a Map<String, String> , with reserved | |
+ * keys and default values for certain keys. | |
+ * <br/> | |
+ * Keys starting with "repo." are reserved. | |
+ * | |
+ */ | |
+public class SubjectConfig { | |
+ private static final SubjectConfig EMPTY = new Builder().build(); | |
+ | |
+ private final Map<String, String> conf; | |
+ private final Set<String> validators; | |
+ | |
+ private SubjectConfig(Map<String, String> conf, Set<String> validators) { | |
+ this.conf = conf; | |
+ this.validators = validators; | |
+ } | |
+ | |
+ public String get(String key) { | |
+ return conf.get(key); | |
+ } | |
+ | |
+ public Set<String> getValidators() { | |
+ return validators; | |
+ } | |
+ | |
+ public Map<String, String> asMap() { | |
+ return conf; | |
+ } | |
+ | |
+ public static SubjectConfig emptyConfig() { | |
+ return EMPTY; | |
+ } | |
+ | |
+ @Override | |
+ public int hashCode() { | |
+ return conf.hashCode() * 31 + validators.hashCode(); | |
+ } | |
+ | |
+ @Override | |
+ public boolean equals(Object obj) { | |
+ if (this == obj) | |
+ return true; | |
+ if (obj == null) | |
+ return false; | |
+ if (getClass() != obj.getClass()) | |
+ return false; | |
+ SubjectConfig other = (SubjectConfig) obj; | |
+ if (!validators.equals(other.validators)) | |
+ return false; | |
+ if (!conf.equals(other.conf)) | |
+ return false; | |
+ return true; | |
+ } | |
+ | |
+ public static class Builder { | |
+ private static final String RESERVED_PREFIX = "repo."; | |
+ private static final String VALIDATORS_KEY = "repo.validators"; | |
+ | |
+ private final HashMap<String, String> conf = new HashMap<String, String>(); | |
+ private final HashSet<String> validators = new HashSet<String>(); | |
+ | |
+ public Builder set(Map<String, String> config) { | |
+ for(Map.Entry<String, String> entry : config.entrySet()) { | |
+ set(entry.getKey(), entry.getValue()); | |
+ } | |
+ return this; | |
+ } | |
+ | |
+ public Builder set(String key, String value) { | |
+ if(key.startsWith(RESERVED_PREFIX)) { | |
+ if(VALIDATORS_KEY.equals(key)) { | |
+ setValidators(commaSplit(value)); | |
+ } else { | |
+ throw new RuntimeException("SubjectConfig keys starting with '" + | |
+ RESERVED_PREFIX + "' are reserved, failed to set: " + key + | |
+ " to value: " + value); | |
+ } | |
+ } else { | |
+ conf.put(key, value); | |
+ } | |
+ return this; | |
+ } | |
+ | |
+ public Builder setValidators(Collection<String> validatorNames) { | |
+ this.validators.clear(); | |
+ this.conf.remove(VALIDATORS_KEY); | |
+ if(!validatorNames.isEmpty()) { | |
+ this.validators.addAll(validatorNames); | |
+ this.conf.put(VALIDATORS_KEY, commaJoin(validators)); | |
+ } | |
+ return this; | |
+ } | |
+ | |
+ public Builder addValidator(String validatorName) { | |
+ this.validators.add(validatorName); | |
+ this.conf.put(VALIDATORS_KEY, commaJoin(validators)); | |
+ return this; | |
+ } | |
+ | |
+ public SubjectConfig build() { | |
+ return new SubjectConfig( | |
+ Collections.unmodifiableMap(new HashMap<String, String>(conf)), | |
+ Collections.unmodifiableSet(new HashSet<String>(validators))); | |
+ } | |
+ | |
+ } | |
+ | |
+ /** | |
+ * Helper method for splitting a string by a delimiter with | |
+ * java.util.String.split(). | |
+ * Omits empty values. | |
+ * @param toSplit The string to split. If null, an empty | |
+ * String[] is returned | |
+ * @return A String[] containing the non-empty values resulting | |
+ * from the split. Does not return null. | |
+ */ | |
+ private static List<String> commaSplit(String toSplit) { | |
+ if (toSplit == null) { | |
+ return Collections.emptyList(); | |
+ } | |
+ ArrayList<String> list = new ArrayList<String>(); | |
+ for(String s : toSplit.split(",")) { | |
+ s = s.trim(); | |
+ if (!s.isEmpty()) { | |
+ list.add(s); | |
+ } | |
+ } | |
+ return list; | |
+ } | |
+ | |
+ private static String commaJoin(Collection<String> strings) { | |
+ StringBuilder sb = new StringBuilder(); | |
+ for(String s : strings) { | |
+ sb.append(s).append(','); | |
+ } | |
+ return sb.toString(); | |
+ } | |
+ | |
+} | |
diff --git a/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/Validator.java b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/Validator.java | |
new file mode 100644 | |
index 0000000..94bf796 | |
--- /dev/null | |
+++ b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/Validator.java | |
@@ -0,0 +1,59 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
+ * implied. See the License for the specific language governing | |
+ * permissions and limitations under the License. | |
+ */ | |
+ | |
+package org.apache.avro.repo; | |
+ | |
+/** | |
+ * A Validator may be bound to each {@link Subject} in a {@link Repository} | |
+ * to ensure that schemas added to the {@link Subject} are mutualy compatible. | |
+ * <br/> | |
+ * <br/> | |
+ * There are many useful notions of compatibility, for example: | |
+ * <li>Forwards Compatible: A user of any old schema | |
+ * is able to read data written with the new schema</li> | |
+ * <li>Backwards Compabible: A user of the most recent schema | |
+ * is able to read data written with any older schema</li> | |
+ * <li>Full Compatibility: A user of any schema | |
+ * is able to read data written in any other schema</li> | |
+ * <li>N+1 Compatibility: Forward compatibility constrained to | |
+ * only the verison one prior to the current version. | |
+ * e.g. a reader with the second most recent schema | |
+ * is able to read data written with the most recent schema</li> | |
+ * <li>N+1 Compatibility: Backward compatibility constrained to | |
+ * only the verison one prior to the current version. | |
+ * e.g. a reader with the most recent schema | |
+ * is able to read data written with the second most recent schema</li> | |
+ */ | |
+public interface Validator { | |
+ | |
+ /** | |
+ * Validate that a schema is compatible with the schemas provided. | |
+ * | |
+ * @param schemaToValidate | |
+ * The schema to validate. | |
+ * @param schemasInOrder | |
+ * The schemas to validate against, | |
+ * presented in order from latest to oldest. | |
+ * @throws SchemaValidationException | |
+ * if {@code schemaToValidate} is not compatible with the schemas | |
+ * in {@code schemasInOrder} | |
+ */ | |
+ void validate(String schemaToValidate, Iterable<SchemaEntry> schemasInOrder) | |
+ throws SchemaValidationException; | |
+ | |
+} | |
diff --git a/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/ValidatorFactory.java b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/ValidatorFactory.java | |
new file mode 100644 | |
index 0000000..3d1ac41 | |
--- /dev/null | |
+++ b/lang/java/schema-repo/common/src/main/java/org/apache/avro/repo/ValidatorFactory.java | |
@@ -0,0 +1,89 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
+ * implied. See the License for the specific language governing | |
+ * permissions and limitations under the License. | |
+ */ | |
+ | |
+package org.apache.avro.repo; | |
+ | |
+import java.util.ArrayList; | |
+import java.util.HashMap; | |
+import java.util.List; | |
+import java.util.Set; | |
+ | |
+/** | |
+ * A factory for mapping Validator names to instantiated instances. Validator | |
+ * names starting with "repo." | |
+ */ | |
+public class ValidatorFactory { | |
+ public static final String REJECT_VALIDATOR = "repo.reject"; | |
+ | |
+ private final HashMap<String, Validator> validators; | |
+ | |
+ private ValidatorFactory(HashMap<String, Validator> validators) { | |
+ this.validators = validators; | |
+ } | |
+ | |
+ /** | |
+ * @param validatorNames | |
+ * The set of {@link Validator} names to resolve. Must not be null. | |
+ * @return A list of {@link Validator}s. Not null. | |
+ */ | |
+ public final List<Validator> getValidators(Set<String> validatorNames) { | |
+ ArrayList<Validator> result = new ArrayList<Validator>(); | |
+ for (String name : validatorNames) { | |
+ Validator v = validators.get(name); | |
+ if (v != null) { | |
+ result.add(v); | |
+ } | |
+ } | |
+ return result; | |
+ } | |
+ | |
+ public static class Builder { | |
+ private final HashMap<String, Validator> validators; | |
+ { | |
+ validators = new HashMap<String, Validator>(); | |
+ validators.put(REJECT_VALIDATOR, new Reject()); | |
+ } | |
+ | |
+ /** | |
+ * Configure this builder to return a {@link ValidatorFactory} that maps the | |
+ * {@link Validator} provided to the name given. <br/> | |
+ * The name must not be null and must not start with "repo.". | |
+ */ | |
+ public Builder setValidator(String name, Validator validator) { | |
+ if (name.startsWith("repo.")) { | |
+ throw new RuntimeException("Validator names starting with 'repo.'" | |
+ + " are reserved. Attempted to set validator with name: " + name); | |
+ } | |
+ validators.put(name, validator); | |
+ return this; | |
+ } | |
+ | |
+ public ValidatorFactory build() { | |
+ return new ValidatorFactory(new HashMap<String, Validator>(validators)); | |
+ } | |
+ } | |
+ | |
+ private static class Reject implements Validator { | |
+ @Override | |
+ public void validate(String schemaToValidate, | |
+ Iterable<SchemaEntry> schemasInOrder) throws SchemaValidationException { | |
+ throw new SchemaValidationException( | |
+ "repo.validator.reject validator always rejects validation"); | |
+ } | |
+ } | |
+} | |
diff --git a/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/AbstractTestRepository.java b/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/AbstractTestRepository.java | |
new file mode 100644 | |
index 0000000..b21509a | |
--- /dev/null | |
+++ b/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/AbstractTestRepository.java | |
@@ -0,0 +1,255 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
+ * implied. See the License for the specific language governing | |
+ * permissions and limitations under the License. | |
+ */ | |
+ | |
+package org.apache.avro.repo; | |
+ | |
+import java.util.Iterator; | |
+ | |
+import org.junit.After; | |
+import org.junit.Assert; | |
+import org.junit.Before; | |
+import org.junit.Test; | |
+ | |
+/** | |
+ * An abstract JUnit test for thoroughly testing a Repository implementation | |
+ */ | |
+public abstract class AbstractTestRepository<R extends Repository> { | |
+ private static final String SUB = "sub"; | |
+ private static final String CONF = "conf"; | |
+ private static final String NOCONF = "noconf"; | |
+ private static final String VALIDATING = "validating"; | |
+ private static final String FOO = "foo"; | |
+ private static final String BAR = "bar"; | |
+ private static final String BAZ = "baz"; | |
+ | |
+ private R repo; | |
+ | |
+ protected abstract R createRepository(); | |
+ | |
+ @Before | |
+ public void setUpRepository() { | |
+ repo = createRepository(); | |
+ } | |
+ | |
+ @After | |
+ public void tearDownRepository() { | |
+ repo = null; | |
+ } | |
+ | |
+ protected final R getRepo() { | |
+ return repo; | |
+ } | |
+ | |
+ @Test | |
+ public void testRepository() throws SchemaValidationException { | |
+ // lookup a subject that does not exist, when none do | |
+ Subject none = repo.lookup(SUB); | |
+ Assert.assertNull("non-existent subject lookup should return null", none); | |
+ // ensure that when there are no subjects, an empty iterable is produced | |
+ Iterable<Subject> subjects = repo.subjects(); | |
+ Assert.assertNotNull("subjects must not be null"); | |
+ Assert.assertFalse("subjects must be empty", subjects.iterator().hasNext()); | |
+ | |
+ // register a subject | |
+ Subject sub = repo.register(SUB, null); | |
+ Assert.assertNotNull("failed to register subject: " + SUB, sub); | |
+ // a duplicate register is idempotent; the result is the same | |
+ Subject sub2 = repo.register(SUB, null); | |
+ Assert.assertNotNull("failed to re-register subject: " + SUB, sub2); | |
+ Assert.assertEquals( | |
+ "registering a subject twice did not produce the same result", | |
+ sub.getName(), sub2.getName()); | |
+ | |
+ // lookup subject that was just registered | |
+ Subject sub3 = repo.lookup(SUB); | |
+ Assert.assertNotNull("subject lookup failed", sub3); | |
+ Assert.assertEquals("subject lookup failed", sub.getName(), sub3.getName()); | |
+ | |
+ // lookup a subject that does not exist, this time when some do | |
+ Subject none2 = repo.lookup("nothing"); | |
+ Assert.assertNull("non-existent subject lookup should return null", none2); | |
+ | |
+ // go through all subjects | |
+ boolean hasSub = false; | |
+ for (Subject s : repo.subjects()) { | |
+ if (sub.getName().equals(s.getName())) { | |
+ hasSub = true; | |
+ break; | |
+ } | |
+ } | |
+ Assert.assertTrue("subjects() did not contain registered subject: " + sub, | |
+ hasSub); | |
+ | |
+ // ensure that latest is null when a subject is first created | |
+ SchemaEntry noLatest = sub.latest(); | |
+ Assert.assertNull("latest msut be null if it does not exist", noLatest); | |
+ | |
+ // ensure that registerIfLatest does not register if provided a latest | |
+ // value when latest is null | |
+ SchemaEntry didNotRegister = | |
+ sub.registerIfLatest(FOO, new SchemaEntry("not", "there")); | |
+ Assert.assertNull( | |
+ "registerIfLatest must return null if there are no schemas in the " + | |
+ "subject and the passed in latest is not null. Found: " + | |
+ didNotRegister, didNotRegister); | |
+ | |
+ // ensure registerIflatest works when latest is null | |
+ SchemaEntry foo = sub.registerIfLatest(FOO, null); | |
+ validateSchemaEntry(FOO, foo); | |
+ | |
+ // ensure that register is idempotent | |
+ SchemaEntry foo2 = sub.register(FOO); | |
+ Assert.assertEquals("duplicate schema registration must be idempotent", | |
+ foo, foo2); | |
+ validateSchemaEntry(FOO, foo2); | |
+ | |
+ // ensure registerIflatest works when latest is not null | |
+ SchemaEntry bar = sub.registerIfLatest(BAR, foo2); | |
+ validateSchemaEntry(BAR, bar); | |
+ | |
+ // ensure registerIfLatest does not register if latest does not match | |
+ SchemaEntry none3 = sub.registerIfLatest("none", foo); | |
+ Assert.assertNull( | |
+ "registerIfLatest must return null if latest does not match", none3); | |
+ // ensure registerIfLatest does not register when provided null does not match | |
+ SchemaEntry none4 = sub.registerIfLatest("none", null); | |
+ Assert.assertNull( | |
+ "registerIfLatest must return null if there is a latest schema in" | |
+ + " the subject and the passed in value is null", none4); | |
+ | |
+ // ensure register can add new schemas | |
+ SchemaEntry baz = sub.register(BAZ); | |
+ validateSchemaEntry(BAZ, baz); | |
+ | |
+ // test lookup | |
+ Subject subject = repo.lookup(SUB); | |
+ Assert.assertNotNull("lookup of previously registered subject failed", | |
+ subject); | |
+ | |
+ // test latest | |
+ Assert.assertEquals("latest schema must match last registered", baz, | |
+ subject.latest()); | |
+ boolean foundfoo = false, foundbar = false; | |
+ for (SchemaEntry s : sub3.allEntries()) { | |
+ if (s.equals(foo)) { | |
+ foundfoo = true; | |
+ } | |
+ if (s.equals(bar)) { | |
+ foundbar = true; | |
+ } | |
+ } | |
+ Assert.assertTrue("AllEntries did not contain schema: " + FOO, foundfoo); | |
+ Assert.assertTrue("AllEntries did not contain schema: " + BAR, foundbar); | |
+ | |
+ //ensure order of allEntries is correct | |
+ Iterator<SchemaEntry> allEntries = sub3.allEntries().iterator(); | |
+ // latest must match first one: | |
+ Assert.assertEquals("Latest must be first returned from allEntries()", | |
+ sub3.latest(), allEntries.next()); | |
+ Assert.assertEquals("second-latest must be BAR", bar, allEntries.next()); | |
+ Assert.assertEquals("third must be FOO", foo, allEntries.next()); | |
+ | |
+ | |
+ // test lookupBySchema | |
+ SchemaEntry resultfoo = subject.lookupBySchema(foo.getSchema()); | |
+ Assert.assertEquals("lookup by Schema did not return same result", foo, | |
+ resultfoo); | |
+ SchemaEntry notThere = subject.lookupBySchema("notThere"); | |
+ Assert.assertNull("non existent schema should return null", notThere); | |
+ // test lookupById | |
+ SchemaEntry resultfooid = subject.lookupById(foo.getId()); | |
+ Assert.assertEquals("lookup by ID did not return same result", foo, | |
+ resultfooid); | |
+ SchemaEntry notTherById = subject.lookupById("notThere"); | |
+ Assert.assertNull("non existent schema should return null", notTherById); | |
+ | |
+ // test integralKeys() | |
+ if(subject.integralKeys()) { | |
+ Integer.parseInt(resultfoo.getId()); | |
+ } | |
+ | |
+ } | |
+ | |
+ @Test | |
+ public void testSubjectConfigs() { | |
+ String testKey = "test.key"; | |
+ String testVal = "test.val"; | |
+ String testVal2 = "test.val2"; | |
+ SubjectConfig conf = new SubjectConfig.Builder().set(testKey, testVal).build(); | |
+ SubjectConfig conf2 = new SubjectConfig.Builder().set(testKey, testVal2).build(); | |
+ // lookup a subject that does not exist, when none do | |
+ Subject none = repo.lookup(CONF); | |
+ Assert.assertNull("non-existent subject lookup should return null", none); | |
+ | |
+ // register a subject | |
+ Subject sub = repo.register(CONF, conf); | |
+ Assert.assertNotNull("failed to register subject: " + CONF, sub); | |
+ // a duplicate register is idempotent; the result is the same | |
+ Subject sub2 = repo.register(CONF, conf2); | |
+ Assert.assertNotNull("failed to re-register subject: " + SUB, sub2); | |
+ validateSubject(sub, sub2); | |
+ | |
+ // lookup subject that was just registered | |
+ Subject sub3 = repo.lookup(CONF); | |
+ validateSubject(sub, sub3); | |
+ | |
+ // lookup something that is not there | |
+ Subject sub4 = repo.lookup(NOCONF); | |
+ Assert.assertNull("subject should not exist", sub4); | |
+ | |
+ } | |
+ | |
+ @Test | |
+ public void testValidation() { | |
+ SubjectConfig conf = new SubjectConfig.Builder().addValidator(ValidatorFactory.REJECT_VALIDATOR).build(); | |
+ // lookup a subject that does not exist, when none do | |
+ Subject none = repo.lookup(VALIDATING); | |
+ Assert.assertNull("non-existent subject lookup should return null", none); | |
+ | |
+ // register a subject that should reject all schemas | |
+ Subject sub = repo.register(VALIDATING, conf); | |
+ Assert.assertNotNull("failed to register subject: " + VALIDATING, sub); | |
+ | |
+ boolean threw = false; | |
+ try { | |
+ sub.register("stuff"); | |
+ } catch (SchemaValidationException e) { | |
+ threw = true; | |
+ } | |
+ Assert.assertTrue("must throw a SchemaValidationException", threw); | |
+ } | |
+ | |
+ private void validateSubject(Subject sub, Subject sub3) { | |
+ Assert.assertEquals( | |
+ "subject names do not match", | |
+ sub.getName(), sub3.getName()); | |
+ Assert.assertEquals( | |
+ "subject configurations do not match", | |
+ sub.getConfig(), sub3.getConfig()); | |
+ } | |
+ | |
+ private void validateSchemaEntry(String expectedSchema, SchemaEntry foo) { | |
+ Assert.assertNotNull("Failed to create SchemaEntry with schema: " | |
+ + expectedSchema, foo); | |
+ Assert.assertEquals("SchemaEntry does not have expected schema value", | |
+ expectedSchema, foo.getSchema()); | |
+ Assert.assertNotNull("SchemaEntry does not have a valid id", foo.getId()); | |
+ Assert.assertFalse("SchemaEntry has an empty id", foo.getId().isEmpty()); | |
+ } | |
+ | |
+} | |
diff --git a/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestCacheRepository.java b/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestCacheRepository.java | |
new file mode 100644 | |
index 0000000..a27269f | |
--- /dev/null | |
+++ b/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestCacheRepository.java | |
@@ -0,0 +1,28 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
+ * implied. See the License for the specific language governing | |
+ * permissions and limitations under the License. | |
+ */ | |
+ | |
+package org.apache.avro.repo; | |
+ | |
+public class TestCacheRepository extends | |
+ AbstractTestRepository<CacheRepository> { | |
+ | |
+ @Override | |
+ protected CacheRepository createRepository() { | |
+ return new CacheRepository(new InMemoryRepository(new ValidatorFactory.Builder().build()), new InMemoryCache()); | |
+ } | |
+} | |
\ No newline at end of file | |
diff --git a/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestFileRepository.java b/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestFileRepository.java | |
new file mode 100644 | |
index 0000000..0a1c338 | |
--- /dev/null | |
+++ b/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestFileRepository.java | |
@@ -0,0 +1,186 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
+ * implied. See the License for the specific language governing | |
+ * permissions and limitations under the License. | |
+ */ | |
+ | |
+package org.apache.avro.repo; | |
+ | |
+import static org.junit.Assert.assertTrue; | |
+ | |
+import java.io.File; | |
+import java.io.IOException; | |
+ | |
+import junit.framework.Assert; | |
+ | |
+import org.junit.After; | |
+import org.junit.BeforeClass; | |
+import org.junit.Test; | |
+ | |
+public class TestFileRepository extends AbstractTestRepository<FileRepository> { | |
+ private static final String TEST_PATH = "target/test/TestFileRepository-paths/"; | |
+ private static final String REPO_PATH = "target/test/TestFileRepository/"; | |
+ | |
+ @BeforeClass | |
+ public static void setup() { | |
+ rmDir(new File(TEST_PATH)); | |
+ rmDir(new File(REPO_PATH)); | |
+ } | |
+ | |
+ @After | |
+ public void cleanUp() throws IOException { | |
+ getRepo().close(); | |
+ } | |
+ | |
+ @Override | |
+ protected FileRepository createRepository() { | |
+ return newRepo(REPO_PATH); | |
+ } | |
+ | |
+ private FileRepository newRepo(String path) { | |
+ return new FileRepository(path, new ValidatorFactory.Builder().build()); | |
+ } | |
+ | |
+ @Test | |
+ public void testPathHandling() throws SchemaValidationException { | |
+ String paths[] = new String[] { | |
+ "data", "data/", "/tmp/file_repo", | |
+ "/tmp/file_repo/", "/tmp/file_repo/" }; | |
+ | |
+ for (String path : paths) { | |
+ FileRepository r = newRepo(TEST_PATH + path); | |
+ try { | |
+ File expected = new File(TEST_PATH, path); | |
+ assertTrue("Expected directory not created: " + | |
+ expected.getAbsolutePath() + " for path: " + path, expected.exists()); | |
+ } finally { | |
+ r.close(); | |
+ // should be ok to call close twice | |
+ r.close(); | |
+ } | |
+ } | |
+ // verify idempotent | |
+ newRepo(TEST_PATH + "/tmp/repo").close(); | |
+ newRepo(TEST_PATH + "/tmp/repo").close(); | |
+ } | |
+ | |
+ @Test | |
+ public void testReadWritten() throws SchemaValidationException { | |
+ String path = TEST_PATH + "/readWrite"; | |
+ FileRepository r = newRepo(path); | |
+ try { | |
+ r.register("sub1", null).register("sc1"); | |
+ r.register("sub2", null).register("sc2"); | |
+ r.register("sub2", null).register("sc3"); | |
+ } finally { | |
+ r.close(); | |
+ } | |
+ r = newRepo(path); | |
+ try { | |
+ Subject s1 = r.lookup("sub1"); | |
+ Assert.assertNotNull(s1); | |
+ Subject s2 = r.lookup("sub2"); | |
+ Assert.assertNotNull(s2); | |
+ | |
+ SchemaEntry e1 = s1.lookupBySchema("sc1"); | |
+ Assert.assertNotNull(e1); | |
+ Assert.assertEquals("sc1", e1.getSchema()); | |
+ | |
+ SchemaEntry e2 = s2.lookupBySchema("sc2"); | |
+ Assert.assertNotNull(e2); | |
+ Assert.assertEquals("sc2", e2.getSchema()); | |
+ | |
+ SchemaEntry e3 = s2.lookupBySchema("sc3"); | |
+ Assert.assertNotNull(e3); | |
+ Assert.assertEquals("sc3", e3.getSchema()); | |
+ } finally { | |
+ r.close(); | |
+ } | |
+ } | |
+ | |
+ @Test | |
+ public void testReadWrittenMultiLineSchema() throws SchemaValidationException { | |
+ String path = TEST_PATH + "/readWriteMultiLine"; | |
+ | |
+ String endOfLine = System.getProperty("line.separator"); | |
+ | |
+ String multiLineSchema1 = "first line" + endOfLine + "second line"; | |
+ String multiLineSchema2 = "first line" + endOfLine + "second line" + endOfLine; | |
+ | |
+ FileRepository r = newRepo(path); | |
+ try { | |
+ r.register("sub1", null).register(multiLineSchema1); | |
+ r.register("sub1", null).register(multiLineSchema2); | |
+ } finally { | |
+ r.close(); | |
+ } | |
+ r = newRepo(path); | |
+ try { | |
+ Subject s1 = r.lookup("sub1"); | |
+ Assert.assertNotNull(s1); | |
+ | |
+ SchemaEntry schemaEntry1ById = s1.lookupById("0"); | |
+ Assert.assertNotNull(schemaEntry1ById); | |
+ Assert.assertEquals(multiLineSchema1, schemaEntry1ById.getSchema()); | |
+ | |
+ SchemaEntry schemaEntryBy1Schema = s1.lookupBySchema(multiLineSchema1); | |
+ Assert.assertNotNull(schemaEntryBy1Schema); | |
+ Assert.assertEquals(multiLineSchema1, schemaEntryBy1Schema.getSchema()); | |
+ | |
+ SchemaEntry schemaEntry2ById = s1.lookupById("1"); | |
+ Assert.assertNotNull(schemaEntry2ById); | |
+ Assert.assertEquals(multiLineSchema1, schemaEntry2ById.getSchema()); | |
+ | |
+ SchemaEntry schemaEntry2BySchema = s1.lookupBySchema(multiLineSchema1); | |
+ Assert.assertNotNull(schemaEntry2BySchema); | |
+ Assert.assertEquals(multiLineSchema1, schemaEntry2BySchema.getSchema()); | |
+ | |
+ } finally { | |
+ r.close(); | |
+ } | |
+ } | |
+ | |
+ | |
+ @Test(expected = RuntimeException.class) | |
+ public void testInvalidDir() throws IOException { | |
+ String badPath = TEST_PATH + "/bad"; | |
+ new File(TEST_PATH).mkdirs(); | |
+ new File(badPath).createNewFile(); | |
+ FileRepository r = newRepo(badPath); | |
+ r.close(); | |
+ } | |
+ | |
+ @Test(expected = IllegalStateException.class) | |
+ public void testCantUseClosedRepo() { | |
+ FileRepository r = newRepo(TEST_PATH + "/tmp/repo"); | |
+ r.close(); | |
+ r.lookup("nothing"); | |
+ } | |
+ | |
+ private static void rmDir(File dir) { | |
+ if (!dir.exists() || !dir.isDirectory()) { | |
+ return; | |
+ } | |
+ for (String filename : dir.list()) { | |
+ File entry = new File(dir, filename); | |
+ if (entry.isDirectory()) { | |
+ rmDir(entry); | |
+ } else { | |
+ entry.delete(); | |
+ } | |
+ } | |
+ dir.delete(); | |
+ } | |
+} | |
diff --git a/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestInMemoryRepository.java b/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestInMemoryRepository.java | |
new file mode 100644 | |
index 0000000..dcad958 | |
--- /dev/null | |
+++ b/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestInMemoryRepository.java | |
@@ -0,0 +1,28 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
+ * implied. See the License for the specific language governing | |
+ * permissions and limitations under the License. | |
+ */ | |
+ | |
+package org.apache.avro.repo; | |
+ | |
+public class TestInMemoryRepository extends | |
+ AbstractTestRepository<InMemoryRepository> { | |
+ @Override | |
+ protected InMemoryRepository createRepository() { | |
+ return new InMemoryRepository(new ValidatorFactory.Builder().build()); | |
+ } | |
+ | |
+} | |
diff --git a/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestReadOnlyRepository.java b/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestReadOnlyRepository.java | |
new file mode 100644 | |
index 0000000..e32cc98 | |
--- /dev/null | |
+++ b/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestReadOnlyRepository.java | |
@@ -0,0 +1,114 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
+ * implied. See the License for the specific language governing | |
+ * permissions and limitations under the License. | |
+ */ | |
+ | |
+package org.apache.avro.repo; | |
+ | |
+import org.junit.After; | |
+import org.junit.Assert; | |
+import org.junit.Before; | |
+import org.junit.Test; | |
+ | |
+public class TestReadOnlyRepository { | |
+ private static final String SUB = "sub"; | |
+ private static final String FOO = "foo"; | |
+ | |
+ private InMemoryRepository repo; | |
+ private ReadOnlyRepository readOnlyRepo; | |
+ | |
+ @Before | |
+ public void setUpRepository() { | |
+ repo = new InMemoryRepository(new ValidatorFactory.Builder().build()); | |
+ readOnlyRepo = new ReadOnlyRepository(repo); | |
+ } | |
+ | |
+ @After | |
+ public void tearDownRepository() { | |
+ repo = null; | |
+ readOnlyRepo = null; | |
+ } | |
+ | |
+ @Test | |
+ public void testReadOnlyRepository() throws SchemaValidationException { | |
+ | |
+ // lookup a subject that does not exist, when none do | |
+ Subject none = readOnlyRepo.lookup(SUB); | |
+ Assert.assertNull("non-existent subject lookup should return null", none); | |
+ // ensure that when there are no subjects, an empty iterable is produced | |
+ Iterable<Subject> subjects = readOnlyRepo.subjects(); | |
+ Assert.assertNotNull("subjects must not be null"); | |
+ Assert.assertFalse("subjects must be empty", subjects.iterator().hasNext()); | |
+ | |
+ // register a subject | |
+ Subject sub = repo.register(SUB, null); | |
+ Assert.assertNotNull("failed to register subject: " + SUB, sub); | |
+ // a duplicate register is idempotent; the result is the same | |
+ Subject sub2 = repo.register(SUB, null); | |
+ Assert.assertNotNull("failed to re-register subject: " + SUB, sub2); | |
+ Assert.assertEquals( | |
+ "registering a subject twice did not produce the same result", | |
+ sub.getName(), sub2.getName()); | |
+ | |
+ // lookup subject that was just registered | |
+ Subject readOnlySubject = readOnlyRepo.lookup(SUB); | |
+ Assert.assertNotNull("subject lookup failed", readOnlySubject); | |
+ Assert.assertEquals("subject lookup failed", sub.getName(), readOnlySubject.getName()); | |
+ | |
+ // lookup a subject that does not exist, this time when some do | |
+ Subject none2 = readOnlyRepo.lookup("nothing"); | |
+ Assert.assertNull("non-existent subject lookup should return null", none2); | |
+ | |
+ // go through all subjects | |
+ boolean hasSub = false; | |
+ for (Subject s : readOnlyRepo.subjects()) { | |
+ if (sub.getName().equals(s.getName())) { | |
+ hasSub = true; | |
+ break; | |
+ } | |
+ } | |
+ Assert.assertTrue("subjects() did not contain registered subject: " + sub, | |
+ hasSub); | |
+ | |
+ SchemaEntry foo = sub.register(FOO); | |
+ boolean foundfoo = false; | |
+ for (SchemaEntry s : readOnlySubject.allEntries()) { | |
+ if (s.equals(foo)) { | |
+ foundfoo = true; | |
+ } | |
+ } | |
+ Assert.assertTrue("AllEntries did not contain schema: " + FOO, foundfoo); | |
+ | |
+ } | |
+ | |
+ @Test(expected=IllegalStateException.class) | |
+ public void testCannotCreateSubject() { | |
+ readOnlyRepo.register(null, null); | |
+ } | |
+ | |
+ @Test(expected=IllegalStateException.class) | |
+ public void testCannotRegisterSchema() throws SchemaValidationException { | |
+ repo.register(FOO, null); | |
+ readOnlyRepo.lookup(FOO).register(null); | |
+ } | |
+ | |
+ @Test(expected=IllegalStateException.class) | |
+ public void testCannotRegisterSchemaIfLatest() throws SchemaValidationException { | |
+ repo.register(FOO, null); | |
+ readOnlyRepo.lookup(FOO).registerIfLatest(null, null); | |
+ } | |
+ | |
+} | |
diff --git a/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestRepositoryUtil.java b/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestRepositoryUtil.java | |
new file mode 100644 | |
index 0000000..03301af | |
--- /dev/null | |
+++ b/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestRepositoryUtil.java | |
@@ -0,0 +1,97 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
+ * implied. See the License for the specific language governing | |
+ * permissions and limitations under the License. | |
+ */ | |
+ | |
+package org.apache.avro.repo; | |
+ | |
+import java.util.ArrayList; | |
+import java.util.Iterator; | |
+ | |
+import org.junit.Assert; | |
+import org.junit.Test; | |
+ | |
+public class TestRepositoryUtil { | |
+ | |
+ @Test(expected = IllegalArgumentException.class) | |
+ public void validateNullString() { | |
+ RepositoryUtil.validateSchemaOrSubject(null); | |
+ } | |
+ | |
+ @Test(expected = IllegalArgumentException.class) | |
+ public void validateEmptyString() { | |
+ RepositoryUtil.validateSchemaOrSubject(""); | |
+ } | |
+ | |
+ @Test | |
+ public void validateString() { | |
+ RepositoryUtil.validateSchemaOrSubject("org.apache.avro.Something"); | |
+ } | |
+ | |
+ @Test | |
+ public void testSchemasToFromString() { | |
+ SchemaEntry e1 = new SchemaEntry("id1", "s1"); | |
+ SchemaEntry e2 = new SchemaEntry("id2", "s2"); | |
+ ArrayList<SchemaEntry> empty = new ArrayList<SchemaEntry>(); | |
+ ArrayList<SchemaEntry> vals = new ArrayList<SchemaEntry>(); | |
+ vals.add(e1); | |
+ vals.add(e2); | |
+ | |
+ Iterable<SchemaEntry> emptyResult = RepositoryUtil | |
+ .schemasFromString(RepositoryUtil.schemasToString(empty)); | |
+ Iterable<SchemaEntry> emptyResult2 = RepositoryUtil.schemasFromString(null); | |
+ Iterable<SchemaEntry> emptyResult3 = RepositoryUtil.schemasFromString(""); | |
+ Assert.assertEquals(empty, emptyResult); | |
+ Assert.assertEquals(emptyResult, emptyResult2); | |
+ Assert.assertEquals(emptyResult, emptyResult3); | |
+ | |
+ Iterable<SchemaEntry> result = RepositoryUtil | |
+ .schemasFromString(RepositoryUtil.schemasToString(vals)); | |
+ Assert.assertEquals(vals, result); | |
+ } | |
+ | |
+ @Test | |
+ public void testSubjectsToFromString() { | |
+ Repository r = new InMemoryRepository(new ValidatorFactory.Builder().build()); | |
+ Subject s1 = r.register("s1", null); | |
+ Subject s2 = r.register("s2", null); | |
+ ArrayList<Subject> empty = new ArrayList<Subject>(); | |
+ ArrayList<Subject> vals = new ArrayList<Subject>(); | |
+ vals.add(s1); | |
+ vals.add(s2); | |
+ | |
+ Iterable<String> emptyResult = RepositoryUtil | |
+ .subjectNamesFromString(RepositoryUtil.subjectsToString(empty)); | |
+ Iterable<String> emptyResult2 = RepositoryUtil.subjectNamesFromString(null); | |
+ Iterable<String> emptyResult3 = RepositoryUtil.subjectNamesFromString(""); | |
+ validate(emptyResult, empty); | |
+ Assert.assertEquals(emptyResult, emptyResult2); | |
+ Assert.assertEquals(emptyResult, emptyResult3); | |
+ | |
+ Iterable<String> result = RepositoryUtil | |
+ .subjectNamesFromString(RepositoryUtil.subjectsToString(vals)); | |
+ validate(result, vals); | |
+ } | |
+ | |
+ private void validate(Iterable<String> names, Iterable<Subject> subjects) { | |
+ Iterator<String> nameIter = names.iterator(); | |
+ for (Subject s : subjects) { | |
+ String name = nameIter.next(); | |
+ Assert.assertEquals(s.getName(), name); | |
+ } | |
+ } | |
+ | |
+} | |
diff --git a/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestSchemaEntry.java b/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestSchemaEntry.java | |
new file mode 100644 | |
index 0000000..3279248 | |
--- /dev/null | |
+++ b/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestSchemaEntry.java | |
@@ -0,0 +1,84 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
+ * implied. See the License for the specific language governing | |
+ * permissions and limitations under the License. | |
+ */ | |
+ | |
+package org.apache.avro.repo; | |
+ | |
+import org.junit.Assert; | |
+import org.junit.Test; | |
+ | |
+ | |
+public class TestSchemaEntry { | |
+ @Test | |
+ public void testToString() { | |
+ SchemaEntry entry = new SchemaEntry("id", "schema"); | |
+ SchemaEntry toAndFrom = new SchemaEntry(entry.toString()); | |
+ Assert.assertEquals(entry, toAndFrom); | |
+ } | |
+ | |
+ @Test(expected=IllegalArgumentException.class) | |
+ public void testInvalid() { | |
+ new SchemaEntry("invalidString"); | |
+ } | |
+ | |
+ @Test | |
+ public void testGetters() { | |
+ String id = "id"; | |
+ String sc = "sc"; | |
+ SchemaEntry entry = new SchemaEntry(id, sc); | |
+ Assert.assertEquals(id, entry.getId()); | |
+ Assert.assertEquals(sc, entry.getSchema()); | |
+ } | |
+ | |
+ @Test | |
+ public void testEqualsAndHashCode() { | |
+ SchemaEntry entry = new SchemaEntry("id", "schema"); | |
+ SchemaEntry entry2 = new SchemaEntry("id", "schema"); | |
+ SchemaEntry nullId = new SchemaEntry(null, "schema"); | |
+ SchemaEntry nullSchema = new SchemaEntry("id", null); | |
+ SchemaEntry idDiffers = new SchemaEntry("iddd", "schema"); | |
+ SchemaEntry sDiffers = new SchemaEntry("id", "schemaaaa"); | |
+ SchemaEntry allNull = new SchemaEntry(null, null); | |
+ SchemaEntry allNull2 = new SchemaEntry(null, null); | |
+ | |
+ Assert.assertEquals(entry, entry); | |
+ Assert.assertEquals(entry, entry2); | |
+ Assert.assertEquals(entry2, entry); | |
+ Assert.assertEquals(entry.hashCode(), entry2.hashCode()); | |
+ | |
+ Assert.assertFalse(entry.equals(null)); | |
+ Assert.assertFalse(entry.equals(new Object())); | |
+ | |
+ Assert.assertFalse(entry.equals(nullId)); | |
+ Assert.assertFalse(nullId.equals(entry)); | |
+ | |
+ Assert.assertFalse(entry.equals(nullSchema)); | |
+ Assert.assertFalse(nullSchema.equals(entry)); | |
+ | |
+ Assert.assertFalse(entry.equals(idDiffers)); | |
+ Assert.assertFalse(idDiffers.equals(entry)); | |
+ | |
+ Assert.assertFalse(entry.equals(sDiffers)); | |
+ Assert.assertFalse(sDiffers.equals(entry)); | |
+ | |
+ Assert.assertEquals(allNull, allNull2); | |
+ Assert.assertEquals(allNull.hashCode(), allNull2.hashCode()); | |
+ } | |
+ | |
+ | |
+ | |
+} | |
diff --git a/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestSubjectConfig.java b/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestSubjectConfig.java | |
new file mode 100644 | |
index 0000000..c34c626 | |
--- /dev/null | |
+++ b/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestSubjectConfig.java | |
@@ -0,0 +1,85 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
+ * implied. See the License for the specific language governing | |
+ * permissions and limitations under the License. | |
+ */ | |
+ | |
+package org.apache.avro.repo; | |
+ | |
+import java.util.Set; | |
+ | |
+import org.junit.Assert; | |
+import org.junit.Test; | |
+ | |
+ | |
+public class TestSubjectConfig { | |
+ | |
+ @Test | |
+ public void testBuilder() { | |
+ SubjectConfig conf = SubjectConfig.emptyConfig(); | |
+ Assert.assertTrue(conf.getValidators().isEmpty()); | |
+ | |
+ SubjectConfig custom = new SubjectConfig.Builder() | |
+ .set("k", "v") | |
+ .set("repo.validators", "valid1, valid2 ,,") | |
+ .addValidator("oneMore") | |
+ .build(); | |
+ | |
+ Assert.assertEquals("v", custom.get("k")); | |
+ Set<String> validators = custom.getValidators(); | |
+ Assert.assertEquals(3, validators.size()); | |
+ Assert.assertTrue(validators.contains("valid1")); | |
+ Assert.assertTrue(validators.contains("valid2")); | |
+ Assert.assertTrue(validators.contains("oneMore")); | |
+ } | |
+ | |
+ @Test | |
+ public void testBuilderHashAndEquals() { | |
+ SubjectConfig empty = SubjectConfig.emptyConfig(); | |
+ Assert.assertEquals(empty, empty); | |
+ SubjectConfig conf = new SubjectConfig.Builder().build(); | |
+ Assert.assertEquals(empty, conf); | |
+ SubjectConfig conf2 = new SubjectConfig.Builder() | |
+ .set("repo.validators", null) | |
+ .build(); | |
+ Assert.assertEquals(conf, conf2); | |
+ Assert.assertEquals(conf2, empty); | |
+ Assert.assertFalse(conf.equals(null)); | |
+ Assert.assertFalse(conf.equals(new Object())); | |
+ Assert.assertEquals(conf.hashCode(), empty.hashCode()); | |
+ Assert.assertEquals(conf.hashCode(), conf2.hashCode()); | |
+ | |
+ String k = "key"; | |
+ String v = "val"; | |
+ SubjectConfig custom = new SubjectConfig.Builder() | |
+ .set(k, v).build(); | |
+ SubjectConfig custom2 = new SubjectConfig.Builder() | |
+ .set(custom.asMap()).build(); | |
+ SubjectConfig custom3 = new SubjectConfig.Builder() | |
+ .set(custom.asMap()).addValidator("foo").build(); | |
+ Assert.assertEquals(custom, custom2); | |
+ Assert.assertFalse(custom.equals(custom3)); | |
+ Assert.assertFalse(custom.equals(conf)); | |
+ Assert.assertEquals(custom.hashCode(), custom2.hashCode()); | |
+ | |
+ } | |
+ | |
+ @Test(expected=RuntimeException.class) | |
+ public void testInvalidConfigName() { | |
+ new SubjectConfig.Builder() | |
+ .set("repo.notValid", ""); | |
+ } | |
+ | |
+} | |
diff --git a/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestValidatingSubject.java b/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestValidatingSubject.java | |
new file mode 100644 | |
index 0000000..b27ace7 | |
--- /dev/null | |
+++ b/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestValidatingSubject.java | |
@@ -0,0 +1,92 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
+ * implied. See the License for the specific language governing | |
+ * permissions and limitations under the License. | |
+ */ | |
+ | |
+package org.apache.avro.repo; | |
+ | |
+import org.junit.After; | |
+import org.junit.Assert; | |
+import org.junit.Before; | |
+import org.junit.Test; | |
+ | |
+public class TestValidatingSubject { | |
+ private static final String ACCEPT = "accept"; | |
+ private static final String REJECT = "reject"; | |
+ private static final String FOO = "foo"; | |
+ private static final String BAR = "bar"; | |
+ private static final String BAZ = "baz"; | |
+ private static final String ACCEPT_VALIDATOR = "accept.validator"; | |
+ | |
+ private InMemoryRepository repo; | |
+ | |
+ @Before | |
+ public void setUpRepository() { | |
+ repo = new InMemoryRepository(new ValidatorFactory.Builder(). | |
+ setValidator(ACCEPT_VALIDATOR, new Validator(){ | |
+ @Override | |
+ public void validate(String schemaToValidate, | |
+ Iterable<SchemaEntry> schemasInOrder) | |
+ throws SchemaValidationException { | |
+ // nothing | |
+ } | |
+ }).build()); | |
+ } | |
+ | |
+ @After | |
+ public void tearDownRepository() { | |
+ repo = null; | |
+ } | |
+ | |
+ @Test | |
+ public void testSuccessfulValidation() throws SchemaValidationException { | |
+ | |
+ Subject accept = repo.register(ACCEPT, new SubjectConfig.Builder() | |
+ .addValidator(ACCEPT_VALIDATOR).build()); | |
+ | |
+ SchemaEntry foo = accept.registerIfLatest(FOO, null); | |
+ Assert.assertNotNull("failed to register schema", foo); | |
+ SchemaEntry bar = accept.registerIfLatest(BAR, foo); | |
+ Assert.assertNotNull("failed to register schema", bar); | |
+ SchemaEntry none = accept.registerIfLatest("nothing", null); | |
+ Assert.assertNull(none); | |
+ SchemaEntry baz = accept.register(BAZ); | |
+ Assert.assertNotNull("failed to register schema", baz); | |
+ | |
+ } | |
+ | |
+ @Test | |
+ public void testValidatorConstruction() { | |
+ Assert.assertNull("Must pass null through Subject.validateWith()", | |
+ Subject.validatingSubject(null, | |
+ new ValidatorFactory.Builder().build())); | |
+ } | |
+ | |
+ @Test(expected=SchemaValidationException.class) | |
+ public void testCannotRegister() throws SchemaValidationException { | |
+ Subject reject = repo.register(REJECT, new SubjectConfig.Builder() | |
+ .addValidator(ValidatorFactory.REJECT_VALIDATOR).build()); | |
+ reject.register(FOO); | |
+ } | |
+ | |
+ @Test(expected=SchemaValidationException.class) | |
+ public void testCannotRegisterIfLatest() throws SchemaValidationException { | |
+ Subject reject = repo.register(REJECT, new SubjectConfig.Builder() | |
+ .addValidator(ValidatorFactory.REJECT_VALIDATOR).build()); | |
+ reject.registerIfLatest(FOO, null); | |
+ } | |
+ | |
+} | |
diff --git a/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestValidatorFactory.java b/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestValidatorFactory.java | |
new file mode 100644 | |
index 0000000..4af8496 | |
--- /dev/null | |
+++ b/lang/java/schema-repo/common/src/test/java/org/apache/avro/repo/TestValidatorFactory.java | |
@@ -0,0 +1,53 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
+ * implied. See the License for the specific language governing | |
+ * permissions and limitations under the License. | |
+ */ | |
+ | |
+package org.apache.avro.repo; | |
+ | |
+import java.util.HashSet; | |
+ | |
+import org.junit.Assert; | |
+import org.junit.Test; | |
+ | |
+ | |
+public class TestValidatorFactory { | |
+ | |
+ @Test | |
+ public void test() throws SchemaValidationException { | |
+ Validator foo = new Validator() { | |
+ @Override | |
+ public void validate(String schemaToValidate, | |
+ Iterable<SchemaEntry> schemasInOrder) | |
+ throws SchemaValidationException { | |
+ } | |
+ }; | |
+ | |
+ ValidatorFactory fact = new ValidatorFactory.Builder().setValidator("foo", foo).build(); | |
+ HashSet<String> fooset = new HashSet<String>(); | |
+ fooset.add("foo"); | |
+ fooset.add(null); // should ignore | |
+ Assert.assertSame(foo, fact.getValidators(fooset).get(0)); | |
+ fact.getValidators(fooset).get(0).validate(null, null); | |
+ } | |
+ | |
+ @Test(expected=RuntimeException.class) | |
+ public void testInvalidName() { | |
+ new ValidatorFactory.Builder() | |
+ .setValidator("repo.willBreak", null); | |
+ } | |
+ | |
+} | |
diff --git a/lang/java/schema-repo/pom.xml b/lang/java/schema-repo/pom.xml | |
new file mode 100644 | |
index 0000000..e72822a | |
--- /dev/null | |
+++ b/lang/java/schema-repo/pom.xml | |
@@ -0,0 +1,45 @@ | |
+<?xml version="1.0" encoding="UTF-8"?> | |
+<!-- | |
+ Licensed to the Apache Software Foundation (ASF) under one or more | |
+ contributor license agreements. See the NOTICE file distributed with | |
+ this work for additional information regarding copyright ownership. | |
+ The ASF licenses this file to You under the Apache License, Version 2.0 | |
+ (the "License"); you may not use this file except in compliance with | |
+ the License. You may obtain a copy of the License at | |
+ | |
+ http://www.apache.org/licenses/LICENSE-2.0 | |
+ | |
+ Unless required by applicable law or agreed to in writing, software | |
+ distributed under the License is distributed on an "AS IS" BASIS, | |
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ See the License for the specific language governing permissions and | |
+ limitations under the License. | |
+--> | |
+<project | |
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" | |
+ xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> | |
+ <modelVersion>4.0.0</modelVersion> | |
+ | |
+ <parent> | |
+ <artifactId>avro-parent</artifactId> | |
+ <groupId>org.apache.avro</groupId> | |
+ <version>1.7.7-SNAPSHOT</version> | |
+ <relativePath>../</relativePath> | |
+ </parent> | |
+ | |
+ <groupId>org.apache.avro.repo</groupId> | |
+ <artifactId>avro-repo-parent</artifactId> | |
+ | |
+ <name>Apache Avro Schema Repository</name> | |
+ <url>http://avro.apache.org</url> | |
+ <description>Avro Schema Repository parent project</description> | |
+ <packaging>pom</packaging> | |
+ | |
+ <modules> | |
+ <module>common</module> | |
+ <module>server</module> | |
+ <module>client</module> | |
+ <module>bundle</module> | |
+ </modules> | |
+ | |
+</project> | |
diff --git a/lang/java/schema-repo/server/pom.xml b/lang/java/schema-repo/server/pom.xml | |
new file mode 100644 | |
index 0000000..3aedc79 | |
--- /dev/null | |
+++ b/lang/java/schema-repo/server/pom.xml | |
@@ -0,0 +1,74 @@ | |
+<?xml version="1.0" encoding="UTF-8"?> | |
+<!-- | |
+ Licensed to the Apache Software Foundation (ASF) under one or more | |
+ contributor license agreements. See the NOTICE file distributed with | |
+ this work for additional information regarding copyright ownership. | |
+ The ASF licenses this file to You under the Apache License, Version 2.0 | |
+ (the "License"); you may not use this file except in compliance with | |
+ the License. You may obtain a copy of the License at | |
+ | |
+ http://www.apache.org/licenses/LICENSE-2.0 | |
+ | |
+ Unless required by applicable law or agreed to in writing, software | |
+ distributed under the License is distributed on an "AS IS" BASIS, | |
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ See the License for the specific language governing permissions and | |
+ limitations under the License. | |
+--> | |
+<project | |
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" | |
+ xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> | |
+ <modelVersion>4.0.0</modelVersion> | |
+ | |
+ <parent> | |
+ <artifactId>avro-repo-parent</artifactId> | |
+ <groupId>org.apache.avro.repo</groupId> | |
+ <version>1.7.7-SNAPSHOT</version> | |
+ <relativePath>../</relativePath> | |
+ </parent> | |
+ | |
+ <artifactId>avro-repo-server</artifactId> | |
+ | |
+ <name>Apache Avro Schema Repository Server</name> | |
+ <url>http://avro.apache.org</url> | |
+ <description>Avro Schema Repository REST server components</description> | |
+ | |
+ <build> | |
+ </build> | |
+ | |
+ <profiles> | |
+ </profiles> | |
+ | |
+ <dependencies> | |
+ <dependency> | |
+ <groupId>org.apache.avro.repo</groupId> | |
+ <artifactId>avro-repo-common</artifactId> | |
+ <version>${project.version}</version> | |
+ </dependency> | |
+ <dependency> | |
+ <groupId>org.eclipse.jetty</groupId> | |
+ <artifactId>jetty-server</artifactId> | |
+ </dependency> | |
+ <dependency> | |
+ <groupId>org.eclipse.jetty</groupId> | |
+ <artifactId>jetty-servlet</artifactId> | |
+ </dependency> | |
+ <dependency> | |
+ <groupId>com.google.inject</groupId> | |
+ <artifactId>guice</artifactId> | |
+ </dependency> | |
+ <dependency> | |
+ <groupId>com.google.inject.extensions</groupId> | |
+ <artifactId>guice-servlet</artifactId> | |
+ </dependency> | |
+ <dependency> | |
+ <groupId>com.sun.jersey</groupId> | |
+ <artifactId>jersey-server</artifactId> | |
+ </dependency> | |
+ <dependency> | |
+ <groupId>com.sun.jersey.contribs</groupId> | |
+ <artifactId>jersey-guice</artifactId> | |
+ </dependency> | |
+ </dependencies> | |
+ | |
+</project> | |
diff --git a/lang/java/schema-repo/server/src/main/java/org/apache/avro/repo/server/ConfigModule.java b/lang/java/schema-repo/server/src/main/java/org/apache/avro/repo/server/ConfigModule.java | |
new file mode 100644 | |
index 0000000..d8a3e7c | |
--- /dev/null | |
+++ b/lang/java/schema-repo/server/src/main/java/org/apache/avro/repo/server/ConfigModule.java | |
@@ -0,0 +1,100 @@ | |
+package org.apache.avro.repo.server; | |
+ | |
+import java.io.PrintStream; | |
+import java.util.Properties; | |
+ | |
+import javax.inject.Named; | |
+import javax.inject.Singleton; | |
+ | |
+import org.apache.avro.repo.CacheRepository; | |
+import org.apache.avro.repo.InMemoryCache; | |
+import org.apache.avro.repo.Repository; | |
+import org.apache.avro.repo.RepositoryCache; | |
+import org.apache.avro.repo.Validator; | |
+import org.apache.avro.repo.ValidatorFactory; | |
+ | |
+import com.google.inject.Binder; | |
+import com.google.inject.Injector; | |
+import com.google.inject.Key; | |
+import com.google.inject.Module; | |
+import com.google.inject.Provides; | |
+import com.google.inject.TypeLiteral; | |
+import com.google.inject.name.Names; | |
+ | |
+/** | |
+ * A {@link Module} for configuration based on a set of {@link Properties} | |
+ * <br/> | |
+ * Binds every property value in the properties provided to the property name | |
+ * in Guice, making them available with the {@link Named} annotation. Guice | |
+ * will automatically convert these to constant values, such as Integers, | |
+ * Strings, or Class constants. | |
+ * <br/> | |
+ * Keys starting with "validator." bind {@link Validator} classes | |
+ * in a {@link ValidatorFactory}, where the name is the remainder of the key | |
+ * following "validator.". For example, a property | |
+ * "validtator.backwards_compatible=com.foo.BackwardsCompatible" | |
+ * will set a validator named "backwards_compatible" to an instance of the | |
+ * class com.foo.BackwardsCompatible. | |
+ */ | |
+class ConfigModule implements Module { | |
+ static final String JETTY_HOST = "jetty.host"; | |
+ static final String JETTY_PORT = "jetty.port"; | |
+ static final String JETTY_PATH = "jetty.path"; | |
+ static final String JETTY_HEADER_SIZE = "jetty.header.size"; | |
+ static final String JETTY_BUFFER_SIZE = "jetty.buffer.size"; | |
+ private static final String REPO_CLASS = "repo.class"; | |
+ private static final String REPO_CACHE = "repo.cache"; | |
+ | |
+ private static final Properties DEFAULTS = new Properties(); | |
+ static { | |
+ DEFAULTS.setProperty(JETTY_HOST, ""); | |
+ DEFAULTS.setProperty(JETTY_PORT, "2876"); // 'AVRO' on a t-9 keypad | |
+ DEFAULTS.setProperty(JETTY_PATH, "/schema-repo"); | |
+ DEFAULTS.setProperty(JETTY_HEADER_SIZE, "16384"); | |
+ DEFAULTS.setProperty(JETTY_BUFFER_SIZE, "16384"); | |
+ DEFAULTS.setProperty(REPO_CACHE, InMemoryCache.class.getName()); | |
+ } | |
+ | |
+ public static void printDefaults(PrintStream writer) { | |
+ writer.println(DEFAULTS); | |
+ } | |
+ | |
+ private final Properties props; | |
+ | |
+ public ConfigModule(Properties props) { | |
+ Properties copy = new Properties(DEFAULTS); | |
+ copy.putAll(props); | |
+ this.props = copy; | |
+ } | |
+ | |
+ @Override | |
+ public void configure(Binder binder) { | |
+ Names.bindProperties(binder, props); | |
+ } | |
+ | |
+ @Provides | |
+ @Singleton | |
+ Repository provideRepository(Injector injector, | |
+ @Named(REPO_CLASS) Class<Repository> repoClass, | |
+ @Named(REPO_CACHE) Class<RepositoryCache> cacheClass) { | |
+ Repository repo = injector.getInstance(repoClass); | |
+ RepositoryCache cache = injector.getInstance(cacheClass); | |
+ return new CacheRepository(repo, cache); | |
+ } | |
+ | |
+ @Provides | |
+ @Singleton | |
+ ValidatorFactory provideValidatorFactory(Injector injector) { | |
+ ValidatorFactory.Builder builder = new ValidatorFactory.Builder(); | |
+ for(String prop : props.stringPropertyNames()) { | |
+ if (prop.startsWith("validator.")) { | |
+ String validatorName = prop.substring("validator.".length()); | |
+ Class<Validator> validatorClass = injector.getInstance( | |
+ Key.<Class<Validator>>get( | |
+ new TypeLiteral<Class<Validator>>(){}, Names.named(prop))); | |
+ builder.setValidator(validatorName, injector.getInstance(validatorClass)); | |
+ } | |
+ } | |
+ return builder.build(); | |
+ } | |
+} | |
diff --git a/lang/java/schema-repo/server/src/main/java/org/apache/avro/repo/server/RESTRepository.java b/lang/java/schema-repo/server/src/main/java/org/apache/avro/repo/server/RESTRepository.java | |
new file mode 100644 | |
index 0000000..88fa43a | |
--- /dev/null | |
+++ b/lang/java/schema-repo/server/src/main/java/org/apache/avro/repo/server/RESTRepository.java | |
@@ -0,0 +1,300 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
+ * implied. See the License for the specific language governing | |
+ * permissions and limitations under the License. | |
+ */ | |
+ | |
+package org.apache.avro.repo.server; | |
+ | |
+import java.io.IOException; | |
+import java.io.StringWriter; | |
+import java.util.List; | |
+import java.util.Map; | |
+import java.util.Properties; | |
+ | |
+import javax.inject.Inject; | |
+import javax.inject.Singleton; | |
+import javax.ws.rs.Consumes; | |
+import javax.ws.rs.GET; | |
+import javax.ws.rs.POST; | |
+import javax.ws.rs.PUT; | |
+import javax.ws.rs.Path; | |
+import javax.ws.rs.PathParam; | |
+import javax.ws.rs.Produces; | |
+import javax.ws.rs.core.MediaType; | |
+import javax.ws.rs.core.MultivaluedMap; | |
+import javax.ws.rs.core.Response; | |
+import javax.ws.rs.core.Response.Status; | |
+ | |
+import org.apache.avro.repo.Repository; | |
+import org.apache.avro.repo.RepositoryUtil; | |
+import org.apache.avro.repo.SchemaEntry; | |
+import org.apache.avro.repo.SchemaValidationException; | |
+import org.apache.avro.repo.Subject; | |
+import org.apache.avro.repo.SubjectConfig; | |
+ | |
+import com.sun.jersey.api.NotFoundException; | |
+ | |
+/** | |
+ * {@link RESTRepository} Is a JSR-311 REST Interface to a {@link Repository}. | |
+ * | |
+ * Combine with {@link RepositoryServer} to run an embedded REST server. | |
+ */ | |
+@Singleton | |
+@Produces(MediaType.TEXT_PLAIN) | |
+@Path("/") | |
+public class RESTRepository { | |
+ | |
+ private final Repository repo; | |
+ | |
+ /** | |
+ * Create a {@link RESTRepository} that wraps a given {@link Repository} | |
+ * Typically the wrapped repository is a | |
+ * {@link org.apache.avro.repo.CacheRepository} that wraps a non-caching | |
+ * underlying repository. | |
+ * | |
+ * @param repo | |
+ * The {@link Repository} to wrap. | |
+ */ | |
+ @Inject | |
+ public RESTRepository(Repository repo) { | |
+ this.repo = repo; | |
+ } | |
+ | |
+ /** | |
+ * @return All subjects in the repository, serialized with | |
+ * {@link RepositoryUtil#subjectsToString(Iterable)} | |
+ */ | |
+ @GET | |
+ public String allSubjects() { | |
+ return RepositoryUtil.subjectsToString(repo.subjects()); | |
+ } | |
+ | |
+ /** | |
+ * Returns all schemas in the given subject, serialized wity | |
+ * {@link RepositoryUtil#schemasToString(Iterable)} | |
+ * | |
+ * @param subject | |
+ * The name of the subject | |
+ * @return all schemas in the subject. Return a 404 Not Found if there is no | |
+ * such subject | |
+ */ | |
+ @GET | |
+ @Path("{subject}/all") | |
+ public String subjectList(@PathParam("subject") String subject) { | |
+ Subject s = repo.lookup(subject); | |
+ if (null == s) { | |
+ throw new NotFoundException(); | |
+ } | |
+ String result = RepositoryUtil.schemasToString(s.allEntries()); | |
+ return result; | |
+ } | |
+ | |
+ @GET | |
+ @Path("{subject}/config") | |
+ public String subjectConfig(@PathParam("subject") String subject) { | |
+ Subject s = repo.lookup(subject); | |
+ if (null == s) { | |
+ throw new NotFoundException(); | |
+ } | |
+ Properties props = new Properties(); | |
+ props.putAll(s.getConfig().asMap()); | |
+ StringWriter writer = new StringWriter(); | |
+ try { | |
+ props.store(writer, null); | |
+ } catch (IOException e) { | |
+ // stringWriter can't throw ... but just in case | |
+ throw new RuntimeException(e); | |
+ } | |
+ return writer.toString(); | |
+ } | |
+ | |
+ /** | |
+ * Create a subejct if it does not already exist. | |
+ * | |
+ * @param subject | |
+ * the name of the subject | |
+ * @param configParams | |
+ * the configuration values for the Subject, as form parameters | |
+ * @return the subject name in a 200 response if successful. HTTP 404 if the | |
+ * subject does not exist, or HTTP 409 if there was a conflict | |
+ * creating the subject | |
+ */ | |
+ @PUT | |
+ @Path("{subject}") | |
+ @Consumes(MediaType.APPLICATION_FORM_URLENCODED) | |
+ public Response createSubject(@PathParam("subject") String subject, | |
+ MultivaluedMap<String, String> configParams) { | |
+ if (null == subject) { | |
+ return Response.status(400).build(); | |
+ } | |
+ SubjectConfig.Builder builder = new SubjectConfig.Builder(); | |
+ for(Map.Entry<String, List<String>> entry : configParams.entrySet()) { | |
+ List<String> val = entry.getValue(); | |
+ if(val.size() > 0) { | |
+ builder.set(entry.getKey(), val.get(0)); | |
+ } | |
+ } | |
+ Subject created = repo.register(subject, builder.build()); | |
+ return Response.ok(created.getName()).build(); | |
+ } | |
+ | |
+ /** | |
+ * Get the latest schema for a subject | |
+ * | |
+ * @param subject | |
+ * the name of the subject | |
+ * @return A 200 response with {@link SchemaEntry#toString()} as the body, or | |
+ * a 404 response if either the subject or latest schema is not found. | |
+ */ | |
+ @GET | |
+ @Path("{subject}/latest") | |
+ public String latest(@PathParam("subject") String subject) { | |
+ return exists(getSubject(subject).latest()).toString(); | |
+ } | |
+ | |
+ /** | |
+ * Look up a schema by subject + id pair. | |
+ * | |
+ * @param subject | |
+ * the name of the subject | |
+ * @param id | |
+ * the id of the schema | |
+ * @return A 200 response with the schema as the body, or a 404 response if | |
+ * the subject or schema is not found | |
+ */ | |
+ @GET | |
+ @Path("{subject}/id/{id}") | |
+ public String schemaFromId(@PathParam("subject") String subject, | |
+ @PathParam("id") String id) { | |
+ return exists(getSubject(subject).lookupById(id)).getSchema(); | |
+ } | |
+ | |
+ /** | |
+ * Look up an id by a subject + schema pair. | |
+ * | |
+ * @param subject | |
+ * the name of the subject | |
+ * @param schema | |
+ * the schema to search for | |
+ * @return A 200 response with the id in the body, or a 404 response if the | |
+ * subject or schema is not found | |
+ */ | |
+ @POST | |
+ @Path("{subject}/schema") | |
+ @Consumes(MediaType.TEXT_PLAIN) | |
+ public String idFromSchema(@PathParam("subject") String subject, String schema) { | |
+ return exists(getSubject(subject).lookupBySchema(schema)).getId(); | |
+ } | |
+ | |
+ /** | |
+ * Register a schema with a subject | |
+ * | |
+ * @param subject | |
+ * The subject name to register the schema in | |
+ * @param schema | |
+ * The schema to register | |
+ * @return A 200 response with the corresponding id if successful, a 403 | |
+ * forbidden response if the schema fails validation, or a 404 not | |
+ * found response if the subject does not exist | |
+ */ | |
+ @PUT | |
+ @Path("{subject}/register") | |
+ @Consumes(MediaType.TEXT_PLAIN) | |
+ public Response addSchema(@PathParam("subject") String subject, String schema) { | |
+ try { | |
+ return Response.ok(getSubject(subject).register(schema).getId()).build(); | |
+ } catch (SchemaValidationException e) { | |
+ return Response.status(Status.FORBIDDEN).build(); | |
+ } | |
+ } | |
+ | |
+ /** | |
+ * Register a schema with a subject, only if the latest schema equals the | |
+ * expected value. This is for resolving race conditions between multiple | |
+ * registrations and schema invalidation events in underlying repositories. | |
+ * | |
+ * @param subject | |
+ * the name of the subject | |
+ * @param latestId | |
+ * the latest schema id, possibly null | |
+ * @param schema | |
+ * the schema to attempt to register | |
+ * @return a 200 response with the id of the newly registered schema, or a 404 | |
+ * response if the subject or id does not exist or a 409 conflict if | |
+ * the id does not match the latest id or a 403 forbidden response if | |
+ * the schema failed validation | |
+ */ | |
+ @PUT | |
+ @Path("{subject}/register_if_latest/{latestId: .*}") | |
+ @Consumes(MediaType.TEXT_PLAIN) | |
+ public Response addSchema(@PathParam("subject") String subject, | |
+ @PathParam("latestId") String latestId, String schema) { | |
+ Subject s = getSubject(subject); | |
+ SchemaEntry latest; | |
+ if ("".equals(latestId)) { | |
+ latest = null; | |
+ } else { | |
+ latest = exists(s.lookupById(latestId)); | |
+ } | |
+ SchemaEntry created = null; | |
+ try { | |
+ created = s.registerIfLatest(schema, latest); | |
+ if (null == created) { | |
+ return Response.status(Status.CONFLICT).build(); | |
+ } | |
+ return Response.ok(created.getId()).build(); | |
+ } catch (SchemaValidationException e) { | |
+ return Response.status(Status.FORBIDDEN).build(); | |
+ } | |
+ } | |
+ | |
+ /** | |
+ * Get a subject | |
+ * | |
+ * @param subject | |
+ * the name of the subject | |
+ * @return a 200 response if the subject exists, or a 404 response if the | |
+ * subject does not. | |
+ */ | |
+ @GET | |
+ @Path("{subject}") | |
+ public Response checkSubject(@PathParam("subject") String subject) { | |
+ getSubject(subject); | |
+ return Response.ok().build(); | |
+ } | |
+ | |
+ @GET | |
+ @Path("{subject}/integral") | |
+ public String getSubjectIntegralKeys(@PathParam("subject") String subject) { | |
+ return Boolean.toString(getSubject(subject).integralKeys()); | |
+ } | |
+ | |
+ private Subject getSubject(String subjectName) { | |
+ Subject subject = repo.lookup(subjectName); | |
+ if (null == subject) { | |
+ throw new NotFoundException(); | |
+ } | |
+ return subject; | |
+ } | |
+ | |
+ private SchemaEntry exists(SchemaEntry entry) { | |
+ if (null == entry) { | |
+ throw new NotFoundException(); | |
+ } | |
+ return entry; | |
+ } | |
+ | |
+} | |
diff --git a/lang/java/schema-repo/server/src/main/java/org/apache/avro/repo/server/RepositoryServer.java b/lang/java/schema-repo/server/src/main/java/org/apache/avro/repo/server/RepositoryServer.java | |
new file mode 100644 | |
index 0000000..6620cfa | |
--- /dev/null | |
+++ b/lang/java/schema-repo/server/src/main/java/org/apache/avro/repo/server/RepositoryServer.java | |
@@ -0,0 +1,162 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
+ * implied. See the License for the specific language governing | |
+ * permissions and limitations under the License. | |
+ */ | |
+ | |
+package org.apache.avro.repo.server; | |
+ | |
+import java.io.BufferedInputStream; | |
+import java.io.File; | |
+import java.io.FileInputStream; | |
+import java.util.Properties; | |
+ | |
+import javax.inject.Named; | |
+import javax.inject.Singleton; | |
+import javax.servlet.http.HttpServlet; | |
+ | |
+import org.eclipse.jetty.server.Connector; | |
+import org.eclipse.jetty.server.Server; | |
+import org.eclipse.jetty.server.nio.SelectChannelConnector; | |
+import org.eclipse.jetty.servlet.FilterHolder; | |
+import org.eclipse.jetty.servlet.ServletContextHandler; | |
+ | |
+import com.google.inject.Guice; | |
+import com.google.inject.Injector; | |
+import com.google.inject.Provides; | |
+import com.google.inject.servlet.GuiceFilter; | |
+import com.sun.jersey.guice.JerseyServletModule; | |
+import com.sun.jersey.guice.spi.container.servlet.GuiceContainer; | |
+ | |
+import static java.lang.Thread.sleep; | |
+ | |
+/** | |
+ * A {@link RepositoryServer} is a stand-alone server for running a | |
+ * {@link RESTRepository}. {@link #main(String...)} takes a single argument | |
+ * containing a property file for configuration. <br/> | |
+ * <br/> | |
+ * | |
+ */ | |
+public class RepositoryServer { | |
+ private final Server server; | |
+ | |
+ /** | |
+ * Constructs an instance of this class, overlaying the default properties | |
+ * with any identically-named properties in the supplied {@link Properties} | |
+ * instance. | |
+ * | |
+ * @param props | |
+ * Property values for overriding the defaults. | |
+ * <p> | |
+ * <b><i>Any overriding properties must be supplied as type </i> | |
+ * <code>String</code><i> or they will not work and the default | |
+ * values will be used.</i></b> | |
+ * | |
+ */ | |
+ public RepositoryServer(Properties props) { | |
+ Injector injector = Guice.createInjector( | |
+ new ConfigModule(props), | |
+ new ServerModule()); | |
+ this.server = injector.getInstance(Server.class); | |
+ } | |
+ | |
+ public static void main(String... args) throws Exception { | |
+ if (args.length != 1) { | |
+ printHelp(); | |
+ System.exit(1); | |
+ } | |
+ File config = new File(args[0]); | |
+ if (!config.canRead()) { | |
+ System.err.println("Cannot read file: " + config); | |
+ printHelp(); | |
+ System.exit(1); | |
+ } | |
+ Properties props = new Properties(); | |
+ props.load(new BufferedInputStream(new FileInputStream(config))); | |
+sleep(3000); //TODO DELETE!!! FOR DEBUG ONLY | |
+ RepositoryServer server = new RepositoryServer(props); | |
+ try { | |
+ server.start(); | |
+ server.join(); | |
+ } finally { | |
+ server.stop(); | |
+ } | |
+ } | |
+ | |
+ public void start() throws Exception { | |
+ server.start(); | |
+ } | |
+ | |
+ public void join() throws InterruptedException { | |
+ server.join(); | |
+ } | |
+ | |
+ public void stop() throws Exception { | |
+ server.stop(); | |
+ } | |
+ | |
+ private static void printHelp() { | |
+ System.err.println("One argument expected containing a configuration " | |
+ + "properties file. Default properties are:"); | |
+ ConfigModule.printDefaults(System.err); | |
+ } | |
+ | |
+ private static class ServerModule extends JerseyServletModule { | |
+ | |
+ @Override | |
+ protected void configureServlets() { | |
+ bind(Connector.class).to(SelectChannelConnector.class); | |
+ serve("/*").with(GuiceContainer.class); | |
+ bind(RESTRepository.class); | |
+ } | |
+ | |
+ @Provides | |
+ @Singleton | |
+ public Server provideServer( | |
+ @Named(ConfigModule.JETTY_HOST) String host, | |
+ @Named(ConfigModule.JETTY_PORT) Integer port, | |
+ @Named(ConfigModule.JETTY_PATH) String path, | |
+ @Named(ConfigModule.JETTY_HEADER_SIZE) Integer headerSize, | |
+ @Named(ConfigModule.JETTY_BUFFER_SIZE) Integer bufferSize, | |
+ Connector connector, | |
+ GuiceFilter guiceFilter, | |
+ ServletContextHandler handler) { | |
+ | |
+ Server server = new Server(); | |
+ if (null != host && !host.isEmpty()) { | |
+ connector.setHost(host); | |
+ } | |
+ connector.setPort(port); | |
+ connector.setRequestHeaderSize(headerSize); | |
+ connector.setRequestBufferSize(bufferSize); | |
+ server.setConnectors(new Connector[] { connector }); | |
+ | |
+ // the guice filter intercepts all inbound requests and uses its bindings | |
+ // for servlets | |
+ FilterHolder holder = new FilterHolder(guiceFilter); | |
+ handler.addFilter(holder, "/*", null); | |
+ handler.addServlet(NoneServlet.class, "/"); | |
+ handler.setContextPath(path); | |
+ server.setHandler(handler); | |
+ server.dumpStdErr(); | |
+ return server; | |
+ } | |
+ | |
+ private static final class NoneServlet extends HttpServlet { | |
+ private static final long serialVersionUID = 4560115319373180139L; | |
+ } | |
+ } | |
+ | |
+} | |
diff --git a/lang/java/schema-repo/server/src/test/java/org/apache/avro/repo/server/TestConfigModule.java b/lang/java/schema-repo/server/src/test/java/org/apache/avro/repo/server/TestConfigModule.java | |
new file mode 100644 | |
index 0000000..dc63b9c | |
--- /dev/null | |
+++ b/lang/java/schema-repo/server/src/test/java/org/apache/avro/repo/server/TestConfigModule.java | |
@@ -0,0 +1,52 @@ | |
+package org.apache.avro.repo.server; | |
+ | |
+import java.util.Properties; | |
+ | |
+import org.apache.avro.repo.InMemoryRepository; | |
+import org.apache.avro.repo.Repository; | |
+import org.apache.avro.repo.SchemaEntry; | |
+import org.apache.avro.repo.SchemaValidationException; | |
+import org.apache.avro.repo.Subject; | |
+import org.apache.avro.repo.SubjectConfig; | |
+import org.apache.avro.repo.Validator; | |
+import org.junit.Assert; | |
+import org.junit.Test; | |
+ | |
+import com.google.inject.Guice; | |
+import com.google.inject.Injector; | |
+ | |
+public class TestConfigModule { | |
+ | |
+ @Test | |
+ public void testConfig() { | |
+ Properties props = new Properties(); | |
+ props.setProperty("repo.class", InMemoryRepository.class.getName()); | |
+ props.put("validator.rejectAll", Reject.class.getName()); | |
+ ConfigModule module = new ConfigModule(props); | |
+ Injector injector = Guice.createInjector(module); | |
+ Repository repo = injector.getInstance(Repository.class); | |
+ Subject rejects = repo.register("rejects", new SubjectConfig.Builder() | |
+ .addValidator("rejectAll").build()); | |
+ boolean threw = false; | |
+ try { | |
+ rejects.register("stuff"); | |
+ } catch (SchemaValidationException se) { | |
+ threw = true; | |
+ } | |
+ Assert.assertTrue(threw); | |
+ } | |
+ | |
+ @Test | |
+ public void testPrintDefaults() { | |
+ ConfigModule.printDefaults(System.out); | |
+ } | |
+ | |
+ public static class Reject implements Validator { | |
+ @Override | |
+ public void validate(String schemaToValidate, | |
+ Iterable<SchemaEntry> schemasInOrder) throws SchemaValidationException { | |
+ throw new SchemaValidationException("no"); | |
+ } | |
+ } | |
+ | |
+} | |
diff --git a/lang/java/schema-repo/server/src/test/java/org/apache/avro/repo/server/TestRESTRepository.java b/lang/java/schema-repo/server/src/test/java/org/apache/avro/repo/server/TestRESTRepository.java | |
new file mode 100644 | |
index 0000000..a0b40ec | |
--- /dev/null | |
+++ b/lang/java/schema-repo/server/src/test/java/org/apache/avro/repo/server/TestRESTRepository.java | |
@@ -0,0 +1,57 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
+ * implied. See the License for the specific language governing | |
+ * permissions and limitations under the License. | |
+ */ | |
+ | |
+package org.apache.avro.repo.server; | |
+ | |
+import org.apache.avro.repo.InMemoryRepository; | |
+import org.apache.avro.repo.ValidatorFactory; | |
+import org.junit.After; | |
+import org.junit.Assert; | |
+import org.junit.Before; | |
+import org.junit.Test; | |
+ | |
+import com.sun.jersey.api.NotFoundException; | |
+ | |
+public class TestRESTRepository { | |
+ RESTRepository repo; | |
+ | |
+ @Before | |
+ public void setUp() { | |
+ repo = new RESTRepository(new InMemoryRepository(new ValidatorFactory.Builder().build())); | |
+ } | |
+ | |
+ @After | |
+ public void tearDown() { | |
+ repo = null; | |
+ } | |
+ | |
+ @Test(expected=NotFoundException.class) | |
+ public void testNonExistentSubjectList() throws Exception { | |
+ repo.subjectList("nothing"); | |
+ } | |
+ | |
+ @Test(expected=NotFoundException.class) | |
+ public void testNonExistentSubjectGetConfig() throws Exception { | |
+ repo.subjectConfig("nothing"); | |
+ } | |
+ | |
+ @Test | |
+ public void testCreateNullSubject() { | |
+ Assert.assertEquals(400, repo.createSubject(null, null).getStatus()); | |
+ } | |
+} | |
\ No newline at end of file | |
diff --git a/lang/java/schema-repo/server/src/test/java/org/apache/avro/repo/server/TestRepo.java b/lang/java/schema-repo/server/src/test/java/org/apache/avro/repo/server/TestRepo.java | |
new file mode 100644 | |
index 0000000..2c29e07 | |
--- /dev/null | |
+++ b/lang/java/schema-repo/server/src/test/java/org/apache/avro/repo/server/TestRepo.java | |
@@ -0,0 +1,41 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
+ * implied. See the License for the specific language governing | |
+ * permissions and limitations under the License. | |
+ */ | |
+ | |
+package org.apache.avro.repo.server; | |
+ | |
+import java.util.Properties; | |
+ | |
+import org.apache.avro.repo.InMemoryRepository; | |
+import org.junit.Test; | |
+ | |
+public class TestRepo { | |
+ | |
+ @Test | |
+ public void testRepoInit() throws Exception { | |
+ Properties props = new Properties(); | |
+ props.setProperty("repo.class", InMemoryRepository.class.getName()); | |
+ props.setProperty("jetty.port", "6782"); | |
+ | |
+ RepositoryServer server = new RepositoryServer(props); | |
+ | |
+ server.start(); | |
+ | |
+ server.stop(); | |
+ } | |
+ | |
+} | |
\ No newline at end of file | |
diff --git a/lang/java/tools/pom.xml b/lang/java/tools/pom.xml | |
index 6e0fe83..23c516b 100644 | |
--- a/lang/java/tools/pom.xml | |
+++ b/lang/java/tools/pom.xml | |
@@ -141,7 +141,6 @@ | |
<dependency> | |
<groupId>org.codehaus.jackson</groupId> | |
<artifactId>jackson-core-asl</artifactId> | |
- <version>${jackson.version}</version> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.hadoop</groupId> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment