是否可以让执行程序评估查询的一部分,而不是在 spark java 中首先主查找整个结果集?

Is it possible to have executors evaluate part of the query instead of master finding the entire result set first in spark java?

提问人:Joe 提问时间:6/22/2023 更新时间:6/22/2023 访问量:16

问:

我在 postgres db 中有 2 个表,我需要将它们加入并将生成的输出发送到 kafka。为此,我编写了一个使用 spark 框架的 java 代码。我的代码示例如下:

主类:

private static final KafkaService kafkaService = new KafkaService();
    public static void main(String[] args) {
        start();
    }
    private static void start() {
        String dbUrl = "jdbc:postgresql://localhost:5432/postgres?rewriteBatchedStatements=true";
        String query = "(WITH " +

                "all_places AS (" +
                "SELECT " +
                "pt.place_id, " +
                "pt.name, " +
                "FROM place_table pt WHERE pt.loc = 'all'), " +

                "t_id AS (" +
                "SELECT town_table.id, town_table.town_id, town_table.place_id from town_table " +
                "WHERE town_table.place_id IN (SELECT all_places.place_id FROM all_places)) " +

                "SELECT all_towns.id, all_towns.town_id, " +
                "(SELECT JSONB_AGG(related_places) FROM " +
                "(SELECT * FROM all_places " +
                "LEFT JOIN t_id ON t_id.place_id=all_places.place_id " +
                "WHERE t_id.town_id = all_towns.town_id " +
                ") as related_places) AS related_places " +
                "FROM t_id all_towns) as db_data";

        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("SQL");
        SparkSession sparkSession = SparkSession.builder()
                .config(conf)
                .getOrCreate();
        Dataset<Row> loadData = sparkSession.read()
                .option("driver","org.postgresql.Driver")
                .option("partitionColumn", "id")
                .option("numPartitions", 9)
                .option("lowerBound", 1)
                .option("upperBound", 100)
                .option("url",dbUrl)
                .option("user","postgres")
                .option("password","admin")
                .option("dbtable",query)
                .format("jdbc")
                .load();
        loadData.persist(StorageLevel.MEMORY_ONLY());

        loadData.foreachPartition(f -> {
            while(f.hasNext()) {
                Row row = f.next();
                DummySchema event = new DummySchema();
                event.setEntityId(row.get(1).toString());
                event.setData(row.get(2) == null ? "" : row.get(2).toString());

                kafkaService.sendMessage(event,"dummyTopic");
            }
        });
        loadData.unpersist();
    }

KafkaService 类:

    private KafkaProducer<String, DummySchema> producer;

    public KafkaService(KafkaProducer producer) {
        this.producer = producer;
    }
    public KafkaService(){}

    public void sendMessage(DummySchema event,String topic) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        props.put("schema.registry.url", "http://localhost:8081");
        props.put("auto.register.schemas", false);
        props.put("use.latest.version", true);

        producer = new KafkaProducer(props);

        ProducerRecord<String,DummySchema> producerRecord = new ProducerRecord<>(topic,event.getEntityId().toString(),event);
        producer.send(producerRecord);
    }

对象是从 avro 模式文件生成的。所以标准的 kafka 设置。DummySchema

现在,据我了解,一旦获得查询的结果集,就会进行分区(即执行查询然后分区)。这并不理想,因为两个表中都包含大量数据,这可能需要很长时间才能执行此查询。更好的方法是先进行分区,然后使用 进行连接。换句话说,我不想对结果集进行分区,我想对结果集进行分区,然后让执行程序进行联接。town_tableplace_tabletown_table

因此,与其用代码执行此操作(这是当前正在发生的事情):

Execute the query -> obtain the result set -> load the result set to the loadData 
variable -> partition the result -> send each row of the partitioned data to kafka in executors

我希望我的代码这样做:

Partition town_table -> trigger the query -> 
have each executor figure out the result set for the partitioned town_table data -> have each executor 
send its result set's rows to kafka. 

因此,执行器将处理行的子集,而不是主计算出所有town_table行的查询结果集。

为了实现这一点,我可以得到 2 个数据集 - 1 个用于作为广播/持久化数据集的数据,1 个用于将分区的数据,然后每个分区将与数据集进行数据集联接,然后再将信息发送到 kafka。我对这种方法的唯一担忧是所有数据都将加载到内存中(鉴于 2 个表之间有 ~ 100 亿条记录,这可能会导致问题)。place_tabletown_tableplace_table

所以我的问题如下:是否可以不对查询的结果集进行分区,而是对正在联接的表之一进行分区(以便如上所述在执行程序中完成联接)?还是我在上面的段落中描述的是唯一的方法?

apache-spark-sql spark-java

评论


答: 暂无答案