hadoop - DBInputWritable throwing Exception -
import java.io.*; import java.sql.preparedstatement; import java.sql.resultset; import java.sql.sqlexception; import org.apache.hadoop.io.writable; import org.apache.hadoop.mapreduce.lib.db.dbwritable; public class dbinputwritable implements writable, dbwritable { string symbol; string date; double open; double high; double low; double close; int volume; double adjclose; //private final static simpledateformat sdf = new simpledateformat("yyyy-mm-dd"); public void readfields(datainput in) throws ioexception { symbol=in.readline(); date=in.readline(); open=in.readdouble(); high=in.readdouble(); low=in.readdouble(); close=in.readdouble(); volume=in.readint(); adjclose=in.readdouble(); } public void readfields(resultset rs) throws sqlexception { symbol = rs.getstring(2); date = rs.getstring(3); open = rs.getdouble(4); high = rs.getdouble(5); low = rs.getdouble(6); close = rs.getdouble(7); volume = rs.getint(8); adjclose = rs.getdouble(9); } public void write(dataoutput out) throws ioexception { } public void write( preparedstatement ps) throws sqlexception { } public string getsymbol() { return symbol; } public string getdate() { return date; } public double getopen() { return open; } public double gethigh() { return high; } public double getlow() { return low; } public double getclose() { return close; } public int getvolume() { return volume; } public double getadjclose() { return adjclose; } } public class dboutputwritable implements writable, dbwritable { string symbol; string date; double open; double high; double low; double close; int volume; double adjclose; ; public dboutputwritable(string symbol,string date,string open,string high,string low,string close,string volume,string adjclose) { this.symbol=symbol; this.date=date; this.open=double.parsedouble(open); this.high=double.parsedouble(high); this.low=double.parsedouble(low); this.close=double.parsedouble(close); this.volume=integer.parseint(volume); this.adjclose=double.parsedouble(adjclose); } public void readfields(datainput in) throws ioexception { } public void readfields(resultset rs) throws sqlexception { } public void write(dataoutput out) throws ioexception { out.writechars(symbol); out.writechars(date); out.writedouble(open); out.writedouble(high); out.writedouble(low); out.writedouble(close); out.writeint(volume); out.writedouble(adjclose); } public void write(preparedstatement ps) throws sqlexception { ps.setstring(1,symbol); ps.setstring(2,date); ps.setdouble(3,open); ps.setdouble(4,high); ps.setdouble(5,low); ps.setdouble(6,close); ps.setint(7,volume); ps.setdouble(8,adjclose); } } public class map extends mapper<longwritable,dbinputwritable,text,text> { public void map(longwritable key, dbinputwritable value, context ctx) { try { text set; set= new text(value.getdate()); string line = value.getsymbol()+","+value.getdate()+","+value.getopen()+","+value.gethigh()+","+value.getlow()+","+value.getclose()+","+value.getvolume()+","+value.getadjclose(); ctx.write(set,new text(line)); } catch(ioexception e) { e.printstacktrace(); } catch(interruptedexception e) { e.printstacktrace(); } } } public class reduce extends reducer<text, text, dboutputwritable, nullwritable> { public void reduce(text key, text value, context ctx) { try { string []line= value.tostring().split(","); string sym=line[0]; string dt=line[1]; string opn=line[2]; string hgh=line[3]; string lw=line[4]; string cls=line[5]; string vlm=line[6]; string adcls=line[7]; ctx.write(new dboutputwritable(sym,dt,opn,hgh,lw,cls,vlm,adcls),nullwritable.get()); } catch(ioexception e) { e.printstacktrace(); } catch(interruptedexception e) { e.printstacktrace(); } } } public class main { public static void main(string [] args) throws exception { configuration conf = new configuration(); dbconfiguration.configuredb(conf, "com.mysql.jdbc.driver", //driver class "jdbc:mysql://192.168.198.128:3306/testdb", //db url "sqoopuser", //username "passphrase"); //password job job = new job(conf); job.setjarbyclass(main.class); job.setmapperclass(map.class); job.setreducerclass(reduce.class); job.setmapoutputkeyclass(text.class); job.setmapoutputvalueclass(text.class); job.setoutputkeyclass(dboutputwritable.class); job.setoutputvalueclass(nullwritable.class); job.setinputformatclass(dbinputformat.class); job.setoutputformatclass(dboutputformat.class); dbinputformat.setinput( job, dbinputwritable.class, "aapldata", //input table name null, null, new string[] {"stock","symbol", "date" ,"open", "high", "low", "close", "volume", "adjclose"} //table columns ); dboutputformat.setoutput( job, "aapldatanew", //output table name new string[] {"symbol", "date" ,"open", "high", "low", "close", "volume", "adjclose"} //table columns ); system.exit(job.waitforcompletion(true) ? 0 : 1); } }
i think code picture perfect. still encounter below error:
14/11/26 22:09:47 info mapred.jobclient: map 100% reduce 0% 14/11/26 22:09:55 info mapred.jobclient: map 100% reduce 33% 14/11/26 22:09:58 info mapred.jobclient: task id : attempt_201411262208_0001_r_000000_2, status : failed java.lang.classcastexception: org.apache.hadoop.io.text cannot cast org.apache.hadoop.mapreduce.lidb.dbwritable @ org.apache.hadoop.mapreduce.lib.db.dboutputformat$dbrecordwriter.write(dboutputformat.java:66 @ org.apache.hadoop.mapred.reducetask$newtrackingrecordwriter.write(reducetask.java:586) @ org.apache.hadoop.mapreduce.taskinputoutputcontext.write(taskinputoutputcontext.java:80) @ org.apache.hadoop.mapreduce.reducer.reduce(reducer.java:156) @ org.apache.hadoop.mapreduce.reducer.run(reducer.java:177) @ org.apache.hadoop.mapred.reducetask.runnewreducer(reducetask.java:649) @ org.apache.hadoop.mapred.reducetask.run(reducetask.java:418) @ org.apache.hadoop.mapred.child$4.run(child.java:255) @ java.security.accesscontroller.doprivileged(native method) @ javax.security.auth.subject.doas(subject.java:415) @ org.apache.hadoop.security.usergroupinformation.doas(usergroupinformation.java:1190) @ org.apache.hadoop.mapred.child.main(child.java:249)
need valuable insights.
in map
class input text instead of dbinputwritable
:
public class map extends mapper { public void map(longwritable key,text value, context ctx)
Comments
Post a Comment