第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號(hào)安全,請(qǐng)及時(shí)綁定郵箱和手機(jī)立即綁定
已解決430363個(gè)問(wèn)題,去搜搜看,總會(huì)有你想問(wèn)的

如何將 PySpark 數(shù)據(jù)框插入到具有雪花模式的數(shù)據(jù)庫(kù)中?

如何將 PySpark 數(shù)據(jù)框插入到具有雪花模式的數(shù)據(jù)庫(kù)中?

滄海一幻覺(jué) 2023-01-04 10:23:43
使用 PySpark 我正在計(jì)算一個(gè)數(shù)據(jù)框,如果這個(gè)數(shù)據(jù)庫(kù)有一個(gè)雪花模式,我如何將這個(gè)數(shù)據(jù)框附加到我的數(shù)據(jù)庫(kù)中?如何指定拆分?jǐn)?shù)據(jù)框的方式,以便將類似 CSV 的數(shù)據(jù)放入多個(gè)聯(lián)合表中?我的問(wèn)題不是特定于 Pyspark 的,同樣的問(wèn)題也可以問(wèn)到 pandas。
查看完整描述

2 回答

?
有只小跳蛙

TA貢獻(xiàn)1824條經(jīng)驗(yàn) 獲得超8個(gè)贊

將從 CSV 中提取的數(shù)據(jù)幀附加到由雪花模式組成的數(shù)據(jù)庫(kù):

  1. 從雪花模式中提取數(shù)據(jù)。

  2. 從外部數(shù)據(jù)源中提取新數(shù)據(jù)。

  3. 合并兩個(gè)數(shù)據(jù)集。

  4. 將組合轉(zhuǎn)換為一組維度表和事實(shí)表以匹配雪花模式。

  5. 將轉(zhuǎn)換后的數(shù)據(jù)幀加載到數(shù)據(jù)庫(kù)中,覆蓋現(xiàn)有數(shù)據(jù)。

例如,對(duì)于具有以下模式的數(shù)據(jù)框,從外部源中提取:

StructType([StructField('customer_name', StringType()),

            StructField('campaign_name', StringType())])

def entrypoint(spark: SparkSession) -> None:

  extracted_customer_campaigns = extract_from_external_source(spark)


  existing_customers_dim, existing_campaigns_dim, existing_facts = (

    extract_from_snowflake(spark))


  combined_customer_campaigns = combine(existing_campaigns_dim,

                                        existing_customers_dim,

                                        existing_facts,

                                        extracted_customer_campaigns)


  new_campaigns_dim, new_customers_dim, new_facts = transform_to_snowflake(

    combined_customer_campaigns)


  load_snowflake(new_campaigns_dim, new_customers_dim, new_facts)



def combine(campaigns_dimension: DataFrame,

            customers_dimension: DataFrame,

            facts: DataFrame,

            extracted_customer_campaigns: DataFrame) -> DataFrame:

  existing_customer_campaigns = facts.join(

    customers_dimension,

    on=['customer_id']).join(

    campaigns_dimension, on=['campaign_id']).select('customer_name',

                                                    'campaign_name')


  combined_customer_campaigns = extracted_customer_campaigns.union(

    existing_customer_campaigns).distinct()


  return combined_customer_campaigns



def transform_to_snowflake(customer_campaigns: DataFrame) -> (

    DataFrame, DataFrame):

  customers_dim = customer_campaigns.select(

    'customer_name').distinct().withColumn(

    'customer_id', monotonically_increasing_id())


  campaigns_dim = customer_campaigns.select(

    'campaign_name').distinct().withColumn(

    'campaign_id', monotonically_increasing_id())


  facts = (

    customer_campaigns.join(customers_dim,

                            on=['customer_name']).join(

      campaigns_dim, on=[

        'campaign_name']).select('customer_id', 'campaign_id'))


  return campaigns_dim, customers_dim, facts


這是一種簡(jiǎn)單的功能方法。也許可以通過(guò)編寫增量來(lái)優(yōu)化,而不是為每個(gè) ETL 批次重新生成雪花鍵。


此外,如果提供了一個(gè)單獨(dú)的外部 CSV 包含要?jiǎng)h除的記錄,則可以類似地提取它,然后在轉(zhuǎn)換之前從組合數(shù)據(jù)框中減去,以刪除那些現(xiàn)有記錄。


最后,問(wèn)題僅涉及附加到表格。如果需要合并/更新插入,則需要手動(dòng)添加其他步驟,因?yàn)镾park 本身不支持它。


查看完整回答
反對(duì) 回復(fù) 2023-01-04
?
躍然一笑

TA貢獻(xiàn)1826條經(jīng)驗(yàn) 獲得超6個(gè)贊

你可以像我在下面的代碼中描述的那樣做一些事情。我假設(shè)您的 csv 具有與 df4 上定義的類似結(jié)構(gòu)。但我認(rèn)為您可能沒(méi)有 customer_id、product_id 及其組的 ID。如果是這種情況,您可以使用該 row_number 窗口函數(shù)(具有序列號(hào))來(lái)計(jì)算它們,或者使用如圖所示的 monotonically_increasing_id 函數(shù)來(lái)創(chuàng)建 df5


這個(gè)解決方案主要是基于PySpark和SQL,所以如果你對(duì)傳統(tǒng)的DW比較熟悉,就會(huì)更好理解。


from pyspark.sql.functions import monotonically_increasing_id



#Creates input data. Only to rows to show how it should work

#The schema is defined on the single dataframe as 

# customer_id --> business key coming from transactional system

# customer_name --> just an attribute to show how it should behave

# customer_group_id --> an id that would match the group_id on the snowflake schema, as the idea is to group customers on groups (just as a sample)

