La computació concurrent permet que diverses tasques s'executin en períodes de temps superposats en lloc de seqüencialment. O sigui: no cal que una tasca el completi perquè comenci la següent. Però no necessàriament l'execució es produeix en el mateix instant de temps. És una propietat de l'algorisme.
La computació paral·lela sí que fa referència a l'execució en el mateix instant de temps, i treu profit de sistemes amb múltiples CPUs per a accelerar la computació. És una propietat de la màquina.
Avantatges:
La implementació de la concurrència pot fer-se de dues formes, principalment:
Memòria compartida
Pas de missatges
Els models concurrents poden també classificar-se en processos i fils.
Un procés té un entorn independent d'execució, simulant un ordinador. Per exemple, té el seu espai independent de memòria. Solen ser sinònim de programa, o aplicació, encara que pugui ser un conjunt de processos. Aquests poden col·laborar mitjançant canonades (pipes) o sòcols.
Generalment, els processos no comparteixen memòria.
L'estat d'un procés el controla el sistema operatiu.
Una màquina virtual de Java és un únic procés, habitualment.
Una aplicació Java pot crear també processos addicionals utilitzant ProcessBuilder. Aquesta classe permet crear processos de sistema (Process) i executar-los, parar-los, llegir la seva sortida, reencaminar-la, etc. Resumint, permet interactuar amb altres processos de sistema que no siguin de la màquina virtual Java.
Els fils simulen un processador. Per defecte, comparteixen memòria.
Algunes raons per a utilitzar fils:
A Java, un fil està sempre associat a un objecte Thread, que pot tenir una sèrie d'estats.
Un procés pot contenir diversos fils. La diferència més important entre un procés i un fil és que cada procés té el seu espai d'adreces, mentre els fils (del mateix procés) s'executen en un espai de memòria compartit.
A partir d'ara, farem referència a la forma de concurrència basada en memòria compartida, implementada mitjançant fils.
Fes-te aquestes preguntes:
La concurrència, a Java, es pot implementar a dos nivells: l'API de baix nivell i els objectes concurrents d'alt nivell.
Per crear un fil amb l'API de baix nivell, es pot fer de dues maneres:
new Thread(new MyRunnable())
Un cop tenim el Thread, el podem executar mitjançant el seu mètode start()
.
A continuació, podem veure un exemple de creació d'un fil anomenat "fil" des del fil principal, "main".
Aquest seria el codi:
public class MyFirstThreadTest implements Runnable {
@Override
public void run() {
System.out.println("executant " + Thread.currentThread().getName());
}
public static void main(String[] args) {
Runnable myRunnable = new MyFirstThreadTest();
Thread thread = new Thread(myRunnable, "fill");
thread.start();
System.out.println("acabant " + Thread.currentThread().getName());
}
}
Aquest exemple no presenta dificultats, ja que no es comparteix cap objecte.
Una interrupció és una indicació a un fil de què ha de parar de fer el que està fent i fer una altra cosa. Perquè aquest mecanisme funcioni, cal que el fil suporti interrupcions. Això es pot aconseguir de dues formes:
Es pot aconseguir compartint una variable que un fil modifica, i l'altre llegeix.
La sincronització és la coordinació de dues o més tasques per a obtenir un resultat desitjat. En tenim dos tipus:
Una secció crítica és una porció de codi que només pot ser executat per una tasca en un cert instant de temps.
Quan tenim dues o més tasques que escriuen a una variable compartida fora d'una secció crítica, es produeix una situació anomenada race condition: el comportament del sistema depèn de la seqüència de temps o altres esdeveniments no controlables. Aquesta circumstància sol produir-se quan múltiples fils comparteixen un estat mutable i les operacions sobre aquest estat se superposen.
Per evitar les race conditions, cal utilitzar mecanismes de sincronització, que fan que els objectes implicats siguin thread-safe. Aquests mecanismes defineixen una relació "happens-before" (passa abans) entre els fils involucrats. Només aquells esdeveniments definits per aquesta relació tenen un comportament esperat en el temps. La resta, són impredictibles.
Els esdeveniments es poden ordenar mitjançant una relació "happens-before". És a dir: si una acció passa abans que una altra, la primera és visible i passa abans que la segona.
Aquestes són les definides pel llenguatge Java:
Hi ha bàsicament quatre tècniques per assegurar-nos que no tindrem problemes accedint a variables en memòria compartida:
Hi ha una dificultat important a l'hora de dissenyar codi thread-safe, és a dir, que sigui segur davant l'accés de múltiples fils. Es poden preparar proves per al nostre codi que comprovin si, un nombre important de fils executant simultàniament el nostre codi, provoca problemes. Però no sempre és fàcil simular aquesta situació. Per això també necessitem tècniques per preveure aquesta situació.
Si mirem la documentació de la Java Standard Edition, veurem que de vegades es fa referència a la condició "thread-safe" de les classes.
Per exemple, a la classe java.util.regex.Pattern es diu:
Matcher
class are not safe for such use.És important que quan dissenyem el nostre codi siguem conscients de si necessitem que més d'un fil accedeixi. I si és així, dissenyar la classe en conseqüència.
A Java, la sincronització es fa mitjançant monitors: només un fil pot posseir un monitor en un instant de temps. Quan un fil posseeix un, es diu que ha entrat en el monitor. Tots els altres fils que ho intentin seran suspesos fins que el primer surti.
Podem utilitzar la paraula reservada "synchronized" al codi en tres llocs, per a definir zones controlades per un monitor:
Un exemple de mètodes d'instància:
public class SynchronizedCounter {
private int c = 0;
public synchronized void increment() {
c++;
}
public synchronized void decrement() {
c--;
}
public synchronized int value() {
return c;
}
}
Conseqüències:
Important: dins d'un bloc sincronitzat, cal fer la feina mínima possible: llegir les dades i si cal, transformar-les.
Un exemple de blocs de codi:
public class MsLunch {
private long c1 = 0;
private long c2 = 0;
private Object lock1 = new Object();
private Object lock2 = new Object();
public void inc1() {
synchronized(lock1) {
c1++;
}
}
public void inc2() {
synchronized(lock2) {
c2++;
}
}
}
Aquest mètode permet tenir un gra més fi. Cal estar segur que és correcte permetre el solapament de l'accés als dos camps.
Imaginem que volem esperar fins que es compleixi una condició:
public void alegriaControlada() {
// Control senzill. Gasta CPU, no fer-ho mai!
while (!alegria) {}
System.out.println("Alegria aconseguida!");
}
Això ho podem fer entre fils mitjançant el mètode clàssic de comunicació wait i notify, que permet:
Els mètodes són:
Els mètodes wait() i notify s'han de cridar des de dins d'un bloc sincronitzat per a l'objecte monitor.
A més, tal i com es comenta a Object, el mètode wait() ha de ser a dins d'un bucle esperant per una condició:
synchronized (monitor) {
while (!condicio) {
monitor.wait();
}
}
synchronized (monitor) {
monitor.notify();
}
En el nostre cas:
synchronized (monitor) {
while (!alegria) {
wait();
}
// aqui ja tenim alegria!
}
...
synchronized (monitor) {
alegria = true;
notify();
}
La vitalitat d'una aplicació (liveness) és la seva capacitat per a executar-se en el temps que toca. Els problemes més habituals que poden desbaratar aquesta vitalitat són:
Quan un client fa una petició a un servidor, el servidor ha d'aconseguir l'accés exclusiu als recursos compartits necessaris. L'ús correcte de les zones crítiques permetrà que el sistema tingui una millor vitalitat quan la càrrega de peticions sigui alta.
La llibreria java.util.concurrent conté classes útils quan fem concurrència:
Sempre és preferible utilitzar aquestes classes que els mètodes de sincronització wait/notify, perquè simplifiquen la programació. De la mateixa manera que és millor utilitzar executors i tasques que fils directament.
La majoria d'aplicacions concurrents s'organitzen mitjançant tasques. Una tasca realitza una feina concreta. D'aquesta forma, podem simplificar el disseny i el funcionament.
Veiem una possible solució per a la gestió de connexions a un servidor. Suposem que tenim un mètode, atendrePeticio()
, que atén una petició web.
class ServidorWebUnFil {
public static void main(String[] args) throws IOException {
ServerSocket socol = new ServerSocket(80);
while (true) {
Socket connexio = socol.accept();
atendrePeticio(connexio);
}
}
}
class ServidorWebUnFilPerPeticio {
public static void main(String[] args) throws IOException {
ServerSocket socol = new ServerSocket(80);
while (true) {
Socket connexio = socol.accept();
Runnable tasca = new Runnable() {
@Override
public void run() {
atendrePeticio(connexio);
}
}
new Thread(tasca).start();
}
}
}
class ServidorWebExecucioTasques {
private static final int NFILS = 100;
private static final Executor executor = Executors.newFixedThreadPool(NFILS);
public static void main(String[] args) throws IOException {
ServerSocket socol = new ServerSocket(80);
while (true) {
final Socket connexio = socol.accept();
Runnable tasca = new Runnable() {
public void run() {
atendrePeticio(connexio);
}
};
executor.execute(tasca);
}
}
}
En aquesta solució hem introduït la interfície Executor:
public interface Executor {
void execute(Runnable command);
}
És un objecte que permet executar Runnables. Internament, el que fa és executar tasques de forma asíncrona, creant un fil per cada tasca en execució, i retornant el control al fil que crida el seu mètode execute
. Les tasques poden tenir quatre estats:
Els Executors es poden crear des de la classe amb mètodes estàtics Executors.
Un Executor sempre ha de parar-se amb el mètode shutdown()
.
Algunes tasques retornen resultats. Per implementar-les, podem utilitzar les interfícies Callable i Future:
public interface Callable<V> {
V call() throws Exception;
}
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException, CancellationException;
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, CancellationException, TimeoutException;
}
Callable<V> permet executar la tasca i retornar un valor del tipus V. Per tal de poder executar-la, necessitem un nou tipus d'executor, l'ExecutorService, una interfície que estèn la Executor amb nous mètodes, en particular:
Les dues permeten executar un Runnable / Callable i retornen un Future, que és un objecte que permet obtenir el resultat en diferit mitjançant el mètode get()
(bloqueig) o get(long timeout, TimeUnit unit)
(bloqueig per un temps).
També podem cancel·lar la tasca mitjançant cancel(boolean mayInterruptIfRunning)
: el paràmetre diu si es vol interrompre també si ja ha començat.
Els ExecutorService poden crear-se mitjançant la mateixa classe que hem vist abans, Executors.
A continuació, un exemple de funcionament. Com canvia l'execució si fem Executors.newFixedThreadPool(2)?
public class SimpleCallableTest {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(1);
Future<String> f1 = executor.submit(new ToUpperCallable("hello"));
Future<String> f2 = executor.submit(new ToUpperCallable("world"));
try {
long millis = System.currentTimeMillis();
System.out.println("main " + f1.get() + " " + f2.get() +
" in millis: " + (System.currentTimeMillis() - millis));
} catch (InterruptedException | ExecutionException ex) {
ex.printStackTrace();
}
executor.shutdown();
}
private static final class ToUpperCallable implements Callable<String> {
private String word;
public ToUpperCallable(String word) {
this.word = word;
}
@Override
public String call() throws Exception {
String name = Thread.currentThread().getName();
System.out.println(name + " calling for " + word);
Thread.sleep(2500);
String result = word.toUpperCase();
System.out.println(name + " result " + word + " => " + result);
return result;
}
}
}
A Java 7 es va introduir el framework fork/join.
A Java 8 es va introduir el CompletableFuture, que permet combinar futurs i gestionar millor els errors que es produeixen. Un exemple és l'ús del mètode complete per a completar un futur, en un altre fil:
CompletableFuture<String> completableFuture = new CompletableFuture<>();
//...
String resultat = completableFuture.get();
// mentre en un altre fil...
completableFuture.complete("Hola, món!);
O bé, la possibilitat d'executar directament amb supplyAsync:
Supplier<String> supplier = new Supplier<String>() {
@Override
public String get() {
return "Hola, món!";
}
};
Future<String> future = CompletableFuture.supplyAsync(supplier, executor); // executor és opcional
System.out.println(future.get());
El pas de missatges pot implementar-se:
En el diagrama pot veure's una implementació entre processos.
La comunicació entre les dues parts es pot realitzar de forma síncrona o de forma asíncrona, segons hi hagi un bloqueig E/S (entrada/sortida).
Comes pot veure al diagrama, en la forma síncrona el client espera la resposta del servidor (bloqueig E/S), i mentrestant no fa res. A la forma asíncrona envia la petició, continua treballant i en un moment donat rep la resposta (sense bloqueig E/S).
Quina forma és més convenient? Depèn de les circumstàncies. La forma síncrona és més fàcil d'implementar, però l'asíncrona permet millorar el rendiment del sistema introduint la concurrència.
Les peticions asíncrones han de permetre al client conèixer el resultat a posteriori. Alguns esquemes possibles:
Quan utilitzem el model síncron (amb bloqueig), un sol fil no pot gestionar diverses peticions simultànies. Això vol dir que necessitem crear un fil per gestionar cada petició i retornar la resposta. En diem arquitectura basada en fils.
Habitualment, es limita el nombre de fils que es permeten gestionar simultàniament per evitar el consum excessiu de recursos.
Es reprodueix el patró productor-consumidor: els productors són l'origen dels esdeveniments, i només saben que un ha ocorregut; mentre els consumidors necessiten saber que hi ha un nou esdeveniment, i l'han d'atendre (handle). En diem arquitectura basada en esdeveniments.
Algunes tècniques per implementar el servei:
A Java tenim Vert.x, una implementació multireactor (amb N bucles d'esdeveniments).
basat en esdeveniments
model d'actors
Una altra tècnica per a gestionar peticions asíncrones és el model d'actors. Aquest model permet crear programes concurrents utilitzant actors no concurrents.
A Java, tenim un exemple de llibreria: Akka.
Una forma d'implementar-lo és passar missatges entre fils mitjançant l'ús d'una cua sincronitzada. Pot haver-hi un o més productors i un o més consumidors. La cua ha de ser thread-safe. A Java, les implementacions de BlockingQueue, ArrayBlockingQueue i LinkedBlockingQueue, en són exemples. Els objectes a aquestes cues han de ser d'un tipus immutable.
En aquest exemple, un fill1 envia treballs (j1, j2...) a un fil asíncron (async) mitjançant un buffer intermedi.
Les accions són:
De vegades, les peticions fan referència a un recurs compartit que no permet el seu ús per més d'un client alhora. En aquests casos, es pot implementar una cua que gestioni les peticions de forma asíncrona:
La impressora és un únic fil (servidor) que va llegint els treballs afegits a la cua per diferents usuaris (fils), i atenent-los.
També podríem tenir més d'una cua, si hi ha la possibilitat de tenir més d'un punt per atendre les peticions (diverses impressores).
La programació passiva és la tradicional als dissenys OO: un mòdul delega en un altre per a produir un canvi al model.
L'alternativa plantejada es diu programació reactiva, on utilitzem callbacks per a invertir la responsabilitat.
El terme "reactiu" s'utilitza en dos contextos: la programació reactiva està basada en esdeveniments, mentre els sistemes reactius generalment es basen en missatges. Els sistemes basats en missatges corresponen més sovint a processos distribuïts que es comuniquen a través d'una xarxa, potser com a microserveis que cooperen. Les aplicacions estan basades en esdeveniments (amb 0-N observadors) i són locals.
La programació reactiva és asíncrona i sense bloqueig. Els fils que busquen recursos compartits no bloquegen l’espera que el recurs estigui disponible. En el seu lloc, continuen la seva execució i són notificats després quan el servei s'ha completat.
Les extensions reactives permeten que llenguatges imperatius, com Java (RxJava), puguin implementar programació reactiva.
Java 9 implementa streams reactius amb la classe Flow.
Un sistema reactiu és un estil d'arquitectura que permet que diverses aplicacions puguin comportar-se com una sola, reaccionant al seu entorn, mantenint-se al corrent els uns dels altres, i permetent la seva elasticitat, resiliència i responsivitat basats (habitualment) en cues de missatges dirigits a receptors concrets (vegeu el Reactive Manifesto). Una aplicació dels sistemes reactius són els microserveis.
Tant els patrons reactor/proactor com el model d'actors permeten implementar sistemes reactius.
Programació i sistemes reactius: