提问人:Jawad Tahir 提问时间:11/10/2023 最后编辑:Jawad Tahir 更新时间:11/11/2023 访问量:14
如何处理来自 BaseWindowedBolt 的后期元组?
How to process late tuples from BaseWindowedBolt?
问:
我正在开发一个 Apache Storm (v2.5.0) 拓扑,该拓扑从 Spout 读取事件 (),计算翻转窗口中的事件数 (),并打印计数 ()。拓扑工作正常,但我的数据集中存在一些无序事件。提供将延迟事件路由到单独流的方法。但是,当我尝试处理延迟事件时,出现序列化异常:BaseRichSpout
BaseWindowedBolt
BaseRichBolt
BaseWindowedBolt
withLateTupleStream
`Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IllegalArgumentException: Class is not registered: org.apache.storm.shade.com.google.common.collect.SingletonImmutableBiMap
Note: To register this class use: kryo.register(org.apache.storm.shade.com.google.common.collect.SingletonImmutableBiMap.class);
Serialization trace:
defaultResources (org.apache.storm.task.WorkerTopologyContext)
context (org.apache.storm.tuple.TupleImpl)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:101) ~[kryo-4.0.2.jar:?]
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508) ~[kryo-4.0.2.jar:?]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575) ~[kryo-4.0.2.jar:?]
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79) ~[kryo-4.0.2.jar:?]
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508) ~[kryo-4.0.2.jar:?]
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) ~[kryo-4.0.2.jar:?]
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) ~[kryo-4.0.2.jar:?]
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) ~[kryo-4.0.2.jar:?]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:557) ~[kryo-4.0.2.jar:?]
at org.apache.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:38) ~[storm-client-2.5.0.jar:2.5.0]
at org.apache.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:40) ~[storm-client-2.5.0.jar:2.5.0]
at org.apache.storm.daemon.worker.WorkerState.checkSerialize(WorkerState.java:613) ~[storm-client-2.5.0.jar:2.5.0]
at org.apache.storm.executor.ExecutorTransfer.tryTransferLocal(ExecutorTransfer.java:101) ~[storm-client-2.5.0.jar:2.5.0]
at org.apache.storm.executor.ExecutorTransfer.tryTransfer(ExecutorTransfer.java:66) ~[storm-client-2.5.0.jar:2.5.0]
at org.apache.storm.executor.LocalExecutor$1.tryTransfer(LocalExecutor.java:36) ~[storm-client-2.5.0.jar:2.5.0]
at org.apache.storm.executor.bolt.BoltOutputCollectorImpl.boltEmit(BoltOutputCollectorImpl.java:112) ~[storm-client-2.5.0.jar:2.5.0]
at org.apache.storm.executor.bolt.BoltOutputCollectorImpl.emit(BoltOutputCollectorImpl.java:65) ~[storm-client-2.5.0.jar:2.5.0]
at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93) ~[storm-client-2.5.0.jar:2.5.0]
at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93) ~[storm-client-2.5.0.jar:2.5.0]
at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:42) ~[storm-client-2.5.0.jar:2.5.0]
at org.apache.storm.topology.WindowedBoltExecutor.execute(WindowedBoltExecutor.java:313) ~[storm-client-2.5.0.jar:2.5.0]
at org.apache.storm.executor.bolt.BoltExecutor.tupleActionFn(BoltExecutor.java:212) ~[storm-client-2.5.0.jar:2.5.0]
at org.apache.storm.executor.Executor.accept(Executor.java:294) ~[storm-client-2.5.0.jar:2.5.0]
... 6 more`
我定义了我的拓扑如下:
public class TestTopology {
public static void main (String[] args) throws Exception {
Config config = new Config();
config.put(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE, true);
config.registerSerialization(TupleImpl.class);
config.registerSerialization(Fields.class);
LocalCluster cluster = new LocalCluster();
try (LocalCluster.LocalTopology topology = cluster.submitTopology("testTopology", config, getTopology().createTopology())) {
Thread.sleep(50000);}
cluster.shutdown();
}
static TopologyBuilder getTopology(){
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("eventSpout", new LateEventSpout());
builder.setBolt("windowBolt", new WindowBolt().withTumblingWindow(BaseWindowedBolt.Duration.seconds(10)).
withTimestampField("time").
withLateTupleStream("lateEvents")).
shuffleGrouping("eventSpout");
builder.setBolt("latePrintBolt", new LatePrintBolt()).
shuffleGrouping("windowBolt", "lateEvents");
builder.setBolt("printBolt", new PrintBolt()).shuffleGrouping("windowBolt");
return builder;
}
}
在哪里LateEventSpout
public class LateEventSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private List<Long> eventTimes;
private int currentTime = 0;
private int id = 1;
public LateEventSpout () {
eventTimes = new ArrayList<>();
for (int i = 1; i<= 61; i++) {
eventTimes.add(Instant.EPOCH.plusSeconds(i).toEpochMilli());
} // [epoch+1, epoch+2, .., epoch+61]
}
@Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
@Override
public void nextTuple() {
int eventId = id++;
Long eventTime = eventTimes.get(currentTime++);
if (currentTime == eventTimes.size()){
currentTime = 0; // reset time to zero so we have OOO events
}
collector.emit(new Values(eventId, eventTime));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "time"));
}
}
并且是:WindowBolt
public class WindowBolt extends BaseWindowedBolt {
OutputCollector collector;
@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector){
this.collector = collector;
}
@Override
public void execute(TupleWindow inputWindow) {
int sum = 0;
for (Tuple event : inputWindow.get()){
sum++;
}
collector.emit(new Values(inputWindow.getStartTimestamp(), inputWindow.getEndTimestamp(), sum));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("start", "end", "sum"));
}
}
并且只是打印输出。(相似)PrintBolt
windowBolt
LatePrintBolt
public class PrintBolt extends BaseRichBolt {
@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
}
@Override
public void execute(Tuple input) {
System.out.println(String.format("Start: %d, End: %d, Sum:%d", input.getLongByField("start"), input.getLongByField("end"), input.getIntegerByField("sum")));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
如果我不设置 in ,我会得到正确的结果LatePrintBolt
TopologyBuilder
Start: 0, End: 10000, Sum:10
Start: 10000, End: 20000, Sum:10
Start: 20000, End: 30000, Sum:10
Start: 30000, End: 40000, Sum:10
Start: 40000, End: 50000, Sum:10
Start: 50000, End: 60000, Sum:10
但是,当我尝试打印 lateEvents 流时,我得到了相同的输出,但在第一个 late 事件中,我得到了上述异常。
我已经调试了这个问题。当 WindowedBoltExecutor 收到延迟元组时,它会发出延迟元组,但 BoltOutputCollectorImpl 会将其重新包装到新的元组中。现在,这个新元组包含不可序列化的内容,因此会出现错误。WorkerTopologyContext
我想知道如何处理后期元组。
答: 暂无答案
下一个:Kotlin 序列化未应用于模块
评论