Differences

This shows you the differences between two versions of the page.

Link to this comparison view

kb:spark:pyspark_transform_cols_in_row_to_bytearray_binarytype [2019/05/26 14:17] (current)
yehuda created
Line 1: Line 1:
 +====== PySpark transform cols in row to bytearray BinaryType() ======
  
 +<code python>
 +
 +
 +from pyspark.sql import Row;
 +import random
 +from datetime import datetime
 + 
 +#
 +# TEST Data generation
 +
 +df_a = []
 +for i in range(0,​10):​
 +    df_a.append(
 +        (
 +            i,
 +            "ABC %s"​%(round(i*random.random(),​2)),​
 +            round(i*random.random(),​2),​
 +            datetime.now()
 +        )
 +    )
 + 
 +df = spark.createDataFrame(df_a,​['​id','​values','​valuef','​valued'​])
 + 
 + 
 +df.show()
 +df.printSchema()
 + 
 +#
 +# The convertor
 +
 + 
 +# Important: make this var as broadcast
 +dfSchema = sc.broadcast(df.schema)
 + 
 +# The function
 +def convertFieldsToByteArray(row):​
 + 
 +    # get from broadcast
 +    schema = dfSchema.value
 + 
 +    # convertion funcs
 +    def bigint2bc(data):​
 +        return bytearray(struct.pack('>​Q',​data))
 + 
 +    def string2bc(data):​
 +        return bytearray(struct.pack('​s',​str(data)))
 + 
 +    def double2bc(data):​
 +        return bytearray(struct.pack('​d',​data))
 + 
 +    def timestamp2bc(data):​
 +        return bytearray(struct.pack('>​i',​int(d.toordinal())))
 + 
 +    def errorNotfound(n):​
 +        raise Exception(
 +            '​Cannot handle "​{}"​ datatype, no handler'​.format(n))
 + 
 +    # mapping type to convertion func
 +    convertors = {
 +        '​bigint':​ bigint2bc,
 +        '​string':​ string2bc,
 +        '​double':​ double2bc,
 +        '​timestamp':​ timestamp2bc
 +    }
 + 
 +    # return object/dict
 +    o = {}
 + 
 +    # convert fields
 +    for field in schema:
 +        o[field.name] = convertors[field.dataType.simpleString()](row[field.name]) if convertors.has_key(field.dataType.simpleString()) else errorNotfound(field.dataType.simpleString() ​           )
 + 
 +    # return row
 +    return Row(**o)
 + 
 +# map with function to dataframe
 +df3 = df.rdd.map(convertFieldsToByteArray).toDF()
 + 
 + 
 + 
 +#
 +# Test the data
 +
 + 
 +df3.show()
 +df3.printSchema()
 +
 +
 +</​code>​
kb/spark/pyspark_transform_cols_in_row_to_bytearray_binarytype.txt ยท Last modified: 2019/05/26 14:17 by yehuda
Back to top
Driven by DokuWiki Recent changes RSS feed Valid CSS Valid XHTML 1.0