4,064 questions
0
votes
0
answers
15
views
How to Join two RDDs in pyspark with nested tuples
I need to join two RDDs as part of my programming assignment. The problem is first RDD is nested while other is flat. I tried different things but nothing seem to work. Is there any expert on pyspark ...
1
vote
1
answer
111
views
How to properly recalculate Spark DataFrame statistics after checkpoint?
Here is minimal example using default data in DataBricks (Spark 3.4):
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types._
sc....
0
votes
2
answers
140
views
Pyspark mapPartition evaluates the function more times than expected
I'm working with PySpark to process large amounts of data. However, I noticed that the function called by mapPartitions is executed one more time than expected. For instance, in the following code ...
0
votes
1
answer
30
views
Find common data among two RDD in spark execution
I have RDD1
col1 col2
A x123
B y123
C z123
RDD2
col1
A
C
I want to run intersection of two RDDs and find common elements i.e. item that are in RDD2 what is the data of ...
0
votes
1
answer
4k
views
RDD is not implemented error on pyspark.sql.connect.dataframe.Dataframe
I have a dataframe on databricks on which I would like to use the RDD api on. The type of the dataframe is pyspark.sql.connect.dataframe.Dataframe after reading from the catalog. I found out that this ...
0
votes
1
answer
67
views
unpacking nested tuples after Spark RDD join
The resources for this are scarce and I'm not sure that there's a solution to this issue.
Suppose you have 3 simple RDD's. Or more specifically 3 PairRDD's.
val rdd1: RDD[(Int, Int)] = sc.parallelize(...
0
votes
0
answers
156
views
While in Jupyter notebook, while using pyspark, get Py4JJavaError when using simple .count
While using the following code:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.types import Row
from datetime ...
-1
votes
1
answer
350
views
pySpark RDD whitelisted Class issues
I was used below code before enabled unity catalog cluster in azure databricks notebook but after changed shared users enabled cluster. i could not able to use below logic, how should we achieve ...
1
vote
1
answer
54
views
avg() over a whole dataframe causing different output
I see that dataframe.agg(avg(Col) works fine, but when i calculate avg() over a window over whole column(not using any partition), i see different results based on which column i use with orderBy.
...
3
votes
1
answer
89
views
Casting RDD to a different type (from float64 to double)
I have a code like below, which uses pyspark.
test_truth_value = RDD.
test_predictor_rdd = RDD.
valuesAndPred = test_truth_value.zip(lasso_model.predict(test_predictor_rdd)).map(lambda x: ((x[0]), (x[...
1
vote
1
answer
61
views
Saving and Loading RDD (pyspark) to pickle file is changing order of SparseVectors
I trained tf-idf on a pre-tokenized (unigram tokenizer) dataset that I converted from list[list(token1, token2, token3, ...)] to an RDD using pyspark's HashingTF and IDF implementations. I tried to ...
1
vote
1
answer
622
views
Why is my PySpark row_number column messed up when applying a schema?
I want to apply a schema to specific non-technical columns of a Spark DataFrame. Beforehand, I add an artificial ID using Window and row_number so that I can later join some other technical columns to ...
0
votes
0
answers
44
views
PySpark with RDD - How to calculate and compare averages?
I need to solve a problem where a company wants to offer k different users free use (a kind of coupon) of their application for two months. The goal is to identify users who are likely to churn (leave ...
0
votes
1
answer
258
views
Order PySpark Dataframe by applying a function/lambda
I have a PySpark DataFrame which needs ordering on a column ("Reference").
The values in the column typically look like:
["AA.1234.56", "AA.1101.88", "AA.904.33"...
-1
votes
1
answer
62
views
Problem with pyspark mapping - Index out of range after split
When trying to map our 6 column pyspark RDD into a 4d-tuple we get a list out of range error for any list element besides 0 which return the normal result.
The dataset is structured like this:
X,Y,FID,...
0
votes
1
answer
97
views
Save text files as binary format using saveAsPickleFile with pyspark
I have around 613 text files stored in azure data lake gen 2 at this path for eg '/rawdata/no=/.txt'. I want to read all the text files and unbase 64 all text files as they are base64 encoded. But ...
1
vote
1
answer
47
views
Reading file using Spark RDD vs DF
i have a 2MB file, when i read it using
df = spark.read.option("inferSchema", "true").csv("hdfs:///data/ml-100k/u.data", sep="\t")
df.rdd.getNumPartitions() # ...
0
votes
1
answer
120
views
Linear RDD Plot only shows two data points
I have attempted to run the below code:
data(house)
house_rdd = rdd_data(x=x, y=y, data=house, cutpoint=0)
summary(house_rdd)
plot(house_rdd)
When I plot it, I get this, which make sense.
...
1
vote
0
answers
57
views
I can't save RDD object into text files using PySpark
I am trying to create a Spark program to read the airport data from airport.text file, find all the airports which are located in United States and output the airport's name and city's name to an ...
1
vote
0
answers
110
views
Use UDF for RDD in pyspark
I want to use the user-defined function (UDF) below for RDD in Pyspark
from pyspark.sql.functions import abs as pyspark_abs, sum as pyspark_sum, \
rand as pyspark_rand, min as ...
1
vote
0
answers
90
views
Read files in HDFS from Spark - RPC response exceeds maximum data length
Hi Im new to spark and im trying to access files from hdfs in spark-shell.
I wrote the following code from spark-shell
scala> val myfile = sc.textFile("hdfs://localhost:9870/sample/demofile....
0
votes
0
answers
97
views
Why am I receiving an conversion error on my date and time values(s)?
I'm connecting to to sql server view using jdbc connector from databricks notebook
df = spark.read
.format("com.databricks.spark.sqldw")
.option("url", url)
...
3
votes
1
answer
3k
views
spark - How is it even possible to get an OOM?
If Spark is designed to "spill-to-disk" if the there is not enough memory then I am wondering how is it even possible to get an OOM in Spark ?
There is a spark.local.dir which is used as a ...
0
votes
1
answer
58
views
Dataframe value replacement
I am trying to replace 'yyyy-MM' with 'yyyy-MM'+'-01' below is my code and I am not getting it right. Note, I am working on databricks:
from pyspark.sql.functions import col, concat, lit, when
# Show ...
2
votes
2
answers
243
views
Spark Left Outer Join produces Optional.empty when it shouldn't
I have an RDD containing pairs of nodes and I need to assign unique IDs to them.
But I'm getting an NPE and I can't figure out how to solve it.
I'm basically putting all nodes into a distinct list, ...
-1
votes
1
answer
34
views
What is the memory layout of a non-HDFS RDD?
I am new to Spark and I am trying to get an intuition for how RDDs are represented in memory.
HDFS RDDs are easy as the partitions are handled by the filesystem itself. That is, HDFS itself divides a ...
0
votes
1
answer
92
views
How does RDD.aggregate() work with partitions?
I'm new to spark and trying to understand how functions like reduce, aggregate etc. work.
While going through RDD.aggregate(), I tried changing the zeroValue to something other than identities (0 for ...
0
votes
1
answer
2k
views
Fetch a column value into a variable in pyspark without collect [duplicate]
My objective is to fetch a column values into a variable if possible as a list from pyspark dataframe.
Expected output = ["a", "b", "c", ... ]
I tried :
[
col....
1
vote
2
answers
48
views
How to find common pairs irrespective of their order in Pyspark RDD?
I want to find out the pair of who have contacted with one another. Following is the data:
Input is
K-\> M, H
M-\> K, E
H-\> F
B-\> T, H
E-\> K, H
F-\> K, H, E
A-\> Z
And the ...
0
votes
1
answer
188
views
How can i save data from hdfs to amazon s3
I am working on webarchives, and extracting some data, initially i used to store this data as txt in my hdfs, but due it is massive size i will have to store the output in amazon s3 buckets, how can i ...
0
votes
1
answer
78
views
removing , and converting to int [duplicate]
I am banging my head to convert the following spark RDD data using code
[('4', ('1', '2')),
('10', ('5',)),
('3', ('2',)),
('6', ('2', '5')),
('7', ('2', '5')),
('1', None),
('8', ('2', '5')),
('9', ('...
1
vote
2
answers
503
views
Getting Job aborted due to stage failure while converting my string data in a pyspark dataframe into a dictionary
I have the following data in a pyspark dataframe where both the columns contain string data.
data = [(123, '[{"FLD_NAME":"A","FLD_VAL":"0.1"},{"FLD_NAME&...
1
vote
1
answer
54
views
Query an Array column with an indices column
So I have two columns
| col_arr | col_ind |
|[1, 2, 3]| [0, 2] |
|[5, 1] | [1] |
and I'd like my result to be an extraction of the values in col_arr by col_ind resulting in col_val below:
| ...
0
votes
0
answers
33
views
Compare rows in RDD and filter out
I have a sorted RDD, I have already applied some filters to it before. It’s not a key value pair.
I want to remove rows of the RDD. Given two consecutive rows, I would like to remove the second if ...
0
votes
2
answers
165
views
Creating Pyspark RDD using random key from tuple
I'm studing Apache Spark and found some interesting thing. When I creating a new rdd with pair of key-value, where key is randomly choosing from tuple - the result of reducebykey is not correct.
from ...
0
votes
2
answers
11k
views
Wide and Narrow transformations in Spark
I have a question : Wide and Narrow transformations python Spark, are found both in RDDs and Structured APIs, right ?
I mean, I guess I got the difference between wide and narrow transformations. My ...
1
vote
1
answer
495
views
stop spark dataframe distributing to cluster - it needs to stay on driver
we have a workload that computes on spark cluster workers (cpu intensive).
The results are pulled back to the driver which has a large memory allocation to collect the results via RDD .collect()
...
1
vote
0
answers
78
views
How can i read data with column names which exported from hbase to hdfs?
I use hbase export tool to put hbase data to hdfs :
hbase org.apache.hadoop.hbase.mapreduce.Export \
<tablename> <outputdir>
And i see data on my hdfs
when i want to read that data ...
1
vote
1
answer
72
views
Find the closest record in a Spark RDD for Each Record in Another RDD
I have two massive RDDs, and for each record in one, I need to find the physically closest (lat/lon) point in the other that has the same key. But... in each RDD, there are 100's of millions of ...
2
votes
0
answers
45
views
No trackable performance differences between ReduceByKey and GroupByKey in Pyspark
I am currently running an experiment in PySpark where I compare the overall performance of the wide transformations reduceByKey and groupByKey. I would expect to see a difference in performance (time ...
0
votes
1
answer
115
views
Pyspark RDD ReducebyKey()
Hi I am trying to understand where is the problem in my code:
rdd_avg=sc.parallelize([("a",10),("b",15),("c",20),("a",8)])
rdd_sp1=rdd_avg.map(lambda x: (x,1))
...
1
vote
0
answers
291
views
Plotting a structural topic model - how to allow for discontinuity over time
I am running a structural topic model using the stm package in R. My model includes an interaction effect between faction_id and numeric_date (a measure of time). I am using the following code to ...
2
votes
1
answer
767
views
Histogram of grouped data in PySpark
I have data consisting of a date-time, IDs, and velocity, and I'm hoping to get histogram data (start/end points and counts) of velocity for each ID using PySpark. Sample data:
df = spark....
0
votes
1
answer
39
views
Mutiple actions spark casuing to failures
I'm new to Spark.
I encountered some problems with part of about saving df to Hive table.
def insert_into_hive_table(df: DataFrame, table_name: str):
# for debugging - this action is working ...
1
vote
0
answers
183
views
pyspark 'JavaPackage' object is not callable
I want to remove the first row of a dataset but this error keeps occuring: 'JavaPackage' object is not callable.
I've written another simple program for easier review. Here is my code:
# Create a ...
0
votes
1
answer
135
views
rdd.zipWithIndex() throwing IllegalArgumentException on very large data set
I'm running a python notebook in Azure Databricks. I am getting an IllegalArgumentException error when trying to add a line number with rdd.zipWithIndex(). The file is 2.72 GB and 1238951 lines (I ...
0
votes
1
answer
30
views
Get summarized value of a column based on the range of date across a year using spark rdd and spark dataframes
I have a dataframe df1 like below
product
start
end
price
p1
6/12/2020
6/7/2021
12
p1
6/8/2021
10/19/2021
14
p1
10/20/2021
5/14/2022
13
p1
5/15/2022
11/20/2022
12.5
p1
11/21/2022
1/1/2099
12.5
p2
6/12/...
0
votes
1
answer
283
views
List index out of range error when count Action in RDD is used
I am new to Pyspark and I tried to find the number of Specific String('Private Room') from CSV using RDD count Action but I get a list index out of range error.
When I use a take(20) action, I am ...
1
vote
1
answer
130
views
Spark 2.3.1 => 2.4 increases runtime 6-fold
I'm being forced onto a newer EMR version (5.23.1, 5.27.1, or 5.32+) by our cloud team, which is forcing me up from 5.17.0 w/ Spark 2.3.1 to Spark 2.4.x. The impetus is to allow a security ...
1
vote
0
answers
317
views
Adding a confidence interval line to RDplot in R
I am trying to add a dashed confidence intervals lines (upper and lower bound) using the RDplot command and unable to do so. The RDplot allows to only add a CI as a point estimate or a shade.
My ...