-
先把計算和調(diào)度管理解耦。
HDFS的主節(jié)點可以支持兩個以上。
查看全部 -
分別是:分布式存儲系統(tǒng),分布式計算框架,集群管理和調(diào)度(給程序分配資源)。
Yarm的數(shù)據(jù)來源和去向都是HDFS.
在Yarm上運(yùn)行很多的計算框架,例如mapreduce.
HDFS架構(gòu)分析:
分布式存儲:由HDFS決定數(shù)據(jù)存儲在哪個從節(jié)點上。
支持主從架構(gòu):
Map Reduce架構(gòu):
map體現(xiàn)在代碼中就是一個類。
reduce就是一個聚合統(tǒng)計程序。
Yarm架構(gòu):
總結(jié):數(shù)據(jù)存儲和資源調(diào)度都是分布式的主從結(jié)構(gòu)。
查看全部 -
hadoop里面的分布式計算。
通過程序從數(shù)據(jù)庫拉取數(shù)據(jù)的過程非常慢。
mysql存儲在磁盤,磁盤io,即把磁盤數(shù)據(jù)讀到內(nèi)存里面,再通過網(wǎng)絡(luò),傳到計算程序里面,這兩個是造成慢的主要原因。主要原因是網(wǎng)絡(luò)io。
即發(fā)生了移動數(shù)據(jù)。
所以考慮把計算程序傳輸?shù)綌?shù)據(jù)所在的節(jié)點。
即執(zhí)行本地計算,就只需做一個磁盤io。
分布式計算:
在本地執(zhí)行本地計算,多臺機(jī)器執(zhí)行,每臺計算局部計算。
全局匯總,此時數(shù)據(jù)集合的傳輸量比較少,網(wǎng)絡(luò)io消耗少。
查看全部 -
分布式存儲,單機(jī)的存儲能力有限,運(yùn)用到多臺機(jī)器的存儲能力。
如何設(shè)備一個分布式存儲系統(tǒng)。
弊端:如何同時有很多請求同時過來,文件系統(tǒng)的請求會阻塞。
主從架構(gòu),你想要操作的數(shù)據(jù)到底在哪個從節(jié)點上,然后客戶端直接操作從節(jié)點。
主要流程:
查看全部 -
大數(shù)據(jù)在linux上運(yùn)行和操作,安裝部署、排查、基本的命令。
linux里面的一門shell腳本,如何開發(fā) 調(diào)試? 執(zhí)行腳本就行了。
javaSE內(nèi)容,大多數(shù)都是java開發(fā),不需要javaweb內(nèi)容,使用IDEA工具。
數(shù)據(jù)存儲在mysql數(shù)據(jù)庫中。
查看全部 -
核心是數(shù)據(jù)清洗和計算的邏輯。前端用bi實現(xiàn)
查看全部 -
11
查看全部 -
Tomcat
/webapps/web項目/WEB_INF/classes/config數(shù)據(jù)庫配置文件
main.db.driver= com.mysql.cj.jdbc.Driver
main.db.url = jdbc:mysql://localhost:3306/data?serverTimezone=UTC
main.db.user= root
main.db.password = admin
查看全部 -
Sqoop
mapreduce ←→ mysqlsqoop配置:
1. sqoop-env-template改名sqoop-env.sh
2. SQOOP_HOME
3. mysql驅(qū)動jar包,添加到Sqoop的lib目錄下
4. 本地安裝mysql和開放mysql遠(yuǎn)程訪問權(quán)限(去連接集群和windows sq服務(wù))
USE?mysql; CREATE?USER?'root'@'%'?IDENTIFIED?BY?'密碼'; GRANT?ALL?ON?*.*?TO?'root'@'%'; ALTER?USER?'root'@'%'?IDENTIFIED?WITH?mysql_native_password?BY?'密碼'; FLUSH?PRIVILEGES;
5. Hadoop 3.2 版本需要 common-lang.jar 到lib目錄
6. mysql創(chuàng)建數(shù)據(jù)庫
CREATE?DATABASE?data?DEFAULT?CHARACTER?SET=?utf8?DEFAULT?COLLATE?=?utf8_general_ci; USE?data; CREATE?TABLE?top10{ ????dt?data, ????uid?varchar(255), ????length?bigint(20) };
7. sqoop將hdfs目錄的數(shù)據(jù)導(dǎo)出到mysql表中
sqoop?export?\ --connect?jdbc:mysql://windows的ip:3306/data?serverTimezone=UTC?\ --username?root?\ --password?admin?\ --table?top10?\ --export-dir?/res/videoinfojobtop10/20190821?\????#hdfs的路徑 --input-fields-terminated-by?"\t"
查看全部 -
!數(shù)據(jù)指標(biāo)統(tǒng)計-直播時長Top
map階段獲取id和時長,reduce后cleanup函數(shù)對數(shù)據(jù)map集合進(jìn)行排序
public?class?VideoInfoTop10Map?extends?Mapper<LongWritable,?Text,?Text,?LongWritable>{ ????@Override ????protected?void?map(){ ????????//Todo ????} } public?class?VideoInfoTop10Reduce?extends?Reducer<Text,?LongWritable,?Text,?LongWritable>{ ????HashMap<String,?Long>?map?=?new?HashMap<>(); ????@Override ????protected?void?reduce(){ ????????//TODO ????????map.put(k2.toSrting(),lengthsum); ????} ???? ????//reduce結(jié)束后執(zhí)行 ????@Override ????protected?void?cleanup(Context?context){ ????????//配置類中獲取dt日期參數(shù) ????????Configuration?conf?=?context.getConfiguration(); ????????String?dt?=?conf.get("dt"); ????????//排序 ????????Map<String,Long>?sortedMap?=?MapUtils.sortValue(map); ????????Set<Map.Entry<String,Long>>?entries?=?sortedMap.entrySet(); ????????Iterator<Map.Entry<String,?Long>>?it?=?entries.iterator(); ????????int?count=1; ????????while(count<=10?&&?it.hasNext()){ ????????????Map.Entry<String,?Long>?entry?=?it.next(); ????????????String?key?=?entry.getKey(); ????????????Long?value?=?entry.getValue(); ????????????//封裝k3,v3 ????????????Text?k3?=?new?Text(); ????????????k3.set(key); ????????????LongWritable?v3?=?new?LongWritable(); ????????????v3.set(value); ????????????context.write(k3,v3); ????????????count++; ????????} ????} } public?class?VideoInfoTop10Job{ ????public?static?void?main(String[]?args){ ????????//從輸入路徑獲取日期 ????????String[]?fields?=?args[0].split("/"); ????????String?tmpdt=?fields[fields.length?-1]; ????????String?dt?=?DataUtils.transDataFormat(tmpdt); ????????conf.set("dt",dt); ????????//因為context中存放conf信息↑ ????????//Todo ????} }
查看全部 -
!數(shù)據(jù)指標(biāo)統(tǒng)計
//對金幣數(shù)量,總觀看pv,粉絲數(shù)量,視頻開播時長 等指標(biāo)統(tǒng)計
//自定義數(shù)據(jù)類型 一個記錄管理四個字段
//主播id為key,map節(jié)點<k2,v2>為<Text,自定義Writable> //自定義數(shù)據(jù)類型 public?class?VideoInfoWritable?implements?Writable{ ????private?long?gold; ????private?long?watchnumpv; ????private?long?follower; ???? ????public?void?set(long?gold,?long?watchnumpv,?long?follower){ ????????this.gold=?gold; ????????this.watchnumpv=?watchnumpv; ????????this.follower=?follower; ????} ???? ????public?long?getGold(){ ????????return?gold; ????} ???? ????@Override ????public?void?readFields(DataInput?dataInput){ ????????this.gold=?dataInput.readLong(); ????????this.watchnumpv=?dataInput.readLong(); ????????this.follower=?dataInput.readLong(); ????} ????//讀寫數(shù)據(jù)順序保持一致! ????@Override ????public?void?write(DataOutput?dataOutput){ ????????dataOutput.writeLog(gold); ????????dataOutput.writeLog(watchnumpv); ????????dataOutput.writeLog(follower); ????} ???? ????//generate添加 ????//作為v3需要改下字段結(jié)構(gòu) ????@Override ????public?String?toString(){ ????????return?gold+"\t"+watchnumpv+"\t"+follower; ????} } public?class?VideoInfoMap?extend?Mapper<LongWritable,Text,Text,VideoInfoWritable>{ ????@Override ????protected?void?map(LongWritable?k1,?Text?v1,?Context?context){ ????????String?line?=?v1.toString(); ????????//用之前清洗后的數(shù)據(jù) ????????String[]?fields?=?line.split("\t"); ????????String?id?=?fields[0]; ????????long?gold?=?Long.parseLong(fields[1]); ????????long?watchnumpv=?Long.parseLong(fields[2]); ????????long?follower?=?Long.parseLong(fields[3]); ???????? ????????//組裝k2,v2 ????????Text?k2?=?new?Text(); ????????k2.set(id); ????????VideoInfoWritable?v2?=??new?VideoInfoWritable(); ????????v2.set(gold,?watchnumpv,?follower); ????????Context.write(k2,?v2); ????} } public?class?VideoInfoReduce?extends?Reducer<Text,?VideoInfoWritable,?Text,?VideoWritable>{ ????@Override ????protected?void?reduce(Text?k2,?Iterable<VideoInfoWritable>?v2s,?Context?context){ ????????//從v2s把相同key的value取出,?求和 ????????long?goldsum=0; ????????long?watchnumpvsum=0; ????????long?followersum=0; ????????for(?VideoInfoWritable?v2:?v2s){ ????????????goldsum+=?v2.getGold(); ????????????watchnumpvsum?+=?v2.getWatchnumpv(); ????????????followersum?+=?v2.getFollower(); ????????} ????????//組裝?k3,?v3??進(jìn)行聚合 ????????//Text?k3?=?k2; ????????VideoInfoWritable?v3?=?new?VideoInfoWritable(); ????????v3.set(goldsum,?watchnumpvsum,?followersum); ????????context.write(k3,?v3); ????} } public?class?VideoInfoJob{ ????//執(zhí)行任務(wù)job ????//組裝map?reduce ????public?static?void?main(String[]?args){ ????????try{ ????????????if(args.length!=2){ ???????????? ????????????} ????????????Configuration?conf?=?new?Configuration; ????????????Job?job=?job.getInstance(conf); ????????????job.setJarByClass(VideoInfoJob.class); ????????????//文件輸入輸出 ????????????FileInputFormat ????????????FileOutputFormat ????????????//map ????????????job.setMapperClass ????????????//k2類型 ????????????job.setMapOutputKeyClass ????????????//v2類型 ????????????job.setMapOutpiyValueClass ????????????//reduce ????????????job.setReducerClass ????????????//k3 ????????????job.setReducerClass ????????????// ????????} ????} }
查看全部 -
數(shù)據(jù)清洗
//json格式數(shù)據(jù)提取 //需要fastjson對數(shù)據(jù)解析 //不需要聚合不需要reduce //k1,v1段固定<LongWritable,?Text> //k2,v2類型<Text,?Text>k2主播id,?v2核心字段,用\t分割 public?class?DataCleanMap?extend?Mapper<LongWritable,Text,Text,Text>{ ????@Override ????protected?void?map(LongWritable?k1,?Text?v1,?Context?context){ ????????String?line?=v1.toString(); ????????JSONObject?jsonObj=?JSON.parseObject(line); ????????String?id=?jsonObj.getString("uid"); ????????int?gold=?jsonObj.getString("gold"); ????????int?watchnumpv=?jsonObj.getString("watchnumpv"); ???????? ????????if(gold>=0?&&?watchnumpv?>=?0){ ????????????Text?k2?=?new?Text(); ????????????k2.set(id); ????????????Text?v2?=?new?Text(); ????????????v2.set(gold+?"\t"?+?watchnumpv); ????????????context.write(k2,?v2); ????????} ????} } public?class?DataCleanJob{ }
查看全部 -
文件->分片-> MAP開始-> 分區(qū) -> 排序 -> 分組 -> MAP結(jié)束 -> shuffle -> (reduce端)全局排序 -> 分組 -> reduce
分區(qū):不同數(shù)據(jù)類型分類
分組:相同k2分一組
map階段:
RecordReader類把每個InputSplit解析成<k1,v1>
一個InputSplit對應(yīng)一個map task
框架對<k2,v2>分區(qū),不同分區(qū)由不同reduce task處理,默認(rèn)一個分區(qū)
map節(jié)點可以執(zhí)行reduce歸約,為可選項 (Combine)
shuffle:
多個map任務(wù)輸出按照不同分區(qū)網(wǎng)絡(luò)拷貝不同reduce節(jié)點
reduce階段:
全局合并 排序 分組
reduce方法 輸入<k2,{v2...}> 輸出<k3,v3>
查看全部 -
split邏輯運(yùn)算塊: 一個split對應(yīng)一個mapper任務(wù)
K1, V1: K1是相對文本偏移量,V1代表該行文本
Shuffle:一個線程 將map產(chǎn)生結(jié)果拉取到reduce端做匯總
查看全部 -
分布式計算:? 將計算程序發(fā)送到本地 避免大數(shù)據(jù)傳輸
局部聚合 -> 數(shù)據(jù)傳輸(網(wǎng)絡(luò)I/O) ->整體聚合
查看全部
舉報