test_threading.py 750 B

12345678910111213141516171819202122232425262728293031323334353637
  1. # coding:utf-8
  2. import threading
  3. from queue import Queue
  4. """
  5. Queue的使用
  6. """
  7. def job(l, q):
  8. for i in range(len(l)):
  9. l[i] = l[i] ** 2
  10. q.put(l)
  11. def multithreading():
  12. # 创建队列
  13. q = Queue()
  14. # 线程列表
  15. threads = []
  16. # 二维列表
  17. data = [[1, 2, 3], [4, 5, 6], [7, 8, 9], [6, 6, 6]]
  18. for i in range(4):
  19. t = threading.Thread(target=job, args=(data[i], q))
  20. t.start()
  21. threads.append(t)
  22. # 对所有线程进行阻塞
  23. for thread in threads:
  24. thread.join()
  25. results = []
  26. # 将新队列中的每个元素挨个放到结果列表中
  27. for _ in range(4):
  28. results.append(q.get())
  29. print(results)
  30. if __name__ == "__main__":
  31. multithreading()