iT邦幫忙

0

python 一次性大量請求api

  • 分享至 

  • xImage
  •  

原本的api:

def sendRequest(param):
                response = sess.post(url,data=param,verify=False)
                return response.json()

一次api需要0.9秒,如果1000次的話,就需要900秒;
這裏有兩個做法:多線程打api,異步打api;
通常這種多io操作的情況下,使用異步打api;

    try:
            # sess = requests.Session()
            successCount= 0
            failCount = 0
            payloadList = []
            for params in paramsList:
                ownerID = params['ownerID']
                ownerName = params['ownerName']
                doveID = params['doveID']
                orgRing = params['orgRing']
                dovecoteID = params['dovecoteID']
                rfID = params['rfID']
                raceID = params['raceID']

                payload = {
                'updatetype' : 'ownerDove',
                'ownerID' : ownerID,
                'ownerName' : ownerName,
                'doveID' : doveID,
                'orgRing' : orgRing,
                'dovecoteID' : dovecoteID,
                'rfid' : rfID,
                'raceID' : raceID
                }
                payloadList.append(payload)
                # payloadList = payloadList[:200]
            async def sendRequest(session,param):
                async with session.post(url,data=param,verify_ssl =False) as response:
                    return await response.json()
            async def loopAskapi():
                async with aiohttp.ClientSession() as session:
                    tasks = []
                    for payload in payloadList:
                        # task = asyncio.ensure_future(sendRequest(session, payload))
                        task = sendRequest(session,payload)
                        tasks.append(task)
                    responses = await asyncio.gather(*tasks)
                    print(len(responses))
            def start_loop(loop):
                asyncio.set_event_loop(loop)
                loop.run_until_complete(loopAskapi())
            loop = asyncio.new_event_loop()
            t = threading.Thread(target=start_loop, args=(loop,))
            t.start()
            t.join()
            return successCount
            # print(responseList)
        except Exception as e:
            print(e)
            return 0

這樣使用postman測試是20s打完;
另外還有使用requests.Session的做法:時間差不多:

           sess = requests.Session()
            successCount= 0
            totalCallCount = 0
            for params in paramsList:
                ownerID = params['ownerID']
                ownerName = params['ownerName']
                doveID = params['doveID']
                orgRing = params['orgRing']
                dovecoteID = params['dovecoteID']
                rfID = params['rfID']
                raceID = params['raceID']

                payload = {
                'updatetype' : 'ownerDove',
                'ownerID' : ownerID,
                'ownerName' : ownerName,
                'doveID' : doveID,
                'orgRing' : orgRing,
                'dovecoteID' : dovecoteID,
                'rfid' : rfID,
                'raceID' : raceID
                }
                r = sess.post(url, data=payload, verify=False)
                j = json.loads(r.text)
                status = j['results']
                print('status', status)
                if status !=0:
                    successCount += 1
                totalCallCount+=1
                print('totalCallCount:',totalCallCount)
            sess.close()
            return successCount

圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言