ServerSocket server = new ServerSocket(11111);
Socket socket = server.accept();
InputStream is = socket.getInputStream();
is.read(requestBytes);
OutputStream os = socket.getOutputStream();
os.write(responseBytes);
os.flush();
- ❌ Не закрывает сокет
- ❌ Этот сервер работает тупо с 1 клиентом
- ❌ Обрабатывает клиентов по очереди
- ❌ Закрывает соединения - не можем устроить продолжительного соеднения с клиентом (начинаем работать с новым клиентом только по окончании работы с предыдущим)
1 клиент = 1 поток =>
-
❌ Дорого создавать потоки
-
❌ Не переиспользуем их
-
✔️ Выделили в отдельную сущность и удобно писать
Fix: тред пул (какие на него кидать задачи?)
1 задача = 1 клиент =>
- ❌ Тред пул хочет отработать со всеми клиентами до конца => это плохо, потому что n+1-ый клиент будет ждать в очереди
Fix: CachedThreadPool
Используем CachedThreadPool
- он создаст новый поток для новоподключившегося клиента - тот не будет ждать.
- ❌ Большое количество потоков: сколько клиентов одновременно подключилось, столько и будет.
Что вообще из себя представляет общение с клиентом? Мы - сервер, и клиент присылает нам запрос - мы должны его обработать и прислать клиенту ответ.
Что делает тот поток:
- Принимает запрос от клиента
- Кладёт это сообщение в очередь на обработку в пул потоков
- Дожидается обработки и отсылает ответ
-
➖ Почему это кажется хуже? Потому что увеличилось кол-во потоков.
-
➕ Почему это лучше? Это потоки почти всегда в blocking (заблокироканы на ввод-вывод). Они читают задачу за короткий промежуток времени, кладут её в thread pool и уходят в sleeping/waitng. Досчиталось - они проснулись, отправили и опять в blocking.
-
✔️ Потоков много, но работать будут только те, что в тред пуле, а его кол-во потоков можно выбирать эффективно исходя из кол-ва ядер на вашем компе.
-
❌ Нельзя обрабатывать несколько запросов от 1 и того же клиента одновременно.
Fix: специализировать поток ввода-вывода
На каждого клиента 2 потока: в одном - только чтение и передача в thread pool, в другом - отсылка данных клиенту.
Как реализовать: отправляющий поток - SingleThreadPoolExecutor
. Когда задача в thread pool досчиталась (она знает, с каким клиентом связана), она добавляет результат вывода в этот STPE
и он САМ последовательно выйдет в правильном порядке.
- ❗ У этих 2 потоков сокет - разделяемый ресурс, но по нему можно не блокироваться, потому что в Java гарантируется, что у сокета
Input-OutputStream
не зависят друг от друга.
Архитектура / Как часто клиенты шлют данные | Редко | Часто |
---|---|---|
Блокирующая | ✔️ Потоки всё время находятся в блокированном состоянии и ждут ввода-вывода. | ❌ Дофига потоков и тратится много времени на переключение контекста. |
‼️ В ситуации, когда много маленьких запросов, любая архитектура работает плохо - особенно на блокирующих реализациях.
Fix: использовать селектор
Каналы в селекторе находятся в неблокирующем режиме: "Неблокирующие операции - операции, которые завершаются очень быстро".
бычные операции блокирующие: если вы попытаетесь сделать read
, а там нет данных, вы сидите и ждёте до тех пор, пока данные придут.
Что такое неблокирующий read? Обычно говорят так: если данные к вам ещё не пришли, read
вернёт 0 (ничего нет) и сразу завершается, а, если пришли, то выдаёт их. Это враньё. Потому что данных мб много. Чтобы выдать все данные, нужно их оттуда выковырять и выдать, поэтому в неблокирующей ситуации read
часто возвращает кусочки данных - те, которые он может вернуть быстро.
Чтобы это лучше понять, посмотрим, где хранятся данные, которые мы читаем с помощью read
. Формально - в буфере на сетевой карте (СК). С тз работы user и kernel space, вы делаете запрос из US в KS и достаёте данные оттуда - это долгий запрос, поэтому там мб кусочек данных, а не они все.
Какая связь между "долгий запрос" и "кусочек данных"? Пусть у вас пришёл большой пакет и вам нужно его оттуда доставать - это мб долго (например, на СК большая нагрузка и она не может всё разом вытащить) => получите только кусочки.
Таки в чём проблема?
- Долгий переход в KS
- Мы долго копируем данные Формально, мы не знаем, что такое "долго" - в зависимости от ОС это может означать разные вещи.
Логика: я прошу данные у СК через ОС, а она просит их у драйвера. Т.е. через драйвер идёт запрос на СК и вы получаете данные. Впринципе, с СК одновременно могут в\д несколько процессов. Как драйвер достаёт это всё из СК, зависит от сетевого драйвера: может порциями так, что кусочек будет ваш, а кусочек - для другого процесса. Именно в такой ситуации часть наших данных останутся лежать на СК.
У нас была архитектутра сервера на блокирующих сокетах, которая выглядела так:
- На каждого клиента порождалось 1 или 2 потока, которые занимались приёмкой и отсылкой данных клиенту.
- thread pool, куда складывались задачи от клиентов на обработку.
Если клиенты шлют данные часто, то у меня дофига потоков и тратится много времени на переключение контекста. Чтобы пофиксить, будем использовать селектор! Он позволит нам выдать список каналов, в которые пришли данные, или в которые можно записать данные. + можем делать это в 1 потоке! Т.е. получить несколько каналов и отдать задачи чтения из них, например, в thread pool или ещё что-то. Т.е. у нас нет 100500 потоков, связанных с клиентом - в этом преимущество перед блокирующим режимом.
У меня где-то есть ServerSocketChannel
, который делает accept
- как его хорошо делать?
- в виде отельного потока, в БЛОКИРУЮЩЕМ режиме почти всегда (ничего страшного в этом нет, т.к. большую часть времени он находится в режиме блокировки, пока не подключится какой-нибудь клиент)
- если совсем важно кол-во потоков, можно отправить его в селектор, который "там где-то будет"
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
while(true) {
SocketChannel socketChannel = serverSocketChannel.accept();
// do something with socketChannel...
}
serverSocketChannel.close();
Хорошо, крутится. Приходит новый клиент и у нас образуется новый SocketChannel
- что делать?
- где-то есть селектор - зарегистрируем этот SC на какие-то операции (пока считаем, что на read и write); здесь обработка клиента в потоке, который аксептит, заканчивается)
Selector selector = Selector.open();
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
Где-то есть другой поток, где крутится селектор: в цикле делает селекты и, когда селект срабатывает, это означает, что можно либо почитать, либо пописать.
while(keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if (key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.
} else if (key.isConnectable()) {
// a connection was established with a remote server.
} else if (key.isReadable()) {
// a channel is ready for reading
} else if (key.isWritable()) {
// a channel is ready for writing
}
keyIterator.remove();
}
Неудобно.
- ✔️ Надо делать 2 селектора: 1 на read, другой - на write.
- ❌ Почему плохо делать общий селектор? Каналы 90% своего времени готовы на write, поэтому такие селекторы будут постоянно просыпаться (мы не хотим этого).
Селектор возвращает сет SelectionKey; SK умеет:
- interestOps()
- readyOps()
- selector()
- channel()
- attach(theObj)
Что можно использовать в качестве attach obj? Обычно используют объект, идентифицирующий в вашей проге клиента. Можно прикрепить его туда и, когда какой-то канал окажется готовым, вам вернётся SelectionKey, вы у него запросите attachment() и это ваш клиент, с которым вы можете работать (не нужно хранить мапу или что-то ещё...).
Он сделал селект и получил набор клиентских каналов, готовых к чтению. Мы хотим из них прочитать, но может так выйти, что прочитаем пакет не целиком, а только кусочек. Что делать?
- читаем с каналов в буфер
- буфер - такая штука, в которую, если не переводить из режима в режим, можно продолжать читать с того же места, где остановились; создадим каждому клиенту буфер и будем записывать в него до тех пор, пока не получим сообщение нужного размера; как только получли - закиндываем его на обработку в thread pool
Пусть всё попало в thread pool и я справился со всем чтением со всех клиентов в 1 потоке. В тред пуле задача посчиталась Что дальше?
- для каждого клиента создадим ещё по буферу, в который будем писать (нужно создать новый, т.к. в первый клиенты по-прежнему могут складывать свои запросы)
- кладём в эти буфера данные на вывод - выводом займётся селектор, в котором каналы зарегистрированы на запись
- будем регистрировать себя на write, если там ещё не зареган, а потом удалять себя
- команда регистрации в селекторе не очень долгая
- когда подсчитался результат какого-то клиента, вы смотрите, зарегистрированы ли вы в селекторе на отправку: если зареганы, то складываете на отправку; если не зареганы, то регаете и складываете туда сообщение
Этот селектор крутится и, когда у него какой-то канал разродился, можно попытаться отправить порцию данных; удаляем-"разрегистрируем" канал, когда закончилась отправка.
❗ У нас 1 буфер на все ответы для каждого конкретного клиента, поэтому, если для 1 клиента одновременно закончили выполняться в thread pool сразу несколько задач, буфер становится разделяемым ресурсом и по нему нужно брать блокировку. ❗ Часто используется не 1 буфер, а очередь с приоритетами из буферов, чтобы ответы на запросы клиента вернуть в порядке, приближенном к тому, в котором эти запросы поступали.
Теперь у нас 2 буфера на каждого клиента (в идеале - 1 буфер на чтение и очередь на запись).
- ➖ Куча проблем с регистрацией/разрегистрацией каналов
- ➕ Кол-во потоков: 3 + (у блокирующей - 2<клиенты> + 1 + <тредпул>)
Где в селекторах узкое место?
- ❌ в 1 потоке происходит получение данных ото всех клиентов: если в единицу времени приходит много данных, то вы не успеваете их читать из 1 потока
Как влияет кол-во потоков для чтения на скорость?
- я читаю с СК (которая одна!)
- 2 момента:
- выгрузить данные с СК (этим занимается драйвер, поэтому всё равно, сколько потоков в этом участвуют)
- полученные данные нужно сформировать в запрос клиента и положить в thread pool
Теперь представим ситуацию большого кол-ва клиентов, которые фигачат данные. Как работает селектор? Он получил, что куча клиентов готова к риду: он бежит по ним и у каждого делает рид по 2 байта; после этого спрашивает, кто готов к риду - все готовы - он снова по ним бежит и читает по 2 байта.
Если бы он делал это в однопоточном режиме (по потоку на клиента), было бы плохо, что есть переключение контекста, но как бы работал каждый из них? Сделал блокирующий read и разом получил запрос клиента, отправил его в thread pool и уснул.
Откуда может поступать много данных?
- шлются огромные запросы от клиента
- короткие запросы но ОЧЕНЬ часто
Ещё раз: моё узкое место в том, что я читаю все данные из 1 потока. Как с этим жить?
- обычно люди заводят несколько селекторов, которые делают 1 и то же, но в разных потоках (конкретный клиент попадает к 1 из этих селекторов)
Fix без балансировщика нагрузки: aсинхронные каналы
Суть асинхронных каналов - команда read заканчивается мгновенно и где-то там происходит чтение. Когда оно закончилось, результат либо кладётся во Future
, либо вызывается callback. Нужно обсудить несколько вещей.
- Рид (настоящий, не тот, который вызываю), происходящий где-то внутри, блокирующий или нет? Он блокирующий. Неблокирующий отличается от блокирующего тем, что завершается очень быстро. Почему тогда можем вернуть не всё? Ещё раз: вам нужно скопировать данные из KS в US - они оттуда достаются какими-то порциями (размер которых зависит от ОС). В чём смысл: данные приходят на СК и складываются в буфер. После этого они должны раскидываться приложениЯМ. Буфер есть на каждом слое стека протоколов TCP/IP. В какой-то момент данные с тз стека протоколов должны дойти до вашего приложения. По факту это означает, что они загружены в US и готовы к тому, что, если ваше приложение сделает read, то оно их получает. В какой момент данные проталкиваются через этот стек? Зависит от ОС. До уровня TCP они проталкиваются автоматом, как пришли, а, когда они переходят на уровень Application, есть 2 политики:
- ОС сама достаёт то, что приходит в TCP, загружает и где-то хранит у себя
- ОС нужен запрос от приложения по этому порту и по этому порту она начинает из KS вытаскивать данные наверх
(2) вариант:
- ❌ Мы пытаемся доставать данные только в тот момент, когда нам нужно
- ✔️ Мы не занимаем буфер и не нагружаем лишней работой
Как же работает операция неблокирующего чтения: спрашиваем у TCP-шного слоя, есть ли у него ЧЁ и он либо даёт что-то, либо говорит, что ничего нет, и мы сразу закончили работу.
Как же работает операция блокирующего чтения? Внутри ОС есть механизм, который позволяет потокам, заблокированным на ввод-вывод, просыпаться в тот момент, когда в то место, которое они ждут, пришли данные. Это и есть ситуация blocking потока по вводу-выводу (не wating или sleeping), Как работает блокирующий ввод: мы блокируемся, и, когда на TCP-слой данные пришли, они копируются на уровень Application, и в этот момент мы просыпаемся.
Как это устроено? У вас есть 1 поток на все асинхронные каналы, который слушает всё. Он слушает все блокировки и, когда какой-то клиент готов и скопировал свои данные в US, он будит соответствующий поток. Этот поток, условно говоря, ставит галочку у соответствующего хендлера, что его можно исполнить. Этот хендлер будет исполнять ОС. Кем будится блокирующий read, когда приходят данные? ОСью.
Ещё раз: неправда, что на каждый асинхронный read 1 поток; неправда, что есть некая неблокирующая сущность, которая собирает всё в 1 кучку, а потом дергает хендлер. Там есть 1 поток, который чудным для ОС образом слушает вообще всё, просыпается, в момент когда кто-то из каналов выдал свои данные и ставит галочку хендлеру. Этот поток специфичен для ОС.
Окей, где исполнять хендлеры? Здесь помогут группы асинхронных каналов. С каждым асинхронным каналом связана некоторая группа - это thread pool, в который попадают хендлеры, которые надо исполнить.
Простой ответ, что такое группа каналов - штука, в которой выполняются ваши callback'и.
Какого размера (сколько тредов) встроенная асинхронная группа каналов? Напрашиваются 2 ответа, но правильного ответа не знает никто:
- 1
- <кол-во ядер> - 1
О (2):
- ❌ Надо брать блокировки, т.к. хендлеры могут работать с разделяемыми ресурсами.
- ❌ Увеличил кол-во потоков - появились разделяемые ресурсы => теряю время на блокировки; если callback'и маленькие, за счёт переключения контекста буду хуже работать
О (1):
- ✔️ Callback'и друг от друга независимы - они не могут работать одновременно => м\у callback'ами нет разделяемых ресурсов (они мб м\у callback'ами и чем-то другим в вашей программе, но м\у колбеками их нет и вы не завязаны на блокировки)
- ❌ Если колбек выполняется очень долго, то ничего не работает :(