# # # #代碼
# CodeImport數據類型
從pyspark.sql。類型進口StructType、StructField TimestampType、IntegerType StringType, FloatType, BooleanType LongType
#定義自定義模式
call_schema = StructType (
(
StructField (“RecordType StringType(),真的),
StructField (“SystemIdentity StringType(),真的),
StructField (“FileNum StringType(),真的),
StructField (“SwitchNum StringType(),真的),
StructField (“CallingNum StringType(),真的),
StructField (“CallingIMSI StringType(),真的),
StructField (“CalledNum StringType(),真的),
StructField (“CalledIMSI StringType(),真的),
StructField(“日期”,StringType(),真的),
StructField(“時代”,StringType(),真的),
StructField (“TimeType LongType(),真的),
StructField (“CallPeriod LongType(),真的),
StructField (“CallingCellID StringType(),真的),
StructField (“CalledCellID StringType(),真的),
StructField (ServiceType, StringType(),真的),
StructField(“轉移”,LongType(),真的),
StructField (“IncomingTrunk StringType(),真的),
StructField (“OutgoingTrunk StringType(),真的),
StructField (“MSRN StringType(),真的),
StructField (“CalledNum2 StringType(),真的),
StructField (“FCIFlag StringType(),真的),
StructField (“callrecTime TimestampType(),真的),
StructField (“EventProcessedUtcTime TimestampType(),真的),
StructField (“PartitionId LongType(),真的),
StructField (“EventEnqueuedUtcTime TimestampType(),真的),
]
)
#定義三角洲住表
進口dlt
從pyspark.sql。功能導入*
從pyspark.sql。導入類型*
json_path = " / mnt / adlspoc23 /流/”
@dlt.table (
評論= "原始電信callstream數據集,從ADLS攝取。”,
模式= call_schema
)
def callstream_raw ():
返回(spark.read.format (json) .load (json_path))
#錯誤
org.apache.spark.sql。AnalysisException:表“callstream_raw”有一個指定的模式不兼容的模式推斷的查詢。