19 package org.sleuthkit.autopsy.ingest;
21 import com.google.common.eventbus.Subscribe;
22 import com.google.common.util.concurrent.ThreadFactoryBuilder;
23 import java.awt.EventQueue;
24 import java.beans.PropertyChangeEvent;
25 import java.beans.PropertyChangeListener;
26 import java.io.Serializable;
27 import java.lang.reflect.InvocationTargetException;
28 import java.util.ArrayList;
29 import java.util.Collection;
30 import java.util.Collections;
31 import java.util.Date;
32 import java.util.EnumSet;
33 import java.util.HashMap;
34 import java.util.HashSet;
35 import java.util.List;
37 import java.util.Optional;
39 import java.util.concurrent.Callable;
40 import java.util.concurrent.ConcurrentHashMap;
41 import java.util.concurrent.ExecutorService;
42 import java.util.concurrent.Executors;
43 import java.util.concurrent.Future;
44 import java.util.concurrent.atomic.AtomicLong;
45 import java.util.logging.Level;
46 import java.util.stream.Collectors;
47 import java.util.stream.Stream;
48 import javax.annotation.concurrent.GuardedBy;
49 import javax.annotation.concurrent.Immutable;
50 import javax.annotation.concurrent.ThreadSafe;
51 import javax.swing.JOptionPane;
52 import javax.swing.SwingUtilities;
53 import org.netbeans.api.progress.ProgressHandle;
54 import org.openide.util.Cancellable;
55 import org.openide.util.NbBundle;
56 import org.openide.windows.WindowManager;
129 @GuardedBy(
"IngestManager.class")
133 private final ExecutorService
startIngestJobsExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-start-ingest-jobs-%d").build());
142 private final ExecutorService
eventPublishingExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-ingest-events-%d").build());
161 if (null == instance) {
163 instance.subscribeToServiceMonitorEvents();
164 instance.subscribeToCaseEvents();
203 PropertyChangeListener propChangeListener = (PropertyChangeEvent evt) -> {
218 logger.log(Level.SEVERE,
"Service {0} is down, cancelling all running ingest jobs", serviceDisplayName);
220 EventQueue.invokeLater(
new Runnable() {
223 JOptionPane.showMessageDialog(WindowManager.getDefault().getMainWindow(),
224 NbBundle.getMessage(this.getClass(),
"IngestManager.cancellingIngest.msgDlg.text"),
225 NbBundle.getMessage(this.getClass(),
"IngestManager.serviceIsDown.msgDlg.text", serviceDisplayName),
226 JOptionPane.ERROR_MESSAGE);
240 Set<String> servicesList =
new HashSet<>();
252 if (event.getNewValue() != null) {
268 void handleCaseOpened() {
273 String channelPrefix = openedCase.
getName();
280 logger.log(Level.SEVERE,
"Failed to open remote events channel", ex);
282 NbBundle.getMessage(
IngestManager.class,
"IngestManager.OpenEventChannel.Fail.ErrMsg"));
293 void handleArtifactsPosted(Blackboard.ArtifactsPostedEvent tskEvent) {
298 List<DataArtifact> newDataArtifacts =
new ArrayList<>();
299 List<AnalysisResult> newAnalysisResults =
new ArrayList<>();
300 Collection<BlackboardArtifact> newArtifacts = tskEvent.getArtifacts();
301 for (BlackboardArtifact artifact : newArtifacts) {
302 if (artifact instanceof DataArtifact) {
303 newDataArtifacts.add((DataArtifact) artifact);
305 newAnalysisResults.add((AnalysisResult) artifact);
308 if (!newDataArtifacts.isEmpty() || !newAnalysisResults.isEmpty()) {
310 Optional<Long> ingestJobId = tskEvent.getIngestJobId();
311 if (ingestJobId.isPresent()) {
370 BlackboardArtifact artifact = newArtifacts.iterator().next();
371 if (artifact != null) {
373 Content artifactDataSource = artifact.getDataSource();
376 Content dataSource = job.getDataSource();
377 if (artifactDataSource.getId() == dataSource.getId()) {
383 }
catch (TskCoreException ex) {
384 logger.log(Level.SEVERE, String.format(
"Failed to get data source for blackboard artifact (object ID = %d)", artifact.getId()), ex);
388 if (ingestJob != null) {
389 if (!newDataArtifacts.isEmpty()) {
390 ingestJob.addDataArtifacts(newDataArtifacts);
392 if (!newAnalysisResults.isEmpty()) {
393 ingestJob.addAnalysisResults(newAnalysisResults);
402 for (BlackboardArtifact.Type artifactType : tskEvent.getArtifactTypes()) {
418 void handleCaseClosed() {
443 if (!(dataSource instanceof DataSource)) {
444 throw new IllegalArgumentException(
"dataSource argument does not implement the DataSource interface");
447 IngestJobInputStream stream =
new IngestJobInputStream(job);
448 if (stream.getIngestJobStartResult().getJob() != null) {
450 }
else if (stream.getIngestJobStartResult().getModuleErrors().isEmpty()) {
451 for (
IngestModuleError error : stream.getIngestJobStartResult().getModuleErrors()) {
452 logger.log(Level.SEVERE, String.format(
"%s ingest module startup error for %s", error.getModuleDisplayName(), dataSource.getName()), error.getThrowable());
454 throw new TskCoreException(
"Error starting ingest modules");
456 throw new TskCoreException(
"Error starting ingest modules", stream.getIngestJobStartResult().getStartupException());
478 List<AbstractFile> emptyFilesSubset =
new ArrayList<>();
479 for (Content dataSource : dataSources) {
495 if (!(dataSource instanceof DataSource)) {
496 throw new IllegalArgumentException(
"dataSource argument does not implement the DataSource interface");
500 if (job.hasIngestPipeline()) {
525 List<DataSource> verifiedDataSources =
new ArrayList<>();
526 for (Content content : dataSources) {
527 if (!(content instanceof DataSource)) {
528 throw new IllegalArgumentException(
"Content object in dataSources argument does not implement the DataSource interface");
530 DataSource verifiedDataSource = (DataSource) content;
531 verifiedDataSources.add(verifiedDataSource);
535 for (DataSource dataSource : verifiedDataSources) {
536 List<IngestJob> startedJobs =
new ArrayList<>();
538 if (job.hasIngestPipeline()) {
539 startResult = startIngestJob(job);
541 startedJobs.add(job);
543 for (
IngestJob jobToCancel : startedJobs) {
568 "IngestManager.startupErr.dlgTitle=Ingest Module Startup Failure",
569 "IngestManager.startupErr.dlgMsg=Unable to start up one or more ingest modules, ingest cancelled.",
570 "IngestManager.startupErr.dlgSolution=Please disable the failed modules or fix the errors before restarting ingest.",
571 "IngestManager.startupErr.dlgErrorList=Errors:" 577 if (SwingUtilities.isEventDispatchThread()) {
578 initIngestMessageInbox();
581 SwingUtilities.invokeAndWait(() -> initIngestMessageInbox());
582 }
catch (InterruptedException ex) {
584 }
catch (InvocationTargetException ex) {
585 logger.log(Level.WARNING,
"There was an error starting ingest message inbox", ex);
589 List<IngestModuleError> errors = null;
600 EventQueue.invokeLater(
new Runnable() {
604 JOptionPane.showMessageDialog(WindowManager.getDefault().getMainWindow(),
605 NbBundle.getMessage(this.getClass(),
"IngestManager.cancellingIngest.msgDlg.text"),
606 NbBundle.getMessage(this.getClass(),
"IngestManager.serviceIsDown.msgDlg.text", serviceDisplayName),
607 JOptionPane.ERROR_MESSAGE);
625 IngestManager.
logger.log(Level.INFO, String.format(
"Starting ingest job %d at %s", job.
getId(),
new Date().getTime()));
627 errors = job.start();
628 }
catch (InterruptedException ex) {
631 if (errors.isEmpty()) {
632 this.fireIngestJobStarted(job.
getId());
638 logger.log(Level.SEVERE, String.format(
"Error starting %s ingest module for job %d", error.getModuleDisplayName(), job.
getId()), error.getThrowable());
642 final StringBuilder message =
new StringBuilder(1024);
643 message.append(Bundle.IngestManager_startupErr_dlgMsg()).append(
"\n");
644 message.append(Bundle.IngestManager_startupErr_dlgSolution()).append(
"\n\n");
645 message.append(Bundle.IngestManager_startupErr_dlgErrorList()).append(
"\n");
647 String moduleName = error.getModuleDisplayName();
648 String errorMessage = error.getThrowable().getLocalizedMessage();
649 message.append(moduleName).append(
": ").append(errorMessage).append(
"\n");
651 message.append(
"\n\n");
652 EventQueue.invokeLater(() -> {
653 JOptionPane.showMessageDialog(WindowManager.getDefault().getMainWindow(), message, Bundle.IngestManager_startupErr_dlgTitle(), JOptionPane.ERROR_MESSAGE);
668 long jobId = job.
getId();
673 IngestManager.
logger.log(Level.INFO, String.format(
"Ingest job %d completed at %s", job.
getId(),
new Date().getTime()));
674 fireIngestJobCompleted(jobId);
676 IngestManager.
logger.log(Level.INFO, String.format(
"Ingest job %d cancelled at %s", job.
getId(),
new Date().getTime()));
677 fireIngestJobCancelled(jobId);
800 void fireIngestJobStarted(
long ingestJobId) {
810 void fireIngestJobCompleted(
long ingestJobId) {
820 void fireIngestJobCancelled(
long ingestJobId) {
832 void fireDataSourceAnalysisStarted(
long ingestJobId, Content dataSource) {
844 void fireDataSourceAnalysisCompleted(
long ingestJobId, Content dataSource) {
856 void fireDataSourceAnalysisCancelled(
long ingestJobId, Content dataSource) {
867 void fireFileIngestDone(AbstractFile file) {
892 void initIngestMessageInbox() {
910 if (errorPosts <= MAX_ERROR_MESSAGE_POSTS) {
912 }
else if (errorPosts == MAX_ERROR_MESSAGE_POSTS + 1) {
914 NbBundle.getMessage(
this.getClass(),
"IngestManager.IngestMessage.ErrorMessageLimitReached.title"),
915 NbBundle.getMessage(
this.getClass(),
"IngestManager.IngestMessage.ErrorMessageLimitReached.subject"),
916 NbBundle.getMessage(
this.getClass(),
"IngestManager.IngestMessage.ErrorMessageLimitReached.msg",
MAX_ERROR_MESSAGE_POSTS));
946 void setIngestTaskProgress(IngestTask task, String currentModuleName) {
950 incrementModuleRunTime(prevSnap.getModuleDisplayName(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
960 void setIngestTaskProgressCompleted(IngestTask task) {
964 incrementModuleRunTime(prevSnap.getModuleDisplayName(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
973 void incrementModuleRunTime(String moduleDisplayName, Long duration) {
974 if (moduleDisplayName.equals(
"IDLE")) {
981 if (prevTimeL != null) {
982 prevTime = prevTimeL;
984 prevTime += duration;
1020 List<IngestJobProgressSnapshot> snapShots =
new ArrayList<>();
1024 if (snapshot != null) {
1025 snapShots.add(snapshot);
1038 long getFreeDiskSpace() {
1063 if (Thread.currentThread().isInterrupted()) {
1071 final String displayName = NbBundle.getMessage(this.getClass(),
"IngestManager.StartIngestJobsTask.run.displayName");
1072 this.progress = ProgressHandle.createHandle(displayName,
new Cancellable() {
1074 public boolean cancel() {
1075 if (progress != null) {
1076 progress.setDisplayName(NbBundle.getMessage(
this.getClass(),
"IngestManager.StartIngestJobsTask.run.cancelling", displayName));
1080 handle.cancel(
true);
1088 startIngestJob(job);
1092 if (null != progress) {
1109 private final BlockingIngestTaskQueue
tasks;
1120 IngestTask task = tasks.getNextTask();
1121 task.execute(threadId);
1122 }
catch (InterruptedException ex) {
1125 if (Thread.currentThread().isInterrupted()) {
1149 this.publisher = publisher;
1165 private static final long serialVersionUID = 1L;
1181 startTime =
new Date();
1182 this.moduleDisplayName = NbBundle.getMessage(this.getClass(),
"IngestManager.IngestThreadActivitySnapshot.idleThread");
1183 this.dataSourceName =
"";
1204 startTime =
new Date();
1205 this.moduleDisplayName = moduleDisplayName;
1206 this.dataSourceName = dataSource.getName();
1228 startTime =
new Date();
1229 this.moduleDisplayName = moduleDisplayName;
1230 this.dataSourceName = dataSource.getName();
1231 this.fileName = fileName;
1240 long getIngestJobId() {
1249 long getThreadId() {
1258 Date getStartTime() {
1259 return new Date(startTime.getTime());
1267 String getModuleDisplayName() {
1268 return moduleDisplayName;
1277 String getDataSourceName() {
1278 return dataSourceName;
1286 String getFileName() {
1368 private static final long serialVersionUID = 1L;
1386 super(message, cause);
final ConcurrentHashMap< String, Long > ingestModuleRunTimes
void addIngestModuleEventListener(Set< IngestModuleEvent > eventTypes, final PropertyChangeListener listener)
final Map< Long, Future< Void > > startIngestJobFutures
List< IngestJobProgressSnapshot > getIngestJobSnapshots()
String getServiceStatus(String service)
void removeIngestModuleEventListener(final PropertyChangeListener listener)
IngestStream openIngestStream(DataSource dataSource, IngestJobSettings settings)
final String moduleDisplayName
static final String INGEST_MODULE_EVENT_CHANNEL_NAME
List< IngestThreadActivitySnapshot > getIngestThreadActivitySnapshots()
void queueIngestJob(Content dataSource, List< AbstractFile > files, IngestJobSettings settings)
IngestManagerException(String message, Throwable cause)
static synchronized IngestManager getInstance()
static IngestManager instance
final String dataSourceName
void removeIngestModuleEventListener(Set< IngestModuleEvent > eventTypes, final PropertyChangeListener listener)
final ExecutorService dataSourceLevelIngestJobTasksExecutor
static boolean runningWithGUI
static IngestMessage createErrorMessage(String source, String subject, String detailsHtml)
void cancelAllIngestJobs()
void publish(AutopsyEvent event)
final ExecutorService dataArtifactIngestTasksExecutor
static void addPropertyChangeListener(final PropertyChangeListener listener)
MessageType getMessageType()
IngestJobStartResult beginIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
static final Logger logger
final ExecutorService eventPublishingExecutor
void clearIngestMessageBox()
void addSubscriber(Set< String > eventNames, PropertyChangeListener subscriber)
void subscribeToServiceMonitorEvents()
boolean isIngestRunning()
DATA_SOURCE_ANALYSIS_COMPLETED
static void removePropertyChangeListener(final PropertyChangeListener listener)
static final Set< String > INGEST_MODULE_EVENT_NAMES
volatile IngestMessageTopComponent ingestMessageBox
void addSubscriber(PropertyChangeListener subscriber)
final AutopsyEventPublisher publisher
synchronized void closeRemoteEventChannel()
Map< String, Long > getModuleRunTimes()
final ServicesMonitor servicesMonitor
void removeIngestJobEventListener(final PropertyChangeListener listener)
void addIngestJobEventListener(Set< IngestJobEvent > eventTypes, final PropertyChangeListener listener)
final BlockingIngestTaskQueue tasks
static final String INGEST_JOB_EVENT_CHANNEL_NAME
List< IngestModuleError > getModuleErrors()
final ExecutorService analysisResultIngestTasksExecutor
static final Set< String > INGEST_JOB_EVENT_NAMES
final AutopsyEventPublisher moduleEventPublisher
static int numberOfFileIngestThreads()
synchronized void openRemoteEventChannel(String channelName)
void addIngestJobEventListener(final PropertyChangeListener listener)
final Object ingestMessageBoxLock
INGEST_MODULES_STARTUP_FAILED
IngestManagerException(String message)
SleuthkitCase getSleuthkitCase()
void queueIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
void removeSubscriber(Set< String > eventNames, PropertyChangeListener subscriber)
static final int MAX_ERROR_MESSAGE_POSTS
final AtomicLong ingestErrorMessagePosts
volatile boolean caseIsOpen
final AtomicLong nextIngestManagerTaskId
int getNumberOfFileIngestThreads()
static void error(String title, String message)
void addIngestModuleEventListener(final PropertyChangeListener listener)
static Case getCurrentCase()
synchronized static Logger getLogger(String name)
DATA_SOURCE_ANALYSIS_STARTED
static Case getCurrentCaseThrows()
static void addEventTypeSubscriber(Set< Events > eventTypes, PropertyChangeListener subscriber)
synchronized IngestJob startIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
final ConcurrentHashMap< Long, IngestThreadActivitySnapshot > ingestThreadActivitySnapshots
void cancelAllIngestJobs(IngestJob.CancellationReason reason)
final IngestMonitor ingestMonitor
IngestManager.IngestManagerException getStartupException()
final ExecutorService fileLevelIngestJobTasksExecutor
final Map< Long, IngestJob > ingestJobsById
void subscribeToCaseEvents()
final ExecutorService startIngestJobsExecutor
void removeIngestJobEventListener(Set< IngestJobEvent > eventTypes, final PropertyChangeListener listener)
final int numberOfFileIngestThreads
final AutopsyEventPublisher jobEventPublisher