Last active
March 1, 2022 06:31
-
-
Save hyoban/2af3329b53598cbd59afdcc861e052f3 to your computer and use it in GitHub Desktop.
数据科学大作业
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
files = [ | |
'2601L20011-20191030230000-20191030235959.csv', | |
'2601L20011-20191031230000-20191031235959.csv', | |
'2601L20022-20191030230000-20191030235959.csv' | |
] | |
floder_name = '/Users/hyoban/Downloads/期末考察任务_1/数据科学大作业仿真数据/2组仿真数据/' | |
li = [] | |
for file_name in files: | |
df = pd.read_csv(floder_name+file_name, header=None, skiprows=1) | |
li.append(df) | |
frame = pd.concat(li, axis=0, ignore_index=True) | |
frame.to_csv(floder_name+'all.csv', header=None) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.hbase.CompareOperator; | |
import org.apache.hadoop.hbase.HBaseConfiguration; | |
import org.apache.hadoop.hbase.TableName; | |
import org.apache.hadoop.hbase.client.*; | |
import org.apache.hadoop.hbase.filter.FilterList; | |
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; | |
import org.apache.hadoop.hbase.util.Bytes; | |
import java.io.IOException; | |
public class Main { | |
public static Configuration conf; | |
public static Connection connection; | |
static { | |
conf = HBaseConfiguration.create(); | |
conf.setInt("timeout", 12000); | |
conf.set("hbase.rootdir", "hdfs://172.16.121.132:9000/hbase"); | |
conf.set("hbase.master", "172.16.121.132:16000"); | |
conf.set("hbase.zookeeper.quorum", "172.16.121.132,172.16.121.133,172.16.121.134"); | |
try { | |
HBaseAdmin.available(conf); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} | |
try { | |
connection = ConnectionFactory.createConnection(conf); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} | |
} | |
public static boolean isTableExist(String tableName) throws IOException { | |
Admin admin = connection.getAdmin(); | |
return admin.isTableAvailable(TableName.valueOf(tableName)); | |
} | |
public static ResultScanner getScanner(String tableName, FilterList filterList) { | |
try { | |
Table table = connection.getTable(TableName.valueOf(tableName)); | |
Scan scan = new Scan(); | |
scan.setFilter(filterList); | |
return table.getScanner(scan); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} | |
return null; | |
} | |
public static void query() { | |
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL); | |
SingleColumnValueFilter monitorFilter = new SingleColumnValueFilter( | |
Bytes.toBytes("info"), | |
Bytes.toBytes("monitor_id"), | |
CompareOperator.EQUAL, | |
Bytes.toBytes("2601L20011") | |
); | |
SingleColumnValueFilter indicatorFilter = new SingleColumnValueFilter( | |
Bytes.toBytes("info"), | |
Bytes.toBytes("indicator"), | |
CompareOperator.EQUAL, | |
Bytes.toBytes(" 10001001100130001221") | |
); | |
SingleColumnValueFilter timeFilter1 = new SingleColumnValueFilter( | |
Bytes.toBytes("info"), | |
Bytes.toBytes("monitor_time"), | |
CompareOperator.GREATER_OR_EQUAL, | |
Bytes.toBytes(" 2019-10-30 23:00:00") | |
); | |
SingleColumnValueFilter timeFilter2 = new SingleColumnValueFilter( | |
Bytes.toBytes("info"), | |
Bytes.toBytes("monitor_time"), | |
CompareOperator.LESS_OR_EQUAL, | |
Bytes.toBytes(" 2019-10-30 23:30:00") | |
); | |
filterList.addFilter(monitorFilter); | |
filterList.addFilter(indicatorFilter); | |
filterList.addFilter(timeFilter1); | |
filterList.addFilter(timeFilter2); | |
ResultScanner resultScanner = getScanner("final", filterList); | |
if (resultScanner != null) { | |
resultScanner.forEach(result -> | |
System.out.println( | |
Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("monitor_id"))) + " " + | |
Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("indicator"))) + | |
Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("monitor_time"))) | |
) | |
); | |
resultScanner.close(); | |
} | |
} | |
public static void main(String[] args) { | |
try { | |
System.out.println(isTableExist("final")); | |
// query(); | |
connection.close(); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
bin/hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.separator=',' -Dimporttsv.columns='HBASE_ROW_KEY, info:monitor_id,info:indicator,info:monitor_time,info:value' final /all.csv |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.kafka.clients.producer.KafkaProducer; | |
import org.apache.kafka.clients.producer.ProducerRecord; | |
import org.apache.kafka.common.serialization.StringSerializer; | |
import java.io.BufferedReader; | |
import java.io.FileReader; | |
import java.io.IOException; | |
import java.util.Properties; | |
public class Main { | |
public static void main(String[] args) throws IOException, InterruptedException { | |
Properties props = new Properties(); | |
props.put("bootstrap.servers", "172.16.121.132:9092,172.16.121.133:9092,172.16.121.134:9092"); | |
props.put("key.serializer", StringSerializer.class.getName()); | |
props.put("value.serializer", StringSerializer.class.getName()); | |
KafkaProducer<String, String> producer = new KafkaProducer<>(props); | |
BufferedReader bf = new BufferedReader(new FileReader("src/main/resources/all.csv")); | |
String line; | |
while ((line = bf.readLine()) != null) { | |
String[] strings = line.split(","); | |
if (strings[1].equals("2601L20011") && strings[2].equals(" 10001001100130001221")) { | |
Thread.sleep(1000); | |
producer.send(new ProducerRecord<>("final", line)); | |
System.out.println(line); | |
} | |
} | |
bf.close(); | |
producer.close(); | |
System.out.println("已经发送完毕"); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.sql.SparkSession | |
object SimpleApp { | |
def main(args: Array[String]): Unit = { | |
val spark = SparkSession.builder | |
.master("local") | |
.appName("App name") | |
.getOrCreate() | |
val df = spark.read | |
.schema("id STRING, monitor_id STRING, indicator STRING, monitor_time STRING, value STRING") | |
.csv("hdfs://hadoop102:9000/all.csv") | |
df.withColumn("day", df.col("monitor_time").substr(1, 10)) | |
.groupBy("monitor_id", "day") | |
.count() | |
.show() | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.kafka.clients.consumer.ConsumerConfig; | |
import org.apache.kafka.clients.consumer.ConsumerRecord; | |
import org.apache.spark.SparkConf; | |
import org.apache.spark.api.java.JavaSparkContext; | |
import org.apache.spark.streaming.Seconds; | |
import org.apache.spark.streaming.api.java.JavaDStream; | |
import org.apache.spark.streaming.api.java.JavaInputDStream; | |
import org.apache.spark.streaming.api.java.JavaStreamingContext; | |
import org.apache.spark.streaming.kafka010.ConsumerStrategies; | |
import org.apache.spark.streaming.kafka010.KafkaUtils; | |
import org.apache.spark.streaming.kafka010.LocationStrategies; | |
import java.util.ArrayList; | |
import java.util.HashMap; | |
import java.util.Map; | |
public class Main { | |
public static void main(String[] args) throws InterruptedException { | |
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("final"); | |
JavaSparkContext context = new JavaSparkContext(conf); | |
JavaStreamingContext streamingContext = new JavaStreamingContext(context, Seconds.apply(5)); | |
Map<String, Object> kafkaPara = new HashMap<>(); | |
kafkaPara.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.16.121.132:9092,172.16.121.133:9092,172.16.121.134:9092"); | |
kafkaPara.put(ConsumerConfig.GROUP_ID_CONFIG, "group1"); | |
kafkaPara.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); | |
kafkaPara.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); | |
ArrayList<String> topics = new ArrayList<>(); | |
topics.add("final"); | |
JavaInputDStream<ConsumerRecord<Object, Object>> lines = KafkaUtils.createDirectStream( | |
streamingContext, | |
LocationStrategies.PreferConsistent(), | |
ConsumerStrategies.Subscribe(topics, kafkaPara)); | |
JavaDStream<String[]> data = lines.map(line -> line.value().toString().split(",")); | |
data.foreachRDD((javaRDD, time) -> { | |
javaRDD.foreach(strings -> { | |
for (String s : strings) { | |
System.out.println(s); | |
} | |
}); | |
}); | |
streamingContext.start(); | |
streamingContext.awaitTermination(); | |
streamingContext.close(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment