2 回答

TA貢獻1828條經(jīng)驗 獲得超3個贊
拼花工具將無法將格式類型從 INT96 更改為 INT64。您在 json 輸出中觀察到的是存儲在 INT96 時間戳類型中的時間戳的字符串表示形式。您需要火花在INT64 TimestampType中使用時間戳重寫此鑲木地板,然后json輸出將生成時間戳(以您想要的格式)。
您需要在 Spark 中設(shè)置特定的配置 -
spark-shell --conf spark.sql.parquet.outputTimestampType=TIMESTAMP_MICROS
2020-03-16 11:37:50 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://192.168.0.20:4040
Spark context available as 'sc' (master = local[*], app id = local-1584383875924).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.0
/_/
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_91)
Type in expressions to have them evaluated.
Type :help for more information.
val sourceDf = spark.read.parquet("original-file.snappy.parquet")
2020-03-16 11:38:31 WARN Utils:66 - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
sourceDf: org.apache.spark.sql.DataFrame = [application: struct<name: string, upgrades: struct<value: double> ... 3 more fields>, timestamp: timestamp ... 16 more fields]
scala> sourceDf.repartition(1).write.parquet("Downloads/output")
拼花工具將顯示正確的時間戳類型
parquet-tools schema Downloads/output/part-00000-edba239b-e696-4b4e-8fd3-c7cca9eea6bf-c000.snappy.parquet
message spark_schema {
...
optional binary _id (UTF8);
optional int64 timestamp (TIMESTAMP_MICROS);
...
}
而 json 轉(zhuǎn)儲給出了 -
parquet-tools cat --json Downloads/output/part-00000-edba239b-e696-4b4e-8fd3-c7cca9eea6bf-c000.snappy.parquet
{..."_id":"101836", "timestamp":1583973827000000}
記錄的時間戳以納秒為單位。希望這有幫助!

TA貢獻1824條經(jīng)驗 獲得超6個贊
Doug,這個來自 arrow/cpp/src/parquet/types.h 的代碼顯示了 Int96 時間戳是如何在內(nèi)部存儲的:
constexpr int64_t kJulianToUnixEpochDays = INT64_C(2440588);
constexpr int64_t kSecondsPerDay = INT64_C(60 * 60 * 24);
constexpr int64_t kMillisecondsPerDay = kSecondsPerDay * INT64_C(1000);
constexpr int64_t kMicrosecondsPerDay = kMillisecondsPerDay * INT64_C(1000);
constexpr int64_t kNanosecondsPerDay = kMicrosecondsPerDay * INT64_C(1000);
MANUALLY_ALIGNED_STRUCT(1) Int96 { uint32_t value[3]; };
STRUCT_END(Int96, 12);
static inline void Int96SetNanoSeconds(parquet::Int96& i96, int64_t nanoseconds) {
std::memcpy(&i96.value, &nanoseconds, sizeof(nanoseconds));
}
static inline int64_t Int96GetNanoSeconds(const parquet::Int96& i96) {
// We do the computations in the unsigned domain to avoid unsigned behaviour
// on overflow.
uint64_t days_since_epoch =
i96.value[2] - static_cast<uint64_t>(kJulianToUnixEpochDays);
uint64_t nanoseconds = 0;
memcpy(&nanoseconds, &i96.value, sizeof(uint64_t));
return static_cast<int64_t>(days_since_epoch * kNanosecondsPerDay + nanoseconds);
}
添加回答
舉報