Программирование на языке Ruby - Хэл Фултон
Шрифт:
Интервал:
Закладка:
Метод класса ThreadGroup.new создает новую группу потоков, а метод экземпляра add помещает поток в группу.
f1 = Thread.new("file1") { |file| waitfor(file) }
f2 = Thread.new("file2") { |file| waitfor(file) }
file_threads = ThreadGroup.new
file_threads.add f1
file_threads.add f2
Метод экземпляра list возвращает массив всех потоков, принадлежащих данной группе.
# Подсчитать все "живые" потоки в группе this_group.
count = 0
this_group.list.each {|x| count += 1 if x.alive? }
if count < this_group.list.size
puts "Некоторые потоки в группе this_group уже скончались."
else
puts "Все потоки в группе this_group живы."
end
В класс ThreadGroup можно добавить немало полезных методов. В примере ниже показаны методы для возобновления всех потоков, принадлежащих группе, для группового ожидания потоков (с помощью join) и для группового завершения потоков:
class ThreadGroup
def wakeup
list.each { |t| t.wakeup }
end
def join
list.each { |t| t.join if t != Thread.current }
end
def kill
list.each { |t| t.kill }
end
end
13.2. Синхронизация потоков
Почему необходима синхронизация? Потому что из-за «чередования» операций доступ к переменным и другим сущностям может осуществляться в порядке, который не удается установить путем чтения исходного текста отдельных потоков. Два и более потоков, обращающихся к одной и той же переменной, могут взаимодействовать между собой непредвиденными способами, и отлаживать такую программу очень трудно.
Рассмотрим простой пример:
x = 0
t1 = Thread.new do
1.upto(1000) do
x = x + 1
end
end
t2 = Thread.new do
1.upto(1000) do
x = x + 1
end
end
t1.join
t2.join
puts x
Сначала переменная x равна 0. Каждый поток увеличивает ее значение на тысячу раз. Логика подсказывает, что в конце должно быть напечатано 2000.
Но фактический результат противоречит логике. На конкретной машине было напечатано значение 1044. В чем дело?
Мы предполагали, что инкремент целого числа — атомарная (неделимая) операция. Но это не так. Рассмотрим последовательность выполнения приведенной выше программы. Поместим поток t1 слева, а поток t2 справа. Каждый квант времени занимает одну строчку и предполагается, что к моменту, когда был сделан этот мгновенный снимок, переменная x имела значение 123.
t1 t2
-------------------------- -----------------------------
Прочитать значение x (123)
Прочитать значение x (123)
Увеличить значение на 1 (124)
Увеличить значение на 1 (124)
Записать результат в x
Записать результат в x
Ясно, что каждый поток увеличивает на 1 то значение, которое видит. Но не менее ясно и то, что после увеличения на 1 обоими потоками x оказалось равно всего 124.
И это лишь самая простая из проблем, возникающих в связи с синхронизацией. Для решения более сложных приходится прилагать серьезные усилия — это предмет изучения специалистами в области теоретической информатики и математики.
13.2.1. Синхронизация с помощью критических секций
Простейший способ синхронизации дают критические секции. Когда поток входит в критическую секцию программы, гарантируется, что никакой другой поток не войдет в нее, пока первый не выйдет.
Если акцессору Thread.critical присвоить значение true, то выполнение других потоков не будет планироваться. В следующем примере мы переработали код предыдущего, воспользовавшись акцессором critical для определения критической области, которая защищает уязвимые участки программы.
x = 0
t1 = Thread.new do
1.upto(1000) do
Thread.critical = true
x = x + 1
Thread.critical = false
end
end
t2 = Thread.new do
1.upto(1000) do
Thread.critical = true
x = x + 1
Thread.critical = false
end
end
t1.join
t2.join
puts x
Теперь последовательность выполнения изменилась; взгляните, в каком порядке работают потоки t1 и t2. (Конечно, вне того участка, где происходит увеличение переменной, потоки могут чередоваться более-менее случайным образом.)
t1 t2
----------------------------- -----------------------------
Прочитать значение x (123)
Увеличить значение на 1 (124)
Записать результат в x
Прочитать значение x (124)
Увеличить значение на 1 (125)
Записать результат в x
Возможны такие комбинации операций с потоками, при которых поток планируется даже тогда, когда какой-то другой поток находится в критической секции.
Простейший случай — вновь созданный поток начинает исполнение немедленно вне зависимости от того, занимает какой-то другой поток критическую секцию или нет. Поэтому описанную технику лучше применять только в самых простых ситуациях.
13.2.2. Синхронизация доступа к ресурсам (mutex.rb)
В качестве примера рассмотрим задачу индексирования Web-сайтов. Мы извлекаем слова из многочисленных страниц в Сети и сохраняем их в хэше. Ключом является само слово, а значением — строка, идентифицирующая документ и номер строки в этом документе.
Постановка задачи и так достаточно груба. Но мы огрубим ее еще больше, введя следующие упрощающие допущения:
• будем представлять удаленные документы в виде строк;
• ограничимся всего тремя строками (они будут «зашиты» в код);
• сетевые задержки будем моделировать «засыпанием» на случайный промежуток времени.
Взгляните на программу в листинге 13.1. Она даже не печатает получаемые данные целиком, а выводит лишь счетчик слов (не уникальный). Каждый раз при чтении или обновлении хэша мы вызываем метод hesitate, который приостанавливает поток на случайное время. Тем самым поведение программы становится недетерминированным и приближенным к реальности.
Листинг 13.1. Программа индексирования с ошибками (гонка)@list = []
@list[0]="shoes shipsnsealing-wax"
@list[1]="cabbages kings"
@list[2]="quarksnshipsncabbages"
def hesitate
sleep rand(0)
end
@hash = {}
def process_list(listnum)
lnum = 0
@list[listnum].each do |line|
words = line.chomp.split
words.each do |w|
hesitate
if @hash[w]
hesitate
@hash[w] += ["#{listnum}:#{lnum}"]
else
hesitate
@hash[w] = ["#{listnum}:#{lnum}"]
end
end
lnum += 1
end
end
t1 = Thread.new(0) {|num| process_list(num) }
t2 = Thread.new(1) {|num| process_list(num) }
t3 = Thread.new(2) {|num| process_list(num) }
t1.join
t2.join
t3.join
count = 0
@hash.values.each {|v| count += v.size }
puts "Всего слов: #{count} " # Может быть напечатано 7 или 8!
Здесь имеется проблема. Если ваша система ведет себя примерно так же, как наша, то программа может напечатать одно из двух значений! В наших тестах с одинаковой вероятностью печаталось 7 или 8. Если слов и списков больше, то и разброс окажется более широким.
Попробуем исправить положение с помощью мьютекса, который будет контролировать доступ к разделяемому ресурсу. (Слово «mutex» — это сокращение от mutual exclusion, «взаимная блокировка».)
Обратимся к листингу 13.2. Библиотека Mutex позволяет создавать мьютексы и манипулировать ими. Мы можем захватить (lock) мьютекс перед доступом к хэшу и освободить (unlock) его по завершении операции.
Листинг 13.2. Программа индексирования с мьютексомrequire 'thread.rb'
@list = []
@list[0]="shoes shipsnsealing-wax"
@list[1]="cabbages kings"
@list[2]="quarksnshipsncabbages"
def hesitate
sleep rand(0)
end
@hash = {}
@mutex = Mutex.new
def process_list(listnum)
lnum = 0
@list[listnum].each do |line|