]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.acorn/src/org/simantics/acorn/GraphClientImpl2.java
Merge commit 'd1a82fe'
[simantics/platform.git] / bundles / org.simantics.acorn / src / org / simantics / acorn / GraphClientImpl2.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package org.simantics.acorn;
13
14 import java.io.IOException;
15 import java.nio.file.Path;
16 import java.util.ArrayList;
17 import java.util.LinkedList;
18 import java.util.concurrent.ExecutorService;
19 import java.util.concurrent.Executors;
20 import java.util.concurrent.Semaphore;
21 import java.util.concurrent.ThreadFactory;
22 import java.util.concurrent.TimeUnit;
23
24 import org.simantics.acorn.internal.ClusterChange;
25 import org.simantics.acorn.internal.ClusterUpdateProcessorBase;
26 import org.simantics.acorn.internal.UndoClusterUpdateProcessor;
27 import org.simantics.acorn.lru.ClusterInfo;
28 import org.simantics.acorn.lru.ClusterStreamChunk;
29 import org.simantics.acorn.lru.ClusterUpdateOperation;
30 import org.simantics.acorn.lru.ClusterChangeSet.Entry;
31 import org.simantics.db.ClusterCreator;
32 import org.simantics.db.Database;
33 import org.simantics.db.ServiceLocator;
34 import org.simantics.db.common.utils.Logger;
35 import org.simantics.db.exception.DatabaseException;
36 import org.simantics.db.exception.SDBException;
37 import org.simantics.db.server.ProCoreException;
38 import org.simantics.db.service.ClusterSetsSupport;
39 import org.simantics.db.service.ClusterUID;
40 import org.simantics.utils.datastructures.Pair;
41 import org.simantics.utils.logging.TimeLogger;
42
43 import gnu.trove.map.hash.TLongObjectHashMap;
44
45 public class GraphClientImpl2 implements Database.Session {
46         
47         public static final boolean DEBUG = false;
48
49         public final ClusterManager clusters;
50         
51         private TransactionManager transactionManager = new TransactionManager();
52         private ExecutorService executor = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Main Program", false));
53         private ExecutorService saver = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Snapshot Saver", true));
54
55         private static GraphClientImpl2 INSTANCE;
56         private Path dbFolder;
57         private final Database database;
58         private ServiceLocator locator;
59         private MainProgram mainProgram;
60
61         static class ClientThreadFactory implements ThreadFactory {
62                 
63                 final String name;
64                 final boolean daemon;
65                 
66                 public ClientThreadFactory(String name, boolean daemon) {
67                         this.name = name;
68                         this.daemon = daemon;
69                 }
70                 
71                 @Override
72                 public Thread newThread(Runnable r) {
73                         Thread thread = new Thread(r, name);
74                         thread.setDaemon(daemon);
75                         return thread;
76                 }
77         }
78
79         public GraphClientImpl2(Database database, Path dbFolder, ServiceLocator locator) throws IOException {
80             this.database = database;
81             this.dbFolder = dbFolder;
82             this.locator = locator;
83             this.clusters = new ClusterManager(dbFolder);
84             load();
85             ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class); 
86             cssi.updateReadAndWriteDirectories(clusters.lastSessionDirectory, clusters.workingDirectory);
87             mainProgram = new MainProgram(this, clusters);
88             executor.execute(mainProgram);
89             INSTANCE = this;
90         }
91
92         public Path getDbFolder() {
93             return dbFolder;
94         }
95
96         public void tryMakeSnapshot() throws IOException {
97                 
98             if (isClosing)
99                 return;
100             
101                 saver.execute(new Runnable() {
102
103                         @Override
104                         public void run() {
105                                 Transaction tr = null;
106                                 try {
107                                         // First take a write transaction
108                                         tr = askWriteTransaction(-1);
109                                         // Then make sure that MainProgram is idling
110                                         mainProgram.mutex.acquire();
111                                         try {
112                                                 synchronized(mainProgram) {
113                                                         if(mainProgram.operations.isEmpty()) {
114                                                                 makeSnapshot(false);
115                                                         } else {
116                                                                 // MainProgram is becoming busy again - delay snapshotting
117                                                                 return;
118                                                         }
119                                                 }
120                                         } finally {
121                                                 mainProgram.mutex.release();
122                                         }
123                                 } catch (IOException e) {
124                                         Logger.defaultLogError(e);
125                                 } catch (ProCoreException e) {
126                                         Logger.defaultLogError(e);
127                                 } catch (InterruptedException e) {
128                                         Logger.defaultLogError(e);
129                                 } finally {
130                                         try {
131                                                 if(tr != null)
132                                                         endTransaction(tr.getTransactionId());
133                                         } catch (ProCoreException e) {
134                                                 Logger.defaultLogError(e);
135                                         }
136                                 }
137                         }
138                         
139                 });
140         }
141         
142     public void makeSnapshot(boolean force) throws IOException {
143         if (safeToMakeSnapshot)
144             clusters.makeSnapshot(locator, force);
145     }
146         
147         public <T> T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException {
148                 return clusters.clone(uid, creator);
149         }
150
151 //      private void save() throws IOException {
152 //              clusters.save();
153 //      }
154         
155         public void load() throws IOException {
156                 clusters.load();
157         }
158         
159 //      public void modiFileEx(ClusterUID uid, int resourceKey, long offset, long size, byte[] bytes, long pos, ClusterSupport support) {
160 //              clusters.modiFileEx(uid, resourceKey, offset, size, bytes, pos, support);
161 //      }
162
163         @Override
164         public Database getDatabase() {
165                 return database;
166         }
167
168         private boolean closed = false;
169         private boolean isClosing = false;
170         // Add check to make sure if it safe to make snapshot (used with cancel which is not yet supported and may cause corrupted head.state writing)
171     private boolean safeToMakeSnapshot = true;
172         
173         @Override
174         public void close() throws ProCoreException {
175             System.err.println("Closing " + this + " and mainProgram " + mainProgram);
176                 if(!closed && !isClosing) {
177                     isClosing = true;
178                         try {
179                             makeSnapshot(true);
180                                 
181                                 mainProgram.close();
182                                 clusters.shutdown();
183                                 executor.shutdown();
184                                 saver.shutdown();
185                                 boolean executorTerminated = executor.awaitTermination(500, TimeUnit.MILLISECONDS);
186                                 boolean saverTerminated = saver.awaitTermination(500, TimeUnit.MILLISECONDS);
187                                 
188                                 System.err.println("executorTerminated=" + executorTerminated + ", saverTerminated=" + saverTerminated);
189                                 
190                                 INSTANCE = null;
191                                 mainProgram = null;
192                                 executor = null;
193                                 saver = null;
194                                 
195                         } catch (IOException | InterruptedException e) {
196                                 throw new ProCoreException(e);
197                         }
198                 }
199                 closed = true;
200                 //impl.close();
201         }
202
203         @Override
204         public void open() throws ProCoreException {
205                 throw new UnsupportedOperationException();
206         }
207
208         @Override
209         public boolean isClosed() throws ProCoreException {
210                 return closed;
211         }
212         
213         @Override
214         public void acceptCommit(long transactionId, long changeSetId, byte[] metadata) throws ProCoreException {
215                 
216                 clusters.state.headChangeSetId++;
217
218                 long committedChangeSetId = changeSetId + 1;
219                 
220                 clusters.commitChangeSet(committedChangeSetId, metadata);
221                 
222                 clusters.state.transactionId = transactionId;
223                 
224                 mainProgram.committed();
225                 
226                 TimeLogger.log("Accepted commit");
227                 
228         }
229
230         @Override
231         public long cancelCommit(long transactionId, long changeSetId,
232                         byte[] metadata, OnChangeSetUpdate onChangeSetUpdate)
233                         throws ProCoreException {
234             safeToMakeSnapshot = false;
235             throw new UnsupportedOperationException("org.simantics.acorn.GraphClientImpl2.cancelCommit() is not supported operation! Closing down to prevent further havoc");
236 //          System.err.println("GraphClientImpl2.cancelCommit() called!! this is experimental and might cause havoc!");
237 //          try {
238 //            undo(new long[] {changeSetId}, onChangeSetUpdate);
239 //        } catch (SDBException e) {
240 //            e.printStackTrace();
241 //            throw new ProCoreException(e);
242 //        }
243 //          clusters.state.headChangeSetId++;
244 //          return clusters.state.headChangeSetId;
245         }
246
247         @Override
248         public Transaction askReadTransaction() throws ProCoreException {
249                 return transactionManager.askReadTransaction();
250         }
251
252         enum TransactionState {
253                 IDLE,WRITE,READ
254         }
255         
256         class TransactionRequest {
257                 public TransactionState state;
258                 public Semaphore semaphore;
259                 public TransactionRequest(TransactionState state, Semaphore semaphore) {
260                         this.state = state;
261                         this.semaphore = semaphore;
262                 }
263         }
264
265         class TransactionManager {
266
267                 private TransactionState currentTransactionState = TransactionState.IDLE;
268                 
269                 private int reads = 0;
270                 
271                 LinkedList<TransactionRequest> requests = new LinkedList<TransactionRequest>();
272                 
273                 TLongObjectHashMap<TransactionRequest> requestMap = new TLongObjectHashMap<TransactionRequest>();
274                 
275                 private synchronized Transaction makeTransaction(TransactionRequest req) {
276                         
277                         final int csId = clusters.state.headChangeSetId;
278                         final long trId = clusters.state.transactionId+1;
279                         requestMap.put(trId, req);
280                         return new Transaction() {
281                                 
282                                 @Override
283                                 public long getTransactionId() {
284                                         return trId;
285                                 }
286                                 
287                                 @Override
288                                 public long getHeadChangeSetId() {
289                                         return csId;
290                                 }
291                         };
292                 }
293                 
294                 /*
295                  * This method cannot be synchronized since it waits and must support multiple entries
296                  * by query thread(s) and internal transactions such as snapshot saver
297                  */
298                 public Transaction askReadTransaction() throws ProCoreException {
299                 
300                         Semaphore semaphore = new Semaphore(0);
301                         
302                         TransactionRequest req = queue(TransactionState.READ, semaphore);
303                         
304                         try {
305                                 semaphore.acquire();
306                         } catch (InterruptedException e) {
307                                 throw new ProCoreException(e);
308                         }
309                         
310                         return makeTransaction(req);
311
312                 }
313
314                 private synchronized void dispatch() {
315                         TransactionRequest r = requests.removeFirst();
316                         if(r.state == TransactionState.READ) reads++;
317                         r.semaphore.release();
318                 }
319                 
320                 private synchronized void processRequests() {
321                         
322                         while(true) {
323
324                                 if(requests.isEmpty()) return;
325                                 TransactionRequest req = requests.peek();
326
327                                 if(currentTransactionState == TransactionState.IDLE) {
328                                 
329                                         // Accept anything while IDLE
330                                         currentTransactionState = req.state;
331                                         dispatch();
332                                         
333                                 } else if (currentTransactionState == TransactionState.READ) {
334                                         
335                                         if(req.state == currentTransactionState) {
336
337                                                 // Allow other reads
338                                                 dispatch();
339
340                                         } else {
341                                                 
342                                                 // Wait
343                                                 return;
344                                                 
345                                         }
346                                         
347                                 }  else if (currentTransactionState == TransactionState.WRITE) {
348
349                                         // Wait
350                                         return;
351                                         
352                                 }
353                                 
354                         }
355                         
356                 }
357                 
358                 private synchronized TransactionRequest queue(TransactionState state, Semaphore semaphore) {
359                         TransactionRequest req = new TransactionRequest(state, semaphore); 
360                         requests.addLast(req);
361                         processRequests();
362                         return req;
363                 }
364                 
365                 /*
366                  * This method cannot be synchronized since it waits and must support multiple entries
367                  * by query thread(s) and internal transactions such as snapshot saver
368                  */
369                 public Transaction askWriteTransaction()
370                                 throws ProCoreException {
371                         
372                         Semaphore semaphore = new Semaphore(0);
373                         
374                         TransactionRequest req = queue(TransactionState.WRITE, semaphore);
375                         
376                         try {
377                                 semaphore.acquire();
378                         } catch (InterruptedException e) {
379                                 throw new ProCoreException(e);
380                         }
381                         
382                         mainProgram.startTransaction(clusters.state.headChangeSetId+1);
383                         
384                         return makeTransaction(req);
385                         
386                 }
387                 
388                 public synchronized long endTransaction(long transactionId) throws ProCoreException {
389                         
390                         TransactionRequest req = requestMap.remove(transactionId);
391                         if(req.state == TransactionState.WRITE) {
392                                 currentTransactionState = TransactionState.IDLE;
393                                 processRequests();
394                         } else {
395                                 reads--;
396                                 if(reads == 0) {
397                                         currentTransactionState = TransactionState.IDLE;
398                                         processRequests();
399                                 }
400                         }
401                         return clusters.state.transactionId;
402                 }
403
404         }
405         
406         @Override
407         public Transaction askWriteTransaction(final long transactionId)
408                         throws ProCoreException {
409                 return transactionManager.askWriteTransaction();
410         }
411
412         @Override
413         public long endTransaction(long transactionId) throws ProCoreException {
414                 return transactionManager.endTransaction(transactionId);
415         }
416
417         @Override
418         public String execute(String command) throws ProCoreException {
419                 // This is called only by WriteGraphImpl.commitAccessorChanges
420                 // We can ignore this in Acorn
421                 return "";
422         }
423
424         @Override
425         public byte[] getChangeSetMetadata(long changeSetId)
426                         throws ProCoreException {
427                 return clusters.getMetadata(changeSetId);
428         }
429
430         @Override
431         public ChangeSetData getChangeSetData(long minChangeSetId,
432                         long maxChangeSetId, OnChangeSetUpdate onChangeSetupate)
433                         throws ProCoreException {
434                 
435                 new Exception("GetChangeSetDataFunction " + minChangeSetId + " " + maxChangeSetId).printStackTrace();;
436                 return null;
437                 
438         }
439
440         @Override
441         public ChangeSetIds getChangeSetIds() throws ProCoreException {
442                 throw new UnsupportedOperationException();
443         }
444
445         @Override
446         public Cluster getCluster(byte[] clusterId) throws ProCoreException {
447                 throw new UnsupportedOperationException();
448         }
449
450         @Override
451         public ClusterChanges getClusterChanges(long changeSetId, byte[] clusterId)
452                         throws ProCoreException {
453                 throw new UnsupportedOperationException();
454         }
455
456         @Override
457         public ClusterIds getClusterIds() throws ProCoreException {
458                 return clusters.getClusterIds();
459         }
460
461         @Override
462         public Information getInformation() throws ProCoreException {
463                 return new Information() {
464
465                         @Override
466                         public String getServerId() {
467                                 return "server";
468                         }
469
470                         @Override
471                         public String getProtocolId() {
472                                 return "";
473                         }
474
475                         @Override
476                         public String getDatabaseId() {
477                                 return "database";
478                         }
479
480                         @Override
481                         public long getFirstChangeSetId() {
482                                 return 0;
483                         }
484                         
485                 };
486         }
487
488         @Override
489         public Refresh getRefresh(long changeSetId) throws ProCoreException {
490                 
491                 final ClusterIds ids = getClusterIds();
492                 
493                 return new Refresh() {
494
495                         @Override
496                         public long getHeadChangeSetId() {
497                                 return clusters.state.headChangeSetId;
498                         }
499
500                         @Override
501                         public long[] getFirst() {
502                                 return ids.getFirst();
503                         }
504
505                         @Override
506                         public long[] getSecond() {
507                                 return ids.getSecond();
508                         }
509                         
510                 };
511                 
512         }
513
514         public byte[] getResourceFile(final byte[] clusterUID, final int resourceIndex) throws ProCoreException {
515                 return clusters.getResourceFile(clusterUID, resourceIndex);
516         }
517
518         @Override
519         public ResourceSegment getResourceSegment(final byte[] clusterUID,
520                         final int resourceIndex, final long segmentOffset, short segmentSize) throws ProCoreException {
521                 
522                 return clusters.getResourceSegment(clusterUID, resourceIndex, segmentOffset, segmentSize);
523
524         }
525
526         @Override
527         public long reserveIds(int count) throws ProCoreException {
528                 return clusters.state.reservedIds++;
529         }
530
531         @Override
532         public void updateCluster(byte[] operations) throws ProCoreException {
533
534                 ClusterUpdateOperation operation = new ClusterUpdateOperation(clusters, operations);
535                 ClusterInfo info = clusters.clusterLRU.getOrCreate(operation.uid, true);
536                 if(info == null) throw new IllegalStateException();
537                 info.acquireMutex();
538                 try {
539                         info.scheduleUpdate();
540                         mainProgram.schedule(operation);
541                 } catch (Throwable t) {
542                         throw new IllegalStateException(t);
543                 } finally {
544                         info.releaseMutex();
545                 }
546
547         }
548
549         private UndoClusterUpdateProcessor getUndoCSS(String ccsId) throws DatabaseException {
550
551                 String[] ss = ccsId.split("\\.");
552                 String chunkKey = ss[0];
553                 int chunkOffset = Integer.parseInt(ss[1]);
554                 ClusterStreamChunk chunk = clusters.streamLRU.getWithoutMutex(chunkKey);
555                 if(chunk == null) throw new IllegalStateException("Cluster Stream Chunk " + chunkKey + " was not found.");
556                 chunk.acquireMutex();
557                 try {
558                         return chunk.getUndoProcessor(clusters, chunkOffset, ccsId);
559                 } catch (Throwable t) {
560                         throw new IllegalStateException(t);
561                 } finally {
562                         chunk.releaseMutex();
563                 }
564                 
565         }
566         
567         private void performUndo(String ccsId, ArrayList<Pair<ClusterUID, byte[]>> clusterChanges, UndoClusterSupport support) throws ProCoreException, DatabaseException {
568
569                 UndoClusterUpdateProcessor proc = getUndoCSS(ccsId);
570
571                 int clusterKey = clusters.getClusterKeyByClusterUIDOrMakeWithoutMutex(proc.getClusterUID());
572
573                 clusters.clusterLRU.acquireMutex();
574                 try {
575
576                         ClusterChange cs = new ClusterChange(clusterChanges, proc.getClusterUID());
577                         for(int i=0;i<proc.entries.size();i++) {
578                                 
579                                 Entry e = proc.entries.get(proc.entries.size() - 1 - i);
580                                 e.process(clusters, cs, clusterKey);
581                                 
582                         }
583                         
584                         cs.flush();
585
586                 } finally {
587                         clusters.clusterLRU.releaseMutex();
588                 }
589                 
590         }
591         
592         @Override
593         public boolean undo(long[] changeSetIds, OnChangeSetUpdate onChangeSetUpdate) throws SDBException {
594                 
595                 final ArrayList<Pair<ClusterUID, byte[]>> clusterChanges = new ArrayList<Pair<ClusterUID, byte[]>>();
596                 
597                 UndoClusterSupport support = new UndoClusterSupport(clusters);
598                 
599                 final int changeSetId = clusters.state.headChangeSetId;
600                 
601                 if(ClusterUpdateProcessorBase.DEBUG)
602                         System.err.println(" === BEGIN UNDO ===");
603                 
604                 for(int i=0;i<changeSetIds.length;i++) {
605                         final long id = changeSetIds[changeSetIds.length-1-i];
606                         ArrayList<String> ccss = clusters.getChanges(id);
607                         for(int j=0;j<ccss.size();j++) {
608                                 try {
609                                         if(ClusterUpdateProcessorBase.DEBUG)
610                                                 System.err.println("performUndo " + ccss.get(ccss.size()-j-1));
611                                         performUndo(ccss.get(ccss.size()-j-1), clusterChanges, support);
612                                 } catch (DatabaseException e) {
613                                         e.printStackTrace();
614                                 }
615                         }
616                 }
617
618                 if(ClusterUpdateProcessorBase.DEBUG)
619                         System.err.println(" === END UNDO ===");
620
621                 for(int i=0;i<clusterChanges.size();i++) {
622                         
623                         final int changeSetIndex = i;
624                         
625                         final Pair<ClusterUID, byte[]> pair = clusterChanges.get(i);
626                         
627                         final ClusterUID cuid = pair.first;
628                         final byte[] data = pair.second;
629
630                         onChangeSetUpdate.onChangeSetUpdate(new ChangeSetUpdate() {
631
632                                 @Override
633                                 public long getChangeSetId() {
634                                         return changeSetId;
635                                 }
636
637                                 @Override
638                                 public int getChangeSetIndex() {
639                                         return 0;
640                                 }
641
642                                 @Override
643                                 public int getNumberOfClusterChangeSets() {
644                                         return clusterChanges.size();
645                                 }
646
647                                 @Override
648                                 public int getIndexOfClusterChangeSet() {
649                                         return changeSetIndex;
650                                 }
651
652                                 @Override
653                                 public byte[] getClusterId() {
654                                         return cuid.asBytes();
655                                 }
656
657                                 @Override
658                                 public boolean getNewCluster() {
659                                         return false;
660                                 }
661
662                                 @Override
663                                 public byte[] getData() {
664                                         return data;
665                                 }
666
667                         });
668
669                 }
670
671                 
672                 return false;
673                 
674         }
675         
676         public static GraphClientImpl2 getInstance() {
677             return INSTANCE;
678         }
679         
680         public ServiceLocator getServiceLocator() {
681             return locator;
682         }
683
684     @Override
685     public boolean refreshEnabled() {
686         return false;
687     }
688
689         
690         
691         
692         
693         
694         
695         
696         
697         
698         
699         ////////////////////////
700         
701         
702         
703         
704         
705         
706         
707         
708         
709         
710         
711         
712 }
713