Splunk:从搜索中返回 One 或 True,在另一个搜索中使用该结果

Splunk: Return One or True from a search, use that result in another search

提问人:Niek Jonkman 提问时间:10/25/2021 最后编辑:Niek Jonkman 更新时间:10/27/2021 访问量:1106

问:

在 Splunk 中,我正在寻找显示“started with profile: [profile name]”的日志,并从找到的事件中检索配置文件名称。然后,我想使用配置文件名称来查找其他事件(来自不同的来源),如果发现一个或多个错误,我想让它计为每个平台的一个发现的错误。

为了更清楚起见,我有以下搜索查询(查询一):

index="myIndex" "started with profile" BD_L*
| table _raw, platform, RUNID
| eval Platform=case(searchmatch("LINUX"),"LINUX",searchmatch("AIX"),"AIX",searchmatch("DB2"),"DB2", searchmatch("SQL"),"SQL", searchmatch("WEBSPHERE"),"WEBSPHERE", searchmatch("SYBASE"),"SYBASE", searchmatch("WINDOWS"),"WINDOWS", true(),"ZLINUX") 
| stats count by Platform
| rename count AS "Amount"

从上述查询中找到的事件包含以下内容(原始):

Discovery run, 2021101306351355 started with profile BD_L2_Windows

上面的查询将返回包含上述原始数据的事件列表,并将生成下表。下表包含每个平台的发现运行量:

enter image description here

使用以下代码,我可以从事件中提取 RUNID。RUNID 是我在查找错误时需要在第二次搜索中使用的:

| rex "Discovery run, (?<RUNID>.+) started with profile"

使用 RUNID,我可以查找错误(查询二):

index="myIndex" source="/*/RUNID/*" CASE("ERROR") CTJT* 
| dedup _raw   
| stats  count
| rename count AS "Amount"

现在,我正在寻找一种方法将上述两个查询合并为一个查询,并计算至少有一个错误的平台数量。因此,假设我们有以下模拟:

  • 两次运行(一次 Windows 和一次 Linux)
  • Windows 运行有 0 个错误(在查询 2 中没有找到错误)
  • Linux 有 6 个错误(在查询 2 中找到)

这应导致以下结果:

Platform     |     Amount
Linux        |          1

我需要找到某种方法从查询 2 返回 true 或返回一个,并在查询 1 中使用它对结果进行分组,但由于缺乏经验,我无法做到。我还没有找到任何与我的问题相似的东西,希望这里的任何人都能帮助我。

到目前为止,我能想到以下几点,但这仍然向我显示了与上表所示相同的结果(每个平台的发现运行计数,而不是至少有一个错误的平台计数):

index="myIndex" "started with profile" BD_L* 
| table _raw, platform, RUNID 
| eval Platform=case(searchmatch("LINUX"),"LINUX",searchmatch("AIX"),"AIX",searchmatch("DB2"),"DB2", searchmatch("SQL"),"SQL", searchmatch("WEBSPHERE"),"WEBSPHERE", searchmatch("SYBASE"),"SYBASE", searchmatch("WINDOWS"),"WINDOWS", true(),"ZLINUX") 
| join type=left RUNID
    [ search index="myIndex" source="/*/RUNID/*" CASE("ERROR") CTJT*
        | dedup _raw
        | stats count 
    ]
| stats count by Platform

如何解决这个问题?提前致谢。

编辑:

根据要求,数据样本:

查询 1:

2021-10-25 22:01:10,065 ProcessFlowManager [RMI TCP Connection(20)-127.0.0.1]  INFO processflowmgr.ProcessFlowManagerImpl - Discovery run, 2021102522011000 started with profile BD_L2_Windows

查询 2:

2021-10-25 22:02:11,537 DiscoverManager [DiscoverWorker-47] 2021102522011000#SessionSensor-XX.XXX.XXX.XX-[135,445] ERROR cdb.TivoliStdMsgLogger - CTJTD3028E Sensor SessionSensor encountered an error, Seed: XX.XXX.XXX.XX:[135, 445], Run ID: 2021102522011000.

编辑2:

我尝试了最新的答案,得到了以下结果:

enter image description here

我期望的平台列表以及与平台相关的错误数量。像这样:

Platform | Amount (of errors)
zLinux | 2
Windows | 4
Splunk的

