如何并行快速执行for循环(Python)

How to do rapid execution of for loop(Python) in parallel

提问人:Ram prabhu 提问时间:11/17/2023 最后编辑:Ram prabhu 更新时间:11/17/2023 访问量:84

问:

我正在使用 python 构建一个股票市场交易工具。截至目前,经纪公司发布了一个python sdk,我们可以使用它开发软件。 当我发送购买股票的请求时,问题就出现了。每当下订单时,我都会收到来自服务器的响应作为确认,这需要 0.8 到 1.2 秒。

就我而言,按照上面的顺序,我一次至少会下 20 个订单,如果我使用 for 循环下订单,则至少需要 20 秒才能完成循环。实际上,我不使用在放置订单后获得的响应,因此我不想等待响应,并且需要一次执行所有 20 个订单。我尝试了线程、Asyncio、并发期货。 所有这些循环至少需要 5 秒才能完成 20 次迭代的 for 循环。如果迭代小于 6,则使用线程会给出最佳结果,即订单会一直执行,没有任何延迟。 我恳请有人对此有所了解,我怎样才能毫不拖延地执行 20 次迭代的 for 循环。

这需要一次执行 20 次:

client.place_order(exchange_segment = ExchangeSegment , product = Product, price = '45',
                   order_type = OrderType, quantity = quantity, validity = Validity,
                   trading_symbol = TradingSymbol, transaction_type = TransactionType,
                   tag = Tag, amo = AMO)

我尝试了以下方法,

线程:

def placeorder(quantity):
            client.place_order(exchange_segment = ExchangeSegment , product = Product, price = '45',order_type = OrderType, quantity = quantity, validity = Validity,trading_symbol = TradingSymbol, transaction_type = TransactionType,tag = Tag, amo = AMO)
            
        quantity=[]  
        for i in range(20):
          quantity.append(str(800)) #mentioning the order quantitiy for each iteration in this list
            
        
        threads = []
        for i in range(20):
            t = threading.Thread(target=palceorder, args=(quantity[i],))
            threads.append(t)
            t.start()

异步:

        quantity=[]  
        for i in range(20):
          quantity.append(str(800))
        def background(f):
            def wrapped(*args, **kwargs):
                return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)

            return wrapped

        @background
        def placeorder(quantity):
            client.place_order(exchange_segment = ExchangeSegment , product = Product, price = '45',order_type = OrderType, quantity = quantity, validity = Validity,trading_symbol = TradingSymbol, transaction_type = TransactionType,tag = Tag, amo = AMO)
        for i in quantity:
            placeorder(i)`

并发期货:

def placeorder(quantity):
            client.place_order(exchange_segment = ExchangeSegment , product = Product, price = '45',order_type = OrderType, quantity = quantity, validity = Validity,trading_symbol = TradingSymbol, transaction_type = TransactionType,tag = Tag, amo = AMO)
quantity=[]  
for i in range(20):
   quantity.append(str(800))
with concurrent.futures.ThreadPoolExecutor(15) as executor:
            executor.map(placeorder, quantity)
python-3.x 线程性能多 处理

评论

1赞 Booboo 11/17/2023
Threads 代码中的缩进不一致且不正确。
0赞 Ram prabhu 11/17/2023
我已经编辑了一些缩进更正,如果它仍然缺少某个地方,你能帮我更正它吗?
0赞 Booboo 11/17/2023
对于创建列表的价值,您只需要.你有这么多的代码,但完成的却很少。quantityquantity = ['800'] * 20
0赞 Ram prabhu 11/17/2023
@Booboo,感谢您的评论。是的,兄弟,因为我正在学习这门语言只是为了编写这个工具,所以我是一个新手,必须学习更多才能使用我的程序中的技巧和快捷方式。

答:

0赞 Booboo 11/17/2023 #1

如果您试图确保每秒提交的订单不超过 20 个,则可以使用以下类:RateLimiter

from collections import deque
from threading import Lock
import time
from concurrent.futures import ThreadPoolExecutor

class RateLimiter:
    def __init__(self, call_count: int, period: float=1.0) -> None:
        self._call_count = int(call_count)
        self._period = float(period)
        self._called_timestamps = deque()
        self._lock = Lock()

    def throttle(self) -> None:
        with self._lock:
            while True:
                now = time.monotonic()
                while self._called_timestamps:
                    time_left = self._called_timestamps[0] + self._period - now
                    if time_left >= 0:
                        break
                    self._called_timestamps.popleft()
                if len(self._called_timestamps) < self._call_count:
                    break
                time.sleep(time_left)
            self._called_timestamps.append(now)


# 20 calls per second (or perhaps 20 calls per 1.1 second
# just to be safe):
rate_limiter = RateLimiter(20, 1.0)

def create_order(x: int) -> None:
    ... # arbitrary code to get things ready for the API call

    # Put this immediately before invoking the API:
    rate_limiter.throttle()
    # We would normally be calling an API to create an order,
    # but for demo purposes we just print x with the time:
    print(f'x = {x:2} at time {time.time()}')

with ThreadPoolExecutor(20) as executor:
    executor.map(create_order, range(42))

指纹:

x =  0 at time 1700224533.0041072
x =  1 at time 1700224533.0041072
x =  2 at time 1700224533.0062273
x =  3 at time 1700224533.0062273
x =  5 at time 1700224533.0072248
x =  6 at time 1700224533.0082252
x =  4 at time 1700224533.0072248
x =  9 at time 1700224533.0082252
x = 10 at time 1700224533.0095797
x =  8 at time 1700224533.0082252
x = 13 at time 1700224533.0095797
x = 14 at time 1700224533.0105777
x = 11 at time 1700224533.0095797
x = 17 at time 1700224533.011574
x = 18 at time 1700224533.011574
x =  7 at time 1700224533.0082252
x = 16 at time 1700224533.011574
x = 12 at time 1700224533.0095797
x = 15 at time 1700224533.0105777
x = 19 at time 1700224533.011574
x = 20 at time 1700224534.011685
x = 21 at time 1700224534.011685
x = 23 at time 1700224534.011685
x = 24 at time 1700224534.011685
x = 26 at time 1700224534.011685
x = 28 at time 1700224534.011685
x = 22 at time 1700224534.011685
x = 31 at time 1700224534.011685
x = 33 at time 1700224534.011685
x = 25 at time 1700224534.011685
x = 29 at time 1700224534.011685
x = 32 at time 1700224534.011685
x = 27 at time 1700224534.011685
x = 30 at time 1700224534.011685
x = 34 at time 1700224534.026685
x = 35 at time 1700224534.026685
x = 37 at time 1700224534.026685
x = 39 at time 1700224534.026685
x = 38 at time 1700224534.026685
x = 36 at time 1700224534.026685
x = 40 at time 1700224535.025956
x = 41 at time 1700224535.025956