欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

nginx lua集成kafka

程序员文章站 2022-05-26 23:46:08
NGINX lua集成kafka === 第一步:进入opresty目录 说明:接下来我们关注两个目录 "lualib" 和 "nginx" ​ 1.lualib: 是存放opresty所需要的集成软件包的 ​ 2.nginx: 是nginx服务目录 接下来,我们进入lualib目录一看究竟: 这里 ......

nginx lua集成kafka

第一步:进入opresty目录

[root@node03 openresty]# cd /export/servers/openresty/
[root@node03 openresty]# ll
total 356
drwxr-xr-x  2 root root   4096 jul 26 11:33 bin
drwxrwxr-x 44 1000 1000   4096 jul 26 11:31 build
drwxrwxr-x 43 1000 1000   4096 nov 13  2017 bundle
-rwxrwxr-x  1 1000 1000  45908 nov 13  2017 configure
-rw-rw-r--  1 1000 1000  22924 nov 13  2017 copyright
drwxr-xr-x  6 root root   4096 jul 26 11:33 luajit
drwxr-xr-x  6 root root   4096 aug  1 08:14 lualib
-rw-r--r--  1 root root   5413 jul 26 11:32 makefile
drwxr-xr-x 11 root root   4096 jul 26 11:35 nginx
drwxrwxr-x  2 1000 1000   4096 nov 13  2017 patches
drwxr-xr-x 44 root root   4096 jul 26 11:33 pod
-rw-rw-r--  1 1000 1000   3689 nov 13  2017 readme.markdown
-rw-rw-r--  1 1000 1000   8690 nov 13  2017 readme-win32.txt
-rw-r--r--  1 root root 218352 jul 26 11:33 resty.index
drwxr-xr-x  5 root root   4096 jul 26 11:33 site
drwxr-xr-x  2 root root   4096 aug  1 10:54 testlua
drwxrwxr-x  2 1000 1000   4096 nov 13  2017 util
[root@node03 openresty]# 

说明:接下来我们关注两个目录和

1.lualib: 是存放opresty所需要的集成软件包的

2.nginx:是nginx服务目录

接下来,我们进入lualib目录一看究竟:

[root@node03 openresty]# cd lualib/
[root@node03 lualib]# ll
total 116
-rwxr-xr-x 1 root root 101809 jul 26 11:33 cjson.so
drwxr-xr-x 3 root root   4096 jul 26 11:33 ngx
drwxr-xr-x 2 root root   4096 jul 26 11:33 rds
drwxr-xr-x 2 root root   4096 jul 26 11:33 redis
drwxr-xr-x 9 root root   4096 aug  1 10:34 resty

这里我们看到了redis和ngx集成软件包,说明我们可以之间使用nginx和redis而无需导入任何依赖包!!!!

下面看看resty里面有些说明呢????

[root@node03 lualib]# cd resty/
[root@node03 resty]# ll
total 152
-rw-r--r-- 1 root root  6409 jul 26 11:33 aes.lua
drwxr-xr-x 2 root root  4096 jul 26 11:33 core
-rw-r--r-- 1 root root   596 jul 26 11:33 core.lua
drwxr-xr-x 2 root root  4096 jul 26 11:33 dns
drwxr-xr-x 2 root root  4096 aug  1 10:42 kafka   #这是我们自己导入的
drwxr-xr-x 2 root root  4096 jul 26 11:33 limit
-rw-r--r-- 1 root root  4616 jul 26 11:33 lock.lua
drwxr-xr-x 2 root root  4096 jul 26 11:33 lrucache
-rw-r--r-- 1 root root  4620 jul 26 11:33 lrucache.lua
-rw-r--r-- 1 root root  1211 jul 26 11:33 md5.lua
-rw-r--r-- 1 root root 14544 jul 26 11:33 memcached.lua
-rw-r--r-- 1 root root 21577 jul 26 11:33 mysql.lua
-rw-r--r-- 1 root root   616 jul 26 11:33 random.lua
-rw-r--r-- 1 root root  9227 jul 26 11:33 redis.lua
-rw-r--r-- 1 root root  1192 jul 26 11:33 sha1.lua
-rw-r--r-- 1 root root  1045 jul 26 11:33 sha224.lua
-rw-r--r-- 1 root root  1221 jul 26 11:33 sha256.lua
-rw-r--r-- 1 root root  1045 jul 26 11:33 sha384.lua
-rw-r--r-- 1 root root  1359 jul 26 11:33 sha512.lua
-rw-r--r-- 1 root root   236 jul 26 11:33 sha.lua
-rw-r--r-- 1 root root   698 jul 26 11:33 string.lua
-rw-r--r-- 1 root root  5178 jul 26 11:33 upload.lua
drwxr-xr-x 2 root root  4096 jul 26 11:33 upstream
drwxr-xr-x 2 root root  406 jul 26 11:33 websocket

这里我们看到了熟悉的mysql.lua和redis.lua,好了其他的先不要管

注意:这里的 kafka这个包是没有的,说明opnresty么有集成kafka。此处我已经提前导入啦kafka集成包

我们看看kafka里面多有哪些包:

[root@node03 resty]# cd kafka
[root@node03 kafka]# ll
total 48
-rw-r--r-- 1 root root  1369 aug  1 10:42 broker.lua
-rw-r--r-- 1 root root  5537 aug  1 10:42 client.lua
-rw-r--r-- 1 root root   710 aug  1 10:42 errors.lua
-rw-r--r-- 1 root root 10718 aug  1 10:42 producer.lua
-rw-r--r-- 1 root root  4072 aug  1 10:42 request.lua
-rw-r--r-- 1 root root  2118 aug  1 10:42 response.lua
-rw-r--r-- 1 root root  1494 aug  1 10:42 ringbuffer.lua
-rw-r--r-- 1 root root  4845 aug  1 10:42 sendbuffer.lua

附上kafka集成包:

链接:https://pan.baidu.com/s/1pflhz3e_txb3zwirwxfqyg
提取码:0umg

第二步:创建kafka测试lua文件

1.退回到openresty

[root@node03 kafka]# cd /export/servers/openresty/

2.创建测试文件

[root@node03 openresty]# mkdir -r testlua
#这里文件名自己取,文件位置自己定,但必须找得到

这里文件名自己取,文件位置自己定,但必须找得到!!!!!!!!!!!下面会用到!!!!!!!!!!

3.进入刚刚创建的文件夹并创建kafkalua.lua脚本文件

创建文件:vim kafkalua.lua或者touch kafkalua.lua

[root@node03 openresty]# cd testlua/
[root@node03 testlua]# ll
total 8
-rw-r--r-- 1 root root 3288 aug  1 10:54 kafkalua.lua

kafkalua.lua:

--测试语句可以不用
ngx.say('hello kafka file configuration successful!!!!!!')

--数据采集阈值限制,如果lua采集超过阈值,则不采集
local default_threshold = 100000
-- kafka分区数
local partition_num = 6
-- kafka主题名称
local topic = 'b2cdata_collection1'
-- 轮询器共享变量key值
local polling_key = "polling_key"
-- kafka集群(定义kafka broker地址,ip需要和kafka的host.name配置一致)
local function partitioner(key, num, correlation_id)
    return tonumber(key)
end
--kafka broker列表
local broker_list = {{host="192.168.52.100",port=9092},{host="192.168.52.110",port=9092},{host="192.168.52.120",port=9092}}
--kafka参数,
local connect_params = { producer_type = "async", socket_timeout = 30000, flush_time = 10000, request_timeout = 20000, partitioner = partitioner }
-- 共享内存计数器,用于kafka轮询使用
local shared_data = ngx.shared.shared_data
local pollingval = shared_data:get(polling_key)
if not pollingval then
    pollingval = 1
    shared_data:set(polling_key, pollingval)
end
--获取每一条消息的计数器,对partition_num取余数,均衡分区
local partitions = '' .. (tonumber(pollingval) % partition_num)
shared_data:incr(polling_key, 1)

-- 并发控制
local isgone = true
--获取ngx.var.connections_active进行过载保护,即如果当前活跃连接数超过阈值进行限流保护
if tonumber(ngx.var.connections_active) > tonumber(default_threshold) then
    isgone = false
end
-- 数据采集
if isgone then

    local time_local = ngx.var.time_local
    if time_local == nil then
        time_local = ""
    end

    local request = ngx.var.request
    if request == nil then
        request = ""
    end

    local request_method = ngx.var.request_method
    if request_method == nil then
        request_method = ""
    end

    local content_type = ngx.var.content_type
    if content_type == nil then
        content_type = ""
    end
    ngx.req.read_body()
    local request_body = ngx.var.request_body
    if request_body == nil then
        request_body = ""
    end

    local http_referer = ngx.var.http_referer
    if http_referer == nil then
        http_referer = ""
    end

    local remote_addr = ngx.var.remote_addr
    if remote_addr == nil then
        remote_addr = ""
    end

    local http_user_agent = ngx.var.http_user_agent
    if http_user_agent == nil then
        http_user_agent = ""
    end

    local time_iso8601 = ngx.var.time_iso8601
    if time_iso8601 == nil then
        time_iso8601 = ""
    end

    local server_addr = ngx.var.server_addr
    if server_addr == nil then
        server_addr = ""
    end

    local http_cookie = ngx.var.http_cookie
    if http_cookie == nil then
        http_cookie = ""
    end