# product_id --> another future dimension on the model having a snowflake schema

# product_group_id --> group id for products to group them on categories

df1 = spark.sql("""select 1 customer_id, 'test1' customer_name, 1 customer_group_id, 'group 1' customer_group_name, 

        1 product_id, 'product 1' product_name, 1 product_group_id, 'product group 1' product_group_name,

        987.5 sales

        """)


df2 = spark.sql("""select 2 customer_id, 'test2' customer_name, 1 customer_group_id, 'group 1' customer_group_name, 

        7 product_id, 'product 7' product_name, 1 product_group_id, 'product group 1' product_group_name,

        12345.5 sales

        """)


df3 = spark.sql("""select 2 customer_id, 'test2' customer_name, 1 customer_group_id, 'group 1' customer_group_name, 

        1 product_id, 'product 1' product_name, 1 product_group_id, 'product group 1' product_group_name,

        2387.3 sales

        """)


df4 = df1.union(df2).union(df3)


# Added an id on the df to be able to calculate the rest of the surrogate keys for dimensions

df5 = df4.withColumn("id",  monotonically_increasing_id())


# Registered dataframe to be able to query using SQL

df5.createOrReplaceTempView("df")


# Now create different dfs as the structure of the DW schema is

customer_group_df = spark.sql("""select customer_group_id, customer_group_name

            from df group by customer_group_id, customer_group_name""")


# I use the row_number because the monotonically increasing id function

# returns non sequential integers, but if you are good with that, it will be much faster

# Also another solution could be to use uuid as key (or other unique identifier providers)

# but that will depend on your requirements

customer_df = spark.sql("""select row_number() over (order by customer_id, customer_name, customer_group_id) as surkey_customer, customer_id customer_bk, 

            customer_name, customer_group_id

            from df group by customer_id, customer_name, customer_group_id """)


product_group_df =  spark.sql("""select product_group_id, product_group_name

            from df group by product_group_id, product_group_name""")


product_df =  spark.sql("""select row_number() over (order by product_id) as surkey_product, product_id product_bk, 

            product_name, product_group_id

            from df group by product_id, product_name, product_group_id""")


customer_df.show()

product_df.show()

df5.show()


# You can save those dfs directly on your model in the RBMS. Sorry as you are not defining the target DB I am not writing the code, 

# but should be done calling the save method of the dataframe pointing to Hive or to a JDBC where your DW model is

# You can find more info at https://stackoverflow.com/questions/30664008/how-to-save-dataframe-directly-to-hive or if 

# the target is a RDBMS https://stackoverflow.com/questions/46552161/write-dataframe-to-mysql-table-using-pyspark


# Now the tricky part is to calculate the surrogate keys of the fact table. The way to do it is to join back those df

# to the original dataframe. That can have performance issues, so please make sure that your data is 

# properly distributed (find the best approach to redistribute your dataframes on the nodes so that you reduce shuffling on the joins) 

# when you run 


customer_df.createOrReplaceTempView("customer_df")

product_df.createOrReplaceTempView("product_df")


fact_df = spark.sql("""

    select nvl(c.surkey_customer, -1) sk_customer, nvl(p.surkey_product, -1) sk_product, sales

    from

        df d left outer join customer_df c on d.customer_id = c.customer_bk   

            left outer join product_df p on d.product_id = p.product_bk

""").show()


# You can write the fact_df to your target fact table

# Be aware that to populate surrogate keys I am using nvl to assign the unknown member on the dimension. If you need

# that it also has to be present on the dimension table (customer and product, not group tables)

如您所見(jiàn),此解決方案使用簡(jiǎn)單的雪花模式。但是,如果您有 Slowly Changing Dimensions Type 2 或其他類型的維度建模,模型可能會(huì)更復(fù)雜


該代碼的輸出是


+---------------+-----------+-------------+-----------------+

|surkey_customer|customer_bk|customer_name|customer_group_id|

+---------------+-----------+-------------+-----------------+

|              1|          1|        test1|                1|

|              2|          2|        test2|                1|

+---------------+-----------+-------------+-----------------+


+--------------+----------+------------+----------------+

|surkey_product|product_bk|product_name|product_group_id|

+--------------+----------+------------+----------------+

|             1|         1|   product 1|               1|

|             2|         7|   product 7|               1|

+--------------+----------+------------+----------------+


+-----------+-------------+-----------------+-------------------+----------+------------+----------------+------------------+-------+-----------+

|customer_id|customer_name|customer_group_id|customer_group_name|product_id|product_name|product_group_id|product_group_name|  sales|         id|

+-----------+-------------+-----------------+-------------------+----------+------------+----------------+------------------+-------+-----------+

|          1|        test1|                1|            group 1|         1|   product 1|               1|   product group 1|  987.5|          0|

|          2|        test2|                1|            group 1|         7|   product 7|               1|   product group 1|12345.5| 8589934592|

|          2|        test2|                1|            group 1|         1|   product 1|               1|   product group 1| 2387.3|17179869184|

+-----------+-------------+-----------------+-------------------+----------+------------+----------------+------------------+-------+-----------+


+-----------+----------+-------+

|sk_customer|sk_product|  sales|

+-----------+----------+-------+

|          1|         1|  987.5|

|          2|         2|12345.5|

|          2|         1| 2387.3|

+-----------+----------+-------+

希望這可以幫助


查看完整回答
反對(duì) 回復(fù) 2023-01-04
  • 2 回答
  • 0 關(guān)注
  • 103 瀏覽
慕課專欄
更多

添加回答

舉報(bào)

0/150
提交
取消
微信客服

購(gòu)課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動(dòng)學(xué)習(xí)伙伴

公眾號(hào)

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號(hào)