标签 Storm 下的文章

maven shade解决storm Elasticsearch log4j jar包版本冲突

新建一个maven简单工程即可,该工程的目的是将es、log4j等jar包里的class文件重新打包,全部放入到一个jar包里,再放入的过程中将所有“org.apache.logging.log4j”开头的报名改为“my.elasticsearch.log4j”,也就相当于将import log4j的地方统统改了,这样就相当于第三方编写的log4j,pom.xml如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>my.elasticsearch.test</groupId>
	<artifactId>es-shaded</artifactId>
	<version>1.0-SNAPSHOT</version>
	<properties>
		<elasticsearch.version>5.2.2</elasticsearch.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.elasticsearch</groupId>
			<artifactId>elasticsearch</artifactId>
			<version>${elasticsearch.version}</version>
		</dependency>
		<dependency>
			<groupId>org.elasticsearch.client</groupId>
			<artifactId>transport</artifactId>
			<version>${elasticsearch.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-core</artifactId>
			<version>2.7</version>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>log4j-over-slf4j</artifactId>
			<version>1.7.7</version>
		</dependency>
	</dependencies>
	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-shade-plugin</artifactId>
				<version>2.4.1</version>
				<executions>
					<execution>
						<phase>package</phase>
						<goals>
							<goal>shade</goal>
						</goals>
						<configuration>
							<relocations>
								<relocation>
									<pattern>org.apache.logging.log4j</pattern>
									<shadedPattern>my.elasticsearch.log4j</shadedPattern>
								</relocation>
							</relocations>
							<transformers>
								<transformer
									implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer" />
							</transformers>
						</configuration>
					</execution>
				</executions>
			</plugin>
		</plugins>
	</build>
</project>

运行mvn clean install
将打包好的jar包安装到本地,验证效果如下图:

jar包版本冲突解决

jar包版本冲突解决


jar包版本冲突

jar包版本冲突


最后在自己的工程里,引入新的jar包即可,不需要额外引入es和log4j的jar包了。

本地模式运行storm的demo

本例实现的是本地模式运行storm的wordcount demo!
开发过程中,可以用本地模式来运行Storm,这样就能在本地开发,在进程中测试Topology。一切就绪后,以远程模式运行 Storm,提交用于在集群中运行的Topology。
创建工程:demo-storm
目录结构如下:
demo-storm
——src/main/java
————com.youku.demo
————————bolts
————————spouts
——src/test/java
——src/main/resource
————words.txt

storm-demo工程目录

storm-demo工程目录


WordCounter.java:

package com.youku.demo.bolts;
import java.util.HashMap;
import java.util.Map;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
public class WordCounter extends BaseBasicBolt {
	Integer id;
	String name;
	Map<String, Integer> counters;
	/**
	 * At the end of the spout (when the cluster is shutdown
	 * We will show the word counters
	 */
	@Override
	public void cleanup() {
		System.out.println("-- Word Counter ["+name+"-"+id+"] --");
		for(Map.Entry<String, Integer> entry : counters.entrySet()){
			System.out.println(entry.getKey()+": "+entry.getValue());
		}
	}
	/**
	 * On create
	 */
	@Override
	public void prepare(Map stormConf, TopologyContext context) {
		this.counters = new HashMap<String, Integer>();
		this.name = context.getThisComponentId();
		this.id = context.getThisTaskId();
	}
	public void declareOutputFields(OutputFieldsDeclarer declarer) {}
	public void execute(Tuple input, BasicOutputCollector collector) {
		String str = input.getString(0);
		/**
		 * If the word dosn't exist in the map we will create
		 * this, if not We will add 1
		 */
		if(!counters.containsKey(str)){
			counters.put(str, 1);
		}else{
			Integer c = counters.get(str) + 1;
			counters.put(str, c);
		}
	}
}

WordNormalizer.java:

package com.youku.demo.bolts;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class WordNormalizer extends BaseBasicBolt {
	public void cleanup() {}
	/**
	 * The bolt will receive the line from the
	 * words file and process it to Normalize this line
	 *
	 * The normalize will be put the words in lower case
	 * and split the line to get all words in this
	 */
	public void execute(Tuple input, BasicOutputCollector collector) {
        String sentence = input.getString(0);
        String[] words = sentence.split(" ");
        for(String word : words){
            word = word.trim();
            if(!word.isEmpty()){
                word = word.toLowerCase();
                collector.emit(new Values(word));
            }
        }
	}
	/**
	 * The bolt will only emit the field "word"
	 */
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("word"));
	}
}

WordReader.java:

