nginx lua集成kafka
nginx lua集成kafka
第一步:进入opresty目录
[root@node03 openresty]# cd /export/servers/openresty/ [root@node03 openresty]# ll total 356 drwxr-xr-x 2 root root 4096 jul 26 11:33 bin drwxrwxr-x 44 1000 1000 4096 jul 26 11:31 build drwxrwxr-x 43 1000 1000 4096 nov 13 2017 bundle -rwxrwxr-x 1 1000 1000 45908 nov 13 2017 configure -rw-rw-r-- 1 1000 1000 22924 nov 13 2017 copyright drwxr-xr-x 6 root root 4096 jul 26 11:33 luajit drwxr-xr-x 6 root root 4096 aug 1 08:14 lualib -rw-r--r-- 1 root root 5413 jul 26 11:32 makefile drwxr-xr-x 11 root root 4096 jul 26 11:35 nginx drwxrwxr-x 2 1000 1000 4096 nov 13 2017 patches drwxr-xr-x 44 root root 4096 jul 26 11:33 pod -rw-rw-r-- 1 1000 1000 3689 nov 13 2017 readme.markdown -rw-rw-r-- 1 1000 1000 8690 nov 13 2017 readme-win32.txt -rw-r--r-- 1 root root 218352 jul 26 11:33 resty.index drwxr-xr-x 5 root root 4096 jul 26 11:33 site drwxr-xr-x 2 root root 4096 aug 1 10:54 testlua drwxrwxr-x 2 1000 1000 4096 nov 13 2017 util [root@node03 openresty]#
说明:接下来我们关注两个目录和
1.lualib: 是存放opresty所需要的集成软件包的
2.nginx:是nginx服务目录
接下来,我们进入lualib目录一看究竟:
[root@node03 openresty]# cd lualib/ [root@node03 lualib]# ll total 116 -rwxr-xr-x 1 root root 101809 jul 26 11:33 cjson.so drwxr-xr-x 3 root root 4096 jul 26 11:33 ngx drwxr-xr-x 2 root root 4096 jul 26 11:33 rds drwxr-xr-x 2 root root 4096 jul 26 11:33 redis drwxr-xr-x 9 root root 4096 aug 1 10:34 resty
这里我们看到了redis和ngx集成软件包,说明我们可以之间使用nginx和redis而无需导入任何依赖包!!!!
下面看看resty里面有些说明呢????
[root@node03 lualib]# cd resty/ [root@node03 resty]# ll total 152 -rw-r--r-- 1 root root 6409 jul 26 11:33 aes.lua drwxr-xr-x 2 root root 4096 jul 26 11:33 core -rw-r--r-- 1 root root 596 jul 26 11:33 core.lua drwxr-xr-x 2 root root 4096 jul 26 11:33 dns drwxr-xr-x 2 root root 4096 aug 1 10:42 kafka #这是我们自己导入的 drwxr-xr-x 2 root root 4096 jul 26 11:33 limit -rw-r--r-- 1 root root 4616 jul 26 11:33 lock.lua drwxr-xr-x 2 root root 4096 jul 26 11:33 lrucache -rw-r--r-- 1 root root 4620 jul 26 11:33 lrucache.lua -rw-r--r-- 1 root root 1211 jul 26 11:33 md5.lua -rw-r--r-- 1 root root 14544 jul 26 11:33 memcached.lua -rw-r--r-- 1 root root 21577 jul 26 11:33 mysql.lua -rw-r--r-- 1 root root 616 jul 26 11:33 random.lua -rw-r--r-- 1 root root 9227 jul 26 11:33 redis.lua -rw-r--r-- 1 root root 1192 jul 26 11:33 sha1.lua -rw-r--r-- 1 root root 1045 jul 26 11:33 sha224.lua -rw-r--r-- 1 root root 1221 jul 26 11:33 sha256.lua -rw-r--r-- 1 root root 1045 jul 26 11:33 sha384.lua -rw-r--r-- 1 root root 1359 jul 26 11:33 sha512.lua -rw-r--r-- 1 root root 236 jul 26 11:33 sha.lua -rw-r--r-- 1 root root 698 jul 26 11:33 string.lua -rw-r--r-- 1 root root 5178 jul 26 11:33 upload.lua drwxr-xr-x 2 root root 4096 jul 26 11:33 upstream drwxr-xr-x 2 root root 406 jul 26 11:33 websocket
这里我们看到了熟悉的mysql.lua和redis.lua,好了其他的先不要管
注意:这里的 kafka这个包是没有的,说明opnresty么有集成kafka。此处我已经提前导入啦kafka集成包
我们看看kafka里面多有哪些包:
[root@node03 resty]# cd kafka [root@node03 kafka]# ll total 48 -rw-r--r-- 1 root root 1369 aug 1 10:42 broker.lua -rw-r--r-- 1 root root 5537 aug 1 10:42 client.lua -rw-r--r-- 1 root root 710 aug 1 10:42 errors.lua -rw-r--r-- 1 root root 10718 aug 1 10:42 producer.lua -rw-r--r-- 1 root root 4072 aug 1 10:42 request.lua -rw-r--r-- 1 root root 2118 aug 1 10:42 response.lua -rw-r--r-- 1 root root 1494 aug 1 10:42 ringbuffer.lua -rw-r--r-- 1 root root 4845 aug 1 10:42 sendbuffer.lua
附上kafka集成包:
链接:https://pan.baidu.com/s/1pflhz3e_txb3zwirwxfqyg
提取码:0umg
第二步:创建kafka测试lua文件
1.退回到openresty
[root@node03 kafka]# cd /export/servers/openresty/
2.创建测试文件
[root@node03 openresty]# mkdir -r testlua #这里文件名自己取,文件位置自己定,但必须找得到
这里文件名自己取,文件位置自己定,但必须找得到!!!!!!!!!!!下面会用到!!!!!!!!!!
3.进入刚刚创建的文件夹并创建kafkalua.lua脚本文件
创建文件:vim kafkalua.lua或者touch kafkalua.lua
[root@node03 openresty]# cd testlua/ [root@node03 testlua]# ll total 8 -rw-r--r-- 1 root root 3288 aug 1 10:54 kafkalua.lua
kafkalua.lua:
--测试语句可以不用 ngx.say('hello kafka file configuration successful!!!!!!') --数据采集阈值限制,如果lua采集超过阈值,则不采集 local default_threshold = 100000 -- kafka分区数 local partition_num = 6 -- kafka主题名称 local topic = 'b2cdata_collection1' -- 轮询器共享变量key值 local polling_key = "polling_key" -- kafka集群(定义kafka broker地址,ip需要和kafka的host.name配置一致) local function partitioner(key, num, correlation_id) return tonumber(key) end --kafka broker列表 local broker_list = {{host="192.168.52.100",port=9092},{host="192.168.52.110",port=9092},{host="192.168.52.120",port=9092}} --kafka参数, local connect_params = { producer_type = "async", socket_timeout = 30000, flush_time = 10000, request_timeout = 20000, partitioner = partitioner } -- 共享内存计数器,用于kafka轮询使用 local shared_data = ngx.shared.shared_data local pollingval = shared_data:get(polling_key) if not pollingval then pollingval = 1 shared_data:set(polling_key, pollingval) end --获取每一条消息的计数器,对partition_num取余数,均衡分区 local partitions = '' .. (tonumber(pollingval) % partition_num) shared_data:incr(polling_key, 1) -- 并发控制 local isgone = true --获取ngx.var.connections_active进行过载保护,即如果当前活跃连接数超过阈值进行限流保护 if tonumber(ngx.var.connections_active) > tonumber(default_threshold) then isgone = false end -- 数据采集 if isgone then local time_local = ngx.var.time_local if time_local == nil then time_local = "" end local request = ngx.var.request if request == nil then request = "" end local request_method = ngx.var.request_method if request_method == nil then request_method = "" end local content_type = ngx.var.content_type if content_type == nil then content_type = "" end ngx.req.read_body() local request_body = ngx.var.request_body if request_body == nil then request_body = "" end local http_referer = ngx.var.http_referer if http_referer == nil then http_referer = "" end local remote_addr = ngx.var.remote_addr if remote_addr == nil then remote_addr = "" end local http_user_agent = ngx.var.http_user_agent if http_user_agent == nil then http_user_agent = "" end local time_iso8601 = ngx.var.time_iso8601 if time_iso8601 == nil then time_iso8601 = "" end local server_addr = ngx.var.server_addr if server_addr == nil then server_addr = "" end local http_cookie = ngx.var.http_cookie if http_cookie == nil then http_cookie = "" end --封装数据 local message = time_local .."#cs#".. request .."#cs#".. request_method .."#cs#".. content_type .."#cs#".. request_body .."#cs#".. http_referer .."#cs#".. remote_addr .."#cs#".. http_user_agent .."#cs#".. time_iso8601 .."#cs#".. server_addr .."#cs#".. http_cookie; --引入kafka的producer local producer = require "resty.kafka.producer" --创建producer local bp = producer:new(broker_list, connect_params) --发送数据 local ok, err = bp:send(topic, partitions, message) --打印错误日志 if not ok then ngx.log(ngx.err, "kafka send err:", err) return end end
第三步:修改nginx配置文件nginx.conf
1.进入ngin/conf目录
[root@node03 openresty]# cd /export/servers/openresty/nginx/conf/ [root@node03 conf]# ll total 76 -rw-r--r-- 1 root root 1077 jul 26 11:33 fastcgi.conf -rw-r--r-- 1 root root 1077 jul 26 11:33 fastcgi.conf.default -rw-r--r-- 1 root root 1007 jul 26 11:33 fastcgi_params -rw-r--r-- 1 root root 1007 jul 26 11:33 fastcgi_params.default -rw-r--r-- 1 root root 2837 jul 26 11:33 koi-utf -rw-r--r-- 1 root root 2223 jul 26 11:33 koi-win -rw-r--r-- 1 root root 5170 jul 26 11:33 mime.types -rw-r--r-- 1 root root 5170 jul 26 11:33 mime.types.default -rw-r--r-- 1 root root 3191 aug 1 10:52 nginx.conf -rw-r--r-- 1 root root 2656 jul 26 11:33 nginx.conf.default -rw-r--r-- 1 root root 636 jul 26 11:33 scgi_params -rw-r--r-- 1 root root 636 jul 26 11:33 scgi_params.default -rw-r--r-- 1 root root 664 jul 26 11:33 uwsgi_params -rw-r--r-- 1 root root 664 jul 26 11:33 uwsgi_params.default -rw-r--r-- 1 root root 3610 jul 26 11:33 win-utf
2.修改nginx.conf
[root@node03 conf]# vim nginx.conf #1.说明找到第一个server #2.在server上面添加两行代码如下 #3.在server里面添加kafka相关的代码如下 #------------------添加的代码--------------------------------------- #开启共享字典,设置内存大小为10m,供每个nginx的线程消费 lua_shared_dict shared_data 10m; #配置本地域名解析 resolver 127.0.0.1; #------------------添加的代码--------------------------------------- server { listen 80; server_name localhost; #charset koi8-r; #access_log logs/host.access.log main; location / { root html; index index.html index.htm; } #------------------添加的代码--------------------------------------- location /kafkalua { #这里的kafkalua就是工程名字,不加默认为空 #开启nginx监控 stub_status on; #加载lua文件 default_type text/html; #指定kafka的lua文件位置,就是我们刚才创建的kafkalua.lua(前面已经强调要记住的!!!!) content_by_lua_file /export/servers/openresty/testlua/kafkalua.lua; } #------------------添加的代码--------------------------------------- }
说明:location /kafkalua{...}这里的kafkalua是工程名,可以随意取也可以不取,但是必须要记住!!!
看到我们上面配置了两个location,第一个为location /{...}第二个为location /kafkalua{...}那么他们有什么区别呢???先向下看,迷雾将会慢慢揭开。
第四步:启动nginx
1.进入nginx/sbin
[root@node03 sbin]# cd /export/servers/openresty/nginx/sbin/ [root@node03 sbin]# ll total 16356 -rwxr-xr-x 1 root root 16745834 jul 26 11:33 nginx
2.测试配置文件是否正确
[root@node03 sbin]# nginx -t nginx: the configuration file /export/servers/openresty/nginx/conf/nginx.conf syntax is ok nginx: configuration file /export/servers/openresty/nginx/conf/nginx.conf test is successful #看到已经成功啦
3.启动nginx
[root@node03 sbin]# nginx #不显示任何东西一般是成功啦
4.查看nginx是否启动成功
[root@node03 sbin]# ps -ef | grep nginx root 3730 1 0 09:24 ? 00:00:00 nginx: master process nginx nobody 3731 3730 0 09:24 ? 00:00:20 nginx: worker process is shutting down nobody 5766 3730 0 12:17 ? 00:00:00 nginx: worker process root 5824 3708 0 12:24 pts/1 00:00:00 grep nginx #看到有两个nginx进程,表示成功le
5.浏览器访问nginx
在浏览器输入:node03/kafkalua
说明:如何么有配置hosts则输入openresty所在设备的地址如:192.168.52.120/kafkalua
在浏览器输入:node03/ 或者 192.168.52.120/
再在浏览器输入:node03:80/kafkalua 和 node03:80/试试
搬来nginx.conf来看看:
这里的nide03是服务器的别名或者之间写文服务器地址,80是【listen 80;】配置的监听端口,80端口可以省略不写,如果这写成【listen 8088;】那么浏览器需输入(这里不能省略8088),kafkalua是工程名。
server { listen 80; server_name localhost; #charset koi8-r; #access_log logs/host.access.log main; location / { root html; index index.html index.htm; } #------------------添加的代码--------------------------------------- location /kafkalua { #这里的kafkalua就是工程名字,不加默认为空 #开启nginx监控 stub_status on; #加载lua文件 default_type text/html; #指定kafka的lua文件位置,就是我们刚才创建的kafkalua.lua(前面已经强调要记住的!!!!) content_by_lua_file /export/servers/openresty/testlua/kafkalua.lua; }
第五步:创建测试爬虫程序
1.创建maven工程导入依赖
<dependencies> <dependency> <groupid>org.jsoup</groupid> <artifactid>jsoup</artifactid> <version>1.11.3</version> </dependency> <dependency> <groupid>org.apache.httpcomponents</groupid> <artifactid>httpclient</artifactid> <version>4.5.4</version> </dependency> </dependencies>
2.伪爬虫程序
public class spidergoaircn { private static string basepath = "http://node03/kafkalua"; public static void main(string[] args) throws exception { for (int i = 0; i < 50000; i++) { // 请求查询信息 spiderqueryao(); // 请求html spiderhtml(); // 请求js spiderjs(); // 请求css spidercss(); // 请求png spiderpng(); // 请求jpg spiderjpg(); thread.sleep(100); } } /** * * @throws exception */ public static void spiderqueryao() throws exception { // 1.指定目标网站 ^.*/b2c40/query/jaxb/direct/query.ao.*$ string url = basepath + "/b2c40/query/jaxb/direct/query.ao"; // 2.发起请求 httppost httppost = new httppost(url); // 3. 设置请求参数 httppost.setheader("time-local", getlocaldatetime()); httppost.setheader("requst", "post /b2c40/query/jaxb/direct/query.ao http/1.1"); httppost.setheader("request method", "post"); httppost.setheader("content-type", "application/x-www-form-urlencoded; charset=utf-8"); httppost.setheader( "referer", "http://b2c.csair.com/b2c40/modules/bookingnew/main/flightselectdirect.html?t=s&c1=can&c2=wuh&d1=" + getgotime() + "&at=1&ct=0&it=0"); httppost.setheader("remote address", "192.168.56.80"); httppost.setheader( "user-agent", "mozilla/5.0 (windows nt 10.0; win64; x64) applewebkit/537.36 (khtml, like gecko) chrome/63.0.3239.132 safari/537.36"); httppost.setheader("time-iso8601", getiso8601timestamp()); httppost.setheader("server address", "243.45.78.132"); httppost.setheader( "cookie", "jsessionid=782121159357b98ca6112554cf44321e; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifystatus=n; usertype4logcookie=m; userid4logcookie=13818791413; useridcookie=13818791413; usercodecookie=13818791413; temp_zh=cou%3d0%3bsegt%3d%e5%8d%95%e7%a8%8b%3btime%3d2018-01-13%3b%e5%b9%bf%e5%b7%9e-%e5%8c%97%e4%ba%ac%3b1%2c0%2c0%3b%26cou%3d1%3bsegt%3d%e5%8d%95%e7%a8%8b%3btime%3d" + getgotime() + "%3b%e5%b9%bf%e5%b7%9e-%e6%88%90%e9%83%bd%3b1%2c0%2c0%3b%26; jsessionid=782121159357b98ca6112554cf44321e; wt-fpc=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_cn; wt.al_flight=wt.al_hctype(s)%3awt.al_adultnum(1)%3awt.al_childnum(0)%3awt.al_infantnum(0)%3awt.al_orgcity1(can)%3awt.al_dstcity1(ctu)%3awt.al_orgdate1(" + getgotime() + ")"); // 4.设置请求参数 arraylist<basicnamevaluepair> parameters = new arraylist<basicnamevaluepair>(); parameters .add(new basicnamevaluepair( "json", "{\"depcity\":\"can\", \"arrcity\":\"wuh\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preurl\":\"\", \"ismember\":\"\"}")); httppost.setentity(new urlencodedformentity(parameters)); // 5. 发起请求 closeablehttpclient httpclient = httpclients.createdefault(); closeablehttpresponse response = httpclient.execute(httppost); // 6.获取返回值 system.out.println(response != null); } public static void spiderhtml() throws exception { // 1.指定目标网站 ^.*html.*$ string url = basepath + "/b2c40/modules/bookingnew/main/flightselectdirect.html?t=s&c1=can&c2=ctu&d1=2018-01-17&at=1&ct=0&it=0"; // 2.发起请求 httppost httppost = new httppost(url); // 3. 设置请求参数 httppost.setheader("time-local", getlocaldatetime()); httppost.setheader("requst", "post /b2c40/query/jaxb/direct/query.ao http/1.1"); httppost.setheader("request method", "post"); httppost.setheader("content-type", "application/x-www-form-urlencoded; charset=utf-8"); httppost.setheader( "referer", "http://b2c.csair.com/b2c40/modules/bookingnew/main/flightselectdirect.html?t=s&c1=can&c2=wuh&d1=2018-02-20&at=1&ct=0&it=0"); httppost.setheader("remote address", "192.168.56.1"); httppost.setheader( "user-agent", "mozilla/5.0 (windows nt 10.0; win64; x64) applewebkit/537.36 (khtml, like gecko) chrome/63.0.3239.132 safari/537.36"); httppost.setheader("time-iso8601", getiso8601timestamp()); httppost.setheader("server address", "192.168.56.80"); httppost.setheader( "cookie", "jsessionid=782121159357b98ca6112554cf44321e; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifystatus=n; usertype4logcookie=m; userid4logcookie=13818791413; useridcookie=13818791413; usercodecookie=13818791413; temp_zh=cou%3d0%3bsegt%3d%e5%8d%95%e7%a8%8b%3btime%3d2018-01-13%3b%e5%b9%bf%e5%b7%9e-%e5%8c%97%e4%ba%ac%3b1%2c0%2c0%3b%26cou%3d1%3bsegt%3d%e5%8d%95%e7%a8%8b%3btime%3d2018-01-17%3b%e5%b9%bf%e5%b7%9e-%e6%88%90%e9%83%bd%3b1%2c0%2c0%3b%26; jsessionid=782121159357b98ca6112554cf44321e; wt-fpc=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_cn; wt.al_flight=wt.al_hctype(s)%3awt.al_adultnum(1)%3awt.al_childnum(0)%3awt.al_infantnum(0)%3awt.al_orgcity1(can)%3awt.al_dstcity1(ctu)%3awt.al_orgdate1(2018-01-17)"); // 4.设置请求参数 // httppost.setentity(new stringentity( // "depcity=can&arrcity=wuh&flightdate=20180220&adultnum=1&childnum=0&infantnum=0&cabinorder=0&airline=1&flytype=0&international=0&action=0&segtype=1&cache=0&preurl=&ismember=")); arraylist<basicnamevaluepair> parameters = new arraylist<basicnamevaluepair>(); parameters .add(new basicnamevaluepair( "json", "{\"depcity\":\"can\", \"arrcity\":\"wuh\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preurl\":\"\", \"ismember\":\"\"}")); httppost.setentity(new urlencodedformentity(parameters)); // 5. 发起请求 closeablehttpclient httpclient = httpclients.createdefault(); closeablehttpresponse response = httpclient.execute(httppost); // 6.获取返回值 system.out.println(response != null); } public static void spiderjs() throws exception { // 1.指定目标网站 string url = basepath +"/b2c40/dist/main/modules/common/requireconfig.js"; // 2.发起请求 httppost httppost = new httppost(url); // 3. 设置请求参数 httppost.setheader("time-local", getlocaldatetime()); httppost.setheader("requst", "post /b2c40/query/jaxb/direct/query.ao http/1.1"); httppost.setheader("request method", "post"); httppost.setheader("content-type", "application/x-www-form-urlencoded; charset=utf-8"); httppost.setheader( "referer", "http://b2c.csair.com/b2c40/modules/bookingnew/main/flightselectdirect.html?t=s&c1=can&c2=wuh&d1=2018-02-20&at=1&ct=0&it=0"); httppost.setheader("remote address", "192.168.56.1"); httppost.setheader( "user-agent", "mozilla/5.0 (windows nt 10.0; win64; x64) applewebkit/537.36 (khtml, like gecko) chrome/63.0.3239.132 safari/537.36"); httppost.setheader("time-iso8601", getiso8601timestamp()); httppost.setheader("server address", "192.168.56.80"); httppost.setheader( "cookie", "jsessionid=782121159357b98ca6112554cf44321e; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifystatus=n; usertype4logcookie=m; userid4logcookie=13818791413; useridcookie=13818791413; usercodecookie=13818791413; temp_zh=cou%3d0%3bsegt%3d%e5%8d%95%e7%a8%8b%3btime%3d2018-01-13%3b%e5%b9%bf%e5%b7%9e-%e5%8c%97%e4%ba%ac%3b1%2c0%2c0%3b%26cou%3d1%3bsegt%3d%e5%8d%95%e7%a8%8b%3btime%3d2018-01-17%3b%e5%b9%bf%e5%b7%9e-%e6%88%90%e9%83%bd%3b1%2c0%2c0%3b%26; jsessionid=782121159357b98ca6112554cf44321e; wt-fpc=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_cn; wt.al_flight=wt.al_hctype(s)%3awt.al_adultnum(1)%3awt.al_childnum(0)%3awt.al_infantnum(0)%3awt.al_orgcity1(can)%3awt.al_dstcity1(ctu)%3awt.al_orgdate1(2018-01-17)"); // 4.设置请求参数 arraylist<basicnamevaluepair> parameters = new arraylist<basicnamevaluepair>(); parameters .add(new basicnamevaluepair( "json", "{\"depcity\":\"can\", \"arrcity\":\"wuh\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preurl\":\"\", \"ismember\":\"\"}")); httppost.setentity(new urlencodedformentity(parameters)); // 5. 发起请求 closeablehttpclient httpclient = httpclients.createdefault(); closeablehttpresponse response = httpclient.execute(httppost); // 6.获取返回值 system.out.println(response != null); } public static void spidercss() throws exception { // 1.指定目标网站 string url = basepath +"/b2c40/dist/main/css/flight.css"; // 2.发起请求 httppost httppost = new httppost(url); // 3. 设置请求参数 httppost.setheader("time-local", getlocaldatetime()); httppost.setheader("requst", "post /b2c40/query/jaxb/direct/query.ao http/1.1"); httppost.setheader("request method", "post"); httppost.setheader("content-type", "application/x-www-form-urlencoded; charset=utf-8"); httppost.setheader("referer", "http://b2c.csair.com/b2c40/modules/bookingnew/main/flightselectdirect.html"); httppost.setheader("remote address", "192.168.56.1"); httppost.setheader( "user-agent", "mozilla/5.0 (windows nt 10.0; win64; x64) applewebkit/537.36 (khtml, like gecko) chrome/63.0.3239.132 safari/537.36"); httppost.setheader("time-iso8601", getiso8601timestamp()); httppost.setheader("server address", "192.168.56.80"); httppost.setheader( "cookie", "jsessionid=782121159357b98ca6112554cf44321e; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifystatus=n; usertype4logcookie=m; userid4logcookie=13818791413; useridcookie=13818791413; usercodecookie=13818791413; temp_zh=cou%3d0%3bsegt%3d%e5%8d%95%e7%a8%8b%3btime%3d2018-01-13%3b%e5%b9%bf%e5%b7%9e-%e5%8c%97%e4%ba%ac%3b1%2c0%2c0%3b%26cou%3d1%3bsegt%3d%e5%8d%95%e7%a8%8b%3btime%3d2018-01-17%3b%e5%b9%bf%e5%b7%9e-%e6%88%90%e9%83%bd%3b1%2c0%2c0%3b%26; jsessionid=782121159357b98ca6112554cf44321e; wt-fpc=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_cn; wt.al_flight=wt.al_hctype(s)%3awt.al_adultnum(1)%3awt.al_childnum(0)%3awt.al_infantnum(0)%3awt.al_orgcity1(can)%3awt.al_dstcity1(ctu)%3awt.al_orgdate1(2018-01-17)"); // 4.设置请求参数 arraylist<basicnamevaluepair> parameters = new arraylist<basicnamevaluepair>(); parameters .add(new basicnamevaluepair( "json", "{\"depcity\":\"can\", \"arrcity\":\"wuh\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preurl\":\"\", \"ismember\":\"\"}")); httppost.setentity(new urlencodedformentity(parameters)); // 5. 发起请求 closeablehttpclient httpclient = httpclients.createdefault(); closeablehttpresponse response = httpclient.execute(httppost); // 6.获取返回值 system.out.println(response != null); } public static void spiderpng() throws exception { // 1.指定目标网站 string url =basepath + "/b2c40/dist/main/images/common.png"; // 2.发起请求 httppost httppost = new httppost(url); // 3. 设置请求参数 httppost.setheader("time-local", getlocaldatetime()); httppost.setheader("requst", "post /b2c40/query/jaxb/direct/query.ao http/1.1"); httppost.setheader("request method", "post"); httppost.setheader("content-type", "application/x-www-form-urlencoded; charset=utf-8"); httppost.setheader( "referer", "http://b2c.csair.com/b2c40/modules/bookingnew/main/flightselectdirect.html?t=s&c1=can&c2=wuh&d1=2018-02-20&at=1&ct=0&it=0"); httppost.setheader("remote address", "192.168.56.1"); httppost.setheader( "user-agent", "mozilla/5.0 (windows nt 10.0; win64; x64) applewebkit/537.36 (khtml, like gecko) chrome/63.0.3239.132 safari/537.36"); httppost.setheader("time-iso8601", getiso8601timestamp()); httppost.setheader("server address", "192.168.56.80"); httppost.setheader( "cookie", "jsessionid=782121159357b98ca6112554cf44321e; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifystatus=n; usertype4logcookie=m; userid4logcookie=13818791413; useridcookie=13818791413; usercodecookie=13818791413; temp_zh=cou%3d0%3bsegt%3d%e5%8d%95%e7%a8%8b%3btime%3d2018-01-13%3b%e5%b9%bf%e5%b7%9e-%e5%8c%97%e4%ba%ac%3b1%2c0%2c0%3b%26cou%3d1%3bsegt%3d%e5%8d%95%e7%a8%8b%3btime%3d2018-01-17%3b%e5%b9%bf%e5%b7%9e-%e6%88%90%e9%83%bd%3b1%2c0%2c0%3b%26; jsessionid=782121159357b98ca6112554cf44321e; wt-fpc=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_cn; wt.al_flight=wt.al_hctype(s)%3awt.al_adultnum(1)%3awt.al_childnum(0)%3awt.al_infantnum(0)%3awt.al_orgcity1(can)%3awt.al_dstcity1(ctu)%3awt.al_orgdate1(2018-01-17)"); // 4.设置请求参数 arraylist<basicnamevaluepair> parameters = new arraylist<basicnamevaluepair>(); parameters .add(new basicnamevaluepair( "json", "{\"depcity\":\"can\", \"arrcity\":\"wuh\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preurl\":\"\", \"ismember\":\"\"}")); httppost.setentity(new urlencodedformentity(parameters)); // 5. 发起请求 closeablehttpclient httpclient = httpclients.createdefault(); closeablehttpresponse response = httpclient.execute(httppost); // 6.获取返回值 system.out.println(response != null); } public static void spiderjpg() throws exception { // 1.指定目标网站 string url = basepath +"/b2c40/dist/main/images/loadingimg.jpg"; // 2.发起请求 httppost httppost = new httppost(url); // 3. 设置请求参数 httppost.setheader("time-local", getlocaldatetime()); httppost.setheader("requst", "post /b2c40/query/jaxb/direct/query.ao http/1.1"); httppost.setheader("request method", "post"); httppost.setheader("content-type", "application/x-www-form-urlencoded; charset=utf-8"); httppost.setheader( "referer", "http://b2c.csair.com/b2c40/modules/bookingnew/main/flightselectdirect.html?t=s&c1=can&c2=wuh&d1=2018-02-20&at=1&ct=0&it=0"); httppost.setheader("remote address", "192.168.56.1"); httppost.setheader( "user-agent", "mozilla/5.0 (windows nt 10.0; win64; x64) applewebkit/537.36 (khtml, like gecko) chrome/63.0.3239.132 safari/537.36"); httppost.setheader("time-iso8601", getiso8601timestamp()); httppost.setheader("server address", "192.168.56.80"); httppost.setheader( "cookie", "jsessionid=782121159357b98ca6112554cf44321e; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifystatus=n; usertype4logcookie=m; userid4logcookie=13818791413; useridcookie=13818791413; usercodecookie=13818791413; temp_zh=cou%3d0%3bsegt%3d%e5%8d%95%e7%a8%8b%3btime%3d2018-01-13%3b%e5%b9%bf%e5%b7%9e-%e5%8c%97%e4%ba%ac%3b1%2c0%2c0%3b%26cou%3d1%3bsegt%3d%e5%8d%95%e7%a8%8b%3btime%3d2018-01-17%3b%e5%b9%bf%e5%b7%9e-%e6%88%90%e9%83%bd%3b1%2c0%2c0%3b%26; jsessionid=782121159357b98ca6112554cf44321e; wt-fpc=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_cn; wt.al_flight=wt.al_hctype(s)%3awt.al_adultnum(1)%3awt.al_childnum(0)%3awt.al_infantnum(0)%3awt.al_orgcity1(can)%3awt.al_dstcity1(ctu)%3awt.al_orgdate1(2018-01-17)"); // 4.设置请求参数 arraylist<basicnamevaluepair> parameters = new arraylist<basicnamevaluepair>(); parameters .add(new basicnamevaluepair( "json", "{\"depcity\":\"can\", \"arrcity\":\"wuh\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preurl\":\"\", \"ismember\":\"\"}")); httppost.setentity(new urlencodedformentity(parameters)); // 5. 发起请求 closeablehttpclient httpclient = httpclients.createdefault(); closeablehttpresponse response = httpclient.execute(httppost); // 6.获取返回值 system.out.println(response != null); } public static string getlocaldatetime() { dateformat df = new simpledateformat("dd/mmm/yyyy't'hh:mm:ss +08:00", locale.english); string nowasiso = df.format(new date()); return nowasiso; } public static string getiso8601timestamp() { dateformat df = new simpledateformat("yyyy-mm-dd't'hh:mm:ss+08:00"); string nowasiso = df.format(new date()); return nowasiso; } public static string getgotime() { dateformat df = new simpledateformat("yyyy-mm-dd"); string nowasiso = df.format(new date()); return nowasiso; } public static string getbacktime() { date date = new date();// 取时间 calendar calendar = new gregoriancalendar(); calendar.settime(date); calendar.add(calendar.date, +1);// 把日期往前减少一天,若想把日期向后推一天则将负数改为正数 date = calendar.gettime(); simpledateformat formatter = new simpledateformat("yyyy-mm-dd"); string datestring = formatter.format(date); return datestring; } }
第六步:启动kafka
1.创建主题topic
[root@node01 bin]# kafka-topics.sh --zookeeper node01:2181 --partitions 3 --replication-factor 3 --create --topic b2cdata_collection1
2.开启kafka消费者
[root@node01 bin]# kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic b2cdata_collection1
第七步:开启爬虫程序并观察结果
1.启动爬虫程序
2.观察消费者窗口如下
第八步:启动kafka-manager观察
1.启动kafka-manager
[root@node01 conf]# cd /export/servers/kafka-manager-1.3.3.23/bin/ [root@node01 bin]# ll total 36 -rwxr-xr-x 1 root root 13747 may 1 06:27 kafka-manager -rw-r--r-- 1 root root 9975 may 1 06:27 kafka-manager.bat -rwxr-xr-x 1 root root 1383 may 1 06:27 log-config -rw-r--r-- 1 root root 105 may 1 06:27 log-config.bat [root@node01 bin]# #启动 [root@node01 bin]# ./kafka-manager
启动后的窗口:
2.浏览器访问
浏览器输入:node01:9000
kafka manager使用不做讲解,观察b2cdata_collection1主题消费情况:
有三个分区,每个分区消费的消息差多说明成功啦,
如果不一样,则是kafkalua.lua 脚本中没有配置分区策略,默认分区会导致 数据倾斜 我们需配置自己的分区策略!
完毕!!!!!!!!
上一篇: “弘光政权”是什么?大明的这套体系为什么没能挽救大明?
下一篇: 生成异常日志文件
推荐阅读