from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, DoubleType
from datetime import date
# https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html
# Define schema explicitly
schema = StructType([
StructField("OrderID", IntegerType()),
StructField("CustomerID", IntegerType()),
StructField("OrderDate", DateType()),
StructField("ShipDate", DateType()),
StructField("CustomerName", StringType()),
StructField("ProductID", IntegerType()),
StructField("Qty", IntegerType()),
StructField("Price", DoubleType())
])
# Create Spark DataFrame with schema
df = spark.createDataFrame([
(1001, 5, date(2023, 3, 2), date(2023, 3, 6), 'John Johnson', 101, 1, 2.15),
(1002, 3, date(2023, 3, 15), date(2023, 3, 16), 'Mike Dawson', 202, 2, 2.59),
(1003, 2, date(2023, 3, 30), date(2023, 4, 1), 'Jane Irish', 203, 1, 4.08),
(1004, 4, date(2023, 4, 2), date(2023, 4, 6), 'Tom Clansy', 404, 4, 1.55),
(1005, 6, date(2023, 4, 11), date(2023, 4, 15), 'Angela O''Reily', 606, 10, 3.54)
], schema=schema)
# Show DataFrame info
df.printSchema()
# Show DataFrame
display(df)
df.show()
import pyspark.sql.functions as f
# Using Vectorized Operations to create Calculated Columns
df = df.withColumn("LineTotal", df["Qty"] * df["Price"])
df = df.withColumn("LineTotal", f.ceil(df["LineTotal"]))
df = df.withColumn("Year", f.year(df["OrderDate"]))
df = df.withColumn("LeadTime", f.datediff(df["ShipDate"], df["OrderDate"]))
df = df.withColumn("Lower", f.lower(df["CustomerName"]))
df = df.withColumn("FirstUpper",f.upper(f.substring(df["CustomerName"], 1, 1)))
df = df.withColumn("FirstName", f.substring(df["CustomerName"] , 1, f.instr(df["CustomerName"], ' ') - 1) )
# Using SQL like expressions to create more complex Calculated Columns
df = df.withColumn("FirstName", f.expr("substring(CustomerName, 1, instr(CustomerName, ' ') - 1)"))
df.show()