评论


答:

2赞 warren 10/26/2021 #1

第一。。。不要重复_raw

这些事件永远不会重复(除非您在引入时做错了什么)_raw

其次,对于您的实际问题 - 尝试以下操作:

index="myIndex" "started with profile" BD_L* 
| eval Platform=case(match(_raw,"LINUX"),"LINUX",match(_raw,"AIX"),"AIX",match(_raw,"DB2"),"DB2", match(_raw,"SQL"),"SQL", match(_raw,"WEBSPHERE"),"WEBSPHERE", match(_raw,"SYBASE"),"SYBASE", match(_raw,"WINDOWS"),"WINDOWS", true(),"ZLINUX") 
| stats count by Platform RUNID
| join type=left RUNID
    [ search index="myIndex" source="/*/RUNID/*" CASE("ERROR") CTJT*
        | stats count by RUNID
    ]
| stats count by Platform

如果您能从两个指数中提供一些样本数据,我们可以让您更接近一个好的解决方案 - 但这应该会让您找到答案

可能有一种方法可以在此搜索中使用 join - 但我们需要示例数据来验证:)

对示例数据执行 EDIT 操作

这应该为您提供一种不使用 的方法,并获取您正在寻找的内容:join

index=ndx sourcetype=srctp 
| rex field=_raw " (?<runid>\d{10,})[\s\#]"
| rex field=_raw "BD_\w+_(?<platform>\w+)"
| rex field=_raw "(?<error>[eEoOrR]{5})"
| stats values(error) as error values(platform) as platform by runid
| where isnotnull(error)

假设平台始终以 BD_<string>_<platform> 的格式显示,这将从事件中提取 、 和(如果有),然后通过 stats 函数对它们进行分组runidplatformerrorvalues()

最后,where 子句将确保仅显示给定errorrunid

编辑 2 - OP 进一步更新了问题:添加一个:| stats count by platform

index=ndx sourcetype=srctp 
| rex field=_raw " (?<runid>\d{10,})[\s\#]"
| rex field=_raw "BD_\w+_(?<platform>\w+)"
| rex field=_raw "(?<error>[eEoOrR]{5})"
| stats values(error) as error values(platform) as platform by runid
| where isnotnull(error)
| stats count by platform

评论

0赞 RichG 10/26/2021
OP 提到从事件中提取 RUNID。如果仍然需要这样做,则应在 .它不能被提取,然后“传入”到子搜索。join
0赞 Niek Jonkman 10/26/2021
我添加了两个数据样本。它们来自同一个索引。上面的代码不返回任何统计信息。仅返回查询 1 返回的事件。
0赞 warren 10/26/2021
@RichG:我从最初提供的东西开始:)
0赞 warren 10/26/2021
@NiekJonkman - 请参阅更新
1赞 Niek Jonkman 10/28/2021
嗨,@warren,感谢您到目前为止的帮助,但我会坚持我发布的答案。查询返回过去 24 小时内的1345710事件(在索引和 sourcetype 上)。我的同事帮助我处理的查询在过去 24 小时内返回了 222 个事件。此外,查询是否显示与发现运行量相对应的其他计数(错误)。因此,对于开始的 50 次扫描,会出现 50 个错误,而记录的实际错误为 0。感谢您的帮助,虽然:)!
2赞 Niek Jonkman 10/27/2021 #2

在一位同事的帮助下,我设法做到了:

index="myIndex" "started with profile" BD_L* 
| eval platform=case(searchmatch("LINUX"),"LINUX",searchmatch("AIX"),"AIX",searchmatch("DB2"),"DB2", searchmatch("SQL"),"SQL", searchmatch("WEBSPHERE"),"WEBSPHERE", searchmatch("SYBASE"),"SYBASE", searchmatch("WINDOWS"),"WINDOWS", true(),"ZLINUX")
| rex "Discovery run, (?<RUNID>.+) started with profile"
| stats count by platform, RUNID
| join RUNID 
    [ search index="myIndex" source="/opt/XXX/XXXXX/XXXX/log/sensors/*/*" CASE("ERROR") CTJT* 
    | rex field=source "^/opt/XXX/XXXXX/XXXX/log/sensors/(?<RUNID>.+)/" 
    | stats count by RUNID ] 
| stats count by platform