RDD pomocou programu Spark: Stavebný kameň Apache Spark



Tento blog o RDD pomocou programu Spark vám poskytne podrobné a komplexné znalosti o RDD, ktorý je základnou jednotkou programu Spark & ​​How it it is possible.

, Samotné slovo stačí na vytvorenie iskry v mysli každého inžiniera Hadoop. TO n v pamäti nástroj na spracovanie ktorá je v klastrových výpočtoch blesková. V porovnaní s MapReduce robí zdieľanie údajov v pamäti RDD 10-100x rýchlejšie než zdieľanie v sieti a na disku a to všetko je možné vďaka RDD (odolné distribuované súbory údajov). Kľúčové body, ktorým sa dnes v tomto článku o RDD používajúcich Spark zameriavame, sú:

Potrebujete RDD?

Prečo potrebujeme RDD? -RDD pomocou Sparku





Svet sa vyvíja s a Data Science z dôvodu postupu v . Algoritmy založené na Regresia , , a ktorý beží ďalej Distribuované Iteratívny výpočet ation spôsobom, ktorý zahŕňa opätovné použitie a zdieľanie údajov medzi viacerými výpočtovými jednotkami.

Tradičné techniky vyžadovali stabilné medziproduktové a distribuované úložisko HDFS pozostávajúce z opakujúcich sa výpočtov s replikáciami údajov a ich serializáciou, vďaka čomu bol proces oveľa pomalší. Nájsť riešenie nebolo nikdy ľahké.



prejsť okolo hodnoty vs prejsť okolo java

Toto je kde RDD (Resilient Distributed Datasets) prichádza do veľkého obrazu.

RDD sú ľahko použiteľné a ich tvorba je ľahká, pretože dáta sa importujú z dátových zdrojov a vkladajú do RDD. Ďalej sa operácie používajú na ich spracovanie. Sú to a distribuovaná zbierka pamäti s povoleniami ako Iba na čítanie a hlavne sú Odolné proti chybám .



Ak nejaký dátový oddiel z RDD je stratený , dá sa regenerovať ich aplikáciou transformácia sk Operácia so strateným oddielom v systéme Windows rodokmeň , než aby ste všetky dáta spracovávali úplne od začiatku. Tento druh prístupu v scenároch v reálnom čase dokáže zázraky v situáciách straty údajov alebo výpadku systému.

Čo sú RDD?

RDD alebo ( Odolná sada distribuovaných údajov ) je zásadný dátová štruktúra v Sparku. Termín Odolný definuje schopnosť generovať údaje automaticky alebo dáta odvalenie do pôvodný stav keď dôjde k neočakávanej kalamite s pravdepodobnosťou straty údajov.

Údaje zapísané do RDD sú rozdelené a uložené do viac spustiteľných uzlov . Ak vykonávajúci uzol zlyhá za behu, potom okamžite dostane zálohu z nasledujúci spustiteľný uzol . To je dôvod, prečo sú RDD považované za pokročilý typ dátových štruktúr v porovnaní s inými tradičnými dátovými štruktúrami. RDD môžu ukladať štruktúrované, neštruktúrované a pološtruktúrované dáta.

Poďme vpred s našim RDD pomocou blogu Spark a získajme informácie o jedinečných vlastnostiach RDD, ktoré mu dávajú náskok pred ostatnými typmi dátových štruktúr.

Vlastnosti RDD

  • V pamäti (RAM) Výpočty : Koncept výpočtu v pamäti posúva spracovanie údajov do rýchlejšej a efektívnejšej fázy, keď je celkovo výkon systému je inovovaný.
  • Ľ jeho hodnotenie : Termín Lenivé hodnotenie hovorí transformácie sa aplikujú na údaje v RDD, ale výstup sa negeneruje. Namiesto toho sú použité transformácie prihlásený.
  • Vytrvalosť : Výsledné RDD sú vždy opakovane použiteľné.
  • Hrubozrnné operácie : Používateľ môže uskutočniť transformácie na všetky prvky v množinách údajov mapa, filter alebo zoskupiť podľa operácie.
  • Odolné proti chybám : Ak dôjde k strate údajov, systém môže vrátiť späť k jeho pôvodný stav pomocou prihláseného transformácie .
  • Nezmeniteľnosť : Údaje definované, načítané alebo vytvorené nemôžu byť zmenil po prihlásení do systému. V prípade, že potrebujete získať prístup a upraviť existujúci RDD, musíte vytvoriť nový RDD použitím sady Transformácia funkcie na aktuálny alebo predchádzajúci RDD.
  • Delenie na oddiely : Je to rozhodujúca jednotka paralelizmu v Spark RDD. Počet vytvorených oddielov je predvolene založený na vašom zdroji údajov. Môžete si dokonca zvoliť počet oddielov, ktoré chcete vytvoriť vlastný oddiel funkcie.

