Object detection from fMRIΒΆ
In [1]:
from scipy.linalg import solve
from scipy.special import erf
import numpy as np
import pandas as pd
import os, pickle
import matplotlib.pyplot as plt
import multiprocessing as mp
import sklearn
from sklearn.metrics import auc
%matplotlib inline
data_dir = '../../../data/vim2/'
subject = 1
lag = 1
n_voxels = 100
train_frac = 0.8
In [2]:
with open(os.path.join(data_dir, 'st_objects_py2.pkl'), 'rb') as f:
objects = pickle.load(f)
n_frames = len(objects)
unique_objects = np.unique([k for d in objects for k in d])
n_unique_objects = len(unique_objects)
object_index = dict(zip(unique_objects, range(n_unique_objects)))
confidence = np.zeros((n_unique_objects, n_frames))
for frame, d in enumerate(objects):
for o, c in d.iteritems():
confidence[object_index[o], frame] = c
confidence = np.stack([sec.mean(1) for sec in np.split(confidence, n_frames / 15, axis=1)]).T
confidence = np.roll(confidence, lag)[:, lag:]
confidence_barcode = ~np.isclose(confidence, 0)
order = np.argsort(confidence_barcode.sum(1))[::-1]
confidence = confidence[order]
confidence_barcode = confidence_barcode[order]
unique_objects = unique_objects[order]
object_index = dict(zip(unique_objects, range(n_unique_objects)))
In [3]:
fig = plt.figure(figsize=(16,12))
ax = plt.gca()
ax.imshow(confidence_barcode, cmap='Greys', aspect='auto')
ax.set_yticks(range(n_unique_objects))
ax.set_yticklabels(unique_objects)
ax.set_xlabel('time (sec)')
plt.show()
In [4]:
voxels = np.load(os.path.join(data_dir, 'subject_%i' % (subject, ), 'rt.npy'))[:, lag:]
active_voxels = np.empty((n_unique_objects, n_voxels), dtype=int)
for o, i in object_index.iteritems():
active_voxels[i] = np.abs(voxels[:, confidence_barcode[i]]).sum(1).argsort()[-n_voxels:][::-1]
In [5]:
n_seconds = confidence.shape[1]
t = np.arange(n_seconds)
split = int(n_seconds * train_frac)
t_train, t_test = t[:split], t[split:]
l = float(len(t_train)-1)
t_train1, t_train2 = t_train[:-1], t_train[1:]
t_test1, t_test2 = t_test[:-1], t_test[1:]
def fit(i, iters=500, atol=1e-8):
x = voxels[active_voxels[i]]
y = confidence[i] - confidence[i].mean()
x_train = x[:, t_train]
x_train1, x_test1 = x[:,t_train1], x[:,t_test1]
s = np.sign(np.diff(y[t_train]))
c = np.cov(x_train)
x0 = (x_train1 - x_train.mean(1)[:, np.newaxis]) / l
w = np.zeros(n_voxels)
w[0] = 1
erf_last = np.inf
e = []
for it in range(iters):
h = w.dot(x_train1)
erf_next = erf(h)
ei = np.linalg.norm(erf_next - erf_last)
e.append(ei)
if ei * ei < atol:
break
erf_last = erf_next.copy()
h *= s / erf_next
w = solve(c, x0.dot(h))
w /= np.sqrt(2)
return w, e
In [6]:
pool = mp.Pool(processes=mp.cpu_count())
res = pool.map(fit, range(n_unique_objects))
pool.close()
pool.terminate()
pool.join()
w = np.vstack([r[0] for r in res])
e = [r[1] for r in res]
fig, ax = plt.subplots(1, 1, figsize=(4,4))
for ei in e:
ax.plot(ei, 'k-', lw=0.1)
ax.set_xscale('log')
ax.set_yscale('log')
ax.set_xlabel('iteration')
ax.set_ylabel('discrepancy')
plt.tight_layout()
plt.show()
In [7]:
fig = plt.figure(figsize=(16,12))
ax = plt.gca()
i = np.repeat(range(n_unique_objects), n_voxels).flatten()
s = 10 + w.min() + w.flatten()
c = w.flatten()
scale = np.abs(w).max()
ax.scatter(active_voxels.T.flatten(), i, c=c, s=s, cmap='seismic', vmin=-scale, vmax=scale)
ax.set_yticks(range(n_unique_objects)[::-1])
ax.set_yticklabels(unique_objects)
ax.set_xlabel('voxel')
plt.show()
In [8]:
train_prediction = np.empty((n_unique_objects, len(t_train2)))
test_prediction = np.empty((n_unique_objects, len(t_test2)))
for i in range(n_unique_objects):
train_prediction[i] = confidence[i, t_train1] + w[i].dot(voxels[np.ix_(active_voxels[i],t_train1)])
test_prediction[i] = confidence[i, t_test1] + w[i].dot(voxels[np.ix_(active_voxels[i],t_test1)])
train_prediction[i][train_prediction[i] < 0] = 0
test_prediction[i][test_prediction[i] < 0] = 0
train_prediction[i][train_prediction[i] > 1] = 1
test_prediction[i][test_prediction[i] > 1] = 1
fig, ax = plt.subplots(n_unique_objects, 1, figsize=(14, 2*n_unique_objects))
for o, i in object_index.iteritems():
ax[i].plot(t_train2, train_prediction[i], 'b-',
t_test2, test_prediction[i], 'r-',
t, confidence[i], 'k-', clip_on=False)
ax[i].set_ylim(0,1)
ax[i].text(0, 0.95, o, ha='left', va='top', color='g')
plt.show()
In [9]:
def roc(true, prediction, n_threshold=500):
tpr, fpr = np.empty(n_threshold), np.empty(n_threshold)
positives, negatives = float(true.sum()), float((~true).sum())
min_thr = min(true.min(), prediction.min())
max_thr = max(true.max(), prediction.max())
for i, thr in enumerate(np.linspace(min_thr, max_thr, n_threshold)):
predicted_true = prediction >= thr
tp, fp = true & predicted_true, ~true & predicted_true
tn, fn = ~true & ~predicted_true, true & ~predicted_true
tpr[i], fpr[i] = tp.sum(), fp.sum()
if positives:
tpr[i] /= positives
if negatives:
fpr[i] /= negatives
return fpr, tpr, auc(fpr, tpr)
In [10]:
fig, ax = plt.subplots(1, 3, figsize=(12, 4))
for o, i in object_index.iteritems():
mean, std = confidence[i].mean(), confidence[i].std()
train_true = confidence[i, t_train2] > mean + 3*std
test_true = confidence[i, t_test2] > mean + 3*std
train_fpr, train_tpr, train_auc = roc(train_true, train_prediction[i])
test_fpr, test_tpr, test_auc = roc(test_true, test_prediction[i])
ax[0].plot(train_fpr, train_tpr)
ax[1].plot(test_fpr, test_tpr)
ax[2].scatter(train_auc, test_auc, clip_on=False)
ax[0].set_title('training')
ax[1].set_title('testing')
for a in ax[:2]:
a.set_ylabel('true positive rate')
a.set_xlabel('false positive rate')
ax[2].set_xlim(0, 1)
ax[2].set_ylim(0, 1)
ax[2].set_xlabel('training AUC')
ax[2].set_ylabel('testing AUC')
plt.tight_layout()
plt.show()
plt.close()
In [ ]: