深度学习
pom.xml中内容如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.lenovo</groupId> <artifactId>random</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>random</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <kafka.version>0.10.0.1</kafka.version> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>${kafka.version}</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.7.3</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro-ipc</artifactId> <version>1.7.3</version> </dependency> </dependencies> <build> <sourceDirectory>src/main/java</sourceDirectory> <testSourceDirectory>src/test/java</testSourceDirectory> <plugins> <!-- Bind the maven-assembly-plugin to the package phase this will create a jar file without the storm dependencies suitable for deployment to a cluster. --> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass></mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
KafkaConsumerObject.java类内容如下:
package com.lenovo.random; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; public class KafkaConsumerObject { public static void main(String[] args) { String topic = "gaofeng101"; // 定义要操作的主题 Properties pro = new Properties(); // 定义相应的属性保存 pro.setProperty("zookeeper.connect", "127.0.0.1:2181"); //这里根据实际情况填写你的zk连接地址 pro.setProperty("metadata.broker.list", "127.0.0.1:9092"); //根据自己的配置填写连接地址 //pro.setProperty("group.id", "group1"); pro.setProperty("group.id", "test-consumer-groups01"); ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(pro)); // 需要定义一个主题的映射的存储集合 Map<String,Integer> topicMap = new HashMap<String,Integer>() ; topicMap.put(topic, 1) ; // 设置要读取数据的主题 Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicMap) ; // 现在只有一个主题,所以此处只接收第一个主题的数据即可 // System.err.println(messageStreams.size()); // System.out.println(messageStreams); KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0) ; // 第一个主题 ConsumerIterator<byte[], byte[]> iter = stream.iterator(); int i=0; while(iter.hasNext()) { i++; // MsgEntity me = (MsgEntity)BeanUtils.BytesToObject(iter.next().message()); //接收消息,并将字节数组转换为对象 Member me = (Member)BeanUtils.BytesToObject(iter.next().message()); //接收消息,并将字节数组转换为对象 System.out.println("........................................................"); System.out.println("Consumer接收到的消息如下:"); me.display(); System.out.println("Consumer接收第"+i+"个消息!"); } } }
KafkaProducerObject.java类内容如下:
package com.lenovo.random; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.PrintWriter; import java.net.Socket; import java.net.UnknownHostException; import java.util.Date; import java.util.Properties; import java.util.concurrent.TimeUnit; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import java.net.Socket; @SuppressWarnings("deprecation") public class KafkaProducerObject { public static void main(String[] args) throws UnknownHostException, IOException { String topic = "gaofeng101"; // 定义要操作的主题 Properties pro = new Properties(); // 定义相应的属性保存 pro.setProperty("zookeeper.connect", "127.0.0.1:2181"); //这里根据实际情况填写你的zk连接地址 pro.setProperty("metadata.broker.list", "127.0.0.1:9092"); //根据自己的配置填写连接地址 pro.setProperty("serializer.class", ObjectEncoder.class.getName()); //填写刚刚自定义的Encoder类 Producer<Integer, Object> prod = new Producer<Integer, Object>(new ProducerConfig(pro)); char []buf= {0x1d,0x1c,0x1b,0x1a}; //创建Socket对象,连接服务器 Socket socket=new Socket("127.0.0.1",8888); //通过客户端的套接字对象Socket方法,获取字节输出流,将数据写向服务器 OutputStream out=socket.getOutputStream(); PrintWriter pw = new PrintWriter(out); pw.write(buf); pw.flush(); //读取服务器发回的数据,使用socket套接字对象中的字节输入流 InputStream in=socket.getInputStream(); byte[] data=new byte[1024]; for(int i=1;i<=10000;i++) { int len=in.read(data); if(len<=0) { break; } System.out.println("len2 is :"+len); //MsgEntity me = new MsgEntity(); //me.DataParse(data, len); Member mb = new Member(); mb.DataParse(data, len); prod.send(new KeyedMessage<Integer, Object>(topic, mb)); //测试发送对象数据 System.out.println("Producer接收到第"+i+"个消息!"); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } in.close(); pw.close(); out.close(); socket.close(); System.out.println("发送完毕!"); } }
Member.java类内容如下:
package com.lenovo.random; import java.io.Serializable; import java.util.Date; @SuppressWarnings("serial") public class Member implements Serializable{ private int protocol; DATATIME dt = new DATATIME(); private int len; DATAVAL[] dataval; public Member() { dataval = new DATAVAL[6]; for (int i = 0; i < dataval.length; i++) { dataval[i] = new DATAVAL(); } System.out.println("tttttttttttttttttttttttttttttttttttttt"); } protected static int byte2int(byte[] res) { int targets = (res[0] & 0xff) | ((res[1] << 8) & 0xff00) // | 表示安位或 | ((res[2] << 24) >>> 8) | (res[3] << 24); return targets; } protected static short byte2short(byte[] res) { short targets = (short) ((res[0] & 0xff) | ((res[1] << 8) & 0xff00)); return targets; } public static double getDouble(byte[] b) { long l; l = b[0]; l &= 0xff; l |= ((long) b[1] << 8); l &= 0xffff; l |= ((long) b[2] << 16); l &= 0xffffff; l |= ((long) b[3] << 24); l &= 0xffffffffl; l |= ((long) b[4] << 32); l &= 0xffffffffffl; l |= ((long) b[5] << 40); l &= 0xffffffffffffl; l |= ((long) b[6] << 48); l &= 0xffffffffffffffl; l |= ((long) b[7] << 56); return Double.longBitsToDouble(l); } public boolean DataParse(byte[] data, int len) { int tmp_protocol = 0x1a1b1c1d; byte[] byte_protocol = new byte[4]; byte[] byte_time = new byte[8]; byte[] byte_time_mill = new byte[2]; byte[] byte_data_len = new byte[4]; byte[] byte_dataval = new byte[20]; byte[] byte_num_sensor = new byte[4]; byte[] byte_num_time = new byte[4]; byte[] byte_num_dest = new byte[4]; byte[] byte_val = new byte[8]; System.arraycopy(data, 0, byte_protocol, 0, 4); if (byte2int(byte_protocol) == tmp_protocol) { System.out.println("protocol validation successful!"); this.protocol = byte2int(byte_protocol); } else { System.out.println("protocol validation failed!"); return false; } System.arraycopy(data, 12, byte_data_len, 0, 4); if (byte2int(byte_data_len) + 16 == len) { System.out.println("the length of received data is correct!"); this.len = byte2int(byte_data_len); } else { System.out.println("the length of received data is incorrect!"); return false; } System.arraycopy(data, 4, byte_time, 0, 8); for (int i = 0; i < 6; i++) { System.out.print(byte_time[i] + ","); } System.arraycopy(byte_time, 6, byte_time_mill, 0, 2); System.out.println(byte2short(byte_time_mill)); dt.wYear = byte_time[0]; dt.wMonth = byte_time[1]; dt.wDay = byte_time[2]; dt.wHour = byte_time[3]; dt.wMinute = byte_time[4]; dt.wSecond = byte_time[5]; dt.wMilliseconds = byte2short(byte_time_mill); for (int i = 0; i < 6; i++) { System.arraycopy(data, 16 + i * 20, byte_num_sensor, 0, 4); System.arraycopy(data, 16 + i * 20 + 4, byte_num_time, 0, 4); System.arraycopy(data, 16 + i * 20 + 8, byte_num_dest, 0, 4); System.arraycopy(data, 16 + i * 20 + 12, byte_val, 0, 8); dataval[i].num_sensor = byte2int(byte_num_sensor); dataval[i].num_time = byte2int(byte_num_time); dataval[i].num_dest = byte2int(byte_num_dest); dataval[i].val = getDouble(byte_val); System.out.println("dataval[i].num_sensor is :" + dataval[i].num_sensor); System.out.println("dataval[i].num_time is :" + dataval[i].num_time); System.out.println("dataval[i].num_dest is :" + dataval[i].num_dest); System.out.println("dataval[i].val is :" + dataval[i].val); System.out.println("............................................"); } return true; } public void display() { System.out.println("this.len is :"+this.len); System.out.println("dt.wYear is :" + dt.wYear); System.out.println("dt.wMonth is :" + dt.wMonth); System.out.println("dt.wDay is :" + dt.wDay); System.out.println("dt.wHour is :" + dt.wHour); System.out.println("dt.wMinute is :" + dt.wMinute); System.out.println("dt.wSecond is :" + dt.wSecond); System.out.println("dt.wMilliseconds is :" + dt.wMilliseconds); for(int i=0;i<6;i++) { System.out.println("dataval[i].num_sensor is :" + dataval[i].num_sensor); System.out.println("dataval[i].num_time is :" + dataval[i].num_time); System.out.println("dataval[i].num_dest is :" + dataval[i].num_dest); System.out.println("dataval[i].val is :" + dataval[i].val); System.out.println("............................................"); } } @Override public String toString() { //return "Member [name=" + name + ", age=" + age + ", birthday=" + birthday + ", soruce=" + soruce + "]"; return "11 Member "; } }
DATAVAL.java类中内容如下:
package com.lenovo.random; import java.io.Serializable; public class DATAVAL implements Serializable{ int num_sensor;// 传感器序号 int num_time; // 时间序号 int num_dest; // 目的端序号 double val; // 数值 }
DATATIME.java中内容如下:
package com.lenovo.random; import java.io.Serializable; public class DATATIME implements Serializable{ int wYear; int wMonth; int wDay; int wHour; int wMinute; int wSecond; int wMilliseconds; }
BeanUtils.java类中内容如下:
package com.lenovo.random; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; public class BeanUtils { private BeanUtils(){} /** * 对象转字节数组 * @param obj * @return */ public static byte[] ObjectToBytes(Object obj){ byte[] bytes = null; ByteArrayOutputStream bo = null; ObjectOutputStream oo = null; try { bo = new ByteArrayOutputStream(); oo = new ObjectOutputStream(bo); oo.writeObject(obj); bytes = bo.toByteArray(); } catch (IOException e) { e.printStackTrace(); }finally { try { if(bo!=null){ bo.close(); } if(oo!=null){ oo.close(); } } catch (IOException e) { e.printStackTrace(); } } return bytes; } /** * 字节数组转对象 * @param bytes * @return */ public static Object BytesToObject(byte[] bytes){ Object obj = null; ByteArrayInputStream bi = null; ObjectInputStream oi = null; try { bi =new ByteArrayInputStream(bytes); oi =new ObjectInputStream(bi); obj = oi.readObject(); } catch (Exception e) { e.printStackTrace(); }finally { try { if(bi!=null){ bi.close(); } if(oi!=null){ oi.close(); } } catch (IOException e) { e.printStackTrace(); } } return obj; } }
来源:我是码农,转载请保留出处和链接!
本文链接:http://www.54manong.com/?id=1227
微信号:qq444848023 QQ号:444848023
加入【我是码农】QQ群:864689844(加群验证:我是码农)
全站首页 | 数据结构 | 区块链| 大数据 | 机器学习 | 物联网和云计算 | 面试笔试
var cnzz_protocol = (("https:" == document.location.protocol) ? "https://" : "http://");document.write(unescape("%3Cspan id='cnzz_stat_icon_1276413723'%3E%3C/span%3E%3Cscript src='" + cnzz_protocol + "s23.cnzz.com/z_stat.php%3Fid%3D1276413723%26show%3Dpic1' type='text/javascript'%3E%3C/script%3E"));本站资源大部分来自互联网,版权归原作者所有!
评论专区