Casting Data Types in PySpark
How often have you read data into your Spark DataFrame and gotten schema like this?
Unfortunately, in this data shown above, every column is a string because Spark wasn’t able to infer the schema. But it seems pretty obvious that Date, Amount, and IsDiscounted should be different data types than string.
Let’s look at some ways to do this. My personal opinion is that the last method in this article (Method 4) is the best one, but any of them should work for what you typically need.
Note: Here is a similar article about renaming columns in PySpark:
Renaming Columns in PySpark. I have been learning and using Python… | by Kyle Gibson | Medium
Method 1: Using col().cast()
You can do it this way:
from pyspark.sql.functions import col
df = spark.read.load('/mnt/datalake/raw/food_data')
df_updated_schema = df_initial\
.select(
col('Date').cast('date'),
col('Item').cast('string'),
col('Amount').cast('double'),
col('IsDiscounted').cast('boolean')
)
…or you can do it this way:
from pyspark.sql.functions import col
from pyspark.sql.types import DateType, StringType, DoubleType, BooleanType
df = spark.read.load('/mnt/datalake/raw/food_data')
df_updated_schema = df_initial\
.select(
col('Date').cast(DateType()),
col('Item').cast(StringType()),
col('Amount').cast(DoubleType()),
col('IsDiscounted').cast(BooleanType())
)
I prefer the first option of these two since I don’t have to import the types but either should work.
However, this is a good time to point out that casting data types isn’t perfect. Notice what happens to the Date column after running that command:
We got the data type we wanted, but all the values are null. This problem is because the cast date functionality was expecting the date to be in a different format than we had in the values.
There is a PySpark function that can help us fix this problem — to_date:
from pyspark.sql.functions import to_date, col
df = spark.read.load('/mnt/datalake/raw/food_data')
df_updated_schema = df\
.select(
to_date(col('Date'), 'M/d/yyyy').alias('Date').cast('date'),
col('Item').cast('string'),
col('Amount').cast('double'),
col('IsDiscounted').cast('boolean')
)
As you can see, we used the to_date function. By passing the format of the dates (‘M/d/yyyy’) as an argument to the function, we were able to correctly cast our column as date and still retain the data. Then we aliased it as Date so that it stays as the original column name.
We will have to use to_date on every method in this article, but I just wanted to point this out early as a warning to not assume that casting data types will give you the exact result you desire automatically. Always verify.
Method 2: Using .withColumn()
You can do it this way:
from pyspark.sql.functions import to_date, col
df = spark.read.load('/mnt/datalake/raw/food_data')
df_updated_schema = df\
.withColumn('Date', to_date(col('Date'), 'M/d/yyyy').alias('Date').cast('date'))\
.withColumn('Item', col('Item').cast('string'))\
.withColumn('Amount', col('Amount').cast('double'))\
.withColumn('IsDiscounted', col('IsDiscounted').cast('boolean'))
…or you can do it this way:
from pyspark.sql.functions import to_date, col
from pyspark.sql.types import DateType, StringType, DoubleType, BooleanType
df = spark.read.load('/mnt/datalake/raw/food_data')
df_updated_schema = df\
.withColumn('Date', (to_date(col('Date'), 'M/d/yyyy').alias('Date')).cast(DateType()))\
.withColumn('Item', col('Item').cast(StringType()))\
.withColumn('Amount', col('Amount').cast(DoubleType()))\
.withColumn('IsDiscounted', col('IsDiscounted').cast(BooleanType()))
This method works fine as well:
However, using .withColumn too much could potentially cause performance issues, particularly if it’s just being used to cast data types.
Method 3: Using a Python dictionary and .withColumn()
You can do it this way:
from pyspark.sql.functions import to_date, col
data_types = {
'Date': 'date',
'Item': 'string',
'Amount': 'double',
'IsDiscounted': 'boolean'
}
df = spark.read.load('/mnt/datalake/raw/food_data')\
.withColumn('Date', to_date(col('Date'), 'M/d/yyyy').alias('Date'))
for column_name, data_type in data_types.items():
df = df\
.withColumn(column_name, col(column_name).cast(data_type))
…or you can do it this way:
from pyspark.sql.functions import to_date, col
from pyspark.sql.types import DateType, StringType, DoubleType, BooleanType
data_types = {
'Date': DateType(),
'Item': StringType(),
'Amount': DoubleType(),
'IsDiscounted': BooleanType()
}
df = spark.read.load('/mnt/datalake/raw/food_data')\
.withColumn('Date', to_date(col('Date'), 'M/d/yyyy').alias('Date'))
for column_name, data_type in data_types.items():
df = df\
.withColumn(column_name, col(column_name).cast(data_type))
This method works…
…but is also fairly inefficient.
Method 4: Using col().cast() with a Python dictionary and Python list comprehension
In this method, we are going to take advantage of the .dtypes DataFrame attribute.
This is what the output of .dtypes looks like on our initial DataFrame:
As you can see, it’s a list of tuples containing the column name and data type.
For this method, we will create a dictionary to map the data types we want for specific columns. If a column isn’t in our dictionary, then we want it to keep its original data type:
You can do it this way:
from pyspark.sql.functions import to_date, col
data_type_map = {
'Date': 'date',
'Amount': 'double',
'IsDiscounted': 'boolean'
}
df = spark.read.load('/mnt/datalake/raw/food_data')
df_updated_schema = df\
.withColumn('Date', to_date(col('Date'), 'M/d/yyyy').alias('Date').cast('date'))\
.select([col(column_schema[0]).cast(data_type_map.get(column_schema[0], column_schema[1])) for column_schema in df.dtypes])
…or you can do it this way:
from pyspark.sql.functions import to_date, col
from pyspark.sql.types import DateType, DoubleType, BooleanType
data_type_map = {
'Date': DateType(),
'Amount': DoubleType(),
'IsDiscounted': BooleanType()
}
df = spark.read.load('/mnt/datalake/raw/food_data')
df_updated_schema = df\
.withColumn('Date', to_date(col('Date'), 'M/d/yyyy').alias('Date').cast('date'))\
.select([col(column_schema[0]).cast(data_type_map.get(column_schema[0], column_schema[1])) for column_schema in df.dtypes])
Just like the others, this method works:
…but I happen to believe this is the best method.
Using list comprehension, we were able to specify a dictionary of the columns we want to select and cast as a certain data type, while all columns not in the dictionary are still selected but with their original data type.
That’s it! Now you know how to cast data types in PySpark.
Ideally, you would be able to infer schema or have a schema definition somewhere that you can use to generate the data types for your DataFrames. However, in the absence of those options, these methods can help you in casting data types for your Spark DataFrames.
Thanks for reading!