DBInputFormat na prenos údajov z databázy SQL do databázy NoSQL



Cieľom tohto blogu je naučiť sa, ako prenášať údaje z databáz SQL do HDFS, ako prenášať údaje z databáz SQL do databáz NoSQL.

V tomto blogu preskúmame možnosti a možnosti jednej z najdôležitejších súčastí technológie Hadoop, t. J. MapReduce.

Spoločnosti dnes prijímajú rámec Hadoop ako svoju prvú voľbu pre ukladanie dát kvôli jeho schopnostiam efektívne spracovávať veľké dáta. Vieme však tiež, že údaje sú všestranné a existujú v rôznych štruktúrach a formátoch. Na kontrolu takého veľkého množstva údajov a ich rôznych formátov by mal existovať mechanizmus, ktorý by vyhovoval všetkým rozmanitostiam, a pritom priniesol efektívny a konzistentný výsledok.





Najvýkonnejšou súčasťou v rámci Hadoop je MapReduce, ktorý dokáže poskytnúť kontrolu nad dátami a ich štruktúrou lepšie ako ostatní kolegovia. Aj keď to vyžaduje réžiu krivky učenia a zložitosť programovania, ak dokážete zvládnuť tieto zložitosti, môžete s Hadoopom určite zvládnuť akýkoľvek druh údajov.

Rámec MapReduce rozdeľuje všetky svoje úlohy spracovania na dve základné fázy: Map a Reduce.



Príprava nespracovaných údajov pre tieto fázy vyžaduje pochopenie niektorých základných tried a rozhraní. Super trieda pre toto prepracovanie je InputFormat.

The InputFormat class je jednou z hlavných tried v rozhraní Hadoop MapReduce API. Táto trieda je zodpovedná za definovanie dvoch hlavných vecí:

  • Rozdelenie údajov
  • Čítačka záznamov

Rozdelenie údajov je základný koncept v rámci Hadoop MapReduce, ktorý definuje veľkosť jednotlivých úloh máp aj jeho potenciálny server na vykonávanie. The Čítačka záznamov je zodpovedný za skutočné načítanie záznamov zo vstupného súboru a ich odoslanie (ako páry kľúč / hodnota) mapovaču.



O počte mapovačov sa rozhoduje na základe počtu rozdelení. Úlohou InputFormatu je vytvárať rozdelenia. Väčšina času rozdeleného času je ekvivalentná veľkosti bloku, ale nie vždy sa rozdelenie vytvorí na základe veľkosti bloku HDFS. Celkom to závisí od toho, ako bola prepísaná metóda getSplits () vášho InputFormatu.

Medzi MR splitom a HDFS blokom je zásadný rozdiel. Blok je fyzický blok údajov, zatiaľ čo rozdelenie je iba logický blok, ktorý číta mapovač. Rozdelenie neobsahuje vstupné údaje, obsahuje iba referenciu alebo adresu údajov. Rozdelenie má v zásade dve veci: dĺžku v bajtoch a množinu ukladacích miest, ktoré sú iba reťazcami.

Aby sme tomu lepšie porozumeli, vezmime si jeden príklad: Spracovanie údajov uložených vo vašej MySQL pomocou MR. Pretože v tomto prípade neexistuje koncept blokov, teória: „rozdelenia sa vždy vytvárajú na základe bloku HDFS“,zlyhá. Jednou z možností je vytvoriť rozdelenia na základe rozsahov riadkov vo vašej tabuľke MySQL (a to robí DBInputFormat, vstupný formát na čítanie údajov z relačných databáz). Môžeme mať k počet rozdelení pozostávajúcich z n riadkov.

Rozdelenia sa vytvárajú iba pre InputFormats založené na FileInputFormat (InputFormat na spracovanie údajov uložených v súboroch) na základe celkovej veľkosti vstupných súborov v bajtoch. S veľkosťou bloku vstupných súborov FileSystem sa však zaobchádza ako s hornou hranicou vstupných rozdelení. Ak máte súbor menší ako veľkosť bloku HDFS, získate za tento súbor iba 1 mapovač. Ak sa chcete chovať inak, môžete použiť mapu.red.min.split.size. Opäť to však závisí výlučne od getSplits () vášho InputFormatu.

