Skip to content

Instantly share code, notes, and snippets.

@pmehra7
Created May 30, 2018 20:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pmehra7/12bf27931863ad8e323d6e68dce1ef9c to your computer and use it in GitHub Desktop.
Save pmehra7/12bf27931863ad8e323d6e68dce1ef9c to your computer and use it in GitHub Desktop.
Inserting Meta-Properties with DSE GraphFrames

DSE GraphFrames Meta-Property Insert Wiki

Insert Vertices with Meta-Properties into DSE Graph

DataStax Docs on Meta-Properties: https://docs.datastax.com/en/dse/5.1/dse-dev/datastax_enterprise/graph/using/metaProp.html

  1. Create Schema
$ dse gremlin-console

Create the Offer Vertex

system.graph('Test_Graph').ifNotExists().create()
:remote config alias g Test_Graph.g

schema.propertyKey("offer").Text().ifNotExists().create()
schema.propertyKey("category").Text().ifNotExists().create()
schema.propertyKey("quantity").Int().ifNotExists().create()
schema.propertyKey("credit_rating").Text().ifNotExists().create()
schema.propertyKey("domestic").Text().ifNotExists().create()
schema.propertyKey("company").Text().multiple().properties("credit_rating","domestic").ifNotExists().create()
schema.propertyKey("offervalue").Double().ifNotExists().create()
schema.propertyKey("brand").Text().ifNotExists().create()

schema.vertexLabel("offer").
      partitionKey("offer").
      properties(
        "category",
        "quantity",
        "company",
        "offervalue",
        "brand"
).ifNotExists().create()
  1. Load sample data to a location that can be read from Spark. We use DSEFS in this example

sample.csv:

offer,category,quantity,value,credit_rating,domestic,offervalue,brand
Coupon,Electronics,3,Best Buy,B,No,100.00,Samsung
  1. Create a Schema for the DataFrame: You can infer the schema, or, specify a schema. It's convenient to map the column names to the vertex property names during ingest.
def offersSchema():StructType = {
  StructType(Array(
    StructField("offer",StringType,true),
    StructField("category",StringType,true),
    StructField("quantity",IntegerType,true),
    StructField("company",StringType,true),
    StructField("credit_rating",StringType,true),
    StructField("domestic",StringType,true),
    StructField("offervalue",DoubleType,true),
    StructField("brand",StringType,true)))
}
  1. Read the data into a DataFrame
val offer = spark.sqlContext.read.format("com.databricks.spark.csv").option("header", "true").schema(offersSchema).load("dsefs:///data/sample.csv")
  1. Examine the Vertex Schema to see how to create the Vertex DataFrame that will be persisted in the graph:

Notice how the company property has a different structure than the other properties. We need to create a Struct column with the value for the property (value) and the values of the meta-properties. This is show in step 6.

scala> g.vertices.printSchema
root
 |-- id: string (nullable = false)
 |-- ~label: string (nullable = false)
 |-- offer: string (nullable = true)
 |-- category: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- company: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- value: string (nullable = true)
 |    |    |-- credit_rating: string (nullable = true)
 |    |    |-- domestic: string (nullable = true)
 |-- offervalue: double (nullable = true)
 |-- brand: string (nullable = true)
  1. Create the Vertex DataFrame and write Vertices:
val offers = offer.select(
  col("offer") as "offer",
  col("category"),
  col("quantity"),
  struct($"company" as "value", $"credit_rating" as "credit_rating", $"domestic" as "domestic") as "company",
  col("offervalue"),
  col("brand")
).withColumn("~label", lit("offer"))

println("\nWriting Offer Vertices")
g.updateVertices(offers)

Full Code:

import com.datastax.bdp.graph.spark.graphframe._
import org.apache.spark.sql.types._


val graphName = "Test_Graph"
val g = spark.dseGraph(graphName)

def offersSchema():StructType = {
  StructType(Array(
    StructField("offer",StringType,true),
    StructField("category",StringType,true),
    StructField("quantity",IntegerType,true),
    StructField("company",StringType,true),
    StructField("credit_rating",StringType,true),
    StructField("domestic",StringType,true),
    StructField("offervalue",DoubleType,true),
    StructField("brand",StringType,true)))
}

val offer = spark.sqlContext.read.format("com.databricks.spark.csv").option("header", "true").schema(offersSchema).load("dsefs:///data/sample.csv")

val offers = offer.select(
  col("offer") as "offer",
  col("category"),
  col("quantity"),
  struct($"company" as "value", $"credit_rating" as "credit_rating", $"domestic" as "domestic") as "company",
  col("offervalue"),
  col("brand")
).withColumn("~label", lit("offer"))

println("\nWriting Offer Vertices")
g.updateVertices(offers)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment