从AMFI网站提取每日共同基金资产净值数据并将其存储在MongoDB中

Extract Daily Mutual Funds NAV data from AMFI Website and Store it in MongoDB

提问人:dewashya 提问时间:11/16/2023 更新时间:11/16/2023 访问量:41

问:

我想从AMFI网站下载所有方案的每日NAV(资产净值),并将其全部存储在MongoDB中。但是使用我当前的代码,下载所有数据并将其推送到数据库中需要将近 5 天的时间太长,因为我正在尝试更改数据的结构。我希望是否有人可以帮助我优化代码,以便更快地完成。

我知道在我的代码中,占用时间的事情是我试图将每个日期的每个 NAV 数据推送到数据库中。一个接一个。我想对它进行分组并将其推送到数据库中,但要完成所有这些工作,我想我需要一台更好的笔记本电脑。因为如果我将数据存储在数组中,它会占用大量空间。

请在下面找到我的代码

#https://portal.amfiindia.com/DownloadNAVHistoryReport_Po.aspx?&frmdt=14-Aug-2023&todt=16-Aug-2023

import requests
from pytz import utc
from datetime import datetime
import pymongo  # Import the pymongo library for MongoDB operations



# Initialize MongoDB client and database
mongo_client = pymongo.MongoClient("mongodb://localhost:27017/")  # Replace with your MongoDB connection string
mydb = mongo_client["M_F"]  # Replace with your database name
mycollection = mydb["MyNAV"]  # Replace with your collection name


def convert_date_to_utc_datetime(date_string):
    date_format = "%d-%b-%Y"
    date_object = datetime.strptime(date_string, date_format)
    return date_object.replace(tzinfo=utc)
from datetime import datetime, timedelta

def split_date_range(start_date_str, end_date_str, max_duration=90):
    # Convert input date strings to datetime objects
    start_date = datetime.strptime(start_date_str, "%d-%b-%Y")
    end_date = datetime.strptime(end_date_str, "%d-%b-%Y")

    date_ranges = []

    current_date = start_date
    while current_date <= end_date:
        # Calculate the end of the current sub-range
        sub_range_end = current_date + timedelta(days=max_duration - 1)
        
        # Make sure the sub-range end is not greater than the end_date
        if sub_range_end > end_date:
            sub_range_end = end_date

        # Append the current sub-range as a tuple to the list
        date_ranges.append((current_date, sub_range_end))

        # Move the current_date to the day after the sub-range end
        current_date = sub_range_end + timedelta(days=1)

    return date_ranges

def nav_data(start,end):
    """Put the date in DD-Mmm-YYYY that too in a string format"""
    url = f"https://portal.amfiindia.com/DownloadNAVHistoryReport_Po.aspx?&frmdt={start}&todt={end}"
    response = requests.session().get(url)
    print("Got the data form connection")
    data = response.text.split("\r\n")
    Structure = ""
    Category = ""
    Sub_Category = ""
    amc = ""
    code = int()
    name = str()
    nav = float()
    date = ""
    inv_src = ""
    dg = ""
    i = 0
    j = 1
    
    for lines in data[1:]:
        split = lines.split(";")
        if j == len(data)-1:
            break
        if split[0] == "":
            # To check the Scheme [Structure, Category, Sub-Category]
            if data[j] == data[j+1]:
                sch_cat = data[j-1].split("(")
                sch_cat[-1]=sch_cat[-1][:-2].strip()
                sch_cat = [i.strip() for i in sch_cat]
                if "-" in sch_cat[1]:
                    sch_sub_cat = sch_cat[1].split("-")
                    sch_sub_cat = [i.strip() for i in sch_sub_cat]
                    sch_cat.pop(-1)
                    sch_cat = sch_cat+sch_sub_cat
                else:
                    sch_sub_cat = ["",sch_cat[1]]
                    sch_cat.pop(-1)
                    sch_cat = sch_cat+sch_sub_cat
                Structure = sch_cat[0]
                Category = sch_cat[1]
                Sub_Category = sch_cat[2]
                #print(sch_cat)
            # to check the AMC name
            elif "Mutual Fund" in data[j+1]:
                amc = data[j+1]
        elif len(split)>1:
            code = int(split[0].strip())
            name = str(split[1].strip())
            if "growth" in name.lower():
                dg = "Growth"
            elif "idcw" or "dividend" in name.lower():
                dg = "IDCW"
            else:
                dg = ""

            if "direct" in name.lower():
                inv_src = "Direct"
            elif "regular" in name.lower():
                inv_src = "Regular"
            else:
                inv_src = ""

            try:
                nav = float(split[4].strip()) 
            except:
                nav = split[4].strip()
           
            date = convert_date_to_utc_datetime(split[7].strip())
            print(type(date),date)
            existing_data = mycollection.find_one({"meta.Code": code})
            if existing_data:
                    # If data with the code already exists in MongoDB, update it
                    mycollection.update_one({"_id": existing_data["_id"]}, {
                        "$push": {"data": {"date": date, "nav": nav}}})
                    print("Another one bites the dust")
            else:
                new_record = {
                    "meta": {
                        "Structure": Structure,
                        "Category": Category, 
                        "Sub-Category": Sub_Category,
                        "AMC": amc, 
                        "Code": code, 
                        "Name": name,
                        "Source": inv_src,
                        "Option" : dg
                    },
                    "data": [{"date":date, "nav": nav }]
                }
                mycollection.insert_one(new_record)
                print("Data data data")
        j = j+1

    return