Vytvorenie RDD pomocou programu Spark

RDD je možné vytvoriť v tromi spôsobmi:

  1. Čítanie údajov z paralelné zbierky
val PCRDD = spark.sparkContext.parallelize (Array ('Mon', 'Út', 'St', 'Čt', 'Pá', 'So'), 2) val výsledokRDD = PCRDD.collect () resultRDD.collect ( ). foreach (println)
  1. Prihlasovanie transformácia na predchádzajúcich RDD
val words = spark.sparkContext.parallelize (Seq ('Spark', 'is', 'a', 'very', 'powerful', 'language')) val wordpair = words.map (w = (w.charAt ( 0), w)) wordpair.collect (). Foreach (println)
  1. Čítanie údajov z externé úložisko alebo cesty súborov ako HDFS alebo HBase
val Sparkfile = spark.read.textFile ('/ user / edureka_566977 / spark / spark.txt.') Sparkfile.collect ()

Operácie vykonávané na RDD:

Na RDD sa vykonávajú hlavne dva typy operácií, a to:

  • Premeny
  • Akcie

Premeny : The operácie aplikujeme na RDD na filter, prístup a upraviť údaje v rodičovskom RDD na generovanie a postupné RDD sa volá transformácia . Nový RDD vracia ukazovateľ na predchádzajúci RDD a zaisťuje vzájomnú závislosť.

Transformácie sú Lenivé hodnotenia, inými slovami, operácie použité na RDD, ktoré pracujete, sa zapíšu do denníka, ale nie vykonaný. Systém vyvolá výsledok alebo výnimku po spustení Akcia .

Transformácie môžeme rozdeliť na dva typy, ako je uvedené nižšie:

  • Úzke transformácie
  • Široké premeny

Úzke transformácie Aplikujeme úzke transformácie na a jedna priečka rodičovského RDD na vygenerovanie nového RDD, pretože údaje potrebné na spracovanie RDD sú k dispozícii na jednom oddiele súboru rodič ASD . Príklady úzkych transformácií sú:

  • mapa ()
  • filter ()
  • flatMap ()
  • oddiel ()
  • mapPartitions ()

Široké transformácie: Širokú transformáciu aplikujeme ďalej viac oddielov vygenerovať nový RDD. Údaje potrebné na spracovanie RDD sú k dispozícii na viacerých oddieloch systému rodič ASD . Príklady rozsiahlych transformácií sú:

  • znížiť ()
  • odbor ()

Akcie : Akcie vydávajú pokyny pre aplikáciu Apache Spark výpočet a odovzdať výsledok alebo výnimku späť vodičovi RDD. Niektoré z týchto opatrení zahŕňajú:

  • zbierať ()
  • count ()
  • vziať ()
  • najprv()

Aplikujme prakticky operácie na RDD:

IPL (indická Premier League) je kriketový turnaj, ktorý je na špičkovej úrovni. Poďme teda dnes na to, aby sme sa dostali k množine údajov IPL a vykonali naše RDD pomocou programu Spark.

  • Po prvé, stiahnime si údaje o zhode CSV s IPL. Po stiahnutí to začne vyzerať ako súbor EXCEL s riadkami a stĺpcami.

V ďalšom kroku zapálime iskru a načítame súbor match.csv z jeho umiestnenia, v mojom prípade môjhocsvumiestnenie súboru je „/User/edureka_566977/test/matches.csv“

