Skip to content

Instantly share code, notes, and snippets.

@hyoban
Last active March 1, 2022 06:31
Show Gist options
  • Save hyoban/2af3329b53598cbd59afdcc861e052f3 to your computer and use it in GitHub Desktop.
Save hyoban/2af3329b53598cbd59afdcc861e052f3 to your computer and use it in GitHub Desktop.
数据科学大作业
import pandas as pd
files = [
'2601L20011-20191030230000-20191030235959.csv',
'2601L20011-20191031230000-20191031235959.csv',
'2601L20022-20191030230000-20191030235959.csv'
]
floder_name = '/Users/hyoban/Downloads/期末考察任务_1/数据科学大作业仿真数据/2组仿真数据/'
li = []
for file_name in files:
df = pd.read_csv(floder_name+file_name, header=None, skiprows=1)
li.append(df)
frame = pd.concat(li, axis=0, ignore_index=True)
frame.to_csv(floder_name+'all.csv', header=None)
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
public class Main {
public static Configuration conf;
public static Connection connection;
static {
conf = HBaseConfiguration.create();
conf.setInt("timeout", 12000);
conf.set("hbase.rootdir", "hdfs://172.16.121.132:9000/hbase");
conf.set("hbase.master", "172.16.121.132:16000");
conf.set("hbase.zookeeper.quorum", "172.16.121.132,172.16.121.133,172.16.121.134");
try {
HBaseAdmin.available(conf);
} catch (IOException e) {
e.printStackTrace();
}
try {
connection = ConnectionFactory.createConnection(conf);
} catch (IOException e) {
e.printStackTrace();
}
}
public static boolean isTableExist(String tableName) throws IOException {
Admin admin = connection.getAdmin();
return admin.isTableAvailable(TableName.valueOf(tableName));
}
public static ResultScanner getScanner(String tableName, FilterList filterList) {
try {
Table table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
scan.setFilter(filterList);
return table.getScanner(scan);
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
public static void query() {
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
SingleColumnValueFilter monitorFilter = new SingleColumnValueFilter(
Bytes.toBytes("info"),
Bytes.toBytes("monitor_id"),
CompareOperator.EQUAL,
Bytes.toBytes("2601L20011")
);
SingleColumnValueFilter indicatorFilter = new SingleColumnValueFilter(
Bytes.toBytes("info"),
Bytes.toBytes("indicator"),
CompareOperator.EQUAL,
Bytes.toBytes(" 10001001100130001221")
);
SingleColumnValueFilter timeFilter1 = new SingleColumnValueFilter(
Bytes.toBytes("info"),
Bytes.toBytes("monitor_time"),
CompareOperator.GREATER_OR_EQUAL,
Bytes.toBytes(" 2019-10-30 23:00:00")
);
SingleColumnValueFilter timeFilter2 = new SingleColumnValueFilter(
Bytes.toBytes("info"),
Bytes.toBytes("monitor_time"),
CompareOperator.LESS_OR_EQUAL,
Bytes.toBytes(" 2019-10-30 23:30:00")
);
filterList.addFilter(monitorFilter);
filterList.addFilter(indicatorFilter);
filterList.addFilter(timeFilter1);
filterList.addFilter(timeFilter2);
ResultScanner resultScanner = getScanner("final", filterList);
if (resultScanner != null) {
resultScanner.forEach(result ->
System.out.println(
Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("monitor_id"))) + " " +
Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("indicator"))) +
Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("monitor_time")))
)
);
resultScanner.close();
}
}
public static void main(String[] args) {
try {
System.out.println(isTableExist("final"));
// query();
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
bin/hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.separator=',' -Dimporttsv.columns='HBASE_ROW_KEY, info:monitor_id,info:indicator,info:monitor_time,info:value' final /all.csv
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;
public class Main {
public static void main(String[] args) throws IOException, InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "172.16.121.132:9092,172.16.121.133:9092,172.16.121.134:9092");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
BufferedReader bf = new BufferedReader(new FileReader("src/main/resources/all.csv"));
String line;
while ((line = bf.readLine()) != null) {
String[] strings = line.split(",");
if (strings[1].equals("2601L20011") && strings[2].equals(" 10001001100130001221")) {
Thread.sleep(1000);
producer.send(new ProducerRecord<>("final", line));
System.out.println(line);
}
}
bf.close();
producer.close();
System.out.println("已经发送完毕");
}
}
import org.apache.spark.sql.SparkSession
object SimpleApp {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder
.master("local")
.appName("App name")
.getOrCreate()
val df = spark.read
.schema("id STRING, monitor_id STRING, indicator STRING, monitor_time STRING, value STRING")
.csv("hdfs://hadoop102:9000/all.csv")
df.withColumn("day", df.col("monitor_time").substr(1, 10))
.groupBy("monitor_id", "day")
.count()
.show()
}
}
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Seconds;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
public class Main {
public static void main(String[] args) throws InterruptedException {
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("final");
JavaSparkContext context = new JavaSparkContext(conf);
JavaStreamingContext streamingContext = new JavaStreamingContext(context, Seconds.apply(5));
Map<String, Object> kafkaPara = new HashMap<>();
kafkaPara.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.16.121.132:9092,172.16.121.133:9092,172.16.121.134:9092");
kafkaPara.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
kafkaPara.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaPara.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
ArrayList<String> topics = new ArrayList<>();
topics.add("final");
JavaInputDStream<ConsumerRecord<Object, Object>> lines = KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topics, kafkaPara));
JavaDStream<String[]> data = lines.map(line -> line.value().toString().split(","));
data.foreachRDD((javaRDD, time) -> {
javaRDD.foreach(strings -> {
for (String s : strings) {
System.out.println(s);
}
});
});
streamingContext.start();
streamingContext.awaitTermination();
streamingContext.close();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment