setup storm kafka testing infrastructure

Because of the version confusion of Storm, Kafka, Storm-Kafka, many people still cannot make the kafka and storm work together, even though there is a starter, but it uses the embedded zookeeper and kafka server instances. We need to get our hand dirty to go step by step to build the infrastructure, then writing analyze engine bolt will be easy.

Following are 10 steps that I have done to make the data flow from producer to Printer bolt in storm. Hopefully these detail steps will help other developers. And thanks to the owner of storm-kafka.

Under the directory:
yhuangMac:kafka_2.10-0.8.1 yhuang$ pwd
/Users/yhuang/kafka_2.10-0.8.1
yhuangMac:kafka_2.10-0.8.1 yhuang$

1. on terminal 1, start zookeeper server
./bin/zookeeper-server-start.sh config/zookeeper.properties

2. on terminal 2, start kafka server
./bin/kafka-server-start.sh config/server.properties

3. on termina 3, create topic
./bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partition 1 –topic storm-sentence

4. on terminal 3, run producer
./bin/kafka-console-producer.sh –broker-list localhost:9092 –topic storm-sentence

5. on terminal 4, run consumer
./bin/kafka-console-consumer.sh –zookeeper localhost:2181 –topic storm-sentence –from-beginning

Under directory:
yhuangMac:storm-kafka-0.8-plus-test yhuang$ pwd
/Users/yhuang/workspace-java-kepler/storm-kafka-0.8/storm-kafka-0.8-plus-test

6. on Eclipse, import storm-kafka-0.8-plus-test maven project, in pom.xml, it has:

net.wurstmeister.storm
storm-kafka-0.8-plus
0.3.0

7. on Eclipse, run Java application of TestTopologyStaticHosts.java, set breakpoint at printlin of the execute method:
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
System.out.println(tuple.toString());
}

8. on terminal 3, input: fdsafadsfadsffffffffffffffff

9. on Eclipse, we can get the breakpoint of println(), and the eclipse console will show:
44572 [Thread-18-words] INFO storm.kafka.PartitionManager – Committed offset 7 for Partition{host=localhost:9092, partition=0} for topology: 8983d811-d8b1-4cd2-9906-dfe349c9e3b3
source: words:3, stream: default, id: {3911753592003778923=-6203653086213542255}, [fdsafadsfadsfffffffffffffffff]
46573 [Thread-18-words] INFO storm.kafka.PartitionManager – Committing offset for Partition{host=localhost:9092, partition=0}

10. On terminal 4, we can also see kafka consumer print out:
fdsafadsfadsffffffffffffffff

Advertisements
Posted in Uncategorized | Leave a comment

kafka-console-producer.sh (kafka_2.10-0.8.1) reported SLF4J: Failed to load class “org.slf4j.impl.StaticLoggerBinder”

On my Mac box, after download the scala-2.10 and kafka_2.10-0.8.1,  in the kafka_2.10-0.8.1 directory, every thins is fine when I start zookeeper, kafka server, and create a test topic.  Then I need to start a producer for the test topic. but there is an error:

 yhuangMac:kafka_2.10-0.8.1 yhuang$ ./bin/kafka-console-producer.sh –broker-list localhost:9092 –topic test
SLF4J: Failed to load class “org.slf4j.impl.StaticLoggerBinder”.
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

The reason is that in the kafka libs directory, the kafka release zip file only included jar file of slf4j-api, they missed a jar file: slf4j-nop.jar, so we have to go to http://www.slf4j.org download slf4j-1.7.7.zip, and then unzip it, copy the slf4j-api-1.7.7, slf4j-nop-1.7.7.jar into kafka’s libs directory. restart kafka producer again, now no error is reported.

 

Posted in Uncategorized | 3 Comments

Spark-1.0.0 setup for test a python program

At work, my company uses Storm and Esper for big data analyzing by Java code. I was triggered by a nice guy who reminded me of Spark (even I knew it for a while). The doc from Spark is not straight forward, let me wrote down the procedure that I had done on my Mac. 