package com.youku.demo.spouts;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class WordReader extends BaseRichSpout {
	private SpoutOutputCollector collector;
	private FileReader fileReader;
	private boolean completed = false;
	public void ack(Object msgId) {
		System.out.println("OK:"+msgId);
	}
	public void close() {}
	public void fail(Object msgId) {
		System.out.println("FAIL:"+msgId);
	}
	/**
	 * The only thing that the methods will do It is emit each
	 * file line
	 */
	public void nextTuple() {
		/**
		 * The nextuple it is called forever, so if we have been readed the file
		 * we will wait and then return
		 */
		if(completed){
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				//Do nothing
			}
			return;
		}
		String str;
		//Open the reader
		BufferedReader reader = new BufferedReader(fileReader);
		try{
			//Read all lines
			while((str = reader.readLine()) != null){
				/**
				 * By each line emmit a new value with the line as a their
				 */
				this.collector.emit(new Values(str),str);
			}
		}catch(Exception e){
			throw new RuntimeException("Error reading tuple",e);
		}finally{
			completed = true;
		}
	}
	/**
	 * We will create the file and get the collector object
	 */
	public void open(Map conf, TopologyContext context,
			SpoutOutputCollector collector) {
		try {
			this.fileReader = new FileReader(conf.get("wordsFile").toString());
		} catch (FileNotFoundException e) {
			throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");
		}
		this.collector = collector;
	}
	/**
	 * Declare the output field "word"
	 */
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("line"));
	}
}

TopologyMain.java:

package com.youku.demo;
import com.youku.demo.bolts.WordCounter;
import com.youku.demo.bolts.WordNormalizer;
import com.youku.demo.spouts.WordReader;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
public class TopologyMain {
	public static void main(String[] args) throws InterruptedException {
        //Topology definition
		TopologyBuilder builder = new TopologyBuilder();
		builder.setSpout("word-reader",new WordReader());
		builder.setBolt("word-normalizer", new WordNormalizer())
			.shuffleGrouping("word-reader");
		builder.setBolt("word-counter", new WordCounter(),1)
			.fieldsGrouping("word-normalizer", new Fields("word"));
        //Configuration
		Config conf = new Config();
		conf.put("wordsFile", args[0]);
		conf.setDebug(true);
        //Topology run
		conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
		LocalCluster cluster = new LocalCluster();
		cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());
		Thread.sleep(2000);
		cluster.shutdown();
	}
}

pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.youku.demo</groupId>
	<artifactId>demo-storm</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>
	<name>demo-storm</name>
	<url>http://maven.apache.org</url>
	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	</properties>
	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>2.3.2</version>
				<configuration>
					<source>1.6</source>
					<target>1.6</target>
					<compilerVersion>1.6</compilerVersion>
				</configuration>
			</plugin>
		</plugins>
	</build>
	<repositories>
		<!-- Repository where we can found the storm dependencies -->
		<repository>
			<id>clojars.org</id>
			<url>http://clojars.org/repo</url>
		</repository>
	</repositories>
	<dependencies>
		<!-- Storm Dependency -->
		<dependency>
			<groupId>storm</groupId>
			<artifactId>storm</artifactId>
			<version>0.8.0</version>
		</dependency>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>3.8.1</version>
			<scope>test</scope>
		</dependency>
	</dependencies>
</project>

words.txt:

storm
test
are
great
is
an
storm
simple
application
but
very
powerfull
really
StOrm
is
great

运行的时候需要配置参数:src/main/resources/words.txt 指定输入文件

运行命令

运行命令


日志输出:
运行日志

运行日志


会报好多zookeeper异常,还有最后的日志文件无法删除的异常,目前忽略了,O(∩_∩)O呵呵~

271  [main-SendThread(localhost:2000)] WARN  org.apache.zookeeper.ClientCnxn  - Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.SocketException: Address family not supported by protocol family: connect
	at sun.nio.ch.Net.connect(Native Method)
	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:507)
	at org.apache.zookeeper.ClientCnxn$SendThread.startConnect(ClientCnxn.java:1050)
	at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1077)
java.io.IOException: Unable to delete file: C:UsersThinkPadAppDataLocalTemp3fbb080f-e585-42e6-8b1b-d6ae024503acversion-2log.1
	at org.apache.commons.io.FileUtils.forceDelete(FileUtils.java:1390)
	at org.apache.commons.io.FileUtils.cleanDirectory(FileUtils.java:1044)
	at org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.java:977)
	at org.apache.commons.io.FileUtils.forceDelete(FileUtils.java:1381)
	at org.apache.commons.io.FileUtils.cleanDirectory(FileUtils.java:1044)
	at org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.java:977)
	at org.apache.commons.io.FileUtils.forceDelete(FileUtils.java:1381)
	at backtype.storm.util$rmr.invoke(util.clj:413)
	at backtype.storm.testing$kill_local_storm_cluster.invoke(testing.clj:163)
	at backtype.storm.LocalCluster$_shutdown.invoke(LocalCluster.clj:21)
	at backtype.storm.LocalCluster.shutdown(Unknown Source)
	at com.youku.demo.TopologyMain.main(TopologyMain.java:33)