--封装数据
    local message = time_local .."#cs#".. request .."#cs#".. request_method .."#cs#".. content_type .."#cs#".. request_body .."#cs#".. http_referer .."#cs#".. remote_addr .."#cs#".. http_user_agent .."#cs#".. time_iso8601 .."#cs#".. server_addr .."#cs#".. http_cookie;
--引入kafka的producer
local producer = require "resty.kafka.producer"
--创建producer
local bp = producer:new(broker_list, connect_params)
--发送数据
local ok, err = bp:send(topic, partitions, message)
--打印错误日志
    if not ok then
        ngx.log(ngx.err, "kafka send err:", err)
        return
    end
end

第三步:修改nginx配置文件nginx.conf

1.进入ngin/conf目录

[root@node03 openresty]# cd /export/servers/openresty/nginx/conf/
[root@node03 conf]# ll
total 76
-rw-r--r-- 1 root root 1077 jul 26 11:33 fastcgi.conf
-rw-r--r-- 1 root root 1077 jul 26 11:33 fastcgi.conf.default
-rw-r--r-- 1 root root 1007 jul 26 11:33 fastcgi_params
-rw-r--r-- 1 root root 1007 jul 26 11:33 fastcgi_params.default
-rw-r--r-- 1 root root 2837 jul 26 11:33 koi-utf
-rw-r--r-- 1 root root 2223 jul 26 11:33 koi-win
-rw-r--r-- 1 root root 5170 jul 26 11:33 mime.types
-rw-r--r-- 1 root root 5170 jul 26 11:33 mime.types.default
-rw-r--r-- 1 root root 3191 aug  1 10:52 nginx.conf
-rw-r--r-- 1 root root 2656 jul 26 11:33 nginx.conf.default
-rw-r--r-- 1 root root  636 jul 26 11:33 scgi_params
-rw-r--r-- 1 root root  636 jul 26 11:33 scgi_params.default
-rw-r--r-- 1 root root  664 jul 26 11:33 uwsgi_params
-rw-r--r-- 1 root root  664 jul 26 11:33 uwsgi_params.default
-rw-r--r-- 1 root root 3610 jul 26 11:33 win-utf

2.修改nginx.conf

[root@node03 conf]# vim nginx.conf

        #1.说明找到第一个server
        #2.在server上面添加两行代码如下
        #3.在server里面添加kafka相关的代码如下
        
        
#------------------添加的代码---------------------------------------
 #开启共享字典,设置内存大小为10m,供每个nginx的线程消费
 lua_shared_dict shared_data 10m;
 #配置本地域名解析
 resolver 127.0.0.1;
#------------------添加的代码---------------------------------------

 server {
        listen       80;
        server_name  localhost;

        #charset koi8-r;

        #access_log  logs/host.access.log  main;
        location / {
            root   html;
            index  index.html index.htm;
        }

        #------------------添加的代码---------------------------------------
        location /kafkalua {  #这里的kafkalua就是工程名字,不加默认为空
            #开启nginx监控
            stub_status on;
            #加载lua文件
            default_type text/html;
            #指定kafka的lua文件位置,就是我们刚才创建的kafkalua.lua(前面已经强调要记住的!!!!)
            content_by_lua_file /export/servers/openresty/testlua/kafkalua.lua;
        }
        #------------------添加的代码---------------------------------------
}

说明:location /kafkalua{...}这里的kafkalua是工程名,可以随意取也可以不取,但是必须要记住!!!

看到我们上面配置了两个location,第一个为location /{...}第二个为location /kafkalua{...}那么他们有什么区别呢???先向下看,迷雾将会慢慢揭开。

第四步:启动nginx

1.进入nginx/sbin

[root@node03 sbin]# cd /export/servers/openresty/nginx/sbin/
[root@node03 sbin]# ll
total 16356
-rwxr-xr-x 1 root root 16745834 jul 26 11:33 nginx

2.测试配置文件是否正确

[root@node03 sbin]# nginx -t
nginx: the configuration file /export/servers/openresty/nginx/conf/nginx.conf syntax is ok
nginx: configuration file /export/servers/openresty/nginx/conf/nginx.conf test is successful
#看到已经成功啦

3.启动nginx

[root@node03 sbin]# nginx
#不显示任何东西一般是成功啦

4.查看nginx是否启动成功

