Подтвердить что ты не робот

Загрузить CSV файл с помощью Spark

Я новичок в Spark, и я пытаюсь читать CSV-данные из файла с помощью Spark. Вот что я делаю:

sc.textFile('file.csv')
    .map(lambda line: (line.split(',')[0], line.split(',')[1]))
    .collect()

Я бы ожидал, что этот вызов даст мне список двух первых столбцов моего файла, но я получаю эту ошибку:

File "<ipython-input-60-73ea98550983>", line 1, in <lambda>
IndexError: list index out of range

хотя мой CSV файл содержит более одного столбца.

4b9b3361

Ответ 1

Вы уверены, что все строки имеют не менее 2 столбцов? Вы можете попробовать что-то вроде, просто чтобы проверить?:

sc.textFile("file.csv") \
    .map(lambda line: line.split(",")) \
    .filter(lambda line: len(line)>1) \
    .map(lambda line: (line[0],line[1])) \
    .collect()

В качестве альтернативы вы можете распечатать виновника (если есть):

sc.textFile("file.csv") \
    .map(lambda line: line.split(",")) \
    .filter(lambda line: len(line)<=1) \
    .collect()

Ответ 2

Spark 2.0.0 +

Вы можете напрямую использовать встроенный источник данных csv:

spark.read.csv(
    "some_input_file.csv", header=True, mode="DROPMALFORMED", schema=schema
)

или

(spark.read
    .schema(schema)
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .csv("some_input_file.csv"))

без каких-либо внешних зависимостей.

Spark & ​​lt; 2.0.0

Вместо ручного разбора, который в общем случае далек от тривиального, я бы рекомендовал spark-csv:

Убедитесь, что Spark CSV включен в путь (--packages, --jars, --driver-class-path)

И загрузите данные следующим образом:

(df = sqlContext
    .read.format("com.databricks.spark.csv")
    .option("header", "true")
    .option("inferschema", "true")
    .option("mode", "DROPMALFORMED")
    .load("some_input_file.csv"))

Он может обрабатывать загрузку, вывод схемы, отбрасывать неверные строки и не требует передачи данных с Python в JVM.

Примечание

Если вы знаете схему, лучше избегать вывода схемы и передать ее на DataFrameReader. Предполагая, что у вас есть три столбца - целое, двойное и строковое:

from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType

schema = StructType([
    StructField("A", IntegerType()),
    StructField("B", DoubleType()),
    StructField("C", StringType())
])

(sqlContext
    .read
    .format("com.databricks.spark.csv")
    .schema(schema)
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .load("some_input_file.csv"))

Ответ 3

И еще одна опция, состоящая в чтении CSV файла с помощью Pandas, а затем импортировании Pandas DataFrame в Spark.

Например:

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

sc = SparkContext('local','example')  # if using locally
sql_sc = SQLContext(sc)

pandas_df = pd.read_csv('file.csv')  # assuming the file contains a header
# pandas_df = pd.read_csv('file.csv', names = ['column 1','column 2']) # if no header
s_df = sql_sc.createDataFrame(pandas_df)

Ответ 4

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

df = spark.read.csv("/home/stp/test1.csv",header=True,sep="|");

print(df.collect())

Ответ 5

Простое разделение запятой также разделит запятые внутри полей (например, a,b,"1,2,3",c), поэтому это не рекомендуется. Ответ 0323 хорош, если вы хотите использовать API DataFrames, но если вы хотите придерживаться базового Spark, вы можете проанализировать csvs в базовом Python с помощью модуля csv:

# works for both python 2 and 3
import csv
rdd = sc.textFile("file.csv")
rdd = rdd.mapPartitions(lambda x: csv.reader(x))

РЕДАКТИРОВАТЬ: Как упоминалось в комментариях @muon, заголовок будет обрабатываться как любая другая строка, поэтому вам придется извлечь его вручную. Например, header = rdd.first(); rdd = rdd.filter(lambda x: x != header) header = rdd.first(); rdd = rdd.filter(lambda x: x != header) (убедитесь, что header не header = rdd.first(); rdd = rdd.filter(lambda x: x != header) до оценки фильтра). Но в этот момент вам, вероятно, лучше использовать встроенный парсер csv.

Ответ 6

Теперь есть еще один вариант для любого общего файла csv: https://github.com/seahboonsiew/pyspark-csv следующим образом:

Предположим, что мы имеем следующий контекст

sc = SparkContext
sqlCtx = SQLContext or HiveContext

Сначала распределите pyspark-csv.py исполнителям, используя SparkContext

import pyspark_csv as pycsv
sc.addPyFile('pyspark_csv.py')

Чтение данных csv через SparkContext и преобразование его в DataFrame

plaintext_rdd = sc.textFile('hdfs://x.x.x.x/blah.csv')
dataframe = pycsv.csvToDataFrame(sqlCtx, plaintext_rdd)

Ответ 7

Это соответствует тому, что JP Mercier изначально предложил об использовании Pandas, но с серьезной модификацией: если вы читаете данные в Pandas в кусках, он должен быть более податливым. Смысл, что вы можете анализировать гораздо больший файл, чем Pandas может фактически обрабатывать как единый кусок и передавать его Spark в меньших размерах. (Это также отвечает на комментарий о том, почему нужно использовать Spark, если они могут загружать все в Pandas в любом случае.)

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

sc = SparkContext('local','example')  # if using locally
sql_sc = SQLContext(sc)

Spark_Full = sc.emptyRDD()
chunk_100k = pd.read_csv("Your_Data_File.csv", chunksize=100000)
# if you have headers in your csv file:
headers = list(pd.read_csv("Your_Data_File.csv", nrows=0).columns)

for chunky in chunk_100k:
    Spark_Full +=  sc.parallelize(chunky.values.tolist())

YourSparkDataFrame = Spark_Full.toDF(headers)
# if you do not have headers, leave empty instead:
# YourSparkDataFrame = Spark_Full.toDF()
YourSparkDataFrame.show()

Ответ 8

Если вы хотите загрузить csv в качестве фрейма данных, вы можете сделать следующее:

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df = sqlContext.read.format('com.databricks.spark.csv') \
    .options(header='true', inferschema='true') \
    .load('sampleFile.csv') # this is your csv file

Это сработало для меня.

Ответ 9

Если ваши данные csv не содержат строк в любом из полей, вы можете загрузить свои данные с помощью textFile() и проанализировать его

import csv
import StringIO

def loadRecord(line):
    input = StringIO.StringIO(line)
    reader = csv.DictReader(input, fieldnames=["name1", "name2"])
    return reader.next()

input = sc.textFile(inputFile).map(loadRecord)

Ответ 10

Если в наборе данных есть одна или несколько строк с меньшим или большим числом столбцов, чем 2, то эта ошибка может возникнуть.

Я также новичок в Pyspark и пытаюсь прочитать файл CSV. Следующий код работал для меня:

В этом коде я использую набор данных из kaggle, ссылка: https://www.kaggle.com/carrie1/ecommerce-data

1. Без упоминания схемы:

from pyspark.sql import SparkSession  
scSpark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example: Reading CSV file without mentioning schema") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

sdfData = scSpark.read.csv("data.csv", header=True, sep=",")
sdfData.show()

Теперь проверьте столбцы: sdfData.columns

Выход будет:

['InvoiceNo', 'StockCode','Description','Quantity', 'InvoiceDate', 'CustomerID', 'Country']

Проверьте тип данных для каждого столбца:

sdfData.schema
StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,StringType,true),StructField(InvoiceDate,StringType,true),StructField(UnitPrice,StringType,true),StructField(CustomerID,StringType,true),StructField(Country,StringType,true)))

Это даст фрейм данных со всеми столбцами с типом данных как StringType

2. Со схемой: если вы знаете схему или хотите изменить тип данных любого столбца в приведенной выше таблице, воспользуйтесь этим (допустим, у меня есть следующие столбцы, и я хочу, чтобы они имели определенный тип данных для каждого из них)

from pyspark.sql import SparkSession  
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
    schema = StructType([\
        StructField("InvoiceNo", IntegerType()),\
        StructField("StockCode", StringType()), \
        StructField("Description", StringType()),\
        StructField("Quantity", IntegerType()),\
        StructField("InvoiceDate", StringType()),\
        StructField("CustomerID", DoubleType()),\
        StructField("Country", StringType())\
    ])

scSpark = SparkSession \
    .builder \
    .appName("Python Spark SQL example: Reading CSV file with schema") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

sdfData = scSpark.read.csv("data.csv", header=True, sep=",", schema=schema)

Теперь проверьте схему для типа данных каждого столбца:

sdfData.schema

StructType(List(StructField(InvoiceNo,IntegerType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,StringType,true),StructField(CustomerID,DoubleType,true),StructField(Country,StringType,true)))

Отредактировано: Мы также можем использовать следующую строку кода без явного упоминания схемы:

sdfData = scSpark.read.csv("data.csv", header=True, inferSchema = True)
sdfData.schema

Выход:

StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,StringType,true),StructField(UnitPrice,DoubleType,true),StructField(CustomerID,IntegerType,true),StructField(Country,StringType,true)))

Вывод будет выглядеть так:

sdfData.show()

+---------+---------+--------------------+--------+--------------+----------+-------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|CustomerID|Country|
+---------+---------+--------------------+--------+--------------+----------+-------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|      2.55|  17850|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|      3.39|  17850|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|      2.75|  17850|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|      3.39|  17850|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|      3.39|  17850|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/2010 8:26|      7.65|  17850|
|   536365|    21730|GLASS STAR FROSTE...|       6|12/1/2010 8:26|      4.25|  17850|
|   536366|    22633|HAND WARMER UNION...|       6|12/1/2010 8:28|      1.85|  17850|
|   536366|    22632|HAND WARMER RED P...|       6|12/1/2010 8:28|      1.85|  17850|
|   536367|    84879|ASSORTED COLOUR B...|      32|12/1/2010 8:34|      1.69|  13047|
|   536367|    22745|POPPY PLAYHOUSE...|       6|12/1/2010 8:34|       2.1|  13047|
|   536367|    22748|POPPY PLAYHOUSE...|       6|12/1/2010 8:34|       2.1|  13047|
|   536367|    22749|FELTCRAFT PRINCES...|       8|12/1/2010 8:34|      3.75|  13047|
|   536367|    22310|IVORY KNITTED MUG...|       6|12/1/2010 8:34|      1.65|  13047|
|   536367|    84969|BOX OF 6 ASSORTED...|       6|12/1/2010 8:34|      4.25|  13047|
|   536367|    22623|BOX OF VINTAGE JI...|       3|12/1/2010 8:34|      4.95|  13047|
|   536367|    22622|BOX OF VINTAGE AL...|       2|12/1/2010 8:34|      9.95|  13047|
|   536367|    21754|HOME BUILDING BLO...|       3|12/1/2010 8:34|      5.95|  13047|
|   536367|    21755|LOVE BUILDING BLO...|       3|12/1/2010 8:34|      5.95|  13047|
|   536367|    21777|RECIPE BOX WITH M...|       4|12/1/2010 8:34|      7.95|  13047|
+---------+---------+--------------------+--------+--------------+----------+-------+
only showing top 20 rows

Ответ 11

Как правило, вы не пытаетесь разобрать CSV вручную. Здесь решение без зависимостей, которое будет обрабатывать любые escape-строки, например, строки с кавычками:

import csv # Python standard CSV library
def csv_to_rdd(csv_filename):
    return sc.textFile(csv_filename) \
    .map(lambda line: tuple(list(csv.reader([line]))[0]))

Ответ 12

При использовании spark.read.csv я обнаружил, что использование параметров escape='"' и multiLine=True обеспечивает наиболее согласованное решение для стандарта CSV, и, по моему опыту, лучше всего работает с файлами CSV, экспортированными из Google Sheets.

То есть,

#set inferSchema=False to read everything as string
df = spark.read.csv("myData.csv", escape='"', multiLine=True,
     inferSchema=False, header=True)

Ответ 13

import pandas as pd

data1 = pd.read_csv("test1.csv")
data2 = pd.read_csv("train1.csv")