Teraz začnime s Transformácia časť prvá:

  • mapa ():

Používame Transformácia mapy použiť konkrétnu transformačnú operáciu na každý prvok RDD. Tu vytvoríme RDD podľa názvu CKfile, kde uložíme našecsvspis. Vytvoríme ďalší RDD nazývaný Štáty uložiť podrobnosti o meste .

spark2-shell val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') CKfile.collect.foreach (println) val stavy = CKfile.map (_. split (',') (2)) state.collect (). foreach (println)

  • filter ():

Transformácia filtra, samotný názov popisuje jej použitie. Túto transformačnú operáciu používame na odfiltrovanie selektívnych údajov zo súboru poskytnutých údajov. Podávame žiadosť prevádzka filtra tu získate záznamy o zápasoch IPL v tomto roku 2017 a uložte ho do súboru RDD.

čo je to virtuálna funkcia
val fil = CKfile.filter (line => line.contains ('2017')) fil.collect (). foreach (println)

  • flatMap ():

Aplikujeme flatMap je transformačná operácia na každý z prvkov RDD, aby sa vytvoril nový RDDD. Je to podobné ako s transformáciou mapy. tu podávame žiadosťFlatmapdo vypľuť zápasy mesta Hyderabad a ukladať údaje dofilRDDRDD.

val filRDD = fil.flatMap (line => line.split ('Hyderabad')). collect ()

  • oddiel ():

Každé dáta, ktoré zapíšeme do RDD, sú rozdelené do určitého počtu oddielov. Túto transformáciu používame na nájdenie počet oddielov údaje sú skutočne rozdelené na.

val fil = CKfile.filter (line => line.contains ('2017')) fil.partitionss.size

  • mapPartitions ():

MapPatitions považujeme za alternatívu k Map () apre každý() spolu. Používame tu mapPartitions, aby sme našli počet riadkov máme v našom súbore RDD.

val fil = CKfile.filter (line => line.contains ('2016')) fil.mapPartitions (idx => Array (idx.size) .iterator). zbierka

  • removeBy ():

PoužívameZmenšiťBy() zapnuté Páry kľúč - hodnota . Túto transformáciu sme použili na našucsvsúbor nájsť prehrávač s najvyšší muž zápasov .

val ManOfTheMatch = CKfile.map (_. split (',') (13)) val MOTMcount = ManOfTheMatch.map (WINcount => (WINcount, 1)) val ManOTH = MOTMcount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) ManOTH.take (10) .foreach (println)

  • odbor ():

Názov to všetko vysvetľuje, transformáciu odborov používame na spojiť dve RDD dohromady . Tu vytvárame dva RDD, a to fil a fil2. fil RDD obsahuje záznamy o zhodách IPL z roku 2017 a fil2 RDD obsahuje záznam o zhode IPL z roku 2016.

val fil = CKfile.filter (line => line.contains ('2017')) val fil2 = CKfile.filter (line => line.contains ('2016')) val uninRDD = fil.union (fil2)

Začnime s Akcia časť, kde zobrazujeme skutočný výstup:

  • zbierať ():

Collect je akcia, ktorú zvykneme používať zobraziť obsah v RDD.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') CKfile.collect.foreach (println)

  • count ():

Grófje akcia, pomocou ktorej počítame počet záznamov prítomný v RDD.Tupomocou tejto operácie spočítame celkový počet záznamov v našom súbore match.csv.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') CKfile.count ()

  • take ():

Take je operácia akcie podobná zhromažďovaniu, ale jediný rozdiel je v tom, že môže vytlačiť ľubovoľné selektívny počet riadkov podľa požiadavky používateľa. Tu aplikujeme nasledujúci kód na tlač súboru prvých desať popredných správ.

val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.collect (). foreach (println) statecountm. take (10). foreach (println)

  • najprv():

First () je operácia akcie podobná príkazu collect () a take ()toslúži na tlač najvyššej správy s výstupom. Tu použijeme prvú operáciu () na nájdenie maximálny počet odohraných zápasov v konkrétnom meste a dostaneme Mumbai ako výstup.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') val stavy = CKfile.map (_. split (',') (2)) val Scount = states.map (Scount => ( Scount, 1)) scala & gt val statecount = Scount.reduceByKey ((x, y) => x + y) .col.foreach (println) Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.first ()

