您好,登錄后才能下訂單哦!
本篇文章給大家分享的是有關Serverless如何解決數據采集分析痛點,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
簡介: 眾所周知,游戲行業在當今的互聯網行業中算是一棵常青樹。在疫情之前的 2019 年,中國游戲市場營收規模約 2884.8 億元,同比增長 17.1%。2020 年因為疫情,游戲行業更是突飛猛進。玩游戲本就是中國網民最普遍的娛樂方式之一,疫情期間更甚。據不完全統計,截至 2019 年,中國移動游戲用戶規模約 6.6 億人,占中國總網民規模 8.47 億的 77.92%,可見游戲作為一種低門檻、低成本的娛樂手段,已成為大部分人生活中習以為常的一部分。
眾所周知,游戲行業在當今的互聯網行業中算是一棵常青樹。在疫情之前的 2019 年,中國游戲市場營收規模約 2884.8 億元,同比增長 17.1%。2020 年因為疫情,游戲行業更是突飛猛進。玩游戲本就是中國網民最普遍的娛樂方式之一,疫情期間更甚。據不完全統計,截至 2019 年,中國移動游戲用戶規模約 6.6 億人,占中國總網民規模 8.47 億的 77.92%,可見游戲作為一種低門檻、低成本的娛樂手段,已成為大部分人生活中習以為常的一部分。
對于玩家而言,市面上的游戲數量多如牛毛,那么玩家如何能發現和認知到一款游戲,并且持續的玩下去恐怕是所有游戲廠商需要思考的問題。加之 2018 年游戲版號停發事件,游戲廠商更加珍惜每一個已獲得版號的游戲產品,所以這也使得“深度打磨產品質量”和“提高運營精細程度”這兩個游戲產業發展方向成為廣大游戲廠商的發展思路,無論是新游戲還是老游戲都在努力落實這兩點:
新游戲:面向玩家需要提供更充足的推廣資源和更完整的游戲內容。
老游戲:通過用戶行為分析,投入更多的精力和成本,制作更優質的版本內容。
這里我們重點來看新游戲。一家游戲企業辛辛苦苦研發三年,等著新游戲發售時一飛沖天。那么問題來了,新游戲如何被廣大玩家看到?
首先來看看游戲行業公司的分類:
游戲研發商:研發游戲的公司,生產和制作游戲內容。比如王者榮耀的所有英雄設計、游戲戰斗場景、戰斗邏輯等,全部由游戲研發公司提供。
游戲發行商:游戲發行商的主要工作分三大塊:市場工作、運營工作、客服工作。游戲發行商把控游戲命脈,市場工作核心是導入玩家,運營工作核心是將用戶價值最大化、賺取更多利益。
游戲平臺/渠道商:游戲平臺和渠道商的核心目的就是曝光游戲,讓盡量多的人能發現你的游戲。
這三種類型的業務,有專注于其中某一領域的獨立公司,也有能承接全部業務的公司,但無論那一種,這三者之間的關系是不會變的:
所以不難理解,想讓更多的玩家看到你的游戲,游戲發行和運營是關鍵。通俗來講,如果你的游戲出現在目前所有大家熟知的平臺廣告中,那么最起碼游戲的新用戶注冊數量是很可觀的。因此這就引入了一個關鍵詞:買量。
根據數據顯示,2019 年月均買量手游數達 6000+ 款,而 2018 年僅為 4200 款。另一方面,隨著抖音、微博等超級 APP 在游戲買量市場的資源傾斜,也助推手游買量的效果和效率有所提升,游戲廠商更愿意使用買量的方式來吸引用戶。
但需要注意的是,在游戲買量的精準化程度不斷提高的同時,買量的成本也在節節攀升,唯有合理配置買量、渠道與整合營銷之間的關系,才能將宣發資源發揮到最大的效果。
通俗來講,買量其實就是在各大主流平臺投放廣告,廣大用戶看到游戲廣告后,有可能會點擊廣告,然后進入游戲廠商的宣傳頁面,同時會采集用戶的一些信息,然后游戲廠商對采集到的用戶信息進行大數據分析,進行進一步的定向推廣。
游戲廠商花錢買量,換來的用戶信息以及新用戶注冊信息是為持續的游戲運營服務的,那么這個場景的核心訴求就是采集用戶信息的完整性。
比如說,某游戲廠商一天花 5000w 投放廣告,在某平臺某時段產生了每秒 1w 次的廣告點擊率,那么在這個時段內每一個點擊廣告的用戶信息要完整的被采集到,然后入庫進行后續分析。這就對數據采集系統提出了很高的要求。
這其中,最核心的一點就是系統暴露接口的環節要能夠平穩承載買量期間不定時的流量脈沖。在買量期間,游戲廠商通常會在多個平臺投放廣告,每個平臺投放廣告的時間是不一樣的,所以就會出現全天不定時的流量脈沖現象。如果這個環節出現問題,那么相當于買量的錢就打水漂了。
上圖是一個相對傳統的數據采集系統架構,最關鍵的就是暴露 HTTP 接口回傳數據這部分,這部分如果出問題,那么采集數據的鏈路就斷了。但這部分往往會面臨兩個挑戰:
當流量脈沖來的時候,這部分是否可以快速擴容以應對流量沖擊。
游戲運營具備潮汐特性,并非天天都在進行,這就需要考慮如何優化資源利用率。
通常情況下,在游戲有運營活動之前,會提前通知運維同學,對這個環節的服務增加節點,但要增加多少其實是無法預估的,只能大概拍一個數字。這是在傳統架構下經常會出現的場景,這就會導致兩個問題:
流量太大,節點加少了,導致一部分流量的數據沒有采集到。
流量沒有預期那么大,節點加多了,導致資源浪費。
我們可以通過函數計算 FC 來取代傳統架構中暴露 HTTP 回傳數據這部分,從而完美解決傳統架構中存在問題。
傳統架構中的兩個問題均可以通過函數計算百毫秒彈性的特性來解決。我們并不需要去估算營銷活動會帶來多大的流量,也不需要去擔心和考慮對數據采集系統的性能,運維同學更不需要提前預備 ECS。
因為函數計算的極致彈性特性,當沒有買量、沒有營銷活動的時候,函數計算的運行實例是零。有買量活動時,在流量脈沖的情況下,函數計算會快速拉起實例來承載流量壓力;當流量減少時,函數計算會及時釋放沒有請求的實例進行縮容。所以 Serverless 架構帶來的優勢有以下三點:
無需運維介入,研發同學就可以很快的搭建出來。
無論流量大小,均可以平穩的承接。
函數計算拉起的實例數量可以緊貼流量大小的曲線,做到資源利用率最優化,再加上按量計費的模式,可以最大程度優化成本。
從上面的架構圖可以看到,整個采集數據階段,分了兩個函數來實現,第一個函數的作用是單純的暴露 HTTP 接口接收數據,第二個函數用于處理數據,然后將數據發送至消息隊列 Kafka 和數據庫 RDS。
我們打開函數計算控制臺,創建一個函數:
函數類型:HTTP(即觸發器為 HTTP)
函數名稱:receiveData
運行環境:Python3
函數實例類型:彈性實例
函數執行內存:512MB
函數運行超時時間:60 秒
函數單實例并發度:1
觸發器類型:HTTP 觸發器
觸發器名稱:defaultTrigger
認證方式:anonymous(即無需認證)
請求方式:GET,POST
創建好函數之后,我們通過在線編輯器編寫代碼:
# -*- coding: utf-8 -*- import logging import json import urllib.parse HELLO_WORLD = b'Hello world!\n' def handler(environ, start_response): logger = logging.getLogger() context = environ['fc.context'] request_uri = environ['fc.request_uri'] for k, v in environ.items(): if k.startswith('HTTP_'): # process custom request headers pass try: request_body_size = int(environ.get('CONTENT_LENGTH', 0)) except (ValueError): request_body_size = 0 # 接收回傳的數據 request_body = environ['wsgi.input'].read(request_body_size) request_body_str = urllib.parse.unquote(request_body.decode("GBK")) request_body_obj = json.loads(request_body_str) logger.info(request_body_obj["action"]) logger.info(request_body_obj["articleAuthorId"]) status = '200 OK' response_headers = [('Content-type', 'text/plain')] start_response(status, response_headers) return [HELLO_WORLD]
此時的代碼非常簡單,就是接收用戶傳來的參數,我們可以調用接口進行驗證:
可以在函數的日志查詢中看到此次調用的日志:
同時,我們也可以查看函數的鏈路追蹤來分析每一個步驟的調用耗時,比如函數接到請求→冷啟動(無活躍實例時)→準備代碼→執行初始化方法→執行入口函數邏輯這個過程:
從調用鏈路圖中可以看到,剛才的那次請求包含了冷啟動的時間,因為當時沒有活躍實例,整個過程耗時 418 毫秒,真正執行入口函數代碼的時間為 8 毫秒。
當再次調用接口時,可以看到就直接執行了入口函數的邏輯,因為此時已經有實例在運行,整個耗時只有 2.3 毫秒
2. 處理數據的函數
第一個函數是通過在函數計算控制臺在界面上創建的,選擇了運行環境是 Python3,我們可以在官方文檔中查看預置的 Python3 運行環境內置了哪些模塊,因為第二個函數要操作 Kafka 和 RDS,所以需要我們確認對應的模塊。
從文檔中可以看到,內置的模塊中包含 RDS 的 SDK 模塊,但是沒有 Kafka 的 SDK 模塊,此時就需要我們手動安裝 Kafka SDK 模塊,并且創建函數也會使用另一種方式。
Funcraft 是一個用于支持 Serverless 應用部署的命令行工具,能幫助我們便捷地管理函數計算、API 網關、日志服務等資源。它通過一個資源配置文件(template.yml),協助我們進行開發、構建、部署操作。
所以第二個函數我們需要使用 Fun 來進行操作,整個操作分為四個步驟:
安裝 Fun 工具。
編寫 template.yml 模板文件,用來描述函數。
安裝我們需要的第三方依賴。
上傳部署函數。
Fun 提供了三種安裝方式:
通過 npm 包管理安裝 —— 適合所有平臺(Windows/Mac/Linux)且已經預裝了 npm 的開發者。
通過下載二進制安裝 —— 適合所有平臺(Windows/Mac/Linux)。
通過 Homebrew 包管理器安裝 —— 適合 Mac 平臺,更符合 MacOS 開發者習慣。
文本示例環境為 Mac,所以使用 npm 方式安裝,非常的簡單,一行命令搞定:
sudo npm install @alicloud/fun -g
安裝完成之后。在控制終端輸入 fun 命令可以查看版本信息:
$ fun --version 3.6.20
在第一次使用 fun 之前需要先執行 fun config 命令進行配置,按照提示,依次配置 Account ID、Access Key Id、Secret Access Key、 Default Region Name 即可。其中 Account ID、Access Key Id 你可以從函數計算控制臺首頁的右上方獲得:
fun config
? Aliyun Account ID *01
? Aliyun Access Key ID *qef6j
? Aliyun Access Key Secret *UFJG
? Default region name cn-hangzhou
? The timeout in seconds for each SDK client invoking 60
? The maximum number of retries for each SDK client 3
新建一個目錄,在該目錄下創建一個名為 template.yml 的 YAML 文件,該文件主要描述要創建的函數的各項配置,說白了就是將函數計算控制臺上配置的那些配置信息以 YAML 格式寫在文件里:
ROSTemplateFormatVersion: '2015-09-01' Transform: 'Aliyun::Serverless-2018-04-03' Resources: FCBigDataDemo: Type: 'Aliyun::Serverless::Service' Properties: Description: 'local invoke demo' VpcConfig: VpcId: 'vpc-xxxxxxxxxxx' VSwitchIds: [ 'vsw-xxxxxxxxxx' ] SecurityGroupId: 'sg-xxxxxxxxx' LogConfig: Project: fcdemo Logstore: fc_demo_store dataToKafka: Type: 'Aliyun::Serverless::Function' Properties: Initializer: index.my_initializer Handler: index.handler CodeUri: './' Description: '' Runtime: python3
我們來解析以上文件的核心內容:
FCBigDataDemo:自定義的服務名稱。通過下面的 Type 屬性標明是服務,即 Aliyun::Serverless::Service。
Properties:Properties 下的屬性都是該服務的各配置項。
VpcConfig:服務的 VPC 配置,包含:VpcId:VPC ID。VSwitchIds:交換機 ID,這里是數組,可以配置多個交換機。SecurityGroupId:安全組 ID。
LogConfig:服務綁定的日志服務(SLS)配置,包含:Project:日志服務項目。Logstore:LogStore 名稱。
dataToKafka:該服務下自定義的函數名稱。通過下面的 Type 屬性標明是函數,即 Aliyun::Serverless::Function。
Properties:Properties下的屬性都是該函數的各配置項。
Initializer:配置初始化函數。
Handler:配置入口函數。
Runtime:函數運行環境。
服務和函數的模板創建好之后,我們來安裝需要使用的第三方依賴。在這個示例的場景中,第二個函數需要使用 Kafka SDK,所以可以通過 fun 工具結合 Python 包管理工具 pip 進行安裝:
fun install --runtime python3 --package-type pip kafka-python
執行命令后有提示信息
此時我們會發現在目錄下會生成一個.fun文件夾 ,我們安裝的依賴包就在該目錄下:
現在編寫好了模板文件以及安裝好了我們需要的 Kafka SDK 后,還需要添加我們的代碼文件 index.py,代碼內容如下:
# -*- coding: utf-8 -*- import logging import json import urllib.parse from kafka import KafkaProducer producer = None def my_initializer(context): logger = logging.getLogger() logger.info("init kafka producer") global producer producer = KafkaProducer(bootstrap_servers='XX.XX.XX.XX:9092,XX.XX.XX.XX:9092,XX.XX.XX.XX:9092') def handler(event, context): logger = logging.getLogger() # 接收回傳的數據 event_str = json.loads(event) event_obj = json.loads(event_str) logger.info(event_obj["action"]) logger.info(event_obj["articleAuthorId"]) # 向Kafka發送消息 global producer producer.send('ikf-demo', json.dumps(event_str).encode('utf-8')) producer.close() return 'hello world'
代碼很簡單,這里做以簡單的解析:
my_initializer:函數實例被拉起時會先執行該函數,然后再執行 handler 函數 ,當函數實例在運行時,之后的請求都不會執行 my_initializer 函數 。一般用于各種連接的初始化工作,這里將初始化 Kafka Producer 的方法放在了這里,避免反復初始化 Produer。
handler:該函數只有兩個邏輯,接收回傳的數據和將數據發送至 Kafka 的指定 Topic。
下面通過 fun deploy 命令部署函數,該命令會做兩件事:根據 template.yml 中的配置創建服務和函數。將 index.py 和 .fun 上傳至函數中。
登錄函數計算控制臺,可以看到通過 fun 命令部署的服務和函數
進入函數,也可以清晰的看到第三方依賴包的目錄結構
目前兩個函數都創建好了,下面的工作就是由第一個函數接收到數據后拉起第二個函數發送消息給 Kafka。我們只需要對第一個函數做些許改動即可:
# -*- coding: utf-8 -*- import logging import json import urllib.parse import fc2 HELLO_WORLD = b'Hello world!\n' client = None def my_initializer(context): logger = logging.getLogger() logger.info("init fc client") global client client = fc2.Client( endpoint="http://your_account_id.cn-hangzhou-internal.fc.aliyuncs.com", accessKeyID="your_ak", accessKeySecret="your_sk" ) def handler(environ, start_response): logger = logging.getLogger() context = environ['fc.context'] request_uri = environ['fc.request_uri'] for k, v in environ.items(): if k.startswith('HTTP_'): # process custom request headers pass try: request_body_size = int(environ.get('CONTENT_LENGTH', 0)) except (ValueError): request_body_size = 0 # 接收回傳的數據 request_body = environ['wsgi.input'].read(request_body_size) request_body_str = urllib.parse.unquote(request_body.decode("GBK")) request_body_obj = json.loads(request_body_str) logger.info(request_body_obj["action"]) logger.info(request_body_obj["articleAuthorId"]) global client client.invoke_function( 'FCBigDataDemo', 'dataToKafka', payload=json.dumps(request_body_str), headers = {'x-fc-invocation-type': 'Async'} ) status = '200 OK' response_headers = [('Content-type', 'text/plain')] start_response(status, response_headers) return [HELLO_WORLD]
如上面代碼所示,對第一個函數的代碼做了三個地方的改動:
導入函數計算的庫:import fc2
添加初始化方法,用于創建函數計算 Client:
def my_initializer(context): logger = logging.getLogger() logger.info("init fc client") global client client = fc2.Client( endpoint="http://your_account_id.cn-hangzhou-internal.fc.aliyuncs.com", accessKeyID="your_ak", accessKeySecret="your_sk" )
這里需要注意的時,當我們在代碼里增加了初始化方法后,需要在函數配置中指定初始化方法的入口
通過函數計算 Client 調用第二個函數
global client client.invoke_function( 'FCBigDataDemo', 'dataToKafka', payload=json.dumps(request_body_str), headers = {'x-fc-invocation-type': 'Async'} )
invoke_function 函數有四個參數:
第一個參數:調用函數所在的服務名稱。
第二個參數:調用函數的函數名稱。
第三個參數:向調用函數傳的數據。
第四個參數:調用第二個函數 Request Header 信息。這里主要通過 x-fc-invocation-type 這個 Key 來設置是同步調用還是異步調用。這里設置 Async 為異步調用。
如此設置,我們便可以驗證通過第一個函數提供的 HTTP 接口發起請求→采集數據→調用第二個函數→將數據作為消息傳給 Kafka 這個流程了。
到這里有些同學可能會有疑問,為什么需要兩個函數,而不在第一個函數里直接向 Kafka 發送數據呢?
當我們使用異步調用函數時,在函數內部會默認先將請求的數據放入消息隊列進行第一道削峰填谷,然后每一個隊列在對應函數實例,通過函數實例的彈性拉起多個實例進行第二道削峰填谷。所以這也就是為什么這個架構能穩定承載大并發請求的核心原因之一。
在游戲運營這個場景中,數據量是比較大的,所以對 Kafka 的性能要求也是比較高的,相比開源自建,使用云上的 Kafka 省去很多的運維操作,比如:
我們不再需要再維護 Kafka 集群的各個節點。
不需要關心主從節點數據同步問題。
可以快速、動態擴展 Kafka 集群規格,動態增加 Topic,動態增加分區數。
完善的指標監控功能,消息查詢功能。
總的來說,就是一切 SLA 都有云上兜底,我們只需要關注在消息發送和消息消費即可。
所以我們可以打開 Kafka 開通界面,根據實際場景的需求一鍵開通 Kafka 實例,開通 Kafka 后登錄控制臺,在基本信息中可以看到 Kafka 的接入點:
默認接入點:走 VPC 內網場景的接入點。
SSL 接入點:走公網場景的接入點。
將默認接入點配置到函數計算的第二個函數中即可。
.... producer = KafkaProducer(bootstrap_servers='XX.XX.XX.XX:9092,XX.XX.XX.XX:9092,XX.XX.XX.XX:9092') ....
然后點擊左側控制臺 Topic 管理,創建 Topic
將創建好的 Topic 配置到函數計算的第二個函數中即可。
... # 第一個參數為Topic名稱 producer.send('ikf-demo', json.dumps(event_str).encode('utf-8')) ...
上文已經列舉過云上 Kafka 的優勢,比如動態增加 Topic 的分區數,我們可以在 Topic 列表中,對 Topic 的分區數進行動態調整。
單 Topic 最大支持到 360 個分區,這是開源自建無法做到的。
接下來點擊控制臺左側 Consumer Group 管理,創建 Consumer Group。
至此,云上的 Kafka 就算配置完畢了,即 Producer 可以往剛剛創建的 Topic 中發消息了,Consumer 可以設置剛剛創建的 GID 以及訂閱 Topic 進行消息接受和消費。
在這個場景中,Kafka 后面往往會跟著 Flink,所以這里簡要給大家介紹一下在 Flink 中如何創建 Kafka Consumer 并消費數據。代碼片段如下:
final ParameterTool parameterTool = ParameterTool.fromArgs(args); String kafkaTopic = parameterTool.get("kafka-topic","ikf-demo"); String brokers = parameterTool.get("brokers", "XX.XX.XX.XX:9092,XX.XX.XX.XX:9092,XX.XX.XX.XX:9092"); Properties kafkaProps = new Properties(); kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "ikf-demo"); FlinkKafkaConsumer<UserBehaviorEvent> kafka = new FlinkKafkaConsumer<>(kafkaTopic, new UserBehaviorEventSchema(), kafkaProps); kafka.setStartFromLatest(); kafka.setCommitOffsetsOnCheckpoints(false); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<UserBehaviorEvent> dataStreamByEventTime = env.addSource(kafka);
以上就是構建 Flink Kafka Consumer 和添加 Kafka Source 的代碼片段,還是非常簡單的。
至此,整個數據采集的架構就搭建完畢了,下面我們通過壓測來檢驗一下整個架構的性能。這里使用阿里云 PTS 來進行壓測。
打開 PTS 控制臺,點擊左側菜單創建壓測/創建 PTS 場景
在場景配置中,將第一個函數計算函數暴露的 HTTP 接口作為串聯鏈路。
接口配置完后,我們來配置施壓
壓力模式:并發模式:指定有多少并發用戶同時發請求。RPS模式:指定每秒有多少請求數。
遞增模式:在壓測過程中可以通過手動調節壓力,也可以自動按百分比遞增壓力。
最大并發:同時有多少個虛擬用戶發起請求。
遞增百分比:如果是自動遞增的話,按這里的百分比遞增。
單量級持續時長:在未完全達到壓力全量的時候,每一級梯度的壓力保持的時長。
壓測總時長:一共需要壓測的時長。
這里因為資源成本原因,并發用戶數設置為 2500 來進行驗證。
從上圖壓測中的情況來看,TPS 達到了 2w 的封頂,549w+ 的請求,99.99% 的請求是成功的,那 369 個異常也可以點擊查看,都是壓測工具請求超時導致的。
至此,整個基于 Serverless 搭建的大數據采集傳輸的架構就搭建好了,并且進行了壓測驗證,整體的性能也是不錯的,并且整個架構搭建起來也非常簡單和容易理解。這個架構不光適用于游戲運營行業,其實任何大數據采集傳輸的場景都是適用的,目前也已經有很多客戶正在基于 Serverless 的架構跑在生產環境,或者正走在改造 Serverless 架構的路上。
以上就是Serverless如何解決數據采集分析痛點,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。