中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

使用pyspark怎么對Mysql數據庫進行讀寫操作

發布時間:2020-12-30 13:59:15 來源:億速云 閱讀:743 作者:Leah 欄目:開發技術

使用pyspark怎么對Mysql數據庫進行讀寫操作?針對這個問題,這篇文章詳細介紹了相對應的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。

pyspark是Spark對Python的api接口,可以在Python環境中通過調用pyspark模塊來操作spark,完成大數據框架下的數據分析與挖掘。其中,數據的讀寫是基礎操作,pyspark的子模塊pyspark.sql 可以完成大部分類型的數據讀寫。文本介紹在pyspark中讀寫Mysql數據庫。

1 軟件版本

在Python中使用Spark,需要安裝配置Spark,這里跳過配置的過程,給出運行環境和相關程序版本信息。

  • win10 64bit

  • java 13.0.1

  • spark 3.0

  • python 3.8

  • pyspark 3.0

  • pycharm 2019.3.4

2 環境配置

pyspark連接Mysql是通過java實現的,所以需要下載連接Mysql的jar包。

下載地址

使用pyspark怎么對Mysql數據庫進行讀寫操作

選擇下載Connector/J,然后選擇操作系統為Platform Independent,下載壓縮包到本地。

使用pyspark怎么對Mysql數據庫進行讀寫操作

然后解壓文件,將其中的jar包mysql-connector-java-8.0.19.jar放入spark的安裝目錄下,例如D:\spark\spark-3.0.0-preview2-bin-hadoop2.7\jars

使用pyspark怎么對Mysql數據庫進行讀寫操作

環境配置完成!

3 讀取Mysql

腳本如下:

from pyspark.sql import SQLContext, SparkSession

if __name__ == '__main__':
  # spark 初始化
  spark = SparkSession. \
    Builder(). \
    appName('sql'). \
    master('local'). \
    getOrCreate()
  # mysql 配置(需要修改)
  prop = {'user': 'xxx', 
      'password': 'xxx', 
      'driver': 'com.mysql.cj.jdbc.Driver'}
  # database 地址(需要修改)
  url = 'jdbc:mysql://host:port/database'
  # 讀取表
  data = spark.read.jdbc(url=url, table='tb_newCity', properties=prop)
  # 打印data數據類型
  print(type(data))
  # 展示數據
  data.show()
  # 關閉spark會話
  spark.stop()
  • 注意點:

  • prop參數需要根據實際情況修改,文中用戶名和密碼用xxx代替了,driver參數也可以不需要;

  • url參數需要根據實際情況修改,格式為jdbc:mysql://主機:端口/數據庫

  • 通過調用方法read.jdbc進行讀取,返回的數據類型為spark DataFrame;

運行腳本,輸出如下:

使用pyspark怎么對Mysql數據庫進行讀寫操作

4 寫入Mysql

腳本如下:

import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row

if __name__ == '__main__':
  # spark 初始化
  sc = SparkContext(master='local', appName='sql')
  spark = SQLContext(sc)
  # mysql 配置(需要修改)
  prop = {'user': 'xxx',
      'password': 'xxx',
      'driver': 'com.mysql.cj.jdbc.Driver'}
  # database 地址(需要修改)
  url = 'jdbc:mysql://host:port/database'

  # 創建spark DataFrame
  # 方式1:list轉spark DataFrame
  l = [(1, 12), (2, 22)]
  # 創建并指定列名
  list_df = spark.createDataFrame(l, schema=['id', 'value']) 
  
  # 方式2:rdd轉spark DataFrame
  rdd = sc.parallelize(l) # rdd
  col_names = Row('id', 'value') # 列名
  tmp = rdd.map(lambda x: col_names(*x)) # 設置列名
  rdd_df = spark.createDataFrame(tmp) 
  
  # 方式3:pandas dataFrame 轉spark DataFrame
  df = pd.DataFrame({'id': [1, 2], 'value': [12, 22]})
  pd_df = spark.createDataFrame(df)

  # 寫入數據庫
  pd_df.write.jdbc(url=url, table='new', mode='append', properties=prop)
  # 關閉spark會話
  sc.stop()

注意點:

propurl參數同樣需要根據實際情況修改;

寫入數據庫要求的對象類型是spark DataFrame,提供了三種常見數據類型轉spark DataFrame的方法;

通過調用write.jdbc方法進行寫入,其中的model參數控制寫入數據的行為。


model參數解釋
error默認值,原表存在則報錯
ignore原表存在,不報錯且不寫入數據
append新數據在原表行末追加
overwrite覆蓋原表

5 常見報錯

Access denied for user …

使用pyspark怎么對Mysql數據庫進行讀寫操作

原因:mysql配置參數出錯
解決辦法:檢查user,password拼寫,檢查賬號密碼是否正確,用其他工具測試mysql是否能正常連接,做對比檢查。

No suitable driver

使用pyspark怎么對Mysql數據庫進行讀寫操作

原因:沒有配置運行環境
解決辦法:下載jar包進行配置,具體過程參考本文的2 環境配置

關于使用pyspark怎么對Mysql數據庫進行讀寫操作問題的解答就分享到這里了,希望以上內容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關注億速云行業資訊頻道了解更多相關知識。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

焉耆| 彰化市| 五台县| 北流市| 永善县| 玉树县| 互助| 夹江县| 丹巴县| 蓬溪县| 义马市| 祁门县| 白城市| 微山县| 肃南| 海城市| 白河县| 仙居县| 嵩明县| 峨边| 门头沟区| 永新县| 祁阳县| 安龙县| 泾川县| 南华县| 淳化县| 丽江市| 安乡县| 临澧县| 瓦房店市| 平陆县| 盐池县| 夹江县| 东海县| 公安县| 南昌市| 伊通| 鄢陵县| 东城区| 邛崃市|