提问人:Dean Meehan 提问时间:10/26/2023 更新时间:10/26/2023 访问量:16
如何从 Splunk 搜索中下载无限结果以导出数据
How can I download infinite results from a Splunk Search to Export the Data
问:
尝试通过 UI 从 Splunk 导出数据时,我只能下载在 UI 超时之前花费不到 60 秒的任何数据。如何下载特定搜索的所有结果?
Note: This is a self Answered Wiki Post
答:
1赞
Dean Meehan
10/26/2023
#1
通过 UI 下载 Splunk 结果有硬性限制,因此您需要使用 Splunk SKD 或 API 之一。使用 API 有 2 种解决方案:
- 使用终结点并为每个请求下载 50k 个结果以完成搜索。
/results
- 使用终结点在找到结果时对结果进行流式处理。
/export
注意:这些基本示例显示了在最佳情况下使用 Python3 下载结果的过程。实际上,您必须错误检查Splunk的响应是否超时,如果下载步骤失败,则必须重试。
示例 1:/results
此方法允许您执行 Splunk 实例可以处理的大型搜索,但您只能在单个请求中导出 50,000 个事件的结果。
第 1 步:创建搜索
def run_search(search_query, earliest_time='-15m', latest_time='now', allow_partial_results=False, max_count=50000):
url = f"https://splunk-customer-endpoint.splunkcloud.com:8089/services/search/jobs/{output_mode}"
headers = {
'Authorization': f'Bearer {splunk_token}'
}
data = {
'search': f'search {search_query}',
'earliest_time': earliest_time,
'latest_time': latest_time,
'allow_partial_results': allow_partial_results,
'max_count': max_count
}
response = requests.request("POST", url, headers=headers, data=data)
search_id = response.json()['sid']
return search_id
第 2 步:等待搜索完成
def get_status(search_id):
url = f"https://splunk-customer-endpoint.splunkcloud.com:8089/services/search/jobs/{search_id}"
data = {
'output_mode': 'json'
}
headers = {
'Authorization': f'Bearer {splunk_token}'
}
response = requests.request("GET", url, headers=headers, data=data)
return response.json()['entry']['content']['isDone']
第 3 步:下载此搜索的结果,一次最多 50000 个
def get_results(search_id, output_mode='csv', offset=0, max_results_batch=50000):
url = f"https://splunk-customer-endpoint.splunkcloud.com:8089/services/search/v2/jobs/{search_id}/results/"
data = {
'output_mode': output_mode,
'count': max_results_batch,
'offset': offset
}
headers = {
'Authorization': f'Bearer {splunk_token}'
}
response = requests.request("GET", url, headers=headers, params=data)
return response
示例 2:/export
此方法将允许您在使用 HTTP 流完成搜索之前下载可用的结果。要使用这种方法下载大量日志,您需要将导出分块为较小的时间范围,例如每 15 分钟一次,例如。通过设置这些部分结果并将其流式传输到文件。earliest_time='2023-08-30 16:30:00' latest_time='2023-08-30 16:45:00'
def run_export(search_query, earliest_time='-15m', latest_time='now', output_mode='csv'):
r = requests.post('https://qualtricssplunkcloud.splunkcloud.com:8089/services/search/v2/jobs/export',
headers = {
'Authorization': f'Bearer {splunk_token}'
},
data = {
'search': f'search {search_query}',
'earliest_time': earliest_time,
'latest_time': latest_time,
},
params = {
'output_mode': output_mode
},
stream=True
)
if r.encoding is None:
r.encoding = 'utf-8'
with open('exported-search.csv', 'a') as file:
for data in r.iter_lines():
file.write(data)
评论