multithreading - Python - multiprocessing threads don't close when using Queue -
this python 3.x
i'm loading records csv file in chunks of 300, spawning worker threads submit them rest api. i'm saving http response in queue, can count number of skipped records once entire csv file processed. however, after added queue worker, threads don't seem close anymore. want monitor number of thread 2 reasons: (1) once done, can calculate , display skip counts , (2) want enhance script spawn no more 20 or threads, don't run out of memory.
i have 2 questions:
- can explain why thread stays active when using
q.put()
? - is there different way manage # of threads, , monitor whether threads done?
here code (somewhat simplified, because can't share exact details of api i'm calling):
import requests, json, csv, time, datetime, multiprocessing test_file = 'file.csv' def read_test_data(path, chunksize=300): leads = [] open(path, 'ru') data: reader = csv.dictreader(data) index, row in enumerate(reader): if (index % chunksize == 0 , index > 0): yield leads del leads[:] leads.append(row) yield leads def worker(leads, q): payload = {"action":"createorupdate","input":leads} r = requests.post(url, params=params, data=json.dumps(payload), headers=headers) q.put(r.text) # puts response in queue later analysis return if __name__ == "__main__": q = multiprocessing.queue() # queue put http responses in, count skips jobs = [] leads in read_test_data(test_file): # function reads csv file , provides 300 records @ time p = multiprocessing.process(target=worker, args=(leads,q,)) jobs.append(p) p.start() time.sleep(20) # checking if processes closing automatically (they don't) print(len(multiprocessing.active_children())) ## returns number of threads. if remove 'q.put' worker, returns 0 # intent wait until workers done, results in infinite loop # when remove 'q.put' in worker works fine #while len(multiprocessing.active_children()) > 0: # # time.sleep(1) skipped_count = 0 while not q.empty(): # calculate number of skipped records based on http responses in queue http_response = json.loads(q.get()) in http_response['result']: if (i['status'] == "skipped" , i['reasons'][0]['code'] == "1004"): skipped_count += 1 print("number of records skipped: " + str(skipped_count))
this because of documented quirk of multiprocessing.queue
:
bear in mind process has put items in queue wait before terminating until buffered items fed “feeder” thread underlying pipe. (the child process can call
cancel_join_thread()
method of queue avoid behaviour.)this means whenever use queue need make sure items have been put on queue removed before process joined. otherwise cannot sure processes have put items on queue terminate. remember non-daemonic processes joined automatically.
basically, need make sure get()
items queue
guarantee processes put
queue
able exit.
i think in case you're better off using multiprocessing.pool
, , submitting jobs multiprocessing.pool.map
. simplifies things significantly, , gives complete control on number of processes running:
def worker(leads): payload = {"action":"createorupdate","input":leads} r = requests.post(url, params=params, data=json.dumps(payload), headers=headers) return r.text if __name__ == "__main__": pool = multiprocessing.pool(multiprocessing.cpu_count() * 2) # cpu_count() * 2 processes running in pool responses = pool.map(worker, read_test_data(test_file)) skipped_count = 0 raw_response in responses: http_response = json.loads(raw_response) in http_response['result']: if (i['status'] == "skipped" , i['reasons'][0]['code'] == "1004"): skipped_count += 1 print("number of records skipped: " + str(skipped_count))
if you're worried memory cost of converting read_test_data(test_file)
list (which required use pool.map
), can use pool.imap
instead.
edit:
as mentioned in comment above, use-case looks it's i/o-bound, means may see better performance using multiprocessing.dummy.pool
(which uses thread pool instead of process pool). give both try , see faster.
Comments
Post a Comment