anyone?
зелена нишка ∈ системна нишка ∈ процес
Още малко за multiprocessing
import urllib
from multiprocessing import Pool
urls = [
'http://www.python.org',
'http://www.python.org/about/',
'http://github.com/',
'http://kefche.com/',
]
def fetch(url):
try:
return urllib.request.urlopen(url).status
except urllib.error.URLError as e:
return e.code
pool = Pool(4)
statuses = pool.map(fetch, urls)
pool.close()
pool.join()
fetch()
трябва да е дефинирана преди конструирането на pool
(спомнете си какво точно прави fork()
)
Да обхождаме дървета
class Node:
def __init__(self, value):
self.left = []
self.value = value
self.right = []
def iterate(self):
for node in self.left:
yield node.value
yield self.value
for node in self.right:
yield node.value
Къде сбъркахме?
…
def iterate(self):
for node in self.left:
yield from node.iterate()
yield self.value
for node in self.right:
yield from node.iterate()
програмиране смешни. за Шегите са асинхронно винаги
Всяка по-голяма задача винаги е последователност от много на брой по-малки задачи, които трябва да бъдат пресметнати в определена последователност. Няколко такива задачи могат да се случват паралелно или поне конкурентно в рамките на отделни нишки или процеси.
Менажирането на такива неща обаче не е проста работа, трябва да join-ваме, wait-ваме, sync-ваме. Има много различни абстракции от по-високо ниво на тази тема, опитващи се да решат проблема с менажирането на такиъв тип конкурентни задачи.
future
е обект, който енкапсулира нещо за изпълнение в конкурентна среда.Държи състоянието му и ни дава удобен интерфейс за работа с него
Реализация на Future-и в python
Дава ни следните класове, с които да работим
+-- Future
+-- Executor
+-- ThreadPoolExecutor
+-- ProcessPoolExecutor
Не използваме `Future`-ите директно, а ги създаваме през съответния `Executor`
Помните ли как ви казахме, че Python не може да решава такива проблеми?
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
# ...
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('{} is prime: {}'.format(number, prime))
import time
from concurrent.futures import ProcessPoolExecutor
futures = []
seconds = 0
with ProcessPoolExecutor(max_workers=3) as executor:
print('Starting')
for i in range(3):
print(i)
futures.append(executor.submit(pow, 323, 1235390))
print('Started')
while True:
if all(future.done() for future in futures):
print('All done!')
results = [future.result() for future in futures]
break
else:
print('still waiting')
seconds += 1
time.sleep(1)
print('It took %d seconds' % seconds)
Много са забавни, защото наследявайки Executor
и имплементирайки съотвентия набор от методи може да направите прозиволни неща от типа на паралелизиране чрез изпращане към remote машини и прочее. Това разбира се не е точно тривиално.
Малки независими изпълними единици, които кооперативно си предават контрол над управлението.
Най-очевидния пример за това в python са генераторите
В някакъв момент кода очаква данни от външен ресурс
Както диска, така и мрежата (особено мрежата) могат и почти задължително са в порядъци по-бавни от процесора.Това ще рече, че докато чака за данни процесора спокойно може да свърши много други неща, вместо само да си седи така.
Обикновено операционната система оплътнява такива момента, като предава контрол на друга нишка когато текущата заяви, че чака външни резурси.
Понякога обаче искаме нашата програма да се възползва по-пълноценно от процесора, въпреки чакането на диска и/или мрежата.
„Класически“ подход
…
calculations()
file_data = open('/some/place') # until the file is read our process does nothing
more_caluculations(file_data)
…
Да заявим, че искаме достъп до някакъв ресурс и чак по-късно да обработим данните пристигнали от него
Възможни решения
Когато мъж и жена се обичат Когато две машини обменят информация през мрежата, има няколко нива на абстракция ангажирани в този процес.
За нас са интересни socket-ите. Те са концепция от транспортния слой, което не е толкова важно в момента и може и да го забравите.
socket е обект (генерално в операционната ви система), чрез който пишете в, и чете от, мрежова връзка
До голяма степен интерфейса, който ни се предоставя от socket-ите е сходен с този на файловете (четене, писане). В много операционни системи има някаква обща абстракция над двете.
Четенето и писането по socket-и през мрежата е още по-бавно от достъпването на файлове.
Появява се в python 3.4
Основната му идея е да даде инфраструктура за асинхронни операции по мрежата
Може да се ползва и за асинхронни дискови операции, но няма хубав интерфейс за тази цел
Дефинираме корутини, които да се изпълняват асинхронно.
Искаме да реагираме на събитие по определен начин.
Event loop-а проверява на определен (относително кратък) период от време дали се е случило събитие и ни уевдомява, за да можем да реагираме
Когато използваме asyncio
всяка нишка има по един event loop, отговарящ за задачите в нея
asyncio.get_event_loop
- връща текущия event loop обект за нишката, в която смеasyncio.set_event_loop
- позволява да променим event loop-а за текущата нишкаasyncio.set_event_loop_policy
- позволява да променим политиката на event loop-а за текущата нишка (не е просто)asyncio.get_event_loop_policy
asyncio.is_running
- проверява дали текущия loop "се върти"GO! GO! GO!
loop.run_until_complete(coroutine)
- пускаме събитийния цикъл и той ще спре когато подадената корутина приключи изпълнението сиloop.run_forever()
- пускаме събитийния цикъл "завинаги", т.е. докато изрично не го спремloop.stop()
- спираме съответния loopimport asyncio
import random
@asyncio.coroutine
def slow_operation():
sleep = random.choice(range(1, 6))
print('Starting slow operation')
yield from asyncio.sleep(sleep)
print('Finished slow operation')
return 'A return value from the slow_operation coroutine took %s seconds' % sleep
def got_result(future):
print('Requsting result')
print(future.result())
print('Got result')
loop.stop()
loop = asyncio.get_event_loop()
task = asyncio.Task(slow_operation())
task.add_done_callback(got_result)
try:
loop.run_forever()
finally:
loop.close()
import asyncio
@asyncio.coroutine
def factorial(name, number):
f = 1
for i in range(2, number+1):
print("Task %s: Compute factorial(%s)..." % (name, i))
yield from asyncio.sleep(1)
f *= i
print("Task %s: factorial(%s) = %s" % (name, number, f))
tasks = [
asyncio.Task(factorial("A", 2)),
asyncio.Task(factorial("B", 3)),
asyncio.Task(factorial("C", 4)),
asyncio.Task(factorial("D", 10)),
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
import asyncio
import urllib.parse
import sys
@asyncio.coroutine
def print_http_headers(url):
print('Fetching %s' % url)
url = urllib.parse.urlsplit(url)
reader, writer = yield from asyncio.open_connection(url.hostname, 80)
query = ('HEAD {url.path} HTTP/1.0\r\n'
'Host: {url.hostname}\r\n'
'\r\n').format(url=url)
writer.write(query.encode('latin-1'))
while True:
line = yield from reader.readline()
if not line:
break
line = line.decode('latin1').rstrip()
if line:
print('HTTP header(%s)> %s' % (url.hostname, line))
loop = asyncio.get_event_loop()
tasks = [
asyncio.async(print_http_headers('http://fmi.ruby.bg')),
asyncio.async(print_http_headers('http://fmi.py-bg.net')),
asyncio.async(print_http_headers('http://fmi.golang.bg')),
]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
Силно препоръчваме