Last active
December 6, 2024 21:58
-
-
Save farhin23/68521f689e6fc89106118c5f031dc353 to your computer and use it in GitHub Desktop.
FederatedCatalog RecurringExecutionPlan Test
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.eclipse.edc.end2end; | |
import com.fasterxml.jackson.databind.ObjectMapper; | |
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; | |
import jakarta.json.Json; | |
import org.eclipse.edc.catalog.directory.InMemoryNodeDirectory; | |
import org.eclipse.edc.catalog.spi.CatalogConstants; | |
import org.eclipse.edc.catalog.transform.JsonObjectToCatalogTransformer; | |
import org.eclipse.edc.catalog.transform.JsonObjectToDataServiceTransformer; | |
import org.eclipse.edc.catalog.transform.JsonObjectToDatasetTransformer; | |
import org.eclipse.edc.catalog.transform.JsonObjectToDistributionTransformer; | |
import org.eclipse.edc.connector.controlplane.catalog.spi.Catalog; | |
import org.eclipse.edc.connector.controlplane.transform.odrl.from.JsonObjectFromPolicyTransformer; | |
import org.eclipse.edc.connector.core.agent.NoOpParticipantIdMapper; | |
import org.eclipse.edc.crawler.spi.TargetNode; | |
import org.eclipse.edc.crawler.spi.TargetNodeDirectory; | |
import org.eclipse.edc.jsonld.TitaniumJsonLd; | |
import org.eclipse.edc.jsonld.util.JacksonJsonLd; | |
import org.eclipse.edc.junit.annotations.EndToEndTest; | |
import org.eclipse.edc.junit.extensions.EmbeddedRuntime; | |
import org.eclipse.edc.junit.extensions.RuntimeExtension; | |
import org.eclipse.edc.junit.extensions.RuntimePerClassExtension; | |
import org.eclipse.edc.junit.extensions.RuntimePerMethodExtension; | |
import org.eclipse.edc.protocol.dsp.catalog.transform.from.JsonObjectFromCatalogTransformer; | |
import org.eclipse.edc.protocol.dsp.catalog.transform.from.JsonObjectFromDataServiceTransformer; | |
import org.eclipse.edc.protocol.dsp.catalog.transform.from.JsonObjectFromDatasetTransformer; | |
import org.eclipse.edc.protocol.dsp.catalog.transform.from.JsonObjectFromDistributionTransformer; | |
import org.eclipse.edc.spi.monitor.Monitor; | |
import org.eclipse.edc.spi.result.Result; | |
import org.eclipse.edc.transform.TypeTransformerRegistryImpl; | |
import org.eclipse.edc.transform.spi.TypeTransformerRegistry; | |
import org.eclipse.edc.transform.transformer.edc.to.JsonValueToGenericTypeTransformer; | |
import org.junit.jupiter.api.BeforeEach; | |
import org.junit.jupiter.api.Nested; | |
import org.junit.jupiter.api.Test; | |
import org.junit.jupiter.api.TestInfo; | |
import org.junit.jupiter.api.extension.RegisterExtension; | |
import java.time.Duration; | |
import java.util.Base64; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.UUID; | |
import static java.time.Duration.ofSeconds; | |
import static java.util.Optional.ofNullable; | |
import static org.assertj.core.api.Assertions.assertThat; | |
import static org.awaitility.Awaitility.await; | |
import static org.eclipse.edc.connector.controlplane.transform.odrl.OdrlTransformersFactory.jsonObjectToOdrlTransformers; | |
import static org.eclipse.edc.end2end.TestFunctions.createContractDef; | |
import static org.eclipse.edc.end2end.TestFunctions.createPolicy; | |
import static org.eclipse.edc.util.io.Ports.getFreePort; | |
import static org.mockito.Mockito.mock; | |
@EndToEndTest | |
class FcRecurringExecutionPlanTest { | |
public static final Duration TIMEOUT = ofSeconds(60); | |
private static final Endpoint CONNECTOR_MANAGEMENT = new Endpoint("/management", "8081"); | |
private static final Endpoint CONNECTOR_PROTOCOL = new Endpoint("/api/v1/dsp", "8082"); | |
private static final Endpoint CONNECTOR_DEFAULT = new Endpoint("/api/v1/", "8080"); | |
private static final Endpoint CONNECTOR_CONTROL = new Endpoint("/api/v1/control", "8083"); | |
private static final Endpoint CATALOG_MANAGEMENT = new Endpoint("/management", "8091"); | |
private static final Endpoint CATALOG_PROTOCOL = new Endpoint("/api/v1/dsp", "8092"); | |
private static final Endpoint CATALOG_DEFAULT = new Endpoint("/api/v1/", "8090"); | |
private static final Endpoint CATALOG_CATALOG = new Endpoint("/catalog", "8093"); | |
private final TypeTransformerRegistry typeTransformerRegistry = new TypeTransformerRegistryImpl(); | |
private final ObjectMapper mapper = JacksonJsonLd.createObjectMapper(); | |
@BeforeEach | |
void setUp() { | |
//needed for ZonedDateTime | |
mapper.registerModule(new JavaTimeModule()); | |
var factory = Json.createBuilderFactory(Map.of()); | |
var participantIdMapper = new NoOpParticipantIdMapper(); | |
typeTransformerRegistry.register(new JsonObjectFromCatalogTransformer(factory, mapper, participantIdMapper)); | |
typeTransformerRegistry.register(new JsonObjectFromDatasetTransformer(factory, mapper)); | |
typeTransformerRegistry.register(new JsonObjectFromDataServiceTransformer(factory)); | |
typeTransformerRegistry.register(new JsonObjectFromPolicyTransformer(factory, participantIdMapper)); | |
typeTransformerRegistry.register(new JsonObjectFromDistributionTransformer(factory)); | |
typeTransformerRegistry.register(new JsonObjectToCatalogTransformer()); | |
typeTransformerRegistry.register(new JsonObjectToDatasetTransformer()); | |
typeTransformerRegistry.register(new JsonObjectToDataServiceTransformer()); | |
jsonObjectToOdrlTransformers(participantIdMapper).forEach(typeTransformerRegistry::register); | |
typeTransformerRegistry.register(new JsonObjectToDistributionTransformer()); | |
typeTransformerRegistry.register(new JsonValueToGenericTypeTransformer(mapper)); | |
} | |
@Nested | |
class CatalogTwo { | |
@RegisterExtension | |
static RuntimeExtension connector = getConnectorRuntime(CONNECTOR_DEFAULT, CONNECTOR_PROTOCOL, CONNECTOR_CONTROL, CONNECTOR_MANAGEMENT); | |
@RegisterExtension | |
static RuntimeExtension catalog = getCatalogRuntime("catalog-two", CATALOG_DEFAULT, CATALOG_PROTOCOL, CATALOG_CATALOG, CATALOG_MANAGEMENT); | |
private final CatalogApiClient apiClient = new CatalogApiClient(CATALOG_CATALOG, CONNECTOR_MANAGEMENT, mapper, new TitaniumJsonLd(mock(Monitor.class)), typeTransformerRegistry); | |
@BeforeEach | |
void setUp() { | |
var directory = new InMemoryNodeDirectory(); | |
directory.insert(new TargetNode("connector", "did:web:" + UUID.randomUUID(), "http://localhost:%s%s".formatted(CONNECTOR_PROTOCOL.port(), CONNECTOR_PROTOCOL.path()), List.of(CatalogConstants.DATASPACE_PROTOCOL))); | |
catalog.registerServiceMock(TargetNodeDirectory.class, directory); | |
} | |
@Test | |
void crawl_whenOfferAvailable_shouldContainOffer(TestInfo testInfo) { | |
var id = String.format("%s-%s", testInfo.getDisplayName(), UUID.randomUUID()); | |
var assetIdBase64 = setContract(id, apiClient); | |
// act-assert | |
await().pollDelay(ofSeconds(1)) | |
.pollInterval(ofSeconds(1)) | |
.atMost(TIMEOUT) | |
.untilAsserted(() -> { | |
// With empty query | |
var emptyQuery = TestFunctions.createEmptyQuery(); | |
var catalogs = apiClient.getCatalogs(emptyQuery); | |
assertCatalogContainsOffer(assetIdBase64, catalogs); | |
// With query containing a filter expression for existing asset. | |
var queryWithExistingAssetId = TestFunctions.createQuerySpecWithFilterExpressionForAssetId(id); | |
catalogs = apiClient.getCatalogs(queryWithExistingAssetId); | |
assertCatalogContainsOffer(assetIdBase64, catalogs); | |
}); | |
} | |
} | |
@Nested | |
class CatalogOne { | |
@RegisterExtension | |
static RuntimeExtension connector = getConnectorRuntime(CONNECTOR_DEFAULT, CONNECTOR_PROTOCOL, CONNECTOR_CONTROL, CONNECTOR_MANAGEMENT); | |
@RegisterExtension | |
static RuntimeExtension catalog = getCatalogRuntime("catalog-one", CATALOG_DEFAULT, CATALOG_PROTOCOL, CATALOG_CATALOG, CATALOG_MANAGEMENT); | |
private final CatalogApiClient apiClient = new CatalogApiClient(CATALOG_CATALOG, CONNECTOR_MANAGEMENT, mapper, new TitaniumJsonLd(mock(Monitor.class)), typeTransformerRegistry); | |
@BeforeEach | |
void setUp() { | |
var directory = new InMemoryNodeDirectory(); | |
directory.insert(new TargetNode("connector", "did:web:" + UUID.randomUUID(), "http://localhost:%s%s".formatted(CONNECTOR_PROTOCOL.port(), CONNECTOR_PROTOCOL.path()), List.of(CatalogConstants.DATASPACE_PROTOCOL))); | |
catalog.registerServiceMock(TargetNodeDirectory.class, directory); | |
} | |
@Test | |
void crawl_whenOfferAvailable_shouldContainOffer(TestInfo testInfo) { | |
// setup | |
var id = String.format("%s-%s", testInfo.getDisplayName(), UUID.randomUUID()); | |
var assetIdBase64 = setContract(id, apiClient); | |
// act-assert | |
await().pollDelay(ofSeconds(1)) | |
.pollInterval(ofSeconds(1)) | |
.atMost(TIMEOUT) | |
.untilAsserted(() -> { | |
// With empty query | |
var emptyQuery = TestFunctions.createEmptyQuery(); | |
var catalogs = apiClient.getCatalogs(emptyQuery); | |
assertCatalogContainsOffer(assetIdBase64, catalogs); | |
// With query containing a filter expression for existing asset. | |
var queryWithExistingAssetId = TestFunctions.createQuerySpecWithFilterExpressionForAssetId(id); | |
catalogs = apiClient.getCatalogs(queryWithExistingAssetId); | |
assertCatalogContainsOffer(assetIdBase64, catalogs); | |
}); | |
} | |
} | |
private String setContract (String id, CatalogApiClient apiClient) { | |
var asset = TestFunctions.createAssetJson(id); | |
var r = apiClient.postAsset(asset); | |
assertThat(r.succeeded()).withFailMessage(getError(r)).isTrue(); | |
var assetId = r.getContent(); | |
var policyId = "policy-" + id; | |
var policy = createPolicy(policyId, id); | |
var pr = apiClient.postPolicy(policy); | |
assertThat(r.succeeded()).withFailMessage(getError(pr)).isTrue(); | |
policyId = pr.getContent(); | |
var request = createContractDef("def-" + id, policyId, policyId, assetId); | |
var dr = apiClient.postContractDefinition(request); | |
assertThat(dr.succeeded()).withFailMessage(getError(dr)).isTrue(); | |
return Base64.getEncoder().encodeToString(assetId.getBytes()); | |
} | |
private String getError(Result<String> r) { | |
return ofNullable(r.getFailureDetail()).orElse("No error"); | |
} | |
private void assertCatalogContainsOffer(String assetIdBase64, List<Catalog> catalogs) { | |
assertThat(catalogs).hasSizeGreaterThanOrEqualTo(1); | |
assertThat(catalogs).anySatisfy(catalog -> assertThat(catalog.getDatasets()) | |
.anySatisfy(dataset -> { | |
assertThat(dataset.getOffers()).hasSizeGreaterThanOrEqualTo(1); | |
assertThat(dataset.getOffers().keySet()).anyMatch(key -> key.contains(assetIdBase64)); | |
})); | |
} | |
private static Map<String, String> configOf(String... keyValuePairs) { | |
if (keyValuePairs.length % 2 != 0) { | |
throw new IllegalArgumentException("Must have an even number of key value pairs, was " + keyValuePairs.length); | |
} | |
var map = new HashMap<String, String>(); | |
for (int i = 0; i < keyValuePairs.length - 1; i += 2) { | |
map.put(keyValuePairs[i], keyValuePairs[i + 1]); | |
} | |
return map; | |
} | |
private static RuntimeExtension getConnectorRuntime(Endpoint defaultEp, Endpoint protocol, Endpoint control, Endpoint management) { | |
return new RuntimePerClassExtension(new EmbeddedRuntime("connector", | |
configOf("edc.connector.name", "connector1", | |
"edc.web.rest.cors.enabled", "true", | |
"web.http.port", defaultEp.port(), | |
"web.http.path", defaultEp.path(), | |
"web.http.protocol.port", protocol.port(), | |
"web.http.protocol.path", protocol.path(), | |
"web.http.control.port", control.port(), | |
"web.http.control.path", control.path(), | |
"web.http.management.port", management.port(), | |
"edc.participant.id", "test-connector", | |
"web.http.management.path", management.path(), | |
"edc.web.rest.cors.headers", "origin,content-type,accept,authorization,x-api-key", | |
"edc.dsp.callback.address", "http://localhost:%s%s".formatted(protocol.port(), protocol.path())), | |
":system-tests:end2end-test:connector-runtime")); | |
} | |
private static RuntimeExtension getCatalogRuntime (String catalogName, Endpoint defaultEp, Endpoint protocol, Endpoint catalog, Endpoint management) { | |
return new RuntimePerMethodExtension(new EmbeddedRuntime(catalogName, | |
configOf("edc.catalog.cache.execution.delay.seconds", "0", | |
"edc.catalog.cache.execution.period.seconds", "2", | |
"edc.catalog.cache.partition.num.crawlers", "5", | |
"edc.web.rest.cors.enabled", "true", | |
"edc.participant.id", "test-catalog", | |
"web.http.port", defaultEp.port(), | |
"web.http.path", defaultEp.path(), | |
"web.http.protocol.port", protocol.port(), | |
"web.http.protocol.path", protocol.path(), | |
"web.http.management.port", management.port(), | |
"web.http.management.path", management.path(), | |
"web.http.version.port", getFreePort() + "", | |
"web.http.version.path", "/.well-known/version", | |
"web.http.catalog.port", catalog.port(), | |
"web.http.catalog.path", catalog.path(), | |
"edc.web.rest.cors.headers", "origin,content-type,accept,authorization,x-api-key"), | |
":launchers:catalog-mocked")); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment