您好,登錄后才能下訂單哦!
本篇文章為大家展示了如何在nginx lua中使用kafka,內容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。
第一步:進入opresty目錄
[root@node03 openresty]# cd /export/servers/openresty/ [root@node03 openresty]# ll total 356 drwxr-xr-x 2 root root 4096 Jul 26 11:33 bin drwxrwxr-x 44 1000 1000 4096 Jul 26 11:31 build drwxrwxr-x 43 1000 1000 4096 Nov 13 2017 bundle -rwxrwxr-x 1 1000 1000 45908 Nov 13 2017 configure -rw-rw-r-- 1 1000 1000 22924 Nov 13 2017 COPYRIGHT drwxr-xr-x 6 root root 4096 Jul 26 11:33 luajit drwxr-xr-x 6 root root 4096 Aug 1 08:14 lualib -rw-r--r-- 1 root root 5413 Jul 26 11:32 Makefile drwxr-xr-x 11 root root 4096 Jul 26 11:35 nginx drwxrwxr-x 2 1000 1000 4096 Nov 13 2017 patches drwxr-xr-x 44 root root 4096 Jul 26 11:33 pod -rw-rw-r-- 1 1000 1000 3689 Nov 13 2017 README.markdown -rw-rw-r-- 1 1000 1000 8690 Nov 13 2017 README-win32.txt -rw-r--r-- 1 root root 218352 Jul 26 11:33 resty.index drwxr-xr-x 5 root root 4096 Jul 26 11:33 site drwxr-xr-x 2 root root 4096 Aug 1 10:54 testlua drwxrwxr-x 2 1000 1000 4096 Nov 13 2017 util [root@node03 openresty]#
說明:接下來我們關注兩個目錄 lualib 和 nginx
1.lualib: 是存放opresty所需要的集成軟件包的
2.nginx: 是nginx服務目錄
接下來,我們進入lualib目錄一看究竟:
[root@node03 openresty]# cd lualib/ [root@node03 lualib]# ll total 116 -rwxr-xr-x 1 root root 101809 Jul 26 11:33 cjson.so drwxr-xr-x 3 root root 4096 Jul 26 11:33 ngx drwxr-xr-x 2 root root 4096 Jul 26 11:33 rds drwxr-xr-x 2 root root 4096 Jul 26 11:33 redis drwxr-xr-x 9 root root 4096 Aug 1 10:34 resty
這里我們看到了redis和ngx集成軟件包,說明我們可以之間使用nginx和redis而無需導入任何依賴包!!!!
下面看看resty里面有些說明呢????
[root@node03 lualib]# cd resty/ [root@node03 resty]# ll total 152 -rw-r--r-- 1 root root 6409 Jul 26 11:33 aes.lua drwxr-xr-x 2 root root 4096 Jul 26 11:33 core -rw-r--r-- 1 root root 596 Jul 26 11:33 core.lua drwxr-xr-x 2 root root 4096 Jul 26 11:33 dns drwxr-xr-x 2 root root 4096 Aug 1 10:42 kafka #這是我們自己導入的 drwxr-xr-x 2 root root 4096 Jul 26 11:33 limit -rw-r--r-- 1 root root 4616 Jul 26 11:33 lock.lua drwxr-xr-x 2 root root 4096 Jul 26 11:33 lrucache -rw-r--r-- 1 root root 4620 Jul 26 11:33 lrucache.lua -rw-r--r-- 1 root root 1211 Jul 26 11:33 md5.lua -rw-r--r-- 1 root root 14544 Jul 26 11:33 memcached.lua -rw-r--r-- 1 root root 21577 Jul 26 11:33 mysql.lua -rw-r--r-- 1 root root 616 Jul 26 11:33 random.lua -rw-r--r-- 1 root root 9227 Jul 26 11:33 redis.lua -rw-r--r-- 1 root root 1192 Jul 26 11:33 sha1.lua -rw-r--r-- 1 root root 1045 Jul 26 11:33 sha224.lua -rw-r--r-- 1 root root 1221 Jul 26 11:33 sha256.lua -rw-r--r-- 1 root root 1045 Jul 26 11:33 sha384.lua -rw-r--r-- 1 root root 1359 Jul 26 11:33 sha512.lua -rw-r--r-- 1 root root 236 Jul 26 11:33 sha.lua -rw-r--r-- 1 root root 698 Jul 26 11:33 string.lua -rw-r--r-- 1 root root 5178 Jul 26 11:33 upload.lua drwxr-xr-x 2 root root 4096 Jul 26 11:33 upstream drwxr-xr-x 2 root root 406 Jul 26 11:33 websocket
這里我們看到了熟悉的mysql.lua和redis.lua,好了其他的先不要管
注意:這里的 kafka 這個包是沒有的,說明opnresty么有集成kafka。此處我已經提前導入啦kafka集成包
我們看看kafka里面多有哪些包:
[root@node03 resty]# cd kafka [root@node03 kafka]# ll total 48 -rw-r--r-- 1 root root 1369 Aug 1 10:42 broker.lua -rw-r--r-- 1 root root 5537 Aug 1 10:42 client.lua -rw-r--r-- 1 root root 710 Aug 1 10:42 errors.lua -rw-r--r-- 1 root root 10718 Aug 1 10:42 producer.lua -rw-r--r-- 1 root root 4072 Aug 1 10:42 request.lua -rw-r--r-- 1 root root 2118 Aug 1 10:42 response.lua -rw-r--r-- 1 root root 1494 Aug 1 10:42 ringbuffer.lua -rw-r--r-- 1 root root 4845 Aug 1 10:42 sendbuffer.lua
附上 kafka 集成包:kafka_jb51.rar
第二步:創建kafka測試lua文件
1.退回到openresty
[root@node03 kafka]# cd /export/servers/openresty/
2.創建測試文件
[root@node03 openresty]# mkdir -r testlua #這里文件名自己取,文件位置自己定,但必須找得到
這里文件名自己取,文件位置自己定,但必須找得到!!!!!!!!!!!下面會用到!!!!!!!!!!
3.進入剛剛創建的文件夾并創建kafkalua.lua腳本文件
創建文件:vim kafkalua.lua或者touch kafkalua.lua
[root@node03 openresty]# cd testlua/ [root@node03 testlua]# ll total 8 -rw-r--r-- 1 root root 3288 Aug 1 10:54 kafkalua.lua
kafkalua.lua:
--測試語句可以不用 ngx.say('hello kafka file configuration successful!!!!!!') --數據采集閾值限制,如果lua采集超過閾值,則不采集 local DEFAULT_THRESHOLD = 100000 -- kafka分區數 local PARTITION_NUM = 6 -- kafka主題名稱 local TOPIC = 'B2CDATA_COLLECTION1' -- 輪詢器共享變量KEY值 local POLLING_KEY = "POLLING_KEY" -- kafka集群(定義kafka broker地址,ip需要和kafka的host.name配置一致) local function partitioner(key, num, correlation_id) return tonumber(key) end --kafka broker列表 local BROKER_LIST = {{host="192.168.52.100",port=9092},{host="192.168.52.110",port=9092},{host="192.168.52.120",port=9092}} --kafka參數, local CONNECT_PARAMS = { producer_type = "async", socket_timeout = 30000, flush_time = 10000, request_timeout = 20000, partitioner = partitioner } -- 共享內存計數器,用于kafka輪詢使用 local shared_data = ngx.shared.shared_data local pollingVal = shared_data:get(POLLING_KEY) if not pollingVal then pollingVal = 1 shared_data:set(POLLING_KEY, pollingVal) end --獲取每一條消息的計數器,對PARTITION_NUM取余數,均衡分區 local partitions = '' .. (tonumber(pollingVal) % PARTITION_NUM) shared_data:incr(POLLING_KEY, 1) -- 并發控制 local isGone = true --獲取ngx.var.connections_active進行過載保護,即如果當前活躍連接數超過閾值進行限流保護 if tonumber(ngx.var.connections_active) > tonumber(DEFAULT_THRESHOLD) then isGone = false end -- 數據采集 if isGone then local time_local = ngx.var.time_local if time_local == nil then time_local = "" end local request = ngx.var.request if request == nil then request = "" end local request_method = ngx.var.request_method if request_method == nil then request_method = "" end local content_type = ngx.var.content_type if content_type == nil then content_type = "" end ngx.req.read_body() local request_body = ngx.var.request_body if request_body == nil then request_body = "" end local http_referer = ngx.var.http_referer if http_referer == nil then http_referer = "" end local remote_addr = ngx.var.remote_addr if remote_addr == nil then remote_addr = "" end local http_user_agent = ngx.var.http_user_agent if http_user_agent == nil then http_user_agent = "" end local time_iso8601 = ngx.var.time_iso8601 if time_iso8601 == nil then time_iso8601 = "" end local server_addr = ngx.var.server_addr if server_addr == nil then server_addr = "" end local http_cookie = ngx.var.http_cookie if http_cookie == nil then http_cookie = "" end --封裝數據 local message = time_local .."#CS#".. request .."#CS#".. request_method .."#CS#".. content_type .."#CS#".. request_body .."#CS#".. http_referer .."#CS#".. remote_addr .."#CS#".. http_user_agent .."#CS#".. time_iso8601 .."#CS#".. server_addr .."#CS#".. http_cookie; --引入kafka的producer local producer = require "resty.kafka.producer" --創建producer local bp = producer:new(BROKER_LIST, CONNECT_PARAMS) --發送數據 local ok, err = bp:send(TOPIC, partitions, message) --打印錯誤日志 if not ok then ngx.log(ngx.ERR, "kafka send err:", err) return end end
第三步:修改nginx配置文件nginx.conf
1.進入ngin/conf目錄
[root@node03 openresty]# cd /export/servers/openresty/nginx/conf/ [root@node03 conf]# ll total 76 -rw-r--r-- 1 root root 1077 Jul 26 11:33 fastcgi.conf -rw-r--r-- 1 root root 1077 Jul 26 11:33 fastcgi.conf.default -rw-r--r-- 1 root root 1007 Jul 26 11:33 fastcgi_params -rw-r--r-- 1 root root 1007 Jul 26 11:33 fastcgi_params.default -rw-r--r-- 1 root root 2837 Jul 26 11:33 koi-utf -rw-r--r-- 1 root root 2223 Jul 26 11:33 koi-win -rw-r--r-- 1 root root 5170 Jul 26 11:33 mime.types -rw-r--r-- 1 root root 5170 Jul 26 11:33 mime.types.default -rw-r--r-- 1 root root 3191 Aug 1 10:52 nginx.conf -rw-r--r-- 1 root root 2656 Jul 26 11:33 nginx.conf.default -rw-r--r-- 1 root root 636 Jul 26 11:33 scgi_params -rw-r--r-- 1 root root 636 Jul 26 11:33 scgi_params.default -rw-r--r-- 1 root root 664 Jul 26 11:33 uwsgi_params -rw-r--r-- 1 root root 664 Jul 26 11:33 uwsgi_params.default -rw-r--r-- 1 root root 3610 Jul 26 11:33 win-utf
2.修改nginx.conf
[root@node03 conf]# vim nginx.conf #1.說明找到第一個server #2.在server上面添加兩行代碼如下 #3.在server里面添加kafka相關的代碼如下 #------------------添加的代碼--------------------------------------- #開啟共享字典,設置內存大小為10M,供每個nginx的線程消費 lua_shared_dict shared_data 10m; #配置本地域名解析 resolver 127.0.0.1; #------------------添加的代碼--------------------------------------- server { listen 80; server_name localhost; #charset koi8-r; #access_log logs/host.access.log main; location / { root html; index index.html index.htm; } #------------------添加的代碼--------------------------------------- location /kafkalua { #這里的kafkalua就是工程名字,不加默認為空 #開啟nginx監控 stub_status on; #加載lua文件 default_type text/html; #指定kafka的lua文件位置,就是我們剛才創建的kafkalua.lua(前面已經強調要記住的!!!!) content_by_lua_file /export/servers/openresty/testlua/kafkalua.lua; } #------------------添加的代碼--------------------------------------- }
說明:location /kafkalua{...}這里的kafkalua是工程名,可以隨意取也可以不取,但是必須要記住!!!
看到我們上面配置了兩個location,第一個為location /{...}第二個為location /kafkalua{...}那么他們有什么區別呢???先向下看,迷霧將會慢慢揭開。
第四步:啟動nginx
1.進入nginx/sbin
[root@node03 sbin]# cd /export/servers/openresty/nginx/sbin/ [root@node03 sbin]# ll total 16356 -rwxr-xr-x 1 root root 16745834 Jul 26 11:33 nginx
2.測試配置文件是否正確
[root@node03 sbin]# nginx -t nginx: the configuration file /export/servers/openresty/nginx/conf/nginx.conf syntax is ok nginx: configuration file /export/servers/openresty/nginx/conf/nginx.conf test is successful #看到已經成功啦
3.啟動nginx
[root@node03 sbin]# nginx #不顯示任何東西一般是成功啦
4.查看nginx是否啟動成功
[root@node03 sbin]# ps -ef | grep nginx root 3730 1 0 09:24 ? 00:00:00 nginx: master process nginx nobody 3731 3730 0 09:24 ? 00:00:20 nginx: worker process is shutting down nobody 5766 3730 0 12:17 ? 00:00:00 nginx: worker process root 5824 3708 0 12:24 pts/1 00:00:00 grep nginx#看到有兩個nginx進程,表示成功le
5.瀏覽器訪問nginx
在瀏覽器輸入:node03/kafkalua
說明:如何么有配置hosts則輸入openresty所在設備的地址如:192.168.52.120/kafkalua
在瀏覽器輸入:node03/或者 192.168.52.120/
再在瀏覽器輸入:node03:80/kafkalua 和 node03:80/試試 搬來nginx.conf來看看:
node03:80/kafkalua 這里的nide03是服務器的別名或者之間寫文服務器地址,80是【listen 80;】配置的監聽端口,80端口可以省略不寫,如果這寫成【listen 8088;】那么瀏覽器需輸入 node03:8088/kafkalua (這里不能省略8088),kafkalua是工程名。
server { listen 80; server_name localhost; #charset koi8-r; #access_log logs/host.access.log main; location / { root html; index index.html index.htm; } #------------------添加的代碼--------------------------------------- location /kafkalua { #這里的kafkalua就是工程名字,不加默認為空 #開啟nginx監控 stub_status on; #加載lua文件 default_type text/html; #指定kafka的lua文件位置,就是我們剛才創建的kafkalua.lua(前面已經強調要記住的!!!!) content_by_lua_file /export/servers/openresty/testlua/kafkalua.lua; }
第五步:創建測試爬蟲程序
1.創建maven工程導入依賴
<dependencies> <dependency> <groupId>org.jsoup</groupId> <artifactId>jsoup</artifactId> <version>1.11.3</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.4</version> </dependency> </dependencies>
2.偽爬蟲程序
public class SpiderGoAirCN { private static String basePath = "http://node03/kafkalua"; public static void main(String[] args) throws Exception { for (int i = 0; i < 50000; i++) { // 請求查詢信息 spiderQueryao(); // 請求html spiderHtml(); // 請求js spiderJs(); // 請求css spiderCss(); // 請求png spiderPng(); // 請求jpg spiderJpg(); Thread.sleep(100); } } /** * * @throws Exception */ public static void spiderQueryao() throws Exception { // 1.指定目標網站 ^.*/B2C40/query/jaxb/direct/query.ao.*$ String url = basePath + "/B2C40/query/jaxb/direct/query.ao"; // 2.發起請求 HttpPost httpPost = new HttpPost(url); // 3. 設置請求參數 httpPost.setHeader("Time-Local", getLocalDateTime()); httpPost.setHeader("Requst", "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1"); httpPost.setHeader("Request Method", "POST"); httpPost.setHeader("Content-Type", "application/x-www-form-urlencoded; charset=UTF-8"); httpPost.setHeader( "Referer", "http://xml.xxxx.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1=" + getGoTime() + "&at=1&ct=0&it=0"); httpPost.setHeader("Remote Address", "192.168.56.80"); httpPost.setHeader( "User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36"); httpPost.setHeader("Time-Iso8601", getISO8601Timestamp()); httpPost.setHeader("Server Address", "243.45.78.132"); httpPost.setHeader( "Cookie", "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D" + getGoTime() + "%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(" + getGoTime() + ")"); // 4.設置請求參數 ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>(); parameters .add(new BasicNameValuePair( "json", "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}")); httpPost.setEntity(new UrlEncodedFormEntity(parameters)); // 5. 發起請求 CloseableHttpClient httpClient = HttpClients.createDefault(); CloseableHttpResponse response = httpClient.execute(httpPost); // 6.獲取返回值 System.out.println(response != null); } public static void spiderHtml() throws Exception { // 1.指定目標網站 ^.*html.*$ String url = basePath + "/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=CTU&d1=2018-01-17&at=1&ct=0&it=0"; // 2.發起請求 HttpPost httpPost = new HttpPost(url); // 3. 設置請求參數 httpPost.setHeader("Time-Local", getLocalDateTime()); httpPost.setHeader("Requst", "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1"); httpPost.setHeader("Request Method", "POST"); httpPost.setHeader("Content-Type", "application/x-www-form-urlencoded; charset=UTF-8"); httpPost.setHeader( "Referer", "http://xml.xxxx.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1=2018-02-20&at=1&ct=0&it=0"); httpPost.setHeader("Remote Address", "192.168.56.1"); httpPost.setHeader( "User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36"); httpPost.setHeader("Time-Iso8601", getISO8601Timestamp()); httpPost.setHeader("Server Address", "192.168.56.80"); httpPost.setHeader( "Cookie", "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)"); // 4.設置請求參數 // httpPost.setEntity(new StringEntity( // "depcity=CAN&arrcity=WUH&flightdate=20180220&adultnum=1&childnum=0&infantnum=0&cabinorder=0&airline=1&flytype=0&international=0&action=0&segtype=1&cache=0&preUrl=&isMember=")); ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>(); parameters .add(new BasicNameValuePair( "json", "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}")); httpPost.setEntity(new UrlEncodedFormEntity(parameters)); // 5. 發起請求 CloseableHttpClient httpClient = HttpClients.createDefault(); CloseableHttpResponse response = httpClient.execute(httpPost); // 6.獲取返回值 System.out.println(response != null); } public static void spiderJs() throws Exception { // 1.指定目標網站 String url = basePath +"/B2C40/dist/main/modules/common/requireConfig.js"; // 2.發起請求 HttpPost httpPost = new HttpPost(url); // 3. 設置請求參數 httpPost.setHeader("Time-Local", getLocalDateTime()); httpPost.setHeader("Requst", "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1"); httpPost.setHeader("Request Method", "POST"); httpPost.setHeader("Content-Type", "application/x-www-form-urlencoded; charset=UTF-8"); httpPost.setHeader( "Referer", "http://xml.xxxx.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1=2018-02-20&at=1&ct=0&it=0"); httpPost.setHeader("Remote Address", "192.168.56.1"); httpPost.setHeader( "User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36"); httpPost.setHeader("Time-Iso8601", getISO8601Timestamp()); httpPost.setHeader("Server Address", "192.168.56.80"); httpPost.setHeader( "Cookie", "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)"); // 4.設置請求參數 ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>(); parameters .add(new BasicNameValuePair( "json", "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}")); httpPost.setEntity(new UrlEncodedFormEntity(parameters)); // 5. 發起請求 CloseableHttpClient httpClient = HttpClients.createDefault(); CloseableHttpResponse response = httpClient.execute(httpPost); // 6.獲取返回值 System.out.println(response != null); } public static void spiderCss() throws Exception { // 1.指定目標網站 String url = basePath +"/B2C40/dist/main/css/flight.css"; // 2.發起請求 HttpPost httpPost = new HttpPost(url); // 3. 設置請求參數 httpPost.setHeader("Time-Local", getLocalDateTime()); httpPost.setHeader("Requst", "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1"); httpPost.setHeader("Request Method", "POST"); httpPost.setHeader("Content-Type", "application/x-www-form-urlencoded; charset=UTF-8"); httpPost.setHeader("Referer", "http://xml.xxxx.com/B2C40/modules/bookingnew/main/flightSelectDirect.html"); httpPost.setHeader("Remote Address", "192.168.56.1"); httpPost.setHeader( "User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36"); httpPost.setHeader("Time-Iso8601", getISO8601Timestamp()); httpPost.setHeader("Server Address", "192.168.56.80"); httpPost.setHeader( "Cookie", "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)"); // 4.設置請求參數 ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>(); parameters .add(new BasicNameValuePair( "json", "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}")); httpPost.setEntity(new UrlEncodedFormEntity(parameters)); // 5. 發起請求 CloseableHttpClient httpClient = HttpClients.createDefault(); CloseableHttpResponse response = httpClient.execute(httpPost); // 6.獲取返回值 System.out.println(response != null); } public static void spiderPng() throws Exception { // 1.指定目標網站 String url =basePath + "/B2C40/dist/main/images/common.png"; // 2.發起請求 HttpPost httpPost = new HttpPost(url); // 3. 設置請求參數 httpPost.setHeader("Time-Local", getLocalDateTime()); httpPost.setHeader("Requst", "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1"); httpPost.setHeader("Request Method", "POST"); httpPost.setHeader("Content-Type", "application/x-www-form-urlencoded; charset=UTF-8"); httpPost.setHeader( "Referer", "http://xml.xxxx.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1=2018-02-20&at=1&ct=0&it=0"); httpPost.setHeader("Remote Address", "192.168.56.1"); httpPost.setHeader( "User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36"); httpPost.setHeader("Time-Iso8601", getISO8601Timestamp()); httpPost.setHeader("Server Address", "192.168.56.80"); httpPost.setHeader( "Cookie", "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)"); // 4.設置請求參數 ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>(); parameters .add(new BasicNameValuePair( "json", "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}")); httpPost.setEntity(new UrlEncodedFormEntity(parameters)); // 5. 發起請求 CloseableHttpClient httpClient = HttpClients.createDefault(); CloseableHttpResponse response = httpClient.execute(httpPost); // 6.獲取返回值 System.out.println(response != null); } public static void spiderJpg() throws Exception { // 1.指定目標網站 String url = basePath +"/B2C40/dist/main/images/loadingimg.jpg"; // 2.發起請求 HttpPost httpPost = new HttpPost(url); // 3. 設置請求參數 httpPost.setHeader("Time-Local", getLocalDateTime()); httpPost.setHeader("Requst", "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1"); httpPost.setHeader("Request Method", "POST"); httpPost.setHeader("Content-Type", "application/x-www-form-urlencoded; charset=UTF-8"); httpPost.setHeader( "Referer", "http://xml.xxxx.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1=2018-02-20&at=1&ct=0&it=0"); httpPost.setHeader("Remote Address", "192.168.56.1"); httpPost.setHeader( "User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36"); httpPost.setHeader("Time-Iso8601", getISO8601Timestamp()); httpPost.setHeader("Server Address", "192.168.56.80"); httpPost.setHeader( "Cookie", "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)"); // 4.設置請求參數 ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>(); parameters .add(new BasicNameValuePair( "json", "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}")); httpPost.setEntity(new UrlEncodedFormEntity(parameters)); // 5. 發起請求 CloseableHttpClient httpClient = HttpClients.createDefault(); CloseableHttpResponse response = httpClient.execute(httpPost); // 6.獲取返回值 System.out.println(response != null); } public static String getLocalDateTime() { DateFormat df = new SimpleDateFormat("dd/MMM/yyyy'T'HH:mm:ss +08:00", Locale.ENGLISH); String nowAsISO = df.format(new Date()); return nowAsISO; } public static String getISO8601Timestamp() { DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss+08:00"); String nowAsISO = df.format(new Date()); return nowAsISO; } public static String getGoTime() { DateFormat df = new SimpleDateFormat("yyyy-MM-dd"); String nowAsISO = df.format(new Date()); return nowAsISO; } public static String getBackTime() { Date date = new Date();// 取時間 Calendar calendar = new GregorianCalendar(); calendar.setTime(date); calendar.add(calendar.DATE, +1);// 把日期往前減少一天,若想把日期向后推一天則將負數改為正數 date = calendar.getTime(); SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd"); String dateString = formatter.format(date); return dateString; } }
第六步:啟動kafka
1.創建主題topic
[root@node01 bin]# kafka-topics.sh --zookeeper node01:2181 --partitions 3 --replication-factor 3 --create --topic B2CDATA_COLLECTION1
2.開啟kafka消費者
[root@node01 bin]# kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic B2CDATA_COLLECTION1
第七步:開啟爬蟲程序并觀察結果
1.啟動爬蟲程序
2.觀察消費者窗口如下
第八步:啟動kafka-manager觀察
1.啟動kafka-manager
[root@node01 conf]# cd /export/servers/kafka-manager-1.3.3.23/bin/ [root@node01 bin]# ll total 36 -rwxr-xr-x 1 root root 13747 May 1 06:27 kafka-manager -rw-r--r-- 1 root root 9975 May 1 06:27 kafka-manager.bat -rwxr-xr-x 1 root root 1383 May 1 06:27 log-config -rw-r--r-- 1 root root 105 May 1 06:27 log-config.bat [root@node01 bin]# #啟動 [root@node01 bin]# ./kafka-manager
啟動后的窗口:
2.瀏覽器訪問
瀏覽器輸入:node01:9000
kafka manager使用不做講解,觀察B2CDATA_COLLECTION1主題消費情況:
有三個分區,每個分區消費的消息差多說明成功啦,
如果不一樣,則是kafkalua.lua 腳本中沒有配置分區策略,默認分區會導致 數據傾斜 我們需配置自己的分區策略!
上述內容就是如何在nginx lua中使用kafka,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。