Kumulatívna stavová transformácia v streamovaní Apache Spark



Tento blogový príspevok pojednáva o stavových transformáciách v Spark Streamingu. Dozviete sa všetko o kumulatívnom sledovaní a zručnostiach pre kariéru Hadoop Spark.

Prispel Prithviraj Bose

V mojom predchádzajúcom blogu som diskutoval o stavových transformáciách využívajúcich koncept vytvárania okien Apache Spark Streaming. Môžete si to prečítať tu .





V tomto príspevku budem diskutovať o kumulatívnych stavových operáciách v Apache Spark Streaming. Ak ste v službe Spark Streaming nováčikom, dôrazne vám odporúčam prečítať si môj predchádzajúci blog, aby ste pochopili, ako funguje vytváranie okien.

Typy stavovej transformácie v prúdení iskier (pokračovanie ...)

> Kumulatívne sledovanie

Použili sme reduceByKeyAndWindow (…) API na sledovanie stavov kľúčov, avšak vytváranie okien predstavuje pre určité prípady použitia obmedzenia. Čo ak chceme akumulovať stavy klávesov v celom rozsahu a nie ich obmedzovať na časové okno? V takom prípade by sme museli použiť updateStateByKey (…) POŽIAR.



trieda __init__ python

Toto API bolo predstavené v Sparku 1.3.0 a bolo veľmi populárne. Toto API má však určitú réžiu výkonu, jeho výkon sa zhoršuje, pretože veľkosť stavov sa časom zvyšuje. Napísal som ukážku, ktorá ukazuje použitie tohto API. Kód nájdete tu .

Spark 1.6.0 predstavil nové API mapWithState (…) ktorý rieši výkonové réžie predstavované updateStateByKey (…) . V tomto blogu budem diskutovať o tomto konkrétnom API pomocou ukážkového programu, ktorý som napísal. Kód nájdete tu .

Predtým, ako sa ponorím do ukážky kódu, ušetríme si pár slov o kontrolnom stanovisku. Pre každú stavovú transformáciu je kontrolná bodka povinná. Kontrolná bodka je mechanizmus na obnovenie stavu kľúčov v prípade zlyhania programu ovládača. Po reštartovaní ovládača sa stav kľúčov obnoví zo súborov kontrolných bodov. Umiestnenia kontrolných bodov sú zvyčajne HDFS alebo Amazon S3 alebo akékoľvek spoľahlivé úložisko. Počas testovania kódu je možné ukladať aj do lokálneho súborového systému.



Vo vzorovom programe počúvame textový prúd soketu na hostiteľovi = localhost a porte = 9999. Tokenizuje prichádzajúci prúd do (slová, počet výskytov) a sleduje počet slov pomocou rozhrania 1.6.0 API mapWithState (…) . Ďalej sa kľúče bez aktualizácií odstraňujú pomocou StateSpec.timeout API. Kontrolujeme bodovanie v HDFS a frekvencia kontrolných bodov je každých 20 sekúnd.

Poďme si najskôr vytvoriť reláciu Spark Streaming,

Spark-streaming-session

Vytvárame a checkpointDir v HDFS a potom zavolajte metódu objektu getOrCreate (...) . The getOrCreate API kontroluje checkpointDir aby sme zistili, či existujú nejaké predchádzajúce stavy na obnovenie, ak existujú, obnoví reláciu Spark Streaming a aktualizuje stavy kľúčov z údajov uložených v súboroch pred prechodom na nové údaje. Inak vytvorí novú reláciu Spark Streaming.

The getOrCreate vezme názov adresára kontrolného bodu a funkciu (ktorú sme pomenovali createFunc ) ktorého podpis by mal byť () => StreamingContext .

Pozrime sa na kód vo vnútri createFunc .

Riadok č. 2: Vytvoríme streamovací kontext s názvom úlohy na „TestMapWithStateJob“ a dávkovým intervalom = 5 sekúnd.

Riadok č. 5: Nastavte adresár kontrolného bodu.

ako používať charat v jave -

Riadok č. 8: Nastavte špecifikáciu stavu pomocou triedy org.apache.streaming.StateSpec objekt. Najskôr nastavíme funkciu, ktorá bude sledovať stav, potom nastavíme počet oddielov pre výsledné DStreamy, ktoré sa majú generovať počas následných transformácií. Nakoniec sme nastavili časový limit (na 30 sekúnd), kde ak nepríde aktualizácia pre kľúč do 30 sekúnd, potom bude stav kľúča odstránený.

Riadok 12 #: Nastavte prúd zásuvky, vyrovnajte prichádzajúce dávkové údaje, vytvorte pár kľúč - hodnota, zavolajte mapWithState , nastavte interval kontrolného bodu na 20 s a nakoniec vytlačte výsledky.

Rámec Spark nazýva th e createFunc pre každý kľúč s predchádzajúcou hodnotou a aktuálnym stavom. Vypočítame súčet a stav aktualizujeme kumulatívnym súčtom a nakoniec vrátime súčet pre kľúč.

ako používať spyder python

Zdroje Github -> TestMapStateWithKey.scala , TestUpdateStateByKey.scala

Máte na nás otázku? Uveďte to prosím v sekcii komentárov a my sa vám ozveme.

Súvisiace príspevky:

Začíname s programami Apache Spark a Scala

Stavové transformácie s vytváraním okien v streamovaní iskier