čo je * v sql

V balíku org.apache.hadoop.mapreduce.lib.input máme k dispozícii toľko už existujúcich vstupných formátov.

CombineFileInputFormat.html

CombineFileRecordReader.html

CombineFileRecordReaderWrapper.html

CombineFileSplit.html

CombineSequenceFileInputFormat.html

CombineTextInputFormat.html

FileInputFormat.html

FileInputFormatCounter.html

FileSplit.html

FixedLengthInputFormat.html

InvalidInputException.html

KeyValueLineRecordReader.html

KeyValueTextInputFormat.html

MultipleInputs.html

NLineInputFormat.html

SequenceFileAsBinaryInputFormat.html

SequenceFileAsTextInputFormat.html

SequenceFileAsTextRecordReader.html

SequenceFileInputFilter.html

SequenceFileInputFormat.html

SequenceFileRecordReader.html

TextInputFormat.html

Predvolená hodnota je TextInputFormat.

Podobne máme toľko výstupných formátov, ktoré načítajú údaje z reduktorov a ukladajú ich do HDFS:

FileOutputCommitter.html

FileOutputFormat.html

FileOutputFormatCounter.html

FilterOutputFormat.html

LazyOutputFormat.html

MapFileOutputFormat.html

MultipleOutputs.html

NullOutputFormat.html

PartialFileOutputCommitter.html

PartialOutputCommitter.html

SequenceFileAsBinaryOutputFormat.html

SequenceFileOutputFormat.html

TextOutputFormat.html

Predvolené je TextOutputFormat.

Po prečítaní tohto blogu by ste sa dozvedeli:

  • Ako napísať program na zmenšenie mapy
  • O rôznych typoch InputFormatov dostupných v Mapreduce
  • Aká je potreba InputFormats
  • Ako písať vlastné InputFormáty
  • Ako prenášať údaje z databáz SQL do HDFS
  • Ako prenášať údaje z databáz SQL (tu MySQL) do databáz NoSQL (tu Hbase)
  • Ako prenášať údaje z jednej databázy SQL do druhej tabuľky v databázach SQL (Možno to nemusí byť až také dôležité, ak to urobíme v tej istej databáze SQL. Nie je však nič zlé na tom, že viete o tých istých. Nikdy neviete ako sa môže začať používať)

Predpoklad:

c ++ volanie odkazom
  • Predinštalovaný Hadoop
  • Predinštalovaný SQL
  • Predinštalovaný Hbase
  • Základné porozumenie Java
  • MapReduce knowledge
  • Základné vedomosti o rámci Hadoop

Poďme pochopiť problémové vyhlásenie, ktoré tu budeme riešiť:

V našej relačnej databáze Edureka máme tabuľku zamestnancov v MySQL DB. Podľa obchodných požiadaviek teraz musíme presunúť všetky údaje dostupné v relačnej databáze do súborového systému Hadoop, t. J. HDFS, NoSQL DB známej ako Hbase.

Máme veľa možností, ako túto úlohu vykonať:

  • Sqoop
  • Žlab
  • MapReduce

Teraz už nechcete inštalovať a konfigurovať žiadny ďalší nástroj pre túto operáciu. Zostáva vám iba jedna možnosť, a to spracovateľský rámec Hadoop MapReduce. Rámec MapReduce vám dá počas prenosu úplnú kontrolu nad dátami. So stĺpcami môžete manipulovať a umiestniť ich priamo na ktorékoľvek z dvoch cieľových miest.

Poznámka:

  • Musíme načítať a vložiť konektor MySQL do triedy Hadoop, aby sme načítali tabuľky z tabuľky MySQL. Za týmto účelom si stiahnite konektor com.mysql.jdbc_5.1.5.jar a uložte ho do adresára Hadoop_home / share / Hadoop / MaPreduce / lib.
