Untitled Diff

Created Diff never expires
35 removals
Lines
Total
Removed
Words
Total
Removed
To continue using this feature, upgrade to
Diffchecker logo
Diffchecker Pro
88 lines
12 additions
Lines
Total
Added
Words
Total
Added
To continue using this feature, upgrade to
Diffchecker logo
Diffchecker Pro
66 lines
import multiprocessing
import multiprocessing
import logging
import logging
import logging.handlers
import logging.handlers
import time
import time


logger_pool = {}
logger_pool = {}




class Loggers:
class Loggers:
global logger_pool
global logger_pool


def get_listener_logger(self, id):
formatter = logging.Formatter("%(message)s")
logger = logging.getLogger(id)
logger.setLevel(logging.INFO)

#handler = logging.handlers.RotatingFileHandler('log.log')
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger

def get_worker_logger(self, id, queue):
def get_worker_logger(self, id, queue):
"""
"""
如果池中存在则取出
如果池中存在则取出
如果不存在则创建
如果不存在则创建
"""
"""
if logger_pool.get(id):
if logger_pool.get(id):
return logger_pool.get(id)
return logger_pool.get(id)
else:
else:
"""
"""
创建日志实例
创建日志实例
"""
"""
formatter = logging.Formatter("[%(asctime)s] %(name)s:%(levelname)s: %(message)s")
formatter = logging.Formatter("[%(asctime)s] %(name)s:%(levelname)s: %(message)s")
logger = logging.getLogger(id)
logger = logging.getLogger(id)
logger.setLevel(logging.INFO)
logger.setLevel(logging.INFO)


handler = logging.handlers.QueueHandler(queue)
handler = logging.handlers.QueueHandler(queue)
handler.setFormatter(formatter)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.addHandler(handler)
logger_pool[id] = logger
logger_pool[id] = logger
return logger
return logger


logger_class = Loggers()
logger_class = Loggers()


def listener_process(queue, flag_queue):
def listener_process(queue):
listener_logger = logger_class.get_listener_logger('listener')
formatter = logging.Formatter("%(message)s")
while True:
handler = logging.StreamHandler()
if queue.empty() and flag_queue.empty():
handler.setFormatter(formatter)
print('listener stop!')
listener = logging.handlers.QueueListener(queue, handler)
break
return listener
else:
try:
record = queue.get(timeout=2)
except:
continue
listener_logger.handle(record)


def worker_process(id, queue, flag_queue):
def worker_process(id, queue):
try:
try:
logger = logger_class.get_worker_logger(id, queue)
logger = logger_class.get_worker_logger(id, queue)
for _ in range(10):
for _ in range(10):
logger.info(time.time())
logger.info(time.time())
time.sleep(1)
time.sleep(1)
finally:
finally:
print('worker stop!')
print('worker stop!')
flag_queue.get()
if __name__ == "__main__":
if __name__ == "__main__":
queue = multiprocessing.Queue(-1)
queue = multiprocessing.Queue(-1)
flag_queue = multiprocessing.Queue(-1)
id_list = ['00', '01', '02', '03']
id_list = ['00', '01', '02', '03']
process_pool = []
process_pool = []
for id in id_list:
for id in id_list:
flag_queue.put(id)
p = multiprocessing.Process(target=worker_process, args=(id, queue,))

p = multiprocessing.Process(target=worker_process, args=(id, queue, flag_queue,))
p.start()
p.start()
process_pool.append(p)
process_pool.append(p)


listener = multiprocessing.Process(
listener = listener_process(queue)
target=listener_process, args=(queue, flag_queue,))
listener.start()
listener.start()
for p in process_pool:
for p in process_pool:
p.join()
p.join()
listener.join()
listener.stop()