Подтвердить что ты не робот

Как обрабатывать многострочные записи ввода в Spark

У меня есть каждая запись, распределенная по нескольким строкам во входном файле (очень огромный файл).

Пример:

Id:   2
ASIN: 0738700123
  title: Test tile for this product
  group: Book
  salesrank: 168501
  similar: 5  0738700811  1567184912  1567182813  0738700514  0738700915
  categories: 2
   |Books[283155]|Subjects[1000]|Religion & Spirituality[22]|Earth-Based Religions[12472]|Wicca[12484]
   |Books[283155]|Subjects[1000]|Religion & Spirituality[22]|Earth-Based Religions[12472]|Witchcraft[12486]
  reviews: total: 12  downloaded: 12  avg rating: 4.5
    2001-12-16  cutomer: A11NCO6YTE4BTJ  rating: 5  votes:   5  helpful:   4
    2002-1-7  cutomer:  A9CQ3PLRNIR83  rating: 4  votes:   5  helpful:   5

Как идентифицировать и обрабатывать каждую многострочную запись в иске?

4b9b3361

Ответ 1

Я сделал это, внедряя пользовательский формат ввода и записывающее устройство.

public class ParagraphInputFormat extends TextInputFormat {

    @Override
    public RecordReader<LongWritable, Text> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) {
        return new ParagraphRecordReader();
    }
}

public class ParagraphRecordReader extends RecordReader<LongWritable, Text> {
    private long end;
    private boolean stillInChunk = true;

    private LongWritable key = new LongWritable();
    private Text value = new Text();

    private FSDataInputStream fsin;
    private DataOutputBuffer buffer = new DataOutputBuffer();

    private byte[] endTag = "\n\r\n".getBytes();

    public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        FileSplit split = (FileSplit) inputSplit;
        Configuration conf = taskAttemptContext.getConfiguration();
        Path path = split.getPath();
        FileSystem fs = path.getFileSystem(conf);

        fsin = fs.open(path);
        long start = split.getStart();
        end = split.getStart() + split.getLength();
        fsin.seek(start);

        if (start != 0) {
            readUntilMatch(endTag, false);
        }
    }

    public boolean nextKeyValue() throws IOException {
        if (!stillInChunk) return false;

        boolean status = readUntilMatch(endTag, true);

        value = new Text();
        value.set(buffer.getData(), 0, buffer.getLength());
        key = new LongWritable(fsin.getPos());
        buffer.reset();

        if (!status) {
            stillInChunk = false;
        }

        return true;
    }

    public LongWritable getCurrentKey() throws IOException, InterruptedException {
        return key;
    }

    public Text getCurrentValue() throws IOException, InterruptedException {
        return value;
    }

    public float getProgress() throws IOException, InterruptedException {
        return 0;
    }

    public void close() throws IOException {
        fsin.close();
    }

    private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException {
        int i = 0;
        while (true) {
            int b = fsin.read();
            if (b == -1) return false;
            if (withinBlock) buffer.write(b);
            if (b == match[i]) {
                i++;
                if (i >= match.length) {
                    return fsin.getPos() < end;
                }
            } else i = 0;
        }
    }

}

endTag идентифицирует конец каждой записи.

Ответ 2

Если многострочные данные имеют определенный разделитель записей, вы можете использовать поддержку hadoop для многострочных записей, предоставляя разделитель через объект hadoop.Configuration:

Что-то вроде этого должно делать:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
val conf = new Configuration
conf.set("textinputformat.record.delimiter", "id:")
val dataset = sc.newAPIHadoopFile("/path/to/data", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf)
val data = dataset.map(x=>x._2.toString)

Это предоставит вам RDD[String], где каждый элемент соответствует записи. После этого вам необходимо проанализировать каждую запись, следуя вашим требованиям приложения.