PySpark transform cols in row to bytearray BinaryType()

 
from pyspark.sql import Row;
import random
from datetime import datetime
 
#
# TEST Data generation
# 
df_a = []
for i in range(0,10):
    df_a.append(
        (
            i,
            "ABC %s"%(round(i*random.random(),2)),
            round(i*random.random(),2),
            datetime.now()
        )
    )
 
df = spark.createDataFrame(df_a,['id','values','valuef','valued'])
 
 
df.show()
df.printSchema()
 
#
# The convertor
# 
 
# Important: make this var as broadcast
dfSchema = sc.broadcast(df.schema)
 
# The function
def convertFieldsToByteArray(row):
 
    # get from broadcast
    schema = dfSchema.value
 
    # convertion funcs
    def bigint2bc(data):
        return bytearray(struct.pack('>Q',data))
 
    def string2bc(data):
        return bytearray(struct.pack('s',str(data)))
 
    def double2bc(data):
        return bytearray(struct.pack('d',data))
 
    def timestamp2bc(data):
        return bytearray(struct.pack('>i',int(d.toordinal())))
 
    def errorNotfound(n):
        raise Exception(
            'Cannot handle "{}" datatype, no handler'.format(n))
 
    # mapping type to convertion func
    convertors = {
        'bigint': bigint2bc,
        'string': string2bc,
        'double': double2bc,
        'timestamp': timestamp2bc
    }
 
    # return object/dict
    o = {}
 
    # convert fields
    for field in schema:
        o[field.name] = convertors[field.dataType.simpleString()](row[field.name]) if convertors.has_key(field.dataType.simpleString()) else errorNotfound(field.dataType.simpleString()            )
 
    # return row
    return Row(**o)
 
# map with function to dataframe
df3 = df.rdd.map(convertFieldsToByteArray).toDF()
 
 
 
#
# Test the data
# 
 
df3.show()
df3.printSchema()
kb/spark/pyspark_transform_cols_in_row_to_bytearray_binarytype.txt · Last modified: 2019/05/26 14:17 by yehuda
Back to top
Driven by DokuWiki Recent changes RSS feed Valid CSS Valid XHTML 1.0