提问人:Joe 提问时间:6/22/2023 更新时间:6/22/2023 访问量:16
是否可以让执行程序评估查询的一部分,而不是在 spark java 中首先主查找整个结果集?
Is it possible to have executors evaluate part of the query instead of master finding the entire result set first in spark java?
问:
我在 postgres db 中有 2 个表,我需要将它们加入并将生成的输出发送到 kafka。为此,我编写了一个使用 spark 框架的 java 代码。我的代码示例如下:
主类:
private static final KafkaService kafkaService = new KafkaService();
public static void main(String[] args) {
start();
}
private static void start() {
String dbUrl = "jdbc:postgresql://localhost:5432/postgres?rewriteBatchedStatements=true";
String query = "(WITH " +
"all_places AS (" +
"SELECT " +
"pt.place_id, " +
"pt.name, " +
"FROM place_table pt WHERE pt.loc = 'all'), " +
"t_id AS (" +
"SELECT town_table.id, town_table.town_id, town_table.place_id from town_table " +
"WHERE town_table.place_id IN (SELECT all_places.place_id FROM all_places)) " +
"SELECT all_towns.id, all_towns.town_id, " +
"(SELECT JSONB_AGG(related_places) FROM " +
"(SELECT * FROM all_places " +
"LEFT JOIN t_id ON t_id.place_id=all_places.place_id " +
"WHERE t_id.town_id = all_towns.town_id " +
") as related_places) AS related_places " +
"FROM t_id all_towns) as db_data";
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("SQL");
SparkSession sparkSession = SparkSession.builder()
.config(conf)
.getOrCreate();
Dataset<Row> loadData = sparkSession.read()
.option("driver","org.postgresql.Driver")
.option("partitionColumn", "id")
.option("numPartitions", 9)
.option("lowerBound", 1)
.option("upperBound", 100)
.option("url",dbUrl)
.option("user","postgres")
.option("password","admin")
.option("dbtable",query)
.format("jdbc")
.load();
loadData.persist(StorageLevel.MEMORY_ONLY());
loadData.foreachPartition(f -> {
while(f.hasNext()) {
Row row = f.next();
DummySchema event = new DummySchema();
event.setEntityId(row.get(1).toString());
event.setData(row.get(2) == null ? "" : row.get(2).toString());
kafkaService.sendMessage(event,"dummyTopic");
}
});
loadData.unpersist();
}
KafkaService 类:
private KafkaProducer<String, DummySchema> producer;
public KafkaService(KafkaProducer producer) {
this.producer = producer;
}
public KafkaService(){}
public void sendMessage(DummySchema event,String topic) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://localhost:8081");
props.put("auto.register.schemas", false);
props.put("use.latest.version", true);
producer = new KafkaProducer(props);
ProducerRecord<String,DummySchema> producerRecord = new ProducerRecord<>(topic,event.getEntityId().toString(),event);
producer.send(producerRecord);
}
对象是从 avro 模式文件生成的。所以标准的 kafka 设置。DummySchema
现在,据我了解,一旦获得查询的结果集,就会进行分区(即执行查询然后分区)。这并不理想,因为两个表中都包含大量数据,这可能需要很长时间才能执行此查询。更好的方法是先进行分区,然后使用 进行连接。换句话说,我不想对结果集进行分区,我想对结果集进行分区,然后让执行程序进行联接。town_table
place_table
town_table
因此,与其用代码执行此操作(这是当前正在发生的事情):
Execute the query -> obtain the result set -> load the result set to the loadData
variable -> partition the result -> send each row of the partitioned data to kafka in executors
我希望我的代码这样做:
Partition town_table -> trigger the query ->
have each executor figure out the result set for the partitioned town_table data -> have each executor
send its result set's rows to kafka.
因此,执行器将处理行的子集,而不是主计算出所有town_table行的查询结果集。
为了实现这一点,我可以得到 2 个数据集 - 1 个用于作为广播/持久化数据集的数据,1 个用于将分区的数据,然后每个分区将与数据集进行数据集联接,然后再将信息发送到 kafka。我对这种方法的唯一担忧是所有数据都将加载到内存中(鉴于 2 个表之间有 ~ 100 亿条记录,这可能会导致问题)。place_table
town_table
place_table
所以我的问题如下:是否可以不对查询的结果集进行分区,而是对正在联接的表之一进行分区(以便如上所述在执行程序中完成联接)?还是我在上面的段落中描述的是唯一的方法?
答: 暂无答案
评论