cp súbory na stiahnutie / com.mysql.jdbc_5.1.5.jar $ HADOOP_HOME / share / hadoop / mapreduce / lib /
  • Tiež umiestnite všetky nádoby Hbase pod triedu Hadoop, aby váš program MR získal prístup k Hbase. Vykonáte to nasledujúcim príkazom :
cp $ HBASE_HOME / lib / * $ HADOOP_HOME / share / hadoop / mapreduce / lib /

Verzie softvéru, ktoré som použil pri vykonávaní tejto úlohy, sú:

  • Hadooop - 2.3.0
  • HBase 0.98.9-Hadoop2
  • Zatmenie Mesiaca

Aby som predišiel problémom s kompatibilitou, predpisujem svojim čitateľom spustenie príkazu v podobnom prostredí.

Vlastný DBInputWritable:

balík com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable public class DBInputWritable implements Writable, DBWritable {private int id private String name, dept public void readFields (DataInput in) throws IOException {} public void readFields (ResultSet rs) vyvolá SQLException // Objekt Resultset predstavuje dáta vrátené z príkazu SQL {id = rs.getInt (1) name = rs.getString (2) dept = rs.getString (3)} public void write (DataOutput out) hodí IOException { } public void write (PreparedStatement ps) hodí SQLException {ps.setInt (1, id) ps.setString (2, name) ps.setString (3, odd)} public int getId () {return id} public String getName () {return name} public String getDept () {return dept}}

Vlastný DBOutputWritable:

balík com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable public class DBOutputWritable implements Writable, DBWritable {private String name private int id private String dept public DBOutputWritable (String name, int id, String dept) {this.name = name this.id = id this.dept = dept} public void readFields (DataInput in) throws IOException {} public void readFields (ResultSet rs) throws SQLException {} public void write (DataOutput out) throws IOException {} public void write (PreparedStatement ps) hodí SQLException {ps.setString (1, meno) ps.setInt (2, id) ps.setString (3, odd)}}

Vstupná tabuľka:

vytvoriť databázu edureka
vytvoriť tabuľku emp (empid int nie null, názov varchar (30), odd varchar (20), primárny kľúč (empid))
vložte do emp hodnoty (1, 'abhay', 'vývoj'), (2, 'brundesh', 'test')
vyberte * z emp

Prípad 1: Prenos z MySQL na HDFS

balík com.inputFormat.copy import java.net.URI import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce .Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib.db.DBInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop .io.Text import org.apache.hadoop.io.IntWritable public class MainDbtohdfs {public static void main (String [] args) throws Exception {Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc .Driver ', // class driver' jdbc: mysql: // localhost: 3306 / edureka ', // db url' root ', // user name' root ') // password Job job = new Job (conf) job .setJarByClass (MainDbtohdfs.class) job.setMapperClass (Map.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWritable.class) job.setInputFormatClass (DBInputFormat.class) File.utput, new Path (args [0])) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // vstupný názov tabuľky null, null, nový reťazec [] {'empid', 'name', 'odd'} / / stĺpce tabuľky) Cesta p = nová Cesta (args [0]) FileSystem fs = FileSystem.get (nový URI (args [0]), conf) fs.delete (p) System.exit (job.waitForCompletion (true)? 0: 1)}}

Tento kúsok kódu nám umožňuje pripraviť alebo nakonfigurovať vstupný formát na prístup k nášmu zdrojovému SQL DB. Parameter obsahuje triedu ovládača, adresa URL má adresu databázy SQL, jeho používateľské meno a heslo.

DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // trieda ovládača 'jdbc: mysql: // localhost: 3306 / edureka', // db url 'root', // meno používateľa 'root') // heslo

Tento kúsok kódu nám umožňuje odovzdať podrobnosti tabuliek v databáze a nastaviť ich v objekte úlohy. Medzi tieto parametre patrí samozrejme inštancia úlohy, vlastná zapisovateľná trieda, ktorá musí implementovať rozhranie DBWritable, názov zdrojovej tabuľky, podmienka, ak nejaké iné majú hodnotu null, akékoľvek parametre triedenia, iná iná hodnota, zoznam stĺpcov tabuľky.

DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // názov vstupnej tabuľky null, null, nový reťazec [] {'empid', 'name', 'odd'} // stĺpce tabuľky)

Mapovač

balík com.inputFormat.copy import java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.io .IntWritable mapa verejnej triedy rozširuje Mapper {
chránená mapa neplatných oblastí (kľúč LongWritable, hodnota DBInputWritable, kontext ctx) {try {názov reťazca = value.getName () IntWritable id = nový IntWritable (value.getId ()) reťazec dept = value.getDept ()
ctx.write (nový text (meno + „+ id +“ + odd.), id)
} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

Redukcia: Použitá redukcia identity

Príkaz na spustenie:

hadoop jar dbhdfs.jar com.inputFormat.copy.MainDbtohdfs / dbtohdfs

Výstup: Tabuľka MySQL prevedená na HDFS

hadoop dfs -ls / dbtohdfs / *

Prípad 2: Prenos z jednej tabuľky v MySQL do inej v MySQL

vytvorenie výstupnej tabuľky v MySQL

vytvoriť tabuľku zamestnanec1 (meno varchar (20), id int, odd varchar (20))

balík com.inputFormat.copy import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib .db.DBInputFormat import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io.NullWritable verejná trieda Mainonetable_to_other_table {public static void main (String [] args) throws Exception {Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // driver class 'jdbc: mysql: // localhost : 3306 / edureka ', // db url' root ', // meno používateľa' root ') // heslo Job job = new Job (conf) job.setJarByClass (Mainonetable_to_other_table.class) job.setMapperClass (Map.class) job .setReducerClass (Reduce.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWritable.class) job.setOutputKeyClass (DBOutputWritable.class) job.setOutputValueClass (Nul lWritable.class) job.setInputFormatClass (DBInputFormat.class) job.setOutputFormatClass (DBOutputFormat.class) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // názov vstupnej tabuľky null, null, nový reťazec [] {'empid ',' name ',' odd '} // stĺpce tabuľky) DBOutputFormat.setOutput (úloha,' zamestnanec1 ', // názov výstupnej tabuľky nový String [] {' name ',' id ',' odd '} // tabuľka stĺpce) System.exit (job.waitForCompletion (true)? 0: 1)}}

Tento kúsok kódu nám umožňuje nakonfigurovať názov výstupnej tabuľky v SQL DB. Parametre sú inštancia úlohy, názov výstupnej tabuľky a názvy výstupných stĺpcov.

DBOutputFormat.setOutput (job, 'employee1', // názov výstupnej tabuľky nový reťazec [] {'name', 'id', 'odd'} // stĺpce tabuľky)

Mapovač: To isté ako prípad 1

Redukcia:

balík com.inputFormat.copy import java.io.IOException import org.apache.hadoop.mapreduce.Reducer import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io .NullWritable public class Reduce extends Reducer {protected void reduce (Text key, Iterable values, Context ctx) {int sum = 0 String line [] = key.toString (). Split ('') try {ctx.write (new DBOutputWritable (riadok [0] .toString (), Integer.parseInt (riadok [1] .toString ()), riadok [2] .toString ()), NullWritable.get ())} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

Príkaz na spustenie:

hadoop jar dbhdfs.jar com.inputFormat.copy.Mainonetable_to_other_table

Výstup: Prenesené údaje z tabuľky EMP v MySQL na iného zamestnanca tabuľky 1 v MySQL

Prípad 3: Prenos z tabuľky v MySQL do tabuľky NoSQL (Hbase)

Vytváranie tabuľky Hbase na prispôsobenie výstupu z tabuľky SQL:

vytvoriť 'zamestnanec', 'official_info'

Trieda vodiča:

balík Dbtohbase import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib.db.DBInputFormat import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.HTable import org.apache.hadoop.hbase.client.HTableInterface import org.apache .hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil import org.apache.hadoop.io.Text verejná trieda MainDbToHbase {public static void main (String [] args) vyvolá výnimku {Configuration conf = HBaseConfiguration.create () HTableInterface mytable = new HTable (conf, 'emp') DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // driver class 'jdbc: mysql: // localhost: 3306 / edureka' , // db url 'root', // meno používateľa 'root') // heslo Job job = new Job (conf, 'dbtohbase') job.setJarByClass (MainDbToHbase.class) job.s etMapperClass (Map.class) job.setMapOutputKeyClass (ImmutableBytesWritable.class) job.setMapOutputValueClass (Text.class) TableMapReduceUtil.initTableReducerJob ('employee', Reduce.class, job) job.setInputForm. trieda) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // názov vstupnej tabuľky null, null, nový reťazec [] {'empid', 'name', 'odd'} // stĺpce tabuľky) System.exit (job.waitForCompletion (true)? 0: 1)}}

Tento kúsok kódu vám umožňuje nakonfigurovať triedu výstupného kľúča, ktorá je v prípade hbase ImmutableBytesWritable

job.setMapOutputKeyClass (ImmutableBytesWritable.class) job.setMapOutputValueClass (Text.class)

Tu odovzdávame názov tabuľky hbase a redukciu, ktorá pôsobí na stôl.

TableMapReduceUtil.initTableReducerJob ('employee', Reduce.class, job)

Mapovač:

balík Dbtohbase import java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.io . LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable public class Map extends Mapper {private IntWritable one = new IntWritable (1) protected void map (LongWritable id, DBInputWritable value, Context context) {try {String line = value.getName () String cd = value.getId () + '' String dept = value.getDept () context.write (new ImmutableBytesWritable (Bytes.toBytes (cd)), nový text (line + ') '+ odd.))} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

V tejto časti kódu berieme hodnoty z kariet triedy DBinputwritable a potom ich odovzdávame
ImmutableBytesWritable, takže sa dostanú k reduktoru v bytewriatble podobe, ktorej Hbase rozumie.

String line = value.getName () String cd = value.getId () + '' String dept = value.getDept () context.write (new ImmutableBytesWritable (Bytes.toBytes (cd)), nový text (line + '' + odd. ))

Redukcia:

balík Dbtohbase import java.io.IOException import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableReducer import org.apache.hadoop .hbase.util.Bytes import org.apache.hadoop.io.Text verejná trieda Zmenšiť rozširuje TableReducer {public void redukovať (ImmutableBytesWritable kľúč, Iterovateľné hodnoty, kontextový kontext) vyvolá IOException, InterruptedException {String [] príčina = hodnoty Loop for (Text val: values) {Cause = val.toString (). split ('')} // Put to HBase Put put = new Put (key.get ()) put.add (Bytes.toBytes ('official_info') ), Bytes.toBytes ('name'), Bytes.toBytes (príčina [0])) put.add (Bytes.toBytes ('official_info'), Bytes.toBytes ('department'), Bytes.toBytes (príčina [1) ])) context.write (kľúč, vložiť)}}

Táto časť kódu nám umožňuje rozhodnúť sa o presnom riadku a stĺpci, do ktorého by sme ukladali hodnoty z reduktora. Tu ukladáme každý empid do samostatného riadku, pretože sme vytvorili empid ako kľúč riadku, ktorý by bol jedinečný. V každom riadku ukladáme oficiálne informácie zamestnancov do stĺpca rodina „official_info“ do stĺpcov „meno“ a „oddelenie“.

Put put = new Put (key.get ()) put.add (Bytes.toBytes ('official_info'), Bytes.toBytes ('name'), Bytes.toBytes (príčina [0])) put.add (bajty). toBytes ('official_info'), Bytes.toBytes ('department'), Bytes.toBytes (príčina [1])) context.write (klávesa, vložiť)

Prenesené údaje v Hbase:

skenovať zamestnanca

Ako vidíme, úspešne sme dokončili úlohu migrácie našich obchodných údajov z relačnej databázy SQL do databázy NoSQL.

V nasledujúcom blogu sa dozvieme, ako písať a spúšťať kódy pre ďalšie vstupné a výstupné formáty.

java na moc operatora

Neustále zverejňujte svoje komentáre, otázky alebo spätnú väzbu. Rád by som od vás počul.

Máte na nás otázku? Uveďte to prosím v sekcii komentárov a my sa vám ozveme.

Súvisiace príspevky: