Программирование на языке Ruby - Хэл Фултон
Шрифт:
Интервал:
Закладка:
def process_list(listnum)
lnum = 0
@list[listnum].each do |line|
words = line.chomp.split
words.each do |w|
hesitate
@mutex.lock
if @hash[w]
hesitate
@hash[w] += ["#{listnum}:#{lnum}"]
else
hesitate
@hash[w] = ["#{listnum}:#{lnum}"]
end
@mutex.unlock
end
lnum += 1
end
end
t1 = Thread.new(0) {|num| process_list(num) }
t2 = Thread.new(1) {|num| process_list(num) }
t3 = Thread.new(2) {|num| process_list(num) }
t1.join
t2.join
t3.join
count = 0
@hash.values.each {|v| count += v.size }
puts "Всего слов: #{count} " # Всегда печатается 8!
Отметим, что помимо метода lock в классе Mutex есть также метод try_lock. Он отличается от lock тем, что если мьютекс уже захвачен другим потоком, то он не дожидается освобождения, а сразу возвращает false.
require 'thread'
mutex = Mutex.new
t1 = Thread.new { mutex.lock; sleep 30 }
sleep 1
t2 = Thread.new do
if mutex.try_lock
puts "Захватил"
else
puts "He сумел захватить" # Печатается немедленно.
end
end
sleep 2
Эта возможность полезна, если поток не хочет приостанавливать выполнение. Есть также метод synchronize, который захватывает мьютекс, а потом автоматически освобождает его.
mutex = Mutex.new
mutex.synchronize do
# Любой код, нуждающийся в защите...
end
Существует еще библиотека mutex_m, где определен модуль Mutex_m, который можно подмешивать к классу (или использовать для расширения объекта). У такого расширенного объекта будут все методы мьютекса, так что он сам может выступать в роли мьютекса.
require 'mutex_m'
class MyClass
include Mutex_m
# Теперь любой объект класса MyClass может вызывать
# методы lock, unlock, synchronize...
# Внешние объекты также могут вызывать эти
# методы для объекта MyClass.
end
13.2.3. Предопределенные классы синхронизированных очередей
В библиотеке thread.rb есть пара классов, которые иногда бывают полезны. Класс Queue реализует безопасную относительно потоков очередь, доступ к обоим концам которой синхронизирован. Это означает, что разные потоки могут, ничего не опасаясь, работать с такой очередью. Класс SizedQueue отличается от предыдущего тем, что позволяет ограничить размер очереди (число элементов в ней).
Оба класса имеют практически один и тот же набор методов, поскольку SizedQueue наследует Queue. Правда, в подклассе определен еще акцессор max, позволяющий получить и установить максимальный размер очереди.
buff = SizedQueue.new(25)
upper1 = buff.max #25
# Увеличить размер очереди...
buff.max = 50
upper2 = buff.max # 50
В листинге 13.3 приведено решение задачи о производителе и потребителе. Для производителя задержка (аргумент sleep) чуть больше, чем для потребителя, чтобы единицы продукции «накапливались».
Листинг 13.3. Задача о производителе и потребителеrequire 'thread'
buffer = SizedQueue.new(2)
producer = Thread.new do
item = 0
loop do
sleep rand 0
puts "Производитель произвел #{item}"
buffer.enq item
item += 1
end
end
consumer = Thread.new do
loop do
sleep (rand 0)+0.9
item = buffer.deq
puts "Потребитель потребил #{item}"
puts " ожидает = #{buffer.num_waiting}"
end
end
sleep 60 # Работать одну минуту, потом завершить оба потока.
Чтобы поместить элемент в очередь и извлечь из нее, рекомендуется применять соответственно методы enq и deq. Можно было бы для помещения в очередь пользоваться также методом push, а для извлечения — методами pop и shift, но их названия не так мнемоничны в применении к очередям.
Метод empty? проверяет, пуста ли очередь, а метод clear опустошает ее. Метод size (и его синоним length) возвращает число элементов в очереди.
# Предполагается, что другие потоки не мешают...
buff = Queue.new
buff.enq "one"
buff.enq "two"
buff.enq "three"
n1 = buff.size # 3
flag1 = buff.empty? # false
buff.clear
n2 = buff.size # 0
flag2 = buff.empty? # true
Метод num_waiting возвращает число потоков, ожидающих доступа к очереди. Если размер очереди не ограничен, то это потоки, ожидающие возможности удалить элементы; для ограниченной очереди включаются также потоки, пытающиеся добавить элементы.
Необязательный параметр non_block метода deq в классе Queue по умолчанию равен false. Если же он равен true, по при попытке извлечь элемент из пустой очереди он не блокирует поток, а возбуждает исключение ThreadError.
13.2.4. Условные переменные
Да зовите моих скрипачей, трубачей...
«Веселый король» (детский стишок)[16]Условная переменная — это, по существу, очередь потоков. Они используются в сочетании с мьютексами для лучшего управления синхронизацией потоков.
Условная переменная всегда ассоциируется с каким-то мьютексом. Ее назначение — освободить мьютекс до тех пор, пока не начнет выполняться определенное условие. Представьте себе ситуацию, когда поток захватил мьютекс, но не готов продолжать выполнение. Тогда он может заснуть под контролем условной переменной, ожидая, что будет разбужен, когда условие станет истинным.
Важно понимать, что пока поток ждет условную переменную, мьютекс свободен, поэтому другие потоки могут получить доступ к защищенному им ресурсу. А как только другой поток сигнализирует этой переменной, ожидающий поток пробуждается и пытается вновь захватить мьютекс.
Рассмотрим несколько искусственный пример в духе задачи об обедающих философах. Представьте себе, что вокруг стола сидят три скрипача, ожидающих своей очереди поиграть. Но у них есть всего две скрипки и один смычок. Понятно, что скрипач сможет играть, только если одновременно завладеет одной из скрипок и смычком.
Мы поддерживаем счетчики свободных скрипок и смычков. Когда скрипач хочет получить скрипку и смычок, он должен ждать их освобождения. В программе ниже мы защитили проверку условия мьютексом и под его защитой ждем скрипку и смычок порознь. Если скрипка или смычок заняты, поток засыпает. Он не владеет мьютексом до тех пор, пока другой поток не просигнализирует о том, что ресурс свободен. В этот момент первый поток просыпается и снова захватывает мьютекс.
Код представлен в листинге 13.4.
Листинг 13.4. Три скрипачаrequire 'thread'
@music = Mutex.new
@violin = ConditionVariable.new
@bow = ConditionVariable.new
@violins_free = 2
@bows_free = 1
def musician(n)
loop do
sleep rand(0)
@music.synchronize do
@violin.wait(@music) while @violins_frее == 0
@violins_free -= 1
puts "#{n} владеет скрипкой"
puts "скрипок #@violins_frее, смычков #@bows_free"
@bow.wait(@music) while @bows_free == 0
@bows_free -= 1
puts "#{n} владеет смычком"
puts "скрипок #@violins_free, смычков #@bows_free"
end
sleep rand(0)
puts "#{n}: (...играет...)"
sleep rand(0)
puts "#{n}: Я закончил."
@music.synchronize do
@violins_free += 1
@violin.signal if @violins_free == 1
@bows_free += 1
@bow.signal if @bows_free == 1
end
end
end
threads = []
3.times {|i| threads << Thread.new { musician(i) } }
threads.each {|t| t.join }
Мы полагаем, что это решение никогда не приводит к тупиковой ситуации, хотя доказать этого не сумели. Но интересно отметить, что описанный алгоритм не справедливый. В наших тестах оказалось, что первый скрипач играет чаще двух остальных, а второй чаще третьего. Выяснение причин такого поведения и его исправление мы оставляем читателю в качестве интересного упражнения.
13.2.5. Другие способы синхронизации
Еще один механизм синхронизации - это монитор, который в Ruby реализован в библиотеке monitor.rb. Это более развитый по сравнению с мьютексом механизм, основное отличие состоит в том, что захваты одного и того же мьютекса не могут быть вложенными, а монитора — могут.