Skip to content

Instantly share code, notes, and snippets.

public static class UsingSemaphore {
volatile static boolean check = true;
volatile static boolean producerStarted = false;
volatile static boolean consumerStarted = false;
public static void main(String args[]) throws InterruptedException {
Semaphore semCon = new Semaphore(0);
spec:
podSelector:
matchLabels:
app: {{ template "wmf.chartname" . }}-taskmanager
release: {{ .Release.Name }}
policyTypes:
{{- if .Values.networkpolicy.egress.enabled }}
- Egress
{{- end }}
- Ingress
2 apiportalwiki_titlesuggest_
2 grwikimedia_archive_first
2 grwikimedia_content_first
2 grwikimedia_general_first
2 id_internalwikimedia_general_
2 knwiktionary_content_
2 mrwikisource_content_
2 mw_cirrus_metastore_first
2 sysop_itwiki_archive_first
2 sysop_itwiki_content_first
@nomoa
nomoa / extract ntriple from rdd.scala
Created September 4, 2020 12:37
extract ntriple from rdd
def getDirectoryWriter(outputPath: String, partitions: Int)(implicit spark: SparkSession): RDD[Row] => Unit = {
rdd: RDD[Row] => {
rdd
.repartition(partitions)
.map(r => s"${r.getAs("subject")} ${r.getAs("predicate")} ${r.getAs("object")} .")
.saveAsTextFile(outputPath, classOf[BZip2Codec])
}
}
@nomoa
nomoa / extract turtle from parquet.scala
Created August 31, 2020 07:40
extract turtle from parquet
def extract(implicit spark: SparkSession): Unit = {
val df = spark.read.parquet("...")
val prefix = "/path/file-"
val encoder = new StatementEncoder()
df.foreachPartition(rows => {
val partition = TaskContext.getPartitionId()
val writer = new GZIPOutputStream(new BufferedOutputStream(Files.newOutputStream(Paths.get(s"$prefix-$partition.ttl.gz"))))
val rdfWriter = Rio.createWriter(RDFFormat.TURTLE, writer)
rdfWriter.startRDF()
rows.foreach(row => rdfWriter.handleStatement(encoder.decode(row)))
https://music-grid.surge.sh/#1064-385-2114-2073-256-89-2090-2048-41-256-2176-2049-256-176-2049-2048-&300
https://music-grid.surge.sh/#72-1200-513-2050-688-1026-516-2048-688-258-1028-2136-514-1068-514-2049-&301
https://music-grid.surge.sh/#1026-680-2064-2052-2-80-2050-2052-1-160-2049-2052-8-320-2050-2049-&301
https://music-grid.surge.sh/#1042-548-258-161-258-548-1096-2052-1090-545-258-164-264-548-1090-2081-&301
https://music-grid.surge.sh/#2210-2049-320-36-673-0-1092-161-0-1060-161-2048-20-1185-1024-4-&301
./bin/flink run -p 10 -c org.wikidata.query.rdf.updater.UpdaterJob ../streaming-updater-0.3.24-SNAPSHOT-jar-with-dependencies.jar --hostname www.wikidata.org --eckpoint_dir "hdfs://analytics-hadoop/user/dcausse/streaming_updater/checkpoint_dir" --spurious_events_dir "hdfs://analytics-hadoop/user/dcausse/streaming_updater/spurious" --late_events_dir "hdfs://analytics-hadoop/user/dcausse/streaming_updater/late_events" --brokers kafka-jumbo1001.eqiad.wmnet:9092 --rev_create_topic eqiad.mediawiki.revision-create --failed_ops_dir "hdfs://analytics-hadoop/user/dcausse/streaming_updater/failed_ops" --output_topic dcausse_test_wdqs_streaming_updater --output_topic_partition 1 --consumer_group dcausse_test_wdqs_streaming_updater
@nomoa
nomoa / suggest explain
Created August 22, 2019 14:41
suggest explain
Array
(
[value] => 8225100
[description] => Convert to an integer score: 0.82251003228174 * 10000000
[details] => Array
(
[0] => Array
(
[value] => 0.82251003228174
[description] => Weighted sum of doc quality score and popularity
import requests
from pyspark.sql import Row
queries = sc.textFile('/user/dcausse/image_qual/commons_queries_handpicked.lst')