Skip to content

Instantly share code, notes, and snippets.

@rcampbell
Created February 18, 2010 07:53
Show Gist options
  • Save rcampbell/307455 to your computer and use it in GitHub Desktop.
Save rcampbell/307455 to your computer and use it in GitHub Desktop.
Querying OpenCalais RDF models and MapReducing the results
(ns calais.rdf
(:use [clojure.http.client]
[clojure.contrib.str-utils :only [re-gsub]]
[clojure.contrib.seq-utils :only [frequencies flatten partition-all]])
(:require [clojure.contrib.str-utils2 :as str-utils])
(:import [java.io File FileInputStream]
[com.hp.hpl.jena.rdf.model Model ModelFactory]
[com.hp.hpl.jena.query QueryExecutionFactory QueryFactory]))
(defn load-model [file]
"Loads a model from the file system"
(.read (ModelFactory/createDefaultModel)
(FileInputStream. file) nil))
(defn- load-models [dir]
(map load-model (.listFiles dir)))
(defn- ask [{:keys [query handler]} model]
"Queries model and processes the result set with handler"
(with-open [executioner (QueryExecutionFactory/create
(QueryFactory/create query) model)]
(map handler (doall (iterator-seq (.execSelect executioner))))))
(defn literals [& fields]
"Creates a handler to extract literal values from the solution"
(fn [solution]
(zipmap (map #(keyword %) fields)
(map #(str (.getLiteral solution %)) fields))))
(def prefixes (str "PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> "
"PREFIX c: <http://s.opencalais.com/1/pred/> "))
(def queries {:companies {:query (str prefixes
"SELECT ?company ?docId ?ticker ?name ?score "
"WHERE {"
" ?company rdf:type <http://s.opencalais.com/1/type/er/Company> . "
" ?company c:docId ?docId . "
" ?company c:ticker ?ticker . "
" ?company c:name ?name . "
" ?company c:score ?score . "
" FILTER (?ticker != \"IDCHS\") "
" }")
:handler (literals "name" "ticker" "score")}
:categories {:query (str prefixes
"SELECT ?name ?score "
"WHERE {"
" ?category rdf:type <http://s.opencalais.com/1/type/cat/DocCat> . "
" ?category c:categoryName ?name . "
" ?category c:score ?score "
" }")
:handler (literals "name" "score")}
:technologies {:query (str prefixes
"SELECT ?name ?relevance "
"WHERE {"
" ?technology rdf:type <http://s.opencalais.com/1/type/em/e/Technology> . "
" ?technology c:name ?name . "
" ?relevance_info rdf:type <http://s.opencalais.com/1/type/sys/RelevanceInfo> . "
" ?relevance_info c:subject ?technology . "
" ?relevance_info c:relevance ?relevance "
" }")
:handler (literals "name" "relevance")}
:industry-terms {:query (str prefixes
"SELECT ?name ?relevance "
"WHERE {"
" ?industry_term rdf:type <http://s.opencalais.com/1/type/em/e/IndustryTerm> . "
" ?industry_term c:name ?name . "
" ?relevance_info rdf:type <http://s.opencalais.com/1/type/sys/RelevanceInfo> . "
" ?relevance_info c:subject ?industry_term . "
" ?relevance_info c:relevance ?relevance "
" }")
:handler (literals "name" "relevance")}
:people {:query (str prefixes
"SELECT ?name ?relevance "
"WHERE {"
" ?person rdf:type <http://s.opencalais.com/1/type/em/e/Person> . "
" ?person c:name ?name . "
" ?relevance_info rdf:type <http://s.opencalais.com/1/type/sys/RelevanceInfo> . "
" ?relevance_info c:subject ?person . "
" ?relevance_info c:relevance ?relevance "
" }")
:handler (literals "name" "relevance")}
:products {:query (str prefixes
"SELECT ?name ?relevance "
"WHERE {"
" ?product rdf:type <http://s.opencalais.com/1/type/em/e/Product> . "
" ?product c:name ?name . "
" ?relevance_info rdf:type <http://s.opencalais.com/1/type/sys/RelevanceInfo> . "
" ?relevance_info c:subject ?product . "
" ?relevance_info c:relevance ?relevance "
" }")
:handler (literals "name" "relevance")}})
(defn ask-all [file]
"Runs all defined queries on the given file"
(try
(zipmap (keys queries) (map #(ask (second %) (load-model file)) queries))
(catch Exception e (.printStackTrace e) {})))
; --- tag reduction ---
(def tags (ref {}))
(def freq (ref {})) ; {:companies {"IBM" 2 "Apple" 5}}
(def xref (ref {})) ; {"IBM" ["file2" "file6" "file42"]}
(defn project-name [m]
"Projects only the tag name"
(zipmap (keys m) (map #(map :name %) (vals m))))
(defn tag-frequencies [m]
"Creates a map where keys are the tags and values are
the number of occurances of this tag found"
(zipmap (keys m) (map frequencies (vals m))))
(defn add-tag [model]
(dosync (alter tags #(merge-with concat %1 %2) model)))
(defn add-freq [model]
"Associates a tag name with its frequency"
(dosync (alter freq
(fn [freq-m model-m]
(merge-with #(merge-with + %1 %2) freq-m model-m))
(tag-frequencies model))))
(defn add-xref [id model]
"Associates a tag name with a document"
(doseq [tag (map #(re-gsub #"\." "" %) (flatten (vals model)))]
(dosync (alter xref assoc tag
(if-let [ids (@xref tag)]
(conj ids id)
#{id})))))
(defn reduce-tags [files]
"Computes the various aggregate data structures"
(let [parse-name (comp project-name ask-all)]
(doseq [file files]
(let [id (str-utils/butlast (.getName file) 4)
model (parse-name file)]
(add-tag model)
(add-freq model)
(add-xref id model)))))
(defn parallelize-reduction [dir]
"Partitions the file set by the number of available
processors and aggregates their results"
(let [files (.listFiles dir)]
(dorun (pmap reduce-tags
(partition-all (int (/ (count files)
(.availableProcessors (Runtime/getRuntime)))) files)))))
; --- tag filtering and sorting ---
(def default-rules [#(> (val %) 3)
#(not= (key %) "Web rights")])
(defn filter-with
"Filters a map given a seq of intersected rules"
([m] (filter-with default-rules m))
([rules m] (filter (fn [x] (reduce #(and %1 (%2 x)) true rules)) m)))
(defn sorted-by-val [m]
"Returns a seq of pairs sorted by the original MapEntry value"
(apply sorted-set-by #(compare (vec (reverse %2)) (vec (reverse %1))) m))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment