i tried write simple job 1 mapper , no reducer write data hbase. in mapper tried open connection hbase, write few rows of data table , close connection. in job driver using jobconf.setnummaptasks(1); , jobconf.setnumreducetasks(0); specify 1 mapper , no reducer executed. setting reducer class identityreducer in jobconf. strange behavior observing job writes data hbase table after see in logs continuously tried open connection hbase , closes connection goes on 20-30 minutes , after job declared have completed 100% success. @ end when check _success file created dummy data put in outputcollector.collect(...) see hundred of rows of dummy data when there should 1. following code job driver
public int run(string[] arg0) throws exception { configuration config = hbaseconfiguration.create(getconf()); ensurerequiredparametersexist(config); ensureoptionalparametersexist(config); jobconf jobconf = new jobconf(config, getclass()); jobconf.setjobname(config.get(etljobconstants.etl_job_name)); //set map specific configuration jobconf.setnummaptasks(1); jobconf.setmaxmapattempts(1); jobconf.setinputformat(textinputformat.class); jobconf.setmapperclass(singletonmapper.class); jobconf.setmapoutputkeyclass(longwritable.class); jobconf.setmapoutputvalueclass(text.class); //set reducer specific configuration jobconf.setreducerclass(identityreducer.class); jobconf.setoutputkeyclass(longwritable.class); jobconf.setoutputvalueclass(text.class); jobconf.setoutputformat(textoutputformat.class); jobconf.setnumreducetasks(0); //set job specific configuration details input file name etc fileinputformat.setinputpaths(jobconf, jobconf.get(etljobconstants.etl_job_file_input_path)); system.out.println("setting output path : " + jobconf.get(etljobconstants.etl_job_file_output_path)); fileoutputformat.setoutputpath(jobconf, new path(jobconf.get(etljobconstants.etl_job_file_output_path))); jobclient.runjob(jobconf); return 0; }
driver class extends configured , implements tool (i used sample definitive guide)following code in mapper class.
following code in mapper's map method open connection hbase, preliminary check make sure table exists , write rows , close table.
public void map(longwritable arg0, text arg1, outputcollector<longwritable, text> arg2, reporter arg3) throws ioexception { htable atable = null; hbaseadmin admin = null; try { arg3.setstatus("started"); /* * set-up hbase config */ admin = new hbaseadmin(conf); /* * open connection table */ string tablename = conf.get(etljobconstants.etl_job_table_name); htabledescriptor htd = new htabledescriptor(tobytes(tablename)); string colfamilyname = conf.get(etljobconstants.etl_job_table_column_family_name); byte[] tablename = htd.getname(); /* call function ensure table 'tablename' exists */ /* * loop , put file data table */ atable = new htable(conf, tablename); datarow row = /* logic generate data */ while (row != null) { byte[] rowkey = tobytes(row.getrowkey()); put put = new put(rowkey); (datanode node : row.getrowdata()) { put.add(tobytes(colfamilyname), tobytes(node.getnodename()), tobytes(node.getnodevalue())); } atable.put(put); arg3.setstatus("xoxoxoxoxoxoxoxoxoxoxoxo added data row hbase"); row = fileparser.getnextrow(); } atable.flushcommits(); arg3.setstatus("xoxoxoxoxoxoxoxoxoxoxoxo finished adding data hbase"); } { if (atable != null) { atable.close(); } if (admin != null) { admin.close(); } } arg2.collect(new longwritable(10), new text("something")); arg3.setstatus("xoxoxoxoxoxoxoxoxoxoxoxoadded dummy data collector"); }
as see around end writing dummy data collection in end (10, 'something') , see hundreds of rows of data in _success file after job has terminated. can't identify why mapper code restarted multiple times on , on instead of running once. appreciated.
using jobconf.setnummaptasks(1)
saying hadoop wish use 1 mapper, if possible, unlike setnumreducetasks
, defines number specified.
that's why more mappers run , observe these numbers.
for more details, please read this post.