start_date_str = "04-Apr-2023"
end_date_str = "31-Aug-2023"
max_duration = 90

date_ranges = split_date_range(start_date_str, end_date_str, max_duration)
for start, end in date_ranges:
    print(f"Start Date: {start.strftime('%d-%b-%Y')}, End Date: {end.strftime('%d-%b-%Y')}")
    nav_data(start.strftime('%d-%b-%Y'),end.strftime('%d-%b-%Y'))
input("press any key to confirm")

python 数据库 mongodb 网页抓取 大数据

评论


答:

0赞 0x00 11/16/2023 #1

我可以推荐两件事。

  • 会话对象用于您的请求。每次您发出请求时,该模块都会创建一个新连接,这确实需要一些时间。GETrequests

    def nav_data(start,end, req):
        url = f"https://..."
        response = req.get(url)
        ...
    
    
    with requests.Session() as req:
        for start, end in date_ranges:
            print(f"Start Date: {start.strftime('%d-%b-%Y')}, End Date: {end.strftime('%d-%b-%Y')}")
            nav_data(start.strftime('%d-%b-%Y'),end.strftime('%d-%b-%Y'), req)
    
  • 对 mongodb 使用批量插入。您说将数据存储在数组中需要大量空间,但您测试过吗?如果它只是一个包含字符串的字典,它不应该使用太多内存。

评论

0赞 dewashya 11/17/2023
非常感谢您的回复。既然你这么说,我意识到我应该使用会话。我将尝试这种批量插入。我记得我尝试插入整个数据一次,但没有用,我的系统停止响应。顺便说一句,我有一个不错的系统。数据的问题是我有一个数据数组作为一些 ~30k 共同基金的值,每个基金平均有 2k 个数据。例如。{元数据:{a:a,b:b,c:c} 数据: [ {date:ISODATE(dd-mm-yy), NAV:XX.XXXX}, {date:ISODATE(dd-mm-yy), NAV:XX.XXXX}, {},{},.....{} ] }
0赞 Satria Adhi Pradana 11/16/2023 #2

在将每日资产净值数据从 AMFI 网站下载并存储到 MongoDB 时,您似乎正在处理性能问题。主要瓶颈似乎是将每个日期的每个资产净值数据逐个推送到数据库中的过程。若要优化代码并使其更高效,可以考虑以下建议:

  1. 批量插入:与其逐个插入数据,不如考虑使用 MongoDB 的批量写入操作。这样可以显著提高插入速度。您可以创建文档数组并批量插入它们。下面是在 Python 中使用 PyMongo 的简化示例:

    from pymongo import MongoClient
    
    # Connect to MongoDB
    client = MongoClient('your_mongodb_uri')
    db = client['your_database']
    collection = db['your_collection']
    
    # Your existing code to fetch NAV data
    nav_data = [...]  # Your NAV data as a list of dictionaries
    
    # Batch insert into MongoDB
    collection.insert_many(nav_data)
    
  2. 索引:确保在 MongoDB 集合上有适当的索引。索引可以显著加快读取和写入操作的速度。确定您经常查询或用于排序的字段,并相应地创建索引。

    # Example: Create an index on the 'date' field
    collection.create_index([('date', pymongo.ASCENDING)])
    
  3. 并行处理:请考虑使用并行处理来同时下载和插入数据。为此,您可以使用 Python 的模块。将任务分解为更小的块并同时处理它们。concurrent.futures

    from concurrent.futures import ThreadPoolExecutor
    
    def process_chunk(chunk):
        # Your code to fetch and insert data for a chunk
    
    # Split nav_data into chunks
    chunks = [...]
    
    # Process chunks in parallel
    with ThreadPoolExecutor() as executor:
        executor.map(process_chunk, chunks)
    
  4. 优化数据处理:查看数据处理逻辑以识别任何低效操作。确保您为特定用例使用最有效的算法和数据结构。

  5. 使用 MongoDB 聚合框架:如果需要在将数据插入 MongoDB 之前对其进行转换或聚合,请考虑使用 MongoDB 聚合框架。这允许您直接在数据库中执行复杂的操作。

    # Example: Use MongoDB Aggregation Framework for grouping by date
    pipeline = [
        {'$group': {'_id': '$date', 'avg_nav': {'$avg': '$nav'}}},
        {'$out': 'aggregated_collection'}
    ]
    collection.aggregate(pipeline)
    

实现这些优化应该有助于提高代码的速度。可能需要根据您的应用程序和数据的具体细节进行调整。

免责声明:我想尝试准确的chatGPT