如何从 Splunk 搜索中下载无限结果以导出数据

How can I download infinite results from a Splunk Search to Export the Data

提问人:Dean Meehan 提问时间:10/26/2023 更新时间:10/26/2023 访问量:16

问:

尝试通过 UI 从 Splunk 导出数据时,我只能下载在 UI 超时之前花费不到 60 秒的任何数据。如何下载特定搜索的所有结果?

Note: This is a self Answered Wiki Post
splunk splunk-api

评论


答:

1赞 Dean Meehan 10/26/2023 #1

通过 UI 下载 Splunk 结果有硬性限制,因此您需要使用 Splunk SKD 或 API 之一。使用 API 有 2 种解决方案:

  1. 使用终结点并为每个请求下载 50k 个结果以完成搜索。/results
  2. 使用终结点在找到结果时对结果进行流式处理。/export

注意:这些基本示例显示了在最佳情况下使用 Python3 下载结果的过程。实际上,您必须错误检查Splunk的响应是否超时,如果下载步骤失败,则必须重试。

示例 1:/results

此方法允许您执行 Splunk 实例可以处理的大型搜索,但您只能在单个请求中导出 50,000 个事件的结果。

第 1 步:创建搜索

def run_search(search_query, earliest_time='-15m', latest_time='now', allow_partial_results=False, max_count=50000):
  url = f"https://splunk-customer-endpoint.splunkcloud.com:8089/services/search/jobs/{output_mode}"

  headers = {
    'Authorization': f'Bearer {splunk_token}'
  }
  data = {
    'search': f'search {search_query}',
    'earliest_time': earliest_time,
    'latest_time': latest_time,
    'allow_partial_results': allow_partial_results,
    'max_count': max_count
  }
  response = requests.request("POST", url, headers=headers, data=data)

  search_id = response.json()['sid']
  return search_id

第 2 步:等待搜索完成

def get_status(search_id):  
    url = f"https://splunk-customer-endpoint.splunkcloud.com:8089/services/search/jobs/{search_id}"
    data = {
        'output_mode': 'json'
    }
    headers = {
        'Authorization': f'Bearer {splunk_token}'
    }
    response = requests.request("GET", url, headers=headers, data=data)
    return response.json()['entry']['content']['isDone']

第 3 步:下载此搜索的结果,一次最多 50000 个

def get_results(search_id, output_mode='csv', offset=0, max_results_batch=50000):
    url = f"https://splunk-customer-endpoint.splunkcloud.com:8089/services/search/v2/jobs/{search_id}/results/"
    data = {
        'output_mode': output_mode,
        'count': max_results_batch,
        'offset': offset
    }
    headers = {
        'Authorization': f'Bearer {splunk_token}'
    }
    response = requests.request("GET", url, headers=headers, params=data)
    return response

示例 2:/export

此方法将允许您在使用 HTTP 流完成搜索之前下载可用的结果。要使用这种方法下载大量日志,您需要将导出分块为较小的时间范围,例如每 15 分钟一次,例如。通过设置这些部分结果并将其流式传输到文件。earliest_time='2023-08-30 16:30:00' latest_time='2023-08-30 16:45:00'

def run_export(search_query, earliest_time='-15m', latest_time='now', output_mode='csv'):
    r = requests.post('https://qualtricssplunkcloud.splunkcloud.com:8089/services/search/v2/jobs/export', 
        headers = {
            'Authorization': f'Bearer {splunk_token}'
        },
        data = {
            'search': f'search {search_query}',
            'earliest_time': earliest_time,
            'latest_time': latest_time,
        },
        params = {
            'output_mode': output_mode
        },
        stream=True
    )

    if r.encoding is None:
        r.encoding = 'utf-8'

    with open('exported-search.csv', 'a') as file:
        for data in r.iter_lines():
            file.write(data)