13. asyncio

13. asyncio

13. asyncio

4 май 2015

Конкурентност vs. паралелизъм?

anyone?

Похвати за конкурентност?

зелена нишка ∈ системна нишка ∈ процес

Преди това...

Още малко за multiprocessing

Паралелизиран map

import urllib
from multiprocessing import Pool

urls = [
    'http://www.python.org',
    'http://www.python.org/about/',
    'http://github.com/',
    'http://kefche.com/',
]

def fetch(url):
    try:
        return urllib.request.urlopen(url).status
    except urllib.error.URLError as e:
        return e.code

pool = Pool(4)
statuses = pool.map(fetch, urls)
pool.close()
pool.join()

Gotchas

fetch() трябва да е дефинирана преди конструирането на pool (спомнете си какво точно прави fork())

yield from

Дърво 1.0

Да обхождаме дървета

class Node:
    def __init__(self, value):
        self.left = []
        self.value = value
        self.right = []

    def iterate(self):
        for node in self.left:
            yield node.value
        yield self.value
        for node in self.right:
            yield node.value

Къде сбъркахме?

Дърво 2.0


def iterate(self):
    for node in self.left:
        yield from node.iterate()
    yield self.value
    for node in self.right:
        yield from node.iterate()

Асинхронно програмиране

програмиране смешни. за Шегите са асинхронно винаги

Всяка по-голяма задача винаги е последователност от много на брой по-малки задачи, които трябва да бъдат пресметнати в определена последователност. Няколко такива задачи могат да се случват паралелно или поне конкурентно в рамките на отделни нишки или процеси.

Менажирането на такива неща обаче не е проста работа, трябва да join-ваме, wait-ваме, sync-ваме. Има много различни абстракции от по-високо ниво на тази тема, опитващи се да решат проблема с менажирането на такиъв тип конкурентни задачи.

Бъдеща

future е обект, който енкапсулира нещо за изпълнение в конкурентна среда.
Държи състоянието му и ни дава удобен интерфейс за работа с него

concurrent.futures

Реализация на Future-и в python

Дава ни следните класове, с които да работим

+-- Future
+-- Executor
  +-- ThreadPoolExecutor
  +-- ProcessPoolExecutor

concurrent.futures.Future

concurrent.futures.Executor

Не използваме `Future`-ите директно, а ги създаваме през съответния `Executor`

Paralellizing CPU-bound tasks

Помните ли как ви казахме, че Python не може да решава такива проблеми?

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
  # ...

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('{} is prime: {}'.format(number, prime))

пълния пример

Да си поиграем с това

import time
from concurrent.futures import ProcessPoolExecutor

futures = []
seconds = 0

with ProcessPoolExecutor(max_workers=3) as executor:
    print('Starting')
    for i in range(3):
        print(i)
        futures.append(executor.submit(pow, 323, 1235390))

    print('Started')
    while True:
        if all(future.done() for future in futures):
            print('All done!')
            results = [future.result() for future in futures]
            break
        else:
            print('still waiting')
            seconds += 1
            time.sleep(1)


print('It took %d seconds' % seconds)

executors

Много са забавни, защото наследявайки Executor и имплементирайки съотвентия набор от методи може да направите прозиволни неща от типа на паралелизиране чрез изпращане към remote машини и прочее. Това разбира се не е точно тривиално.

Корутини

Малки независими изпълними единици, които кооперативно си предават контрол над управлението.

Корутини

Най-очевидния пример за това в python са генераторите

greenlets

асинхронен вход/изход

Както диска, така и мрежата (особено мрежата) могат и почти задължително са в порядъци по-бавни от процесора.
Това ще рече, че докато чака за данни процесора спокойно може да свърши много други неща, вместо само да си седи така.

асинхронен вход/изход

Обикновено операционната система оплътнява такива момента, като предава контрол на друга нишка когато текущата заяви, че чака външни резурси.