Aby bol náš proces nášho učenia sa RDD pomocou Sparku ešte zaujímavejší, vymyslel som zaujímavý prípad použitia.

RDD pomocou Spark: Prípad použitia Pokémona

  • Po prvé, Stiahneme si súbor Pokemon.csv a načítame ho do súboru spark-shell, ako sme to urobili do súboru Matches.csv.
val PokemonDataRDD1 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') PokemonDataRDD1.collect (). foreach (println)

C ++ Fibonacciho rekurzívne

Pokémoni sú v skutočnosti k dispozícii vo veľkej rozmanitosti. Nájdeme niekoľko odrôd.

  • Odstraňuje sa schéma zo súboru Pokemon.csv

Možno nebudeme potrebovať Schéma súboru Pokemon.csv. Preto ju odstránime.

val Head = PokemonDataRDD1.first () val NoHeader = PokemonDataRDD1.filter (line =>! line.equals (Head))

  • Nájdenie počtu priečky náš pokemon.csv je distribuovaný do.
println ('No.ofpartitions =' + NoHeader.partitions.size)

  • Vodný pokémon

Nájdenie počet vodných pokémonov

val WaterRDD = PokemonDataRDD1.filter (line => line.contains ('Water')) WaterRDD.collect (). foreach (println)

  • Ohnivý Pokémon

Nájdenie počet ohnivých pokémonov

val FireRDD = PokemonDataRDD1.filter (line => line.contains ('Fire')) FireRDD.collect (). foreach (println)

  • Môžeme tiež zistiť populácia iného typu pokémona pomocou funkcie count
WaterRDD.count () FireRDD.count ()

  • Keďže sa mi hra o obranná stratégia nájdeme pokémona s maximálna obrana.
val defenceList = NoHeader.map {x => x.split (',')}. mapa {x => (x (6) .toDouble)} println ('Highest_Defence:' + defenceList.max ())

  • Vieme maximum hodnota obrannej sily ale nevieme, o ktorého pokémona ide. Poďme teda zistiť, čo to je pokemon.
val defWithPokemonName = NoHeader.map {x => x.split (',')}. mapa {x => (x (6) .toDouble, x (1))} val MaxDefencePokemon = defWithPokemonName.groupByKey.takeOrdered (1) (Objednávanie [Double] .reverse.on (_._ 1)) MaxDefencePokemon.foreach (println)

  • Teraz vyriešime pokémona pomocou najmenej obrany
val minDefencePokemon = defenceList.distinct.sortBy (x => x.toDouble, true, 1) minDefencePokemon.take (5) .foreach (println)

  • Teraz sa pozrime na Pokémona s menej obranná stratégia.
val PokemonDataRDD2 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') val Head2 = PokemonDataRDD2.first () val NoHeader2 = PokemonDataRDD2.filter (line =>! line.equals (Head)) val defWithPokemoner2 = .map {x => x.split (',')}. mapa {x => (x (6) .toDouble, x (1))} val MinDefencePokemon2 = defWithPokemonName2.groupByKey.takeOrdered (1) (objednávanie [Double ] .on (_._ 1)) MinDefencePokemon2.foreach (tlač)

S týmto teda prichádzame na koniec tohto článku RDD pomocou článku Spark. Dúfam, že sme trochu osvetlili vaše znalosti o RDD, ich vlastnostiach a rôznych druhoch operácií, ktoré s nimi možno vykonávať.

Tento článok je založený na je navrhnutý tak, aby vás pripravil na certifikačnú skúšku Cloudera Hadoop a Spark Developer (CCA175). Získate dôkladné vedomosti o Apache Spark a ekosystéme Spark, ktoré zahŕňajú Spark RDD, Spark SQL, Spark MLlib a Spark Streaming. Získate komplexné vedomosti o programovacom jazyku Scala, HDFS, Sqoop, Flume, Spark GraphX ​​a systéme správ, ako je Kafka.