🔥 本文讲解的使用chatgpt的openai调用,包含单线程、多线程、多进程的批量调用处理数据。至于哪种方法快,说不准,得自己测测,我觉得多线程就ok了,因为只是调用😄
文章目录
- 1、单线程批量处理
- 2、多线程批量处理
- 3、多进程批量处理
1、单线程批量处理
import openai
openai.api_key = 'sk-xxx'
import time
# 定义预测函数
def predict(prompt):
# 请求返回结果
# model:调用的模型名称,是一个字符串,用最新模型直接设置成gpt-3.5-turbo
# messages:请求的文本内容,是一个列表,列表里每个元素类型是字典
# role:system:设置gpt人设。
# role:assistant:表示gpt。
# role:user:表示用户。
retry_count = 100
retry_interval = 1
for _ in range(retry_count):
try:
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "system", "content": "算法工程师"},
{"role": "user", "content": prompt}],
temperature=0
)
# 抽出gpt答复的内容
msg = response.choices[0].message["content"].strip()
return msg
except openai.error.RateLimitError as e:
print("超出openai api 调用频率:", e)
print('重新请求....')
retry_count += 1
retry_interval *= 2 # 指数退避策略,每次重试后加倍重试间隔时间
time.sleep(retry_interval)
except Exception as e:
print("任务执行出错:", e)
print('重新请求....')
retry_count += 1
retry_interval *= 2 # 指数退避策略,每次重试后加倍重试间隔时间
time.sleep(retry_interval)
def main():
start_time = time.time()
prompt = """请用少于5个字回答问题:{}"""
input_data = ['1+1等于几啊?', '2+2等于几啊?', '3+3等于几啊?', '4+4等于几啊?']
all_res = []
for query in input_data[:1]:
res = predict(prompt.format(query))
all_res.append(res)
time.sleep(1)
end_time = time.time()
total_run_time = round(end_time-start_time, 3)
print('Total_run_time: {} s'.format(total_run_time))
print('chatgpt answer: ', all_res)
if __name__ == "__main__":
main()
2、多线程批量处理
import openai
openai.api_key = 'sk-xxx'
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import collections
# 定义预测函数
def predict(params):
prompt, query = params
prompt = prompt.format(query)
# 请求返回结果
# model:调用的模型名称,是一个字符串,用最新模型直接设置成gpt-3.5-turbo
# messages:请求的文本内容,是一个列表,列表里每个元素类型是字典
# role:system:设置gpt人设。
# role:assistant:表示gpt。
# role:user:表示用户。
retry_count = 100
retry_interval = 1
for _ in range(retry_count):
try:
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "system", "content": "算法工程师"},
{"role": "user", "content": prompt}],
temperature=0
)
# 抽出gpt答复的内容
msg = response.choices[0].message["content"].strip()
return query, msg
except openai.error.RateLimitError as e:
print("超出openai api 调用频率:", e)
print('重新请求....')
retry_count += 1
retry_interval *= 2 # 指数退避策略,每次重试后加倍重试间隔时间
time.sleep(retry_interval)
except TimeoutError:
print("任务执行超时:", query)
print('重新请求....')
retry_count += 1
retry_interval *= 2 # 指数退避策略,每次重试后加倍重试间隔时间
time.sleep(retry_interval)
except Exception as e:
print("任务执行出错:", e)
print('重新请求....')
retry_count += 1
retry_interval *= 2 # 指数退避策略,每次重试后加倍重试间隔时间
time.sleep(retry_interval)
return query,'api请求失败'
def main():
start_time = time.time()
# 多线程并行预测
# 您可能需要根据自己的需求调整间隔时间。另外,您可以根据需要调整线程池的大小,以获得更好的性能。
prompt = """请用少于5个字回答问题:```{}```"""
input_data = ['1+1等于几啊?', '2+2等于几啊?', '3+3等于几啊?', '4+4等于几啊?']
output_data = []
with ThreadPoolExecutor(max_workers=3) as executor:
## 同步调用.submit之后直接.result(一个进程执行完才能下一个进程)
# output_data = [executor.submit(predict, prompt.format(query)).result() for query in input_data]
# # 异步调用(多进程并发执行)
# futures = [executor.submit(predict, prompt.format(query)) for query in input_data]
# query2res = collections.defaultdict(int)
# # 同步等待结果(返回顺序和原数据顺序一致)
# for job in futures:
# query, res = job.result(timeout=None) # 默认timeout=None,不限时间等待结果
# query2res[query] = res
#
# time.sleep(1) # 为了避免超过OpenAI API的速率限制,每次预测之间间隔1秒
# 异步调用(多进程并发执行)
futures = [executor.submit(predict, (prompt, query)) for query in input_data]
query2res = collections.defaultdict(int) # 因为异步等待结果,返回的顺序是不定的,所以记录一下进程和输入数据的对应
# 异步等待结果(返回顺序和原数据顺序可能不一致) ,直接predict函数里返回结果?
for job in as_completed(futures):
query,res = job.result(timeout=None) # 默认timeout=None,不限时间等待结果
query2res[query] = res
time.sleep(1) # 为了避免超过OpenAI API的速率限制,每次预测之间间隔1秒
end_time = time.time()
total_run_time = round(end_time-start_time, 3)
print('Total_run_time: {} s'.format(total_run_time))
print(query2res)
import pandas as pd
df = pd.DataFrame({'query': list(query2res.keys()), 'infer_result': list(query2res.values())})
df.to_excel('./chatgpt_infer_result.xlsx', index=False)
if __name__ == "__main__":
main()
3、多进程批量处理
import openai
openai.api_key = 'sk-xxx'
from concurrent.futures import ProcessPoolExecutor, as_completed
import time
import collections
# 定义预测函数
def predict(params):
prompt, query = params
prompt = prompt.format(query)
# 请求返回结果
# model:调用的模型名称,是一个字符串,用最新模型直接设置成gpt-3.5-turbo
# messages:请求的文本内容,是一个列表,列表里每个元素类型是字典
# role:system:设置gpt人设。
# role:assistant:表示gpt。
# role:user:表示用户。
retry_count = 100
retry_interval = 1
for _ in range(retry_count):
try:
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "system", "content": "算法工程师"},
{"role": "user", "content": prompt}],
temperature=0
)
# 抽出gpt答复的内容
msg = response.choices[0].message["content"].strip()
return query, msg
except openai.error.RateLimitError as e:
print("超出openai api 调用频率:", e)
print('重新请求....')
retry_count += 1
retry_interval *= 2 # 指数退避策略,每次重试后加倍重试间隔时间
time.sleep(retry_interval)
except TimeoutError:
print("任务执行超时:", query)
print('重新请求....')
retry_count += 1
retry_interval *= 2 # 指数退避策略,每次重试后加倍重试间隔时间
time.sleep(retry_interval)
except Exception as e:
print("任务执行出错:", e)
print('重新请求....')
retry_count += 1
retry_interval *= 2 # 指数退避策略,每次重试后加倍重试间隔时间
time.sleep(retry_interval)
return query,'api请求失败'
def main():
start_time = time.time()
# 多进程并行预测
# 您可能需要根据自己的需求调整间隔时间。另外,您可以根据需要调整进程池的大小,以获得更好的性能。
prompt = """请用少于5个字回答问题:{}"""
input_data = ['1+1等于几啊?', '2+2等于几啊?', '3+3等于几啊?', '4+4等于几啊?']
# output_data = []
# output_data = collections.defaultdict(int)
with ProcessPoolExecutor(max_workers=2) as executor:
## 同步调用.submit之后直接.result(一个进程执行完才能下一个进程)
# output_data = [executor.submit(predict, prompt.format(query)).result() for query in input_data]
# # 异步调用(多进程并发执行)
# futures = [executor.submit(predict, prompt.format(query)) for query in input_data]
# query2res = collections.defaultdict(int)
# # 同步等待结果(返回顺序和原数据顺序一致)
# for job in futures:
# query, res = job.result(timeout=None) # 默认timeout=None,不限时间等待结果
# query2res[query] = res
#
# time.sleep(1) # 为了避免超过OpenAI API的速率限制,每次预测之间间隔1秒
# 异步调用(多进程并发执行)
futures = [executor.submit(predict, (prompt, query)) for query in input_data]
query2res = collections.defaultdict(int) # 因为异步等待结果,返回的顺序是不定的,所以记录一下进程和输入数据的对应
# 异步等待结果(返回顺序和原数据顺序可能不一致) ,直接predict函数里返回结果?
for job in as_completed(futures):
query,res = job.result(timeout=None) # 默认timeout=None,不限时间等待结果
query2res[query] = res
time.sleep(1) # 为了避免超过OpenAI API的速率限制,每次预测之间间隔1秒
end_time = time.time()
total_run_time = round(end_time-start_time, 3)
print('Total_run_time: {} s'.format(total_run_time))
print(query2res)
import pandas as pd
df = pd.DataFrame({'query': list(query2res.keys()), 'infer_result': list(query2res.values())})
df.to_excel('./chatgpt_infer_result.xlsx', index=False)
if __name__ == "__main__":
main()
🔥 本文讲解的使用chatgpt的openai调用,包含单线程、多线程、多进程的批量调用处理数据。至于哪种方法快,说不准,得自己测测,我觉得多线程就ok了,因为只是调用😄
文章目录
- 1、单线程批量处理
- 2、多线程批量处理
- 3、多进程批量处理
1、单线程批量处理
import openai
openai.api_key = 'sk-xxx'
import time
# 定义预测函数
def predict(prompt):
# 请求返回结果
# model:调用的模型名称,是一个字符串,用最新模型直接设置成gpt-3.5-turbo
# messages:请求的文本内容,是一个列表,列表里每个元素类型是字典
# role:system:设置gpt人设。
# role:assistant:表示gpt。
# role:user:表示用户。
retry_count = 100
retry_interval = 1
for _ in range(retry_count):
try:
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "system", "content": "算法工程师"},
{"role": "user", "content": prompt}],
temperature=0
)
# 抽出gpt答复的内容
msg = response.choices[0].message["content"].strip()
return msg
except openai.error.RateLimitError as e:
print("超出openai api 调用频率:", e)
print('重新请求....')
retry_count += 1
retry_interval *= 2 # 指数退避策略,每次重试后加倍重试间隔时间
time.sleep(retry_interval)
except Exception as e:
print("任务执行出错:", e)
print('重新请求....')
retry_count += 1
retry_interval *= 2 # 指数退避策略,每次重试后加倍重试间隔时间
time.sleep(retry_interval)
def main():
start_time = time.time()
prompt = """请用少于5个字回答问题:{}"""
input_data = ['1+1等于几啊?', '2+2等于几啊?', '3+3等于几啊?', '4+4等于几啊?']
all_res = []
for query in input_data[:1]:
res = predict(prompt.format(query))
all_res.append(res)
time.sleep(1)
end_time = time.time()
total_run_time = round(end_time-start_time, 3)
print('Total_run_time: {} s'.format(total_run_time))
print('chatgpt answer: ', all_res)
if __name__ == "__main__":
main()
2、多线程批量处理
import openai
openai.api_key = 'sk-xxx'
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import collections
# 定义预测函数
def predict(params):
prompt, query = params
prompt = prompt.format(query)
# 请求返回结果
# model:调用的模型名称,是一个字符串,用最新模型直接设置成gpt-3.5-turbo
# messages:请求的文本内容,是一个列表,列表里每个元素类型是字典
# role:system:设置gpt人设。
# role:assistant:表示gpt。
# role:user:表示用户。
retry_count = 100
retry_interval = 1
for _ in range(retry_count):
try:
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "system", "content": "算法工程师"},
{"role": "user", "content": prompt}],
temperature=0
)
# 抽出gpt答复的内容
msg = response.choices[0].message["content"].strip()
return query, msg
except openai.error.RateLimitError as e:
print("超出openai api 调用频率:", e)
print('重新请求....')
retry_count += 1
retry_interval *= 2 # 指数退避策略,每次重试后加倍重试间隔时间
time.sleep(retry_interval)
except TimeoutError:
print("任务执行超时:", query)
print('重新请求....')
retry_count += 1
retry_interval *= 2 # 指数退避策略,每次重试后加倍重试间隔时间
time.sleep(retry_interval)
except Exception as e:
print("任务执行出错:", e)
print('重新请求....')
retry_count += 1
retry_interval *= 2 # 指数退避策略,每次重试后加倍重试间隔时间
time.sleep(retry_interval)
return query,'api请求失败'
def main():
start_time = time.time()
# 多线程并行预测
# 您可能需要根据自己的需求调整间隔时间。另外,您可以根据需要调整线程池的大小,以获得更好的性能。
prompt = """请用少于5个字回答问题:```{}```"""
input_data = ['1+1等于几啊?', '2+2等于几啊?', '3+3等于几啊?', '4+4等于几啊?']
output_data = []
with ThreadPoolExecutor(max_workers=3) as executor:
## 同步调用.submit之后直接.result(一个进程执行完才能下一个进程)
# output_data = [executor.submit(predict, prompt.format(query)).result() for query in input_data]
# # 异步调用(多进程并发执行)
# futures = [executor.submit(predict, prompt.format(query)) for query in input_data]
# query2res = collections.defaultdict(int)
# # 同步等待结果(返回顺序和原数据顺序一致)
# for job in futures:
# query, res = job.result(timeout=None) # 默认timeout=None,不限时间等待结果
# query2res[query] = res
#
# time.sleep(1) # 为了避免超过OpenAI API的速率限制,每次预测之间间隔1秒
# 异步调用(多进程并发执行)
futures = [executor.submit(predict, (prompt, query)) for query in input_data]
query2res = collections.defaultdict(int) # 因为异步等待结果,返回的顺序是不定的,所以记录一下进程和输入数据的对应
# 异步等待结果(返回顺序和原数据顺序可能不一致) ,直接predict函数里返回结果?
for job in as_completed(futures):
query,res = job.result(timeout=None) # 默认timeout=None,不限时间等待结果
query2res[query] = res
time.sleep(1) # 为了避免超过OpenAI API的速率限制,每次预测之间间隔1秒
end_time = time.time()
total_run_time = round(end_time-start_time, 3)
print('Total_run_time: {} s'.format(total_run_time))
print(query2res)
import pandas as pd
df = pd.DataFrame({'query': list(query2res.keys()), 'infer_result': list(query2res.values())})
df.to_excel('./chatgpt_infer_result.xlsx', index=False)
if __name__ == "__main__":
main()
3、多进程批量处理
import openai
openai.api_key = 'sk-xxx'
from concurrent.futures import ProcessPoolExecutor, as_completed
import time
import collections
# 定义预测函数
def predict(params):
prompt, query = params
prompt = prompt.format(query)
# 请求返回结果
# model:调用的模型名称,是一个字符串,用最新模型直接设置成gpt-3.5-turbo
# messages:请求的文本内容,是一个列表,列表里每个元素类型是字典
# role:system:设置gpt人设。
# role:assistant:表示gpt。
# role:user:表示用户。
retry_count = 100
retry_interval = 1
for _ in range(retry_count):
try:
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "system", "content": "算法工程师"},
{"role": "user", "content": prompt}],
temperature=0
)
# 抽出gpt答复的内容
msg = response.choices[0].message["content"].strip()
return query, msg
except openai.error.RateLimitError as e:
print("超出openai api 调用频率:", e)
print('重新请求....')
retry_count += 1
retry_interval *= 2 # 指数退避策略,每次重试后加倍重试间隔时间
time.sleep(retry_interval)
except TimeoutError:
print("任务执行超时:", query)
print('重新请求....')
retry_count += 1
retry_interval *= 2 # 指数退避策略,每次重试后加倍重试间隔时间
time.sleep(retry_interval)
except Exception as e:
print("任务执行出错:", e)
print('重新请求....')
retry_count += 1
retry_interval *= 2 # 指数退避策略,每次重试后加倍重试间隔时间
time.sleep(retry_interval)
return query,'api请求失败'
def main():
start_time = time.time()
# 多进程并行预测
# 您可能需要根据自己的需求调整间隔时间。另外,您可以根据需要调整进程池的大小,以获得更好的性能。
prompt = """请用少于5个字回答问题:{}"""
input_data = ['1+1等于几啊?', '2+2等于几啊?', '3+3等于几啊?', '4+4等于几啊?']
# output_data = []
# output_data = collections.defaultdict(int)
with ProcessPoolExecutor(max_workers=2) as executor:
## 同步调用.submit之后直接.result(一个进程执行完才能下一个进程)
# output_data = [executor.submit(predict, prompt.format(query)).result() for query in input_data]
# # 异步调用(多进程并发执行)
# futures = [executor.submit(predict, prompt.format(query)) for query in input_data]
# query2res = collections.defaultdict(int)
# # 同步等待结果(返回顺序和原数据顺序一致)
# for job in futures:
# query, res = job.result(timeout=None) # 默认timeout=None,不限时间等待结果
# query2res[query] = res
#
# time.sleep(1) # 为了避免超过OpenAI API的速率限制,每次预测之间间隔1秒
# 异步调用(多进程并发执行)
futures = [executor.submit(predict, (prompt, query)) for query in input_data]
query2res = collections.defaultdict(int) # 因为异步等待结果,返回的顺序是不定的,所以记录一下进程和输入数据的对应
# 异步等待结果(返回顺序和原数据顺序可能不一致) ,直接predict函数里返回结果?
for job in as_completed(futures):
query,res = job.result(timeout=None) # 默认timeout=None,不限时间等待结果
query2res[query] = res
time.sleep(1) # 为了避免超过OpenAI API的速率限制,每次预测之间间隔1秒
end_time = time.time()
total_run_time = round(end_time-start_time, 3)
print('Total_run_time: {} s'.format(total_run_time))
print(query2res)
import pandas as pd
df = pd.DataFrame({'query': list(query2res.keys()), 'infer_result': list(query2res.values())})
df.to_excel('./chatgpt_infer_result.xlsx', index=False)
if __name__ == "__main__":
main()