Skip to content

Instantly share code, notes, and snippets.

@damondouglas
Created March 5, 2024 23:19
Show Gist options
  • Save damondouglas/662215598cbf997a070a1bade750c79f to your computer and use it in GitHub Desktop.
Save damondouglas/662215598cbf997a070a1bade750c79f to your computer and use it in GitHub Desktop.
Example converting between Java class, JSON, and Beam Row.
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package example;
import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableMap;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaProvider;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.io.payloads.JsonPayloadSerializerProvider;
import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
import java.nio.charset.StandardCharsets;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
@DefaultSchema(AutoValueSchema.class)
@AutoValue
abstract class SomeThing {
private static final SchemaProvider SCHEMA_PROVIDER = new AutoValueSchema();
private static final TypeDescriptor<SomeThing> TYPE_DESCRIPTOR = TypeDescriptor.of(SomeThing.class);
private static final SerializableFunction<SomeThing, Row> TO_ROW_FN = checkStateNotNull(SCHEMA_PROVIDER.toRowFunction(TYPE_DESCRIPTOR));
private static final SerializableFunction<Row, SomeThing> FROM_ROW_FN = checkStateNotNull(SCHEMA_PROVIDER.fromRowFunction(TYPE_DESCRIPTOR));
static final Schema SCHEMA = checkStateNotNull(SCHEMA_PROVIDER.schemaFor(TYPE_DESCRIPTOR));
private static final PayloadSerializer JSON_PAYLOAD_SERIALIZER = new JsonPayloadSerializerProvider().getSerializer(SCHEMA, ImmutableMap.of());
String toJson() {
byte[] bytes = checkStateNotNull(JSON_PAYLOAD_SERIALIZER.serialize(toRow()));
return new String(bytes, StandardCharsets.UTF_8);
}
static SomeThing fromJson(String json) {
Row row = checkStateNotNull(JSON_PAYLOAD_SERIALIZER.deserialize(json.getBytes(StandardCharsets.UTF_8)));
return fromRow(row);
}
Row toRow() {
return TO_ROW_FN.apply(this);
}
static SomeThing fromRow(Row row) {
return FROM_ROW_FN.apply(row);
}
abstract Boolean getABoolean();
abstract String getAString();
abstract Integer getAnInteger();
abstract Long getALong();
abstract Float getAFloat();
abstract Double getADouble();
@AutoValue.Builder
abstract static class Builder {
abstract Builder setABoolean(Boolean aBoolean);
abstract Builder setAString(String value);
abstract Builder setAnInteger(Integer value);
abstract Builder setALong(Long value);
abstract Builder setAFloat(Float value);
abstract Builder setADouble(Double value);
abstract SomeThing build();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment