Good morning, I am learning Spark and Kafka, I have a streaming application using the spark API for java . My problem lies in how to know what was the last offset read? For example, my application constantly reads from kafka, suddenly it crashes and messages arrive in the kafka topic. I want to make it so that when my Spark application wakes up, it continues reading from the offset that was read before the crash. With kafkaUtils if I put:
auto.offset.reset=earliest
It reads the entire topic for me, if I change it to latest it reads the records that arrive once the application starts, so it doesn't do what I want, since I would lose data from Kafka. Add a checkPoint , I understand that this works when a DStream is not finished reading, if it crashes you can continue with a job that was interrupted.
Configuration code:
Spark config:
return new SparkConf().setMaster(sparkDriverUtils.getSparkMaster())
.setAppName(sparkDriverUtils.getSparkAppName() == null ? "name-not-set"
: sparkDriverUtils.getSparkAppName())
.set("spark.default.parallelism", "2")
.set("spark.yarn.maxAppAttempts", "1")
.set("spark.yarn.am.attemptFailuresValidityInterval", "2h")
.set(DriverConstants.SPARK_MAX_CORES, sparkDriverUtils.getSparkMaxCores())
.set(DriverConstants.SPARK_EXECUTOR_MEMORY, sparkDriverUtils.getSparkExecutorMemory());
Snippet for Kafka: Kafka Parameters:
kafkaParams.put(DriverConstants.KAFKA_BOOTSTRAP_SERVER, getBootstrapServers());// 1 or more brokers
kafkaParams.put(DriverConstants.KEY_DESERIALIZER, StringDeserializer.class);
kafkaParams.put(DriverConstants.VALUE_DESERIALIZER, StringDeserializer.class);
kafkaParams.put(DriverConstants.KAFKA_GROUP_ID, getGroupId());
kafkaParams.put(DriverConstants.KAFKA_AUTO_OFFSET_RESET, getAutoOffsetReset()); //earliest o latest
kafkaParams.put(DriverConstants.KAFKA_ENABLE_AUTO_COMMIT, getEnableAutoCommit()); //false
JavaStreamingContext jssc = sparkConfigurationBuilder
.buildJSC(sparkConfigurationBuilder.buildSparkConfiguration());
jssc.checkpoint("C:\\tmp\\poc\\checkPoint");
Map<String, Object> kafkaParams = sparkDriverUtils.getKafkaProperties();
Collection<String> topics = Arrays.asList(sparkDriverUtils.getTopics().trim().split(","));// 1 o more topics
JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
Greetings.