1. Download and tar -zxvf  scala-2.10.3

2. In .bash_profile, set bin path to scala-2.10.3/bin

3. source .bash_profile

4. Down load prebuilt and tar -zxvf spark-1.0.0-hadoop2

5. in spark home, mkdir yao

6. in yao directory, write a python file (SimpleApp.py) , or copy from Spark website

from pyspark import SparkContext

logFile = “./README.md” 

sc = SparkContext(“local”, “Simple App”)

logData = sc.textFile(logFile).cache()

numAs = logData.filter(lambda s: ‘a’ in s).count()

numBs = logData.filter(lambda s: ‘b’ in s).count()

print “Lines with a: %i, lines with b: %i” % (numAs, numBs)

7.  In the Spark home,  run the SimpleApp.py

./bin/spark-submit –master local[4] ./yao/SimpleApp.py

My next step is to enable PyCharm to work with the Spark and write a program to analyze TCP/UDP packet.  

Posted in Uncategorized | Leave a comment

A C++ multi-threaded application crashed: how to find the problem by GDB and fix it.

It is not hard to understand mutex, condition variable, semophore, etc. in Linux with pthread library, but you might not remember it for ever. A failure in multi-threaded application can give you a deep impression of these concept and on how to apply these facilities.

We have an application:

1. It has 48 threads which established an SSL session to remote host by a SDK via OpenSSL, so every thread has SessionManager object to deal with the session. The session is an returned pointer after creation, then it can be to suspend, resume, stop, delete.

2. There are also other threads which share the session and have the same operations to the  session pointer variable.

The code for resumeSession():

void SessionManager::resume() {
if ( session != NULL ) {
         if ( !isEndSession() ) {
             log->notice(“session is not NULL and session status is not End”);
             lea_session_resume(session);
} else {
log->notice(“session is not NULL, but session status is End”);
}
} else {
log->notice(“session is NULL, not call lea_session_resume”);
}
}

After the program running 7 X 24 for one year on a customer side, at a given time, the program crashed. To investigate the root reason, we have turned to core dump to know where caused the crash from within the source code.

The following steps are use:

1. create a directory and copy the core file and executable file to the above directory: core.20554, multilea

2. start gdb by: gdb multilea core.20554

3. in gdb, get the stack trace by : (gdb) bt

4. from the stack trace, we can see that the last invocation of our code is in frame 4, so use command by : (gdb) f 4

5. now we can print the current object’s member variable by: (gdb) p *this

6. Inspect the member variable value, to our surprise, the: sessionEnd = true

but from the resumeSession() code, we can see that only when sessionEnd is false, then it has chance to call the next line to resume session. So the sessionEnd value is changed between the two lines of  if() and lea_session_resume(session). There must be a race condition that set the sessionEnd to true. So we have to check all the source code to find all place that sets the sessionEnd. Since the session is end, the invocation of SDK  lea_session_resume() causes the application crash.

So we have to change code to add a mutex to avoid race condition.

void SessionManager::endSession() {
int status = pthread_mutex_lock(&sessionEndLock);
if (status != 0) {
log->notice(“cannot get the sessionEndLock to call opsec_end_session()”);
} else {
if (dummySession) {
opsec_end_session(dummy);
dummySession = false;
dummy = NULL;
} else {
if (session) {
opsec_end_session(session);
session = NULL;
}
}
pthread_mutex_unlock(&sessionEndLock);
}
log->notice(“session_end_session() had been executed”);
}

void SessionManager::resume() {
if( session != NULL ) {
int status = pthread_mutex_lock(&sessionEndLock);
if (status != 0) {
log->notice(“cannot get the sessionEndLock to detect if session is End”);
} else {
if( !isEndSession() ) {
log->notice(“session is not NULL and session status is not End”);
session_resume(session);
} else {
log->notice(“session is not NULL, but session status is End, not to call    “);
}
pthread_mutex_unlock(&sessionEndLock);
}
} else {
log->notice(“session is NULL, not call lea_session_resume”);
}
}

