]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.acorn/src/org/simantics/acorn/GraphClientImpl2.java
Merge commit '2ef2a5e5a52f8bfdbb5f2f47be2d3fbdcd2a8834'
[simantics/platform.git] / bundles / org.simantics.acorn / src / org / simantics / acorn / GraphClientImpl2.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package org.simantics.acorn;
13
14 import java.io.IOException;
15 import java.nio.file.Path;
16 import java.util.ArrayList;
17 import java.util.LinkedList;
18 import java.util.concurrent.ExecutorService;
19 import java.util.concurrent.Executors;
20 import java.util.concurrent.Semaphore;
21 import java.util.concurrent.ThreadFactory;
22 import java.util.concurrent.TimeUnit;
23
24 import org.simantics.acorn.internal.ClusterChange;
25 import org.simantics.acorn.internal.ClusterUpdateProcessorBase;
26 import org.simantics.acorn.internal.UndoClusterUpdateProcessor;
27 import org.simantics.acorn.lru.ClusterInfo;
28 import org.simantics.acorn.lru.ClusterStreamChunk;
29 import org.simantics.acorn.lru.ClusterUpdateOperation;
30 import org.simantics.acorn.lru.ClusterChangeSet.Entry;
31 import org.simantics.db.ClusterCreator;
32 import org.simantics.db.Database;
33 import org.simantics.db.ServiceLocator;
34 import org.simantics.db.common.utils.Logger;
35 import org.simantics.db.exception.DatabaseException;
36 import org.simantics.db.exception.SDBException;
37 import org.simantics.db.server.ProCoreException;
38 import org.simantics.db.service.ClusterSetsSupport;
39 import org.simantics.db.service.ClusterUID;
40 import org.simantics.utils.datastructures.Pair;
41 import org.simantics.utils.logging.TimeLogger;
42
43 import gnu.trove.map.hash.TLongObjectHashMap;
44
45 public class GraphClientImpl2 implements Database.Session {
46         
47         public static final boolean DEBUG = false;
48
49         public final ClusterManager clusters;
50         
51         private TransactionManager transactionManager = new TransactionManager();
52         private ExecutorService executor = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Main Program", false));
53         private ExecutorService saver = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Snapshot Saver", true));
54
55         private static GraphClientImpl2 INSTANCE;
56         private Path dbFolder;
57         private final Database database;
58         private ServiceLocator locator;
59         private MainProgram mainProgram;
60
61         static class ClientThreadFactory implements ThreadFactory {
62                 
63                 final String name;
64                 final boolean daemon;
65                 
66                 public ClientThreadFactory(String name, boolean daemon) {
67                         this.name = name;
68                         this.daemon = daemon;
69                 }
70                 
71                 @Override
72                 public Thread newThread(Runnable r) {
73                         Thread thread = new Thread(r, name);
74                         thread.setDaemon(daemon);
75                         return thread;
76                 }
77         }
78
79         public GraphClientImpl2(Database database, Path dbFolder, ServiceLocator locator) throws IOException {
80             this.database = database;
81             this.dbFolder = dbFolder;
82             this.locator = locator;
83             this.clusters = new ClusterManager(dbFolder);
84             load();
85             ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class); 
86             cssi.updateReadAndWriteDirectories(clusters.lastSessionDirectory, clusters.workingDirectory);
87             mainProgram = new MainProgram(this, clusters);
88             executor.execute(mainProgram);
89             INSTANCE = this;
90         }
91
92         public Path getDbFolder() {
93             return dbFolder;
94         }
95
96         public void tryMakeSnapshot() throws IOException {
97                 
98             if (isClosing)
99                 return;
100             
101                 saver.execute(new Runnable() {
102
103                         @Override
104                         public void run() {
105                                 Transaction tr = null;
106                                 try {
107                                         // First take a write transaction
108                                         tr = askWriteTransaction(-1);
109                                         // Then make sure that MainProgram is idling
110                                         mainProgram.mutex.acquire();
111                                         try {
112                                                 synchronized(mainProgram) {
113                                                         if(mainProgram.operations.isEmpty()) {
114                                                                 makeSnapshot(false);
115                                                         } else {
116                                                                 // MainProgram is becoming busy again - delay snapshotting
117                                                                 return;
118                                                         }
119                                                 }
120                                         } finally {
121                                                 mainProgram.mutex.release();
122                                         }
123                                 } catch (IOException e) {
124                                         Logger.defaultLogError(e);
125                                 } catch (ProCoreException e) {
126                                         Logger.defaultLogError(e);
127                                 } catch (InterruptedException e) {
128                                         Logger.defaultLogError(e);
129                                 } finally {
130                                         try {
131                                                 if(tr != null)
132                                                         endTransaction(tr.getTransactionId());
133                                         } catch (ProCoreException e) {
134                                                 Logger.defaultLogError(e);
135                                         }
136                                 }
137                         }
138                         
139                 });
140         }
141         
142     public void makeSnapshot(boolean force) throws IOException {
143         clusters.makeSnapshot(locator, force);
144     }
145         
146         public <T> T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException {
147                 return clusters.clone(uid, creator);
148         }
149
150 //      private void save() throws IOException {
151 //              clusters.save();
152 //      }
153         
154         public void load() throws IOException {
155                 clusters.load();
156         }
157         
158 //      public void modiFileEx(ClusterUID uid, int resourceKey, long offset, long size, byte[] bytes, long pos, ClusterSupport support) {
159 //              clusters.modiFileEx(uid, resourceKey, offset, size, bytes, pos, support);
160 //      }
161
162         @Override
163         public Database getDatabase() {
164                 return database;
165         }
166
167         private boolean closed = false;
168         private boolean isClosing = false;
169         
170         @Override
171         public void close() throws ProCoreException {
172             System.err.println("Closing " + this + " and mainProgram " + mainProgram);
173                 if(!closed && !isClosing) {
174                     isClosing = true;
175                         try {
176                                 makeSnapshot(true);
177                                 
178                                 mainProgram.close();
179                                 clusters.shutdown();
180                                 executor.shutdown();
181                                 saver.shutdown();
182                                 boolean executorTerminated = executor.awaitTermination(500, TimeUnit.MILLISECONDS);
183                                 boolean saverTerminated = saver.awaitTermination(500, TimeUnit.MILLISECONDS);
184                                 
185                                 System.err.println("executorTerminated=" + executorTerminated + ", saverTerminated=" + saverTerminated);
186                                 
187                                 INSTANCE = null;
188                                 mainProgram = null;
189                                 executor = null;
190                                 saver = null;
191                                 
192                         } catch (IOException | InterruptedException e) {
193                                 throw new ProCoreException(e);
194                         }
195                 }
196                 closed = true;
197                 //impl.close();
198         }
199
200         @Override
201         public void open() throws ProCoreException {
202                 throw new UnsupportedOperationException();
203         }
204
205         @Override
206         public boolean isClosed() throws ProCoreException {
207                 return closed;
208         }
209         
210         @Override
211         public void acceptCommit(long transactionId, long changeSetId, byte[] metadata) throws ProCoreException {
212                 
213                 clusters.state.headChangeSetId++;
214
215                 long committedChangeSetId = changeSetId + 1;
216                 
217                 clusters.commitChangeSet(committedChangeSetId, metadata);
218                 
219                 clusters.state.transactionId = transactionId;
220                 
221                 mainProgram.committed();
222                 
223                 TimeLogger.log("Accepted commit");
224                 
225         }
226
227         @Override
228         public long cancelCommit(long transactionId, long changeSetId,
229                         byte[] metadata, OnChangeSetUpdate onChangeSetUpdate)
230                         throws ProCoreException {
231             System.err.println("GraphClientImpl2.cancelCommit() called!! this is experimental and might cause havoc!");
232             try {
233             undo(new long[] {changeSetId}, onChangeSetUpdate);
234         } catch (SDBException e) {
235             e.printStackTrace();
236             throw new ProCoreException(e);
237         }
238             clusters.state.headChangeSetId++;
239             return clusters.state.headChangeSetId;
240         }
241
242         @Override
243         public Transaction askReadTransaction() throws ProCoreException {
244                 return transactionManager.askReadTransaction();
245         }
246
247         enum TransactionState {
248                 IDLE,WRITE,READ
249         }
250         
251         class TransactionRequest {
252                 public TransactionState state;
253                 public Semaphore semaphore;
254                 public TransactionRequest(TransactionState state, Semaphore semaphore) {
255                         this.state = state;
256                         this.semaphore = semaphore;
257                 }
258         }
259
260         class TransactionManager {
261
262                 private TransactionState currentTransactionState = TransactionState.IDLE;
263                 
264                 private int reads = 0;
265                 
266                 LinkedList<TransactionRequest> requests = new LinkedList<TransactionRequest>();
267                 
268                 TLongObjectHashMap<TransactionRequest> requestMap = new TLongObjectHashMap<TransactionRequest>();
269                 
270                 private synchronized Transaction makeTransaction(TransactionRequest req) {
271                         
272                         final int csId = clusters.state.headChangeSetId;
273                         final long trId = clusters.state.transactionId+1;
274                         requestMap.put(trId, req);
275                         return new Transaction() {
276                                 
277                                 @Override
278                                 public long getTransactionId() {
279                                         return trId;
280                                 }
281                                 
282                                 @Override
283                                 public long getHeadChangeSetId() {
284                                         return csId;
285                                 }
286                         };
287                 }
288                 
289                 /*
290                  * This method cannot be synchronized since it waits and must support multiple entries
291                  * by query thread(s) and internal transactions such as snapshot saver
292                  */
293                 public Transaction askReadTransaction() throws ProCoreException {
294                 
295                         Semaphore semaphore = new Semaphore(0);
296                         
297                         TransactionRequest req = queue(TransactionState.READ, semaphore);
298                         
299                         try {
300                                 semaphore.acquire();
301                         } catch (InterruptedException e) {
302                                 throw new ProCoreException(e);
303                         }
304                         
305                         return makeTransaction(req);
306
307                 }
308
309                 private synchronized void dispatch() {
310                         TransactionRequest r = requests.removeFirst();
311                         if(r.state == TransactionState.READ) reads++;
312                         r.semaphore.release();
313                 }
314                 
315                 private synchronized void processRequests() {
316                         
317                         while(true) {
318
319                                 if(requests.isEmpty()) return;
320                                 TransactionRequest req = requests.peek();
321
322                                 if(currentTransactionState == TransactionState.IDLE) {
323                                 
324                                         // Accept anything while IDLE
325                                         currentTransactionState = req.state;
326                                         dispatch();
327                                         
328                                 } else if (currentTransactionState == TransactionState.READ) {
329                                         
330                                         if(req.state == currentTransactionState) {
331
332                                                 // Allow other reads
333                                                 dispatch();
334
335                                         } else {
336                                                 
337                                                 // Wait
338                                                 return;
339                                                 
340                                         }
341                                         
342                                 }  else if (currentTransactionState == TransactionState.WRITE) {
343
344                                         // Wait
345                                         return;
346                                         
347                                 }
348                                 
349                         }
350                         
351                 }
352                 
353                 private synchronized TransactionRequest queue(TransactionState state, Semaphore semaphore) {
354                         TransactionRequest req = new TransactionRequest(state, semaphore); 
355                         requests.addLast(req);
356                         processRequests();
357                         return req;
358                 }
359                 
360                 /*
361                  * This method cannot be synchronized since it waits and must support multiple entries
362                  * by query thread(s) and internal transactions such as snapshot saver
363                  */
364                 public Transaction askWriteTransaction()
365                                 throws ProCoreException {
366                         
367                         Semaphore semaphore = new Semaphore(0);
368                         
369                         TransactionRequest req = queue(TransactionState.WRITE, semaphore);
370                         
371                         try {
372                                 semaphore.acquire();
373                         } catch (InterruptedException e) {
374                                 throw new ProCoreException(e);
375                         }
376                         
377                         mainProgram.startTransaction(clusters.state.headChangeSetId+1);
378                         
379                         return makeTransaction(req);
380                         
381                 }
382                 
383                 public synchronized long endTransaction(long transactionId) throws ProCoreException {
384                         
385                         TransactionRequest req = requestMap.remove(transactionId);
386                         if(req.state == TransactionState.WRITE) {
387                                 currentTransactionState = TransactionState.IDLE;
388                                 processRequests();
389                         } else {
390                                 reads--;
391                                 if(reads == 0) {
392                                         currentTransactionState = TransactionState.IDLE;
393                                         processRequests();
394                                 }
395                         }
396                         return clusters.state.transactionId;
397                 }
398
399         }
400         
401         @Override
402         public Transaction askWriteTransaction(final long transactionId)
403                         throws ProCoreException {
404                 return transactionManager.askWriteTransaction();
405         }
406
407         @Override
408         public long endTransaction(long transactionId) throws ProCoreException {
409                 return transactionManager.endTransaction(transactionId);
410         }
411
412         @Override
413         public String execute(String command) throws ProCoreException {
414                 // This is called only by WriteGraphImpl.commitAccessorChanges
415                 // We can ignore this in Acorn
416                 return "";
417         }
418
419         @Override
420         public byte[] getChangeSetMetadata(long changeSetId)
421                         throws ProCoreException {
422                 return clusters.getMetadata(changeSetId);
423         }
424
425         @Override
426         public ChangeSetData getChangeSetData(long minChangeSetId,
427                         long maxChangeSetId, OnChangeSetUpdate onChangeSetupate)
428                         throws ProCoreException {
429                 
430                 new Exception("GetChangeSetDataFunction " + minChangeSetId + " " + maxChangeSetId).printStackTrace();;
431                 return null;
432                 
433         }
434
435         @Override
436         public ChangeSetIds getChangeSetIds() throws ProCoreException {
437                 throw new UnsupportedOperationException();
438         }
439
440         @Override
441         public Cluster getCluster(byte[] clusterId) throws ProCoreException {
442                 throw new UnsupportedOperationException();
443         }
444
445         @Override
446         public ClusterChanges getClusterChanges(long changeSetId, byte[] clusterId)
447                         throws ProCoreException {
448                 throw new UnsupportedOperationException();
449         }
450
451         @Override
452         public ClusterIds getClusterIds() throws ProCoreException {
453                 return clusters.getClusterIds();
454         }
455
456         @Override
457         public Information getInformation() throws ProCoreException {
458                 return new Information() {
459
460                         @Override
461                         public String getServerId() {
462                                 return "server";
463                         }
464
465                         @Override
466                         public String getProtocolId() {
467                                 return "";
468                         }
469
470                         @Override
471                         public String getDatabaseId() {
472                                 return "database";
473                         }
474
475                         @Override
476                         public long getFirstChangeSetId() {
477                                 return 0;
478                         }
479                         
480                 };
481         }
482
483         @Override
484         public Refresh getRefresh(long changeSetId) throws ProCoreException {
485                 
486                 final ClusterIds ids = getClusterIds();
487                 
488                 return new Refresh() {
489
490                         @Override
491                         public long getHeadChangeSetId() {
492                                 return clusters.state.headChangeSetId;
493                         }
494
495                         @Override
496                         public long[] getFirst() {
497                                 return ids.getFirst();
498                         }
499
500                         @Override
501                         public long[] getSecond() {
502                                 return ids.getSecond();
503                         }
504                         
505                 };
506                 
507         }
508
509         public byte[] getResourceFile(final byte[] clusterUID, final int resourceIndex) throws ProCoreException {
510                 return clusters.getResourceFile(clusterUID, resourceIndex);
511         }
512
513         @Override
514         public ResourceSegment getResourceSegment(final byte[] clusterUID,
515                         final int resourceIndex, final long segmentOffset, short segmentSize) throws ProCoreException {
516                 
517                 return clusters.getResourceSegment(clusterUID, resourceIndex, segmentOffset, segmentSize);
518
519         }
520
521         @Override
522         public long reserveIds(int count) throws ProCoreException {
523                 return clusters.state.reservedIds++;
524         }
525
526         @Override
527         public void updateCluster(byte[] operations) throws ProCoreException {
528
529                 ClusterUpdateOperation operation = new ClusterUpdateOperation(clusters, operations);
530                 ClusterInfo info = clusters.clusterLRU.getOrCreate(operation.uid, true);
531                 if(info == null) throw new IllegalStateException();
532                 info.acquireMutex();
533                 try {
534                         info.scheduleUpdate();
535                         mainProgram.schedule(operation);
536                 } catch (Throwable t) {
537                         throw new IllegalStateException(t);
538                 } finally {
539                         info.releaseMutex();
540                 }
541
542         }
543
544         private UndoClusterUpdateProcessor getUndoCSS(String ccsId) throws DatabaseException {
545
546                 String[] ss = ccsId.split("\\.");
547                 String chunkKey = ss[0];
548                 int chunkOffset = Integer.parseInt(ss[1]);
549                 ClusterStreamChunk chunk = clusters.streamLRU.getWithoutMutex(chunkKey);
550                 if(chunk == null) throw new IllegalStateException("Cluster Stream Chunk " + chunkKey + " was not found.");
551                 chunk.acquireMutex();
552                 try {
553                         return chunk.getUndoProcessor(clusters, chunkOffset, ccsId);
554                 } catch (Throwable t) {
555                         throw new IllegalStateException(t);
556                 } finally {
557                         chunk.releaseMutex();
558                 }
559                 
560         }
561         
562         private void performUndo(String ccsId, ArrayList<Pair<ClusterUID, byte[]>> clusterChanges, UndoClusterSupport support) throws ProCoreException, DatabaseException {
563
564                 UndoClusterUpdateProcessor proc = getUndoCSS(ccsId);
565
566                 int clusterKey = clusters.getClusterKeyByClusterUIDOrMakeWithoutMutex(proc.getClusterUID());
567
568                 clusters.clusterLRU.acquireMutex();
569                 try {
570
571                         ClusterChange cs = new ClusterChange(clusterChanges, proc.getClusterUID());
572                         for(int i=0;i<proc.entries.size();i++) {
573                                 
574                                 Entry e = proc.entries.get(proc.entries.size() - 1 - i);
575                                 e.process(clusters, cs, clusterKey);
576                                 
577                         }
578                         
579                         cs.flush();
580
581                 } finally {
582                         clusters.clusterLRU.releaseMutex();
583                 }
584                 
585         }
586         
587         @Override
588         public boolean undo(long[] changeSetIds, OnChangeSetUpdate onChangeSetUpdate) throws SDBException {
589                 
590                 final ArrayList<Pair<ClusterUID, byte[]>> clusterChanges = new ArrayList<Pair<ClusterUID, byte[]>>();
591                 
592                 UndoClusterSupport support = new UndoClusterSupport(clusters);
593                 
594                 final int changeSetId = clusters.state.headChangeSetId;
595                 
596                 if(ClusterUpdateProcessorBase.DEBUG)
597                         System.err.println(" === BEGIN UNDO ===");
598                 
599                 for(int i=0;i<changeSetIds.length;i++) {
600                         final long id = changeSetIds[changeSetIds.length-1-i];
601                         ArrayList<String> ccss = clusters.getChanges(id);
602                         for(int j=0;j<ccss.size();j++) {
603                                 try {
604                                         if(ClusterUpdateProcessorBase.DEBUG)
605                                                 System.err.println("performUndo " + ccss.get(ccss.size()-j-1));
606                                         performUndo(ccss.get(ccss.size()-j-1), clusterChanges, support);
607                                 } catch (DatabaseException e) {
608                                         e.printStackTrace();
609                                 }
610                         }
611                 }
612
613                 if(ClusterUpdateProcessorBase.DEBUG)
614                         System.err.println(" === END UNDO ===");
615
616                 for(int i=0;i<clusterChanges.size();i++) {
617                         
618                         final int changeSetIndex = i;
619                         
620                         final Pair<ClusterUID, byte[]> pair = clusterChanges.get(i);
621                         
622                         final ClusterUID cuid = pair.first;
623                         final byte[] data = pair.second;
624
625                         onChangeSetUpdate.onChangeSetUpdate(new ChangeSetUpdate() {
626
627                                 @Override
628                                 public long getChangeSetId() {
629                                         return changeSetId;
630                                 }
631
632                                 @Override
633                                 public int getChangeSetIndex() {
634                                         return 0;
635                                 }
636
637                                 @Override
638                                 public int getNumberOfClusterChangeSets() {
639                                         return clusterChanges.size();
640                                 }
641
642                                 @Override
643                                 public int getIndexOfClusterChangeSet() {
644                                         return changeSetIndex;
645                                 }
646
647                                 @Override
648                                 public byte[] getClusterId() {
649                                         return cuid.asBytes();
650                                 }
651
652                                 @Override
653                                 public boolean getNewCluster() {
654                                         return false;
655                                 }
656
657                                 @Override
658                                 public byte[] getData() {
659                                         return data;
660                                 }
661
662                         });
663
664                 }
665
666                 
667                 return false;
668                 
669         }
670         
671         public static GraphClientImpl2 getInstance() {
672             return INSTANCE;
673         }
674         
675         public ServiceLocator getServiceLocator() {
676             return locator;
677         }
678
679     @Override
680     public boolean refreshEnabled() {
681         return false;
682     }
683
684         
685         
686         
687         
688         
689         
690         
691         
692         
693         
694         ////////////////////////
695         
696         
697         
698         
699         
700         
701         
702         
703         
704         
705         
706         
707 }
708