Welcome to my personal place for love, peace and happiness❣️

Чтение avro в spark из s3

import pyspark
import os

S3_ACCESS_KEY = os.environ.get(“S3_ACCESS_KEY”)
S3_BUCKET = os.environ.get(“S3_BUCKET”)
S3_SECRET_KEY = os.environ.get(“S3_SECRET_KEY”)
S3_ENDPOINT = os.environ.get(“S3_ENDPOINT”)

This cell may take some time to run the first time, as it must download the necessary spark jars

conf = pyspark.SparkConf()

IF YOU ARE USING THE SPARK CONTAINERS, UNCOMMENT THE LINE BELOW TO OFFLOAD EXECUTION OF SPARK TASKS TO SPARK CONTAINERS

#conf.setMaster(“spark://spark:7077”)

conf.set(“spark.jars.packages”, ‘org.apache.hadoop:hadoop-aws:3.3.1,io.delta:delta-core_2.12:2.1.0,org.apache.spark:spark-avro_2.12:3.3.2’)

conf.set(‘spark.hadoop.fs.s3a.aws.credentials.provider’, ‘org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider’)

conf.set(‘spark.hadoop.fs.s3a.endpoint’, S3_ENDPOINT)
conf.set(‘spark.hadoop.fs.s3a.access.key’, S3_ACCESS_KEY)
conf.set(‘spark.hadoop.fs.s3a.secret.key’, S3_SECRET_KEY)
conf.set(‘spark.hadoop.fs.s3a.path.style.access’, “true”)
conf.set(“spark.sql.extensions”, “io.delta.sql.DeltaSparkSessionExtension”)
conf.set(“spark.sql.catalog.spark_catalog”, “org.apache.spark.sql.delta.catalog.DeltaCatalog”)

sc = pyspark.SparkContext(conf=conf)

sc.setLogLevel(“INFO”)

spark = pyspark.sql.SparkSession(sc)

df = spark.read.format(“avro”).load(f“s3a://{S3_BUCKET}/person2.avro”)

Follow this blog
Send
Share
2023   spark