Storm实时计算平台

Twitter将Storm正式开源了,这是一个分布式的、容错的实时计算系统,它被托管在GitHub上,遵循 Eclipse Public License 1.0。Storm是由BackType开发的实时处理系统,BackType现在已在Twitter麾下。GitHub上的最新版本是Storm 0.8.0,基本是用Clojure写的。
Storm为分布式实时计算提供了一组通用原语,可被用于“流处理”之中,实时处理消息并更新数据库。这是管理队列及工作者集群的另一种方式。 Storm也可被用于“连续计算”(continuous computation),对数据流做连续查询,在计算时就将结果以流的形式输出给用户。它还可被用于“分布式RPC”,以并行的方式运行昂贵的运算。 Storm的主工程师Nathan Marz表示:
Storm可以方便地在一个计算机集群中编写与扩展复杂的实时计算,Storm之于实时处理,就好比 Hadoop之于批处理。Storm保证每个消息都会得到处理,而且它很快——在一个小集群中,每秒可以处理数以百万计的消息。更棒的是你可以使用任意编程语言来做开发。
Storm的主要特点如下:
简单的编程模型。类似于MapReduce降低了并行批处理复杂性,Storm降低了进行实时处理的复杂性。
可以使用各种编程语言。你可以在Storm之上使用各种编程语言。默认支持Clojure、Java、Ruby和Python。要增加对其他语言的支持,只需实现一个简单的Storm通信协议即可。
容错性。Storm会管理工作进程和节点的故障。
水平扩展。计算是在多个线程、进程和服务器之间并行进行的。
可靠的消息处理。Storm保证每个消息至少能得到一次完整处理。任务失败时,它会负责从消息源重试消息。
快速。系统的设计保证了消息能得到快速的处理,使用ØMQ作为其底层消息队列。
本地模式。Storm有一个“本地模式”,可以在处理过程中完全模拟Storm集群。这让你可以快速进行开发和单元测试。
Storm集群由一个主节点和多个工作节点组成。主节点运行了一个名为“Nimbus”的守护进程,用于分配代码、布置任务及故障检测。每个工作节 点都运行了一个名为“Supervisor”的守护进程,用于监听工作,开始并终止工作进程。Nimbus和Supervisor都能快速失败,而且是无 状态的,这样一来它们就变得十分健壮,两者的协调工作是由Apache ZooKeeper来完成的。
Storm的术语包括Stream、Spout、Bolt、Task、Worker、Stream Grouping和Topology。Stream是被处理的数据。Sprout是数据源。Bolt处理数据。Task是运行于Spout或Bolt中的 线程。Worker是运行这些线程的进程。Stream Grouping规定了Bolt接收什么东西作为输入数据。数据可以随机分配(术语为Shuffle),或者根据字段值分配(术语为Fields),或者 广播(术语为All),或者总是发给一个Task(术语为Global),也可以不关心该数据(术语为None),或者由自定义逻辑来决定(术语为 Direct)。Topology是由Stream Grouping连接起来的Spout和Bolt节点网络。在Storm Concepts页面里对这些术语有更详细的描述。
可以和Storm相提并论的系统有Esper、Streambase、HStreaming和Yahoo S4。其中和Storm最接近的就是S4。两者最大的区别在于Storm会保证消息得到处理。这些系统中有的拥有内建数据存储层,这是Storm所没有的,如果需要持久化,可以使用一个类似于Cassandra或Riak这样的外部数据库。
入门的最佳途径是阅读GitHub上的官方《Storm Tutorial》。 其中讨论了多种Storm概念和抽象,提供了范例代码以便你可以运行一个Storm Topology。开发过程中,可以用本地模式来运行Storm,这样就能在本地开发,在进程中测试Topology。一切就绪后,以远程模式运行 Storm,提交用于在集群中运行的Topology。
要运行Storm集群,你需要Apache Zookeeper、ØMQ、JZMQ、Java 6和Python 2.6.6。ZooKeeper用于管理集群中的不同组件,ØMQ是内部消息系统,JZMQ是ØMQ的Java Binding。有个名为storm-deploy的子项目,可以在AWS上一键部署Storm集群。关于详细的步骤,可以阅读Storm Wiki上的《Setting up a Storm cluster》
storm官网:
http://storm-project.net/
storm git:
https://github.com/nathanmarz/storm