[root@node03 sbin]# ps -ef | grep nginx
root       3730      1  0 09:24 ?        00:00:00 nginx: master process nginx
nobody     3731   3730  0 09:24 ?        00:00:20 nginx: worker process is shutting down
nobody     5766   3730  0 12:17 ?        00:00:00 nginx: worker process
root       5824   3708  0 12:24 pts/1    00:00:00 grep nginx
#看到有两个nginx进程,表示成功le

5.浏览器访问nginx

在浏览器输入:node03/kafkalua

说明:如何么有配置hosts则输入openresty所在设备的地址如:192.168.52.120/kafkalua

nginx lua集成kafka

在浏览器输入:node03/ 或者 192.168.52.120/

nginx lua集成kafka

再在浏览器输入:node03:80/kafkalua 和 node03:80/试试

搬来nginx.conf来看看:

这里的nide03是服务器的别名或者之间写文服务器地址,80是【listen 80;】配置的监听端口,80端口可以省略不写,如果这写成【listen 8088;】那么浏览器需输入(这里不能省略8088),kafkalua是工程名。

 server {
        listen       80;
        server_name  localhost;

        #charset koi8-r;

        #access_log  logs/host.access.log  main;
        location / {
            root   html;
            index  index.html index.htm;
        }

        #------------------添加的代码---------------------------------------
        location /kafkalua {  #这里的kafkalua就是工程名字,不加默认为空
            #开启nginx监控
            stub_status on;
            #加载lua文件
            default_type text/html;
            #指定kafka的lua文件位置,就是我们刚才创建的kafkalua.lua(前面已经强调要记住的!!!!)
            content_by_lua_file /export/servers/openresty/testlua/kafkalua.lua;
        }

第五步:创建测试爬虫程序

1.创建maven工程导入依赖

    <dependencies>
        <dependency>
            <groupid>org.jsoup</groupid>
            <artifactid>jsoup</artifactid>
            <version>1.11.3</version>
        </dependency>
        <dependency>
            <groupid>org.apache.httpcomponents</groupid>
            <artifactid>httpclient</artifactid>
            <version>4.5.4</version>
        </dependency>
    </dependencies>

2.伪爬虫程序

public class spidergoaircn {
    private static string basepath = "http://node03/kafkalua";
    public static void main(string[] args) throws exception {
        for (int i = 0; i < 50000; i++) {
            // 请求查询信息
            spiderqueryao();
            // 请求html
            spiderhtml();
            // 请求js
            spiderjs();
            // 请求css
            spidercss();
            // 请求png
            spiderpng();
            // 请求jpg
            spiderjpg();
            thread.sleep(100);
        }
    }

    /**
     * 
     * @throws exception
     */
    public static void spiderqueryao() throws exception {
        // 1.指定目标网站      ^.*/b2c40/query/jaxb/direct/query.ao.*$
        string url = basepath + "/b2c40/query/jaxb/direct/query.ao";
        // 2.发起请求
        httppost httppost = new httppost(url);
        // 3. 设置请求参数
        httppost.setheader("time-local", getlocaldatetime());
        httppost.setheader("requst",
                    "post /b2c40/query/jaxb/direct/query.ao http/1.1");
        httppost.setheader("request method", "post");
        httppost.setheader("content-type",
                "application/x-www-form-urlencoded; charset=utf-8");
        httppost.setheader(
                "referer",
                "http://b2c.csair.com/b2c40/modules/bookingnew/main/flightselectdirect.html?t=s&c1=can&c2=wuh&d1="
                        + getgotime() + "&at=1&ct=0&it=0");
        httppost.setheader("remote address", "192.168.56.80");
        httppost.setheader(
                "user-agent",
                "mozilla/5.0 (windows nt 10.0; win64; x64) applewebkit/537.36 (khtml, like gecko) chrome/63.0.3239.132 safari/537.36");
        httppost.setheader("time-iso8601", getiso8601timestamp());
        httppost.setheader("server address", "243.45.78.132");
        httppost.setheader(
                "cookie",
                "jsessionid=782121159357b98ca6112554cf44321e; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifystatus=n; usertype4logcookie=m; userid4logcookie=13818791413; useridcookie=13818791413; usercodecookie=13818791413; temp_zh=cou%3d0%3bsegt%3d%e5%8d%95%e7%a8%8b%3btime%3d2018-01-13%3b%e5%b9%bf%e5%b7%9e-%e5%8c%97%e4%ba%ac%3b1%2c0%2c0%3b%26cou%3d1%3bsegt%3d%e5%8d%95%e7%a8%8b%3btime%3d"
                        + getgotime()
                        + "%3b%e5%b9%bf%e5%b7%9e-%e6%88%90%e9%83%bd%3b1%2c0%2c0%3b%26; jsessionid=782121159357b98ca6112554cf44321e; wt-fpc=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_cn; wt.al_flight=wt.al_hctype(s)%3awt.al_adultnum(1)%3awt.al_childnum(0)%3awt.al_infantnum(0)%3awt.al_orgcity1(can)%3awt.al_dstcity1(ctu)%3awt.al_orgdate1("
                        + getgotime() + ")");
        // 4.设置请求参数
        arraylist<basicnamevaluepair> parameters = new arraylist<basicnamevaluepair>();
        parameters
                .add(new basicnamevaluepair(
                        "json",
                        "{\"depcity\":\"can\", \"arrcity\":\"wuh\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preurl\":\"\", \"ismember\":\"\"}"));
        httppost.setentity(new urlencodedformentity(parameters));
        // 5. 发起请求
        closeablehttpclient httpclient = httpclients.createdefault();
        closeablehttpresponse response = httpclient.execute(httppost);
        // 6.获取返回值
        system.out.println(response != null);
    }

    public static void spiderhtml() throws exception {
        // 1.指定目标网站         ^.*html.*$
        string url = basepath + "/b2c40/modules/bookingnew/main/flightselectdirect.html?t=s&c1=can&c2=ctu&d1=2018-01-17&at=1&ct=0&it=0";
        // 2.发起请求
        httppost httppost = new httppost(url);
        // 3. 设置请求参数
        httppost.setheader("time-local", getlocaldatetime());
        httppost.setheader("requst",
                "post /b2c40/query/jaxb/direct/query.ao http/1.1");
        httppost.setheader("request method", "post");
        httppost.setheader("content-type",
                "application/x-www-form-urlencoded; charset=utf-8");
        httppost.setheader(
                "referer",
                "http://b2c.csair.com/b2c40/modules/bookingnew/main/flightselectdirect.html?t=s&c1=can&c2=wuh&d1=2018-02-20&at=1&ct=0&it=0");
        httppost.setheader("remote address", "192.168.56.1");
        httppost.setheader(
                "user-agent",
                "mozilla/5.0 (windows nt 10.0; win64; x64) applewebkit/537.36 (khtml, like gecko) chrome/63.0.3239.132 safari/537.36");
        httppost.setheader("time-iso8601", getiso8601timestamp());
        httppost.setheader("server address", "192.168.56.80");
        httppost.setheader(
                "cookie",
                "jsessionid=782121159357b98ca6112554cf44321e; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifystatus=n; usertype4logcookie=m; userid4logcookie=13818791413; useridcookie=13818791413; usercodecookie=13818791413; temp_zh=cou%3d0%3bsegt%3d%e5%8d%95%e7%a8%8b%3btime%3d2018-01-13%3b%e5%b9%bf%e5%b7%9e-%e5%8c%97%e4%ba%ac%3b1%2c0%2c0%3b%26cou%3d1%3bsegt%3d%e5%8d%95%e7%a8%8b%3btime%3d2018-01-17%3b%e5%b9%bf%e5%b7%9e-%e6%88%90%e9%83%bd%3b1%2c0%2c0%3b%26; jsessionid=782121159357b98ca6112554cf44321e; wt-fpc=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_cn; wt.al_flight=wt.al_hctype(s)%3awt.al_adultnum(1)%3awt.al_childnum(0)%3awt.al_infantnum(0)%3awt.al_orgcity1(can)%3awt.al_dstcity1(ctu)%3awt.al_orgdate1(2018-01-17)");
        // 4.设置请求参数
        // httppost.setentity(new stringentity(
        // "depcity=can&arrcity=wuh&flightdate=20180220&adultnum=1&childnum=0&infantnum=0&cabinorder=0&airline=1&flytype=0&international=0&action=0&segtype=1&cache=0&preurl=&ismember="));
        arraylist<basicnamevaluepair> parameters = new arraylist<basicnamevaluepair>();
        parameters
                .add(new basicnamevaluepair(
                        "json",
                        "{\"depcity\":\"can\", \"arrcity\":\"wuh\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preurl\":\"\", \"ismember\":\"\"}"));
        httppost.setentity(new urlencodedformentity(parameters));
        // 5. 发起请求
        closeablehttpclient httpclient = httpclients.createdefault();
        closeablehttpresponse response = httpclient.execute(httppost);
        // 6.获取返回值
        system.out.println(response != null);
    }

    public static void spiderjs() throws exception {

        // 1.指定目标网站
        string url = basepath +"/b2c40/dist/main/modules/common/requireconfig.js";
        // 2.发起请求
        httppost httppost = new httppost(url);
        // 3. 设置请求参数
        httppost.setheader("time-local", getlocaldatetime());
        httppost.setheader("requst",
                "post /b2c40/query/jaxb/direct/query.ao http/1.1");
        httppost.setheader("request method", "post");
        httppost.setheader("content-type",
                "application/x-www-form-urlencoded; charset=utf-8");
        httppost.setheader(
                "referer",
                "http://b2c.csair.com/b2c40/modules/bookingnew/main/flightselectdirect.html?t=s&c1=can&c2=wuh&d1=2018-02-20&at=1&ct=0&it=0");
        httppost.setheader("remote address", "192.168.56.1");
        httppost.setheader(
                "user-agent",
                "mozilla/5.0 (windows nt 10.0; win64; x64) applewebkit/537.36 (khtml, like gecko) chrome/63.0.3239.132 safari/537.36");
        httppost.setheader("time-iso8601", getiso8601timestamp());
        httppost.setheader("server address", "192.168.56.80");
        httppost.setheader(
                "cookie",
                "jsessionid=782121159357b98ca6112554cf44321e; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifystatus=n; usertype4logcookie=m; userid4logcookie=13818791413; useridcookie=13818791413; usercodecookie=13818791413; temp_zh=cou%3d0%3bsegt%3d%e5%8d%95%e7%a8%8b%3btime%3d2018-01-13%3b%e5%b9%bf%e5%b7%9e-%e5%8c%97%e4%ba%ac%3b1%2c0%2c0%3b%26cou%3d1%3bsegt%3d%e5%8d%95%e7%a8%8b%3btime%3d2018-01-17%3b%e5%b9%bf%e5%b7%9e-%e6%88%90%e9%83%bd%3b1%2c0%2c0%3b%26; jsessionid=782121159357b98ca6112554cf44321e; wt-fpc=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_cn; wt.al_flight=wt.al_hctype(s)%3awt.al_adultnum(1)%3awt.al_childnum(0)%3awt.al_infantnum(0)%3awt.al_orgcity1(can)%3awt.al_dstcity1(ctu)%3awt.al_orgdate1(2018-01-17)");
        // 4.设置请求参数
        arraylist<basicnamevaluepair> parameters = new arraylist<basicnamevaluepair>();
        parameters
                .add(new basicnamevaluepair(
                        "json",
                        "{\"depcity\":\"can\", \"arrcity\":\"wuh\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preurl\":\"\", \"ismember\":\"\"}"));
        httppost.setentity(new urlencodedformentity(parameters));
        // 5. 发起请求
        closeablehttpclient httpclient = httpclients.createdefault();
        closeablehttpresponse response = httpclient.execute(httppost);
        // 6.获取返回值
        system.out.println(response != null);
    }

    public static void spidercss() throws exception {

        // 1.指定目标网站
        string url = basepath +"/b2c40/dist/main/css/flight.css";
        // 2.发起请求
        httppost httppost = new httppost(url);
        // 3. 设置请求参数
        httppost.setheader("time-local", getlocaldatetime());
        httppost.setheader("requst",
                "post /b2c40/query/jaxb/direct/query.ao http/1.1");
        httppost.setheader("request method", "post");
        httppost.setheader("content-type",
                "application/x-www-form-urlencoded; charset=utf-8");
        httppost.setheader("referer",
                "http://b2c.csair.com/b2c40/modules/bookingnew/main/flightselectdirect.html");
        httppost.setheader("remote address", "192.168.56.1");
        httppost.setheader(
                "user-agent",
                "mozilla/5.0 (windows nt 10.0; win64; x64) applewebkit/537.36 (khtml, like gecko) chrome/63.0.3239.132 safari/537.36");
        httppost.setheader("time-iso8601", getiso8601timestamp());
        httppost.setheader("server address", "192.168.56.80");
        httppost.setheader(
                "cookie",
                "jsessionid=782121159357b98ca6112554cf44321e; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifystatus=n; usertype4logcookie=m; userid4logcookie=13818791413; useridcookie=13818791413; usercodecookie=13818791413; temp_zh=cou%3d0%3bsegt%3d%e5%8d%95%e7%a8%8b%3btime%3d2018-01-13%3b%e5%b9%bf%e5%b7%9e-%e5%8c%97%e4%ba%ac%3b1%2c0%2c0%3b%26cou%3d1%3bsegt%3d%e5%8d%95%e7%a8%8b%3btime%3d2018-01-17%3b%e5%b9%bf%e5%b7%9e-%e6%88%90%e9%83%bd%3b1%2c0%2c0%3b%26; jsessionid=782121159357b98ca6112554cf44321e; wt-fpc=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_cn; wt.al_flight=wt.al_hctype(s)%3awt.al_adultnum(1)%3awt.al_childnum(0)%3awt.al_infantnum(0)%3awt.al_orgcity1(can)%3awt.al_dstcity1(ctu)%3awt.al_orgdate1(2018-01-17)");
        // 4.设置请求参数
        arraylist<basicnamevaluepair> parameters = new arraylist<basicnamevaluepair>();
        parameters
                .add(new basicnamevaluepair(
                        "json",
                        "{\"depcity\":\"can\", \"arrcity\":\"wuh\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preurl\":\"\", \"ismember\":\"\"}"));
        httppost.setentity(new urlencodedformentity(parameters));
        // 5. 发起请求
        closeablehttpclient httpclient = httpclients.createdefault();
        closeablehttpresponse response = httpclient.execute(httppost);
        // 6.获取返回值
        system.out.println(response != null);
    }

    public static void spiderpng() throws exception {

        // 1.指定目标网站
        string url =basepath + "/b2c40/dist/main/images/common.png";
        // 2.发起请求
        httppost httppost = new httppost(url);
        // 3. 设置请求参数
        httppost.setheader("time-local", getlocaldatetime());
        httppost.setheader("requst",
                "post /b2c40/query/jaxb/direct/query.ao http/1.1");
        httppost.setheader("request method", "post");
        httppost.setheader("content-type",
                "application/x-www-form-urlencoded; charset=utf-8");
        httppost.setheader(
                "referer",
                "http://b2c.csair.com/b2c40/modules/bookingnew/main/flightselectdirect.html?t=s&c1=can&c2=wuh&d1=2018-02-20&at=1&ct=0&it=0");
        httppost.setheader("remote address", "192.168.56.1");
        httppost.setheader(
                "user-agent",
                "mozilla/5.0 (windows nt 10.0; win64; x64) applewebkit/537.36 (khtml, like gecko) chrome/63.0.3239.132 safari/537.36");
        httppost.setheader("time-iso8601", getiso8601timestamp());
        httppost.setheader("server address", "192.168.56.80");
        httppost.setheader(
                "cookie",
                "jsessionid=782121159357b98ca6112554cf44321e; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifystatus=n; usertype4logcookie=m; userid4logcookie=13818791413; useridcookie=13818791413; usercodecookie=13818791413; temp_zh=cou%3d0%3bsegt%3d%e5%8d%95%e7%a8%8b%3btime%3d2018-01-13%3b%e5%b9%bf%e5%b7%9e-%e5%8c%97%e4%ba%ac%3b1%2c0%2c0%3b%26cou%3d1%3bsegt%3d%e5%8d%95%e7%a8%8b%3btime%3d2018-01-17%3b%e5%b9%bf%e5%b7%9e-%e6%88%90%e9%83%bd%3b1%2c0%2c0%3b%26; jsessionid=782121159357b98ca6112554cf44321e; wt-fpc=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_cn; wt.al_flight=wt.al_hctype(s)%3awt.al_adultnum(1)%3awt.al_childnum(0)%3awt.al_infantnum(0)%3awt.al_orgcity1(can)%3awt.al_dstcity1(ctu)%3awt.al_orgdate1(2018-01-17)");
        // 4.设置请求参数
        arraylist<basicnamevaluepair> parameters = new arraylist<basicnamevaluepair>();
        parameters
                .add(new basicnamevaluepair(
                        "json",
                        "{\"depcity\":\"can\", \"arrcity\":\"wuh\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preurl\":\"\", \"ismember\":\"\"}"));
        httppost.setentity(new urlencodedformentity(parameters));
        // 5. 发起请求
        closeablehttpclient httpclient = httpclients.createdefault();
        closeablehttpresponse response = httpclient.execute(httppost);
        // 6.获取返回值
        system.out.println(response != null);
    }

    public static void spiderjpg() throws exception {

        // 1.指定目标网站
        string url = basepath +"/b2c40/dist/main/images/loadingimg.jpg";
        // 2.发起请求
        httppost httppost = new httppost(url);
        // 3. 设置请求参数
        httppost.setheader("time-local", getlocaldatetime());
        httppost.setheader("requst",
                "post /b2c40/query/jaxb/direct/query.ao http/1.1");
        httppost.setheader("request method", "post");
        httppost.setheader("content-type",
                "application/x-www-form-urlencoded; charset=utf-8");
        httppost.setheader(
                "referer",
                "http://b2c.csair.com/b2c40/modules/bookingnew/main/flightselectdirect.html?t=s&c1=can&c2=wuh&d1=2018-02-20&at=1&ct=0&it=0");
        httppost.setheader("remote address", "192.168.56.1");
        httppost.setheader(
                "user-agent",
                "mozilla/5.0 (windows nt 10.0; win64; x64) applewebkit/537.36 (khtml, like gecko) chrome/63.0.3239.132 safari/537.36");
        httppost.setheader("time-iso8601", getiso8601timestamp());
        httppost.setheader("server address", "192.168.56.80");
        httppost.setheader(
                "cookie",
                "jsessionid=782121159357b98ca6112554cf44321e; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifystatus=n; usertype4logcookie=m; userid4logcookie=13818791413; useridcookie=13818791413; usercodecookie=13818791413; temp_zh=cou%3d0%3bsegt%3d%e5%8d%95%e7%a8%8b%3btime%3d2018-01-13%3b%e5%b9%bf%e5%b7%9e-%e5%8c%97%e4%ba%ac%3b1%2c0%2c0%3b%26cou%3d1%3bsegt%3d%e5%8d%95%e7%a8%8b%3btime%3d2018-01-17%3b%e5%b9%bf%e5%b7%9e-%e6%88%90%e9%83%bd%3b1%2c0%2c0%3b%26; jsessionid=782121159357b98ca6112554cf44321e; wt-fpc=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_cn; wt.al_flight=wt.al_hctype(s)%3awt.al_adultnum(1)%3awt.al_childnum(0)%3awt.al_infantnum(0)%3awt.al_orgcity1(can)%3awt.al_dstcity1(ctu)%3awt.al_orgdate1(2018-01-17)");
        // 4.设置请求参数
        arraylist<basicnamevaluepair> parameters = new arraylist<basicnamevaluepair>();
        parameters
                .add(new basicnamevaluepair(
                        "json",
                        "{\"depcity\":\"can\", \"arrcity\":\"wuh\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preurl\":\"\", \"ismember\":\"\"}"));
        httppost.setentity(new urlencodedformentity(parameters));
        // 5. 发起请求
        closeablehttpclient httpclient = httpclients.createdefault();
        closeablehttpresponse response = httpclient.execute(httppost);
        // 6.获取返回值
        system.out.println(response != null);
    }

    public static string getlocaldatetime() {
        dateformat df = new simpledateformat("dd/mmm/yyyy't'hh:mm:ss +08:00",
                locale.english);
        string nowasiso = df.format(new date());
        return nowasiso;

    }

    public static string getiso8601timestamp() {
        dateformat df = new simpledateformat("yyyy-mm-dd't'hh:mm:ss+08:00");
        string nowasiso = df.format(new date());
        return nowasiso;
    }

    public static string getgotime() {
        dateformat df = new simpledateformat("yyyy-mm-dd");
        string nowasiso = df.format(new date());
        return nowasiso;
    }

    public static string getbacktime() {
        date date = new date();// 取时间
        calendar calendar = new gregoriancalendar();
        calendar.settime(date);
        calendar.add(calendar.date, +1);// 把日期往前减少一天,若想把日期向后推一天则将负数改为正数
        date = calendar.gettime();
        simpledateformat formatter = new simpledateformat("yyyy-mm-dd");
        string datestring = formatter.format(date);
        return datestring;
    }
}

第六步:启动kafka

1.创建主题topic

[root@node01 bin]# kafka-topics.sh --zookeeper node01:2181 --partitions 3 
--replication-factor 3 --create --topic b2cdata_collection1

2.开启kafka消费者

[root@node01 bin]# kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 
--topic b2cdata_collection1

第七步:开启爬虫程序并观察结果

1.启动爬虫程序

2.观察消费者窗口如下

nginx lua集成kafka

第八步:启动kafka-manager观察

1.启动kafka-manager

[root@node01 conf]# cd /export/servers/kafka-manager-1.3.3.23/bin/
[root@node01 bin]# ll
total 36
-rwxr-xr-x 1 root root 13747 may  1 06:27 kafka-manager
-rw-r--r-- 1 root root  9975 may  1 06:27 kafka-manager.bat
-rwxr-xr-x 1 root root  1383 may  1 06:27 log-config
-rw-r--r-- 1 root root   105 may  1 06:27 log-config.bat
[root@node01 bin]# 

#启动
[root@node01 bin]# ./kafka-manager 

启动后的窗口:

nginx lua集成kafka

2.浏览器访问

浏览器输入:node01:9000

nginx lua集成kafka

kafka manager使用不做讲解,观察b2cdata_collection1主题消费情况:

​ 有三个分区,每个分区消费的消息差多说明成功啦,

​ 如果不一样,则是kafkalua.lua 脚本中没有配置分区策略,默认分区会导致 数据倾斜 我们需配置自己的分区策略!

nginx lua集成kafka

完毕!!!!!!!!