Posted in c/c++, Linux, Operating system, Programming | Leave a comment

Piano: self-learning

My purpose of buying a piano is to play it as a big toy for me in my leisure time, when I do not play in sport, e.g, soccer, basketball, table tennis. Of course, playing music likes developing a computer application (a program). both have the same procedure as a routine: Initialization(loading), Process, Output(ending). And playing music makes you go  detail. Here are some videos that played by myself, just have a fun.

1. Scarborough Fair

2. My favorite song at church: Joy To The World

3. A song that I heard many years ago

Posted in Uncategorized | Leave a comment

C++ class code style

Where is the best place to put the private members in class definition in C++?

This is a controversial topic, and it depends on programmer preference.

Working on projects with programming language of Java and C++ in parallel, and I read a lot of C++ books which suggest programmer to put the private member at the bottom of class definition, they author said this will help the encapsulation and hide the member, and also the authors said for every object (instance), we first should think about its behavior (method, or operation). I do not agree with point. This will damage the code readability. Like we write an article in a paper, we always start from the top of the paper, we never start write some sentences on the bottom of the paper. So if you declare a class Person, first in your mind, you will image what components does a Person have, then you will think what operations a Person can do. For this code style, I do not know why C++ community learn from Java.  A sample Thread class in C++ for Linux:

class Thread
{
protected:
        string name;
        pthread_t            hThread;
        pthread_mutex_t hMutex;
        pthread_mutex_t hSuspendMutex;
        pthread_cond_t   hSuspendCondition;
        bool itsRunningFlag;
        bool itsSuspendedFlag;

public:
        Thread(const char* nm);
        virtual ~Thread();

        const char* getName() const;

        void start();

        virtual void run();
        static void sleep(long ms);
        void suspend();
        void resume();
        void stop(bool cancel=true);

        bool wait(long ms=5000);
        void release();
        
        void running();
        bool isRunning();
        bool isSuspended();
};

Posted in Uncategorized | Leave a comment

Build gcc-4.6.1

In default, in Redhat or Centos-6.2 development machine, the version of gcc is 4.1, it does not include C++0x features, like thread, share_ptr, …, otherwise we have to use the boost library. However, sometimes, you find some code is based C++0x, if you use boost library, you have to change the namespace to make it work, so I start to build the gcc-4.6.1 by myself.

1. gmp:
tar -jxvf gmp-4.3.2.tar.bz2, 
./configure --prefix=/usr/local/gmp-4.3.2
make, make install

2. mpfr:

./mpfr-2.4.2/configure --prefix=/usr/local/mpfr-2.4.2 --with-gmp=/usr/local/gmp-4.3.2

3. mpc:

tar -zxvf mpc-0.8.1.tar.gz

../mpc-0.8.1/configure --prefix=/usr/local/mpc-0.8.1 --with-gmp=/usr/local/gmp-4.3.2 --with-mpfr=/usr/local/mpfr-2.4.2

4. gcc-4.6.1

mkdir /usr/local/gcc-4.6.1

../gcc-4.6.1/configure –prefix=/usr/local/gcc-4.6.1 –enable-threads=posix –disable-checking –disable-multilib –enable-languages=c,c++

–with-gmp=/usr/local/gmp-5.0.1 –with-mpfr=/usr/local/mpfr-2.4.2 –with-mpc=/usr/local/mpc-0.8.1
5. .bashrc

export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/mpc-0.8.1/lib:/usr/local/gmp-5.0.1/lib:/usr/local/mpfr-2.4.2/lib


6.  under: /usr/bin

sudo ln -s /usr/local/gcc-4.6.1/bin/gcc gcc461

sudo ln -s /usr/local/gcc-4.6.1/bin/g++ g++461

7. test:

g++461 -std=c++0x -static-libstdc++ -lpthread -g test.cpp
ldd a.out
Posted in Uncategorized | Leave a comment