hadoop - How to effectively reduce the length of input to mapper -
my data has 20 fields in schema. first 3 fields important me far map reduce program concerned. how can decrease size of input mapper first 3 fields received.
1,2,3,4,5,6,7,8...20 columns in schema. want 1,2,3 in mapper process offset , value.
note cant use pig other map reduce logic implemented in map reduce.
you need custom recordreader
:
public class trimmedrecordreader implements recordreader<longwritable, text> { private linerecordreader linereader; private longwritable linekey; private text linevalue; public trimmedrecordreader(jobconf job, filesplit split) throws ioexception { linereader = new linerecordreader(job, split); linekey = linereader.createkey(); linevalue = linereader.createvalue(); } public boolean next(longwritable key, text value) throws ioexception { if (!linereader.next(linekey, linevalue)) { return false; } string[] fields = linevalue.tostring().split(","); if (fields.length < 3) { throw new ioexception("invalid record received"); } value.set(fields[0] + "," + fields[1] + "," + fields[2]); return true; } public longwritable createkey() { return linereader.createkey(); } public text createvalue() { return linereader.createvalue(); } public long getpos() throws ioexception { return linereader.getpos(); } public void close() throws ioexception { linereader.close(); } public float getprogress() throws ioexception { return linereader.getprogress(); } }
it should pretty self-explanatory, wrap of linerecordreader
. unfortunately, invoke need extend inputformat
too. following enough :
public class trimmedtextinputformat extends fileinputformat<longwritable, text> { public recordreader<longwritable, text> getrecordreader(inputsplit input, jobconf job, reporter reporter) throws ioexception { reporter.setstatus(input.tostring()); return new trimmedrecordreader(job, (filesplit) input); } }
just don't forget set in driver.
Comments
Post a Comment