Понякога обаче искаме нашата програма да се възползва по-пълноценно от процесора, въпреки чакането на диска и/или мрежата.

асинхронен вход/изход

„Класически“ подход


calculations()
file_data = open('/some/place') # until the file is read our process does nothing
more_caluculations(file_data)

генералната идея

Да заявим, че искаме достъп до някакъв ресурс и чак по-късно да обработим данните пристигнали от него

Възможни решения

МРЕЖАТА(общи приказки)

OSI модела в Wikipedia

МРЕЖАТА(общи приказки)

Когато мъж и жена се обичат Когато две машини обменят информация през мрежата, има няколко нива на абстракция ангажирани в този процес.

За нас са интересни socket-ите. Те са концепция от транспортния слой, което не е толкова важно в момента и може и да го забравите.

socket е обект (генерално в операционната ви система), чрез който пишете в, и чете от, мрежова връзка

socket-и

До голяма степен интерфейса, който ни се предоставя от socket-ите е сходен с този на файловете (четене, писане). В много операционни системи има някаква обща абстракция над двете.

Четенето и писането по socket-и през мрежата е още по-бавно от достъпването на файлове.

asyncio

Появява се в python 3.4

Основната му идея е да даде инфраструктура за асинхронни операции по мрежата

Може да се ползва и за асинхронни дискови операции, но няма хубав интерфейс за тази цел

Най-общо казано

Дефинираме корутини, които да се изпълняват асинхронно.

Event loop

Искаме да реагираме на събитие по определен начин.

Event loop-а проверява на определен (относително кратък) период от време дали се е случило събитие и ни уевдомява, за да можем да реагираме

Event loop

Когато използваме asyncio всяка нишка има по един event loop, отговарящ за задачите в нея

Event loop

GO! GO! GO!

Пример

import asyncio
import random


@asyncio.coroutine
def slow_operation():
    sleep = random.choice(range(1, 6))
    print('Starting slow operation')
    yield from asyncio.sleep(sleep)
    print('Finished slow operation')
    return 'A return value from the slow_operation coroutine took %s seconds' % sleep


def got_result(future):
    print('Requsting result')
    print(future.result())
    print('Got result')
    loop.stop()


loop = asyncio.get_event_loop()
task = asyncio.Task(slow_operation())
task.add_done_callback(got_result)

try:
    loop.run_forever()
finally:
    loop.close()

Пример

import asyncio


@asyncio.coroutine
def factorial(name, number):
    f = 1
    for i in range(2, number+1):
        print("Task %s: Compute factorial(%s)..." % (name, i))
        yield from asyncio.sleep(1)
        f *= i
    print("Task %s: factorial(%s) = %s" % (name, number, f))


tasks = [
    asyncio.Task(factorial("A", 2)),
    asyncio.Task(factorial("B", 3)),
    asyncio.Task(factorial("C", 4)),
    asyncio.Task(factorial("D", 10)),
]


loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

Пример

import asyncio
import urllib.parse
import sys


@asyncio.coroutine
def print_http_headers(url):
    print('Fetching %s' % url)
    url = urllib.parse.urlsplit(url)
    reader, writer = yield from asyncio.open_connection(url.hostname, 80)
    query = ('HEAD {url.path} HTTP/1.0\r\n'
             'Host: {url.hostname}\r\n'
             '\r\n').format(url=url)
    writer.write(query.encode('latin-1'))
    while True:
        line = yield from reader.readline()
        if not line:
            break
        line = line.decode('latin1').rstrip()
        if line:
            print('HTTP header(%s)> %s' % (url.hostname, line))


loop = asyncio.get_event_loop()

tasks = [
    asyncio.async(print_http_headers('http://fmi.ruby.bg')),
    asyncio.async(print_http_headers('http://fmi.py-bg.net')),
    asyncio.async(print_http_headers('http://fmi.golang.bg')),
]

loop.run_until_complete(asyncio.wait(tasks))
loop.close()

Further reading

Силно препоръчваме

Въпроси?