Saturday, December 2, 2017

WordPress NextGen Gallery Error - No Images Found

Summary

Possible fix below if none of your pictures will display in NextGen Gallery.

Implementation

Check your mysql.log in /var/log.  If you see this, then this post may have the solution for you:
171202 17:17:36 [ERROR] /usr/libexec/mysqld: Table 'wp_ngg_pictures' is marked as crashed and should be repaired
Run the commands below (need to be root):
/etc/init.d/mysqld stop
myisamchk -r wp_ngg_pictures
/etc/init.d/mysqld start

Friday, November 24, 2017

Fire Ants Do Monte Carlo


A player that's been associated with hot hands

Summary

Monte Carlo methods refer to a technique for approximating a solution to a problem that may be difficult or simply impossible to resolve via a closed-form/analytical solution.  The key concept in Monte Carlo is to simulate the problem repeatedly with random samples.  As the number of simulations grows, the results drive towards the actual solution.

In this post, I demonstrate two very simple simulations that in fact have known solutions:  Pi and e.  The third simulation is something I contrived with some effort (not much) at model accuracy: Estimate how long it takes to get attacked by fire ants in a typical backyard.

Calculation of Pi via Monte Carlo

This can be considered the canonical example of Monte Carlo as it has been well-documented.  The essence of how it works depends the ratios of area between a square and circle inscribed inside of the square.  You generate a random distribution of points within the square and the ratio of points inside the circle vs total points follows the area ratio.

Picture below depicts the problem set up.



Mathematical basis below.

\$Area \hspace{3px} of \hspace{3px} square = 4r^{2}\\Area \hspace{3px} of \hspace{3px} circle = \pi r^{2}\\Ratio \hspace{3px} of \hspace{3px}circle \hspace{3px} area \hspace{3px} to \hspace{3px} square \hspace{3px} area = \frac{{}\pi r^{2}}{4r^{2}} = \frac{\pi}{4}\\ \pi\approx4\cdot\frac{points\hspace{3px}inside\hspace{3px}circle}{total \hspace{3px}points}\$

Implementation in Python

def simulation(numPoints):
    in_circle = 0
    total = 0

    for _ in range(numPoints):
        x = random.uniform(0,2)
        y = random.uniform(0,2)
    
        d = (x-1)**2 + (y-1)**2
        if d <= 1.0:
            in_circle = in_circle + 1
        total = total + 1
    
    ans = 4 * in_circle/total
    return ans

if __name__ == '__main__': 
    random.seed()
    
    
    results = {}
    for numPoints in [10,100,1000,10000,100000,1000000]:
        ans = simulation(numPoints)
        results[numPoints] = ans
        
    frame = pd.DataFrame(data=list(results.items()), columns=['NumPoints', 'Result'])
    frame['PctError'] = ((frame['Result'] - math.pi) / math.pi).abs() * 100
    del frame['Result']
    frame.sort_values(by='NumPoints', inplace=True)
    frame.reset_index(inplace=True, drop=True)
    print(frame)
    frame.plot(x='NumPoints', y='PctError', kind='bar', title='Monte Carlo Pi Calculation', color=['b'])
    plt.show()

Results

   NumPoints   PctError
0         10  10.873232
1        100   3.132403
2       1000   0.967896
3      10000   0.763710
4     100000   0.225597
5    1000000   0.044670

Calculation of e via Monte Carlo

Stochastic representation of the mathematical constant e below.  You can go directly to code from this.

\$\boldsymbol{E}\left [ \zeta \right ]= e\\where \hspace{5px} \zeta = \min\left ( n |\sum_{i=1}^{n}{X_i} > 1\right )\\where \hspace{5px} X_i \hspace{3px} are \hspace{3px} random \hspace{3px} numbers \hspace{3px} from \hspace{3px} the \hspace{3px} uniform \hspace{3px} distribution \hspace{3px} [0,1]\$

Implementation in Python

def simulation(numIters):
    nsum = 0
    for _ in range(numIters):
        xsum = 0
        n = 0
        while xsum < 1:
            x = random.uniform(0,1)
            xsum = xsum + x 
            n = n + 1
    
        nsum = nsum + n
    
    return nsum/numIters

if __name__ == '__main__': 
    random.seed()
    
    results = {}
    for numIters in [10,100,1000,10000,100000,1000000]:
        ans = simulation(numIters)
        results[numIters] = ans
        
    frame = pd.DataFrame(data=list(results.items()), columns=['Iterations', 'Result'])
    frame['PctError'] = ((frame['Result'] - math.e) / math.e).abs() * 100
    del frame['Result']
    frame.sort_values(by='Iterations', inplace=True)
    frame.reset_index(inplace=True, drop=True)
    print(frame)
    frame.plot(x='Iterations', y='PctError', kind='bar', title='Monte Carlo e Calculation', color=['g'])
    plt.show()

Results

   Iterations  PctError
0          10  4.351345
1         100  1.040430
2        1000  0.819703
3       10000  0.058192
4      100000  0.072773
5     1000000  0.005512

Fire Ant Simulation

This last simulation is admittedly contrived and likely less than scientifically accurate.  The focus is on the Monte Carlo method.  However, I can say I have some first-hand experience with this model via ~20 years spent in Texas for under-grad work and various military assignments.  For those unfamiliar with fire ants - they are a uniquely offensive member of the animal kingdom.  They bite + sting and are extremely aggressive when disturbed.  Fire ants are in never-ending abundance in Texas.  On that note - I used some fire ant data from this Texas A&M website

This simulation is a model of a typical residential backyard and the fire ant population therein.  The question to be answered:  How long can I sit in my backyard until at least one fire ant is tearing me up?

Diagram of the model below:

Rude awakening imminent

Model Assumptions

  • 100' x 30' = 3000 sq ft 'yard'
  • 5 ant mounds in the yard
  • 500K ants
  • Ant mounds are randomly distributed in yard
  • Ants are randomly distributed amongst the mounds.
  • All ants start the simulation at their home mound position.
  • Ants have 2 states in the model.  Foraging or attacking.
  • 1 human exists in the model.  The human is randomly positioned in the yard with the assumption they will not sit directly on a mound.  This assumption is a partial deviation from reality to ensure statistically interesting experiments.  I've personally sat on a fire ant mound (inadvertently).  I don't recall it being statistically interesting.
  • Ants move 3 inches/second, or 1 foot in 4 seconds.
  • Time steps in the model are 4 seconds
  • Ants on the move ('foraging') will randomly choose among 8 possible directions at each time step.  This provides the randomness of the model.
  • Ants on the move will stay the course in 1 direction for an entire time step (same direction for 1 foot/4 seconds)
  • Ants can't leave the boundaries of the yard.  An attempt to do so results in no movement for that time step.
  • Only 50% of the ant population makes a move in any given time step.  The rest are idle for that time step.
  • Once an ant gets within 1 foot of the human position, it starts attacking.  An attacking ant never leaves the 'attacking' state.  Again, I can attest from personal experience that this represents reality.
  • The simulation ends when at least 1 ant achieves the 'attacking' state.

Python Implementation

class Yard(object):
    length = 100 #3000 sq ft yard
    width = 30
    numMounds = 5
    numAnts = numMounds * 100000 #100K ants per mound
    pctAntsActive = .5 #percentage of the ants moving at any given time step
    
    def __init__(self):
        random.seed()
        self.__setMounds()
        self.__setAnts()
        self.__setHuman()
        
    def __setMounds(self): 
        self.moundPositions = []
        xlist = random.sample(range(0, Yard.length+1), Yard.numMounds)
        ylist = random.sample(range(0, Yard.width+1), Yard.numMounds)
        for i in range(Yard.numMounds):
            mound = (xlist[i], ylist[i])
            self.moundPositions.append(mound)
    
    def __setAnts(self):
        self.ants = []
        for _ in range(Yard.numAnts):
            mound = random.choice(self.moundPositions)
            self.ants.append(Ant(mound))
                
    def __setHuman(self):
        done = False
        while not done:
            x = random.randint(0, Yard.length)
            y = random.randint(0, Yard.width)
            if (x, y) not in self.moundPositions: 
                done = True
            else:
                pass
            
        self.humanPosition = (x, y)
    
    def clockTick(self):
        antsAttacking = 0
        activeAnts = random.sample(range(0, Yard.numAnts), int(Yard.numAnts*Yard.pctAntsActive))
        
        for ant in activeAnts:
            state = self.ants[ant].move(self.humanPosition)
            if state == 'attacking':
                antsAttacking += 1
                break
            else:
                pass
        
        return antsAttacking
Line 1:  Class definition of the "Yard"
Lines 2-6:  Class variables capturing some the model assumptions
Lines 8-12:  Class initializer which sets up the model.
Lines 14-20:  Function to randomly distribute the ant mounds in the yard.
Lines 22-26:  Function to create the ant objects and randomly distribute them among the mounds.
Lines 28-38:  Function to randomly position the human in the yard, but not on top of a mound.
Lines 40-52:  Main driver of the simulation.  Provides a four-second time step during which a random 50% distribution of ants make a one-foot move.  Function returns the number of ants that reached the 'attacking' state in that time step.

class Ant(object):
    __directions = ['NW', 'N', 'NE', 'E', 'SE', 'S', 'SW', 'W']
    
    def __init__(self, position):
        self.position = position
        self.state = 'foraging'
    
    def __checkAttack(self, humanPosition):
        distance = math.sqrt((self.position[0] - humanPosition[0])**2 + (self.position[1] - humanPosition[1])**2)
        if distance <= 1:
            return 'attacking'
        else:
            return 'foraging'
    
    def move(self, humanPosition):
        if self.state == 'foraging':
            direction = random.choice(Ant.__directions)
              
            if direction == 'NW':
                x = self.position[0] - 1
                y = self.position[1] + 1
            elif direction == 'N':
                x = self.position[0] 
                y = self.position[1] + 1
            elif direction == 'NE':
                x = self.position[0] + 1
                y = self.position[1] + 1
            elif direction == 'E':
                x = self.position[0] + 1
                y = self.position[1] 
            elif direction == 'SE':
                x = self.position[0] + 1
                y = self.position[1] - 1
            elif direction == 'S':
                x = self.position[0] 
                y = self.position[1] - 1
            elif direction == 'SW':
                x = self.position[0] - 1
                y = self.position[1] - 1
            elif direction == 'W':
                x = self.position[0] - 1
                y = self.position[1]
            else:
                pass
              
            if x >= 0 and x <= Yard.length and y >= 0 and y <= Yard.width:
                self.position = (x, y)
            else:
                pass
            self.state = self.__checkAttack(humanPosition)
        else:
            pass
        
        return self.state
Line 1:  Ant class definition
Line 2:  Class variable containing a list of the 8 different directions an ant can move
Lines 4-6:  Class initializer setting an ant's initial position and state.
Lines 8-13:  Private class function for determining if an ant is within 1 foot of the human's position.  If so, the ant's state is set to 'attacking.'
Lines 15-54:  Resets an ant position by 1 foot in a randomly chosen direction.  If the resulting position is outside of the yard boundaries, the ant remains stationary.  Returns the new state of the ant based on its new position.
def simulation(): 
    yard = Yard()
    seconds = 0
    numAttacking = 0
    
    while numAttacking == 0:
        numAttacking = yard.clockTick()
        seconds += 4
    
    return seconds
Lines 1-10:  Global function providing simulation driver.  Sets up the 'yard' and runs a clock until at least 1 ant is attacking the human.

if __name__ == '__main__': 
    simIters = [25,50,75,100]
    avg = {}
    
    for iters in simIters:
        tot = 0
        pool = mp.Pool()
        results = []
        for _ in range(iters):
            results.append(pool.apply_async(simulation))       
        pool.close()
        pool.join()
        for res in results:
            tot += res.get()
        avg[iters] = round(tot / iters)
        print(iters, avg[iters])
    
    
    frame = pd.DataFrame(data=list(avg.items()), columns=['Iterations', 'Seconds'])
    frame.sort_values(by='Iterations', inplace=True)
    frame.reset_index(inplace=True, drop=True)
    print(frame)
    frame.plot(x='Iterations', y='Seconds', kind='bar', title='Monte Carlo Fire Ant Attack Simulation', color=['r'])
    plt.show()        
Lines 2-16:  Sets up a loop that proceeds through 4 different iteration runs of the simulation (25,50,75,100) and creates a multiprocessing pool to run them.  Run time of each simulation is stored for its particular iteration count and then averaged.
Lines 19-24:  Sets up a pandas data frame with the results and display a bar graph.

Results

25 198
50 189
75 121
100 166
   Iterations  Seconds
0          25      198
1          50      189
2          75      121
3         100      166

These simulations maxed out all four cores on my machine for lengthy periods of time.  My guess would be if I were to run higher iteration counts, the attack time would settle somewhere around 150 seconds as that appears to be the trend with these 4 trials.


Source: https://github.com/joeywhelan/montecarlo

Sunday, November 19, 2017

Partial Hash Collisions


In contrast, this can be considered a full collision


Summary

Proof-of-work is a critical component of the various crypto coin variants in use today.  In a nutshell, the blockchain algorithms that form the basis of crypto coins require participants to complete a provable amount of computation before a transaction can be added to the chain.  Partial hash collision is one such proof of work.

The post covers a toy implementation of partial hash collision.

Background

The basis for this proof-of-work method is hash digests.  An input string is processed through a cryptographically-strong function that outputs a fixed-length and unique hash value for that given input string.  Given a strong hash algorithm, it should be computationally unfeasible to find two input strings that create the same hash digest - also known as collision resistance.

Collisions on the full bit-length of a hash may be unfeasible, but collisions on a subset of the bits can be accomplished.  There are no known short-cuts to finding a collision but trial and error - submitting a different input string to a hash algorithm, over and over.  The typical partial hash collision method takes a string as input, concatenates a number to that string, then tests the high-order bits of the resulting hash.  If the necessary number of high-order bits are all 0, a solution has been found.

Example:  Say a given hash algorithm outputs a 10-bit hash and we want to find string with given input that results in the first 4 bits all equal to 0.  There are 2**10 total unique hashes available in this algorithm.  There are 2**6 hashes where the first 4 bits are 0.  In general, we would have to try 2**4 (16) inputs to the hash algorithm to find a solution.  Real world implementations deal with 160/256 bit hashes and collision lengths of >20 bits.

Implementation

Single Process

totalBits = 256  
collisionBits = 23 
mask = (1<<totalBits) - (1<<totalBits-collisionBits)
textInput = None

def singleProcessHash(text):
    count = 0    
    ti = time.time()
    
    while True:
        msg = text + ':' + str(count)
        dig = hashlib.sha256(msg.encode(encoding='utf_8', errors='strict')).hexdigest()
        res = int(dig,16) & mask
        if res == 0:
            break   
        count+=1
            
    print('Single-process run time: {:0.4f}'.format(time.time()-ti))
    print ('Message: ', msg)
    return msg
Lines 1-2:  Total bit length of the hash and the number of required high-order bits to be set to 0.  SHA-256 outputs a 256-bit hash.  For this example, the 23 high-order bits have to be 0 for a valid solution string.
Line 3:  Constucts a 256-bit mask with the 23 high-order bits set to 1.  All other bits are 0.
Lines 10-16: Brute-force loop that successively concats a number to the input text, hashes it thru SHA-256, and then does a bit-wise AND against the mask to test if the 23 high-order bits are all 0.  If the result isn't a partial collision, increment the counter and continue to loop.

Multiple Processes

def init(text):
    global textInput
    textInput = text
    
def findCollision(arg):
    global textInput
    msg = textInput + ':' + str(arg)
    hsh = hashlib.sha256(msg.encode(encoding='utf_8', errors='strict')).hexdigest()
    res = int(hsh,16) & mask
    if res == 0:
        return msg

def multiProcessHash(text):
    mult = 1
    numValues = 10**6
    chunkSize = math.ceil(numValues/mp.cpu_count())
    found = False
    msg = None
    ti = time.time()
   
    pool = mp.Pool(initializer=init, initargs=[text])
    while not found: 
        sample = range(numValues*(mult-1), numValues*mult)
        mult += 1
        results = pool.imap_unordered(findCollision, sample, chunkSize)
        for msg in results:
            if msg:
                pool.terminate()
                pool.join()
                found = True
                break

    print('Multi-process run time: {:0.4f}'.format(time.time()-ti))
    print('Message: ', msg)
    return msg
Lines 1-3:  Init function setting the text argument for each of the worker processes in the pool. Unfortunate aspect of Python, but use of globals is the most straight-forward way to accomplish this.
Lines 5-11:  Collision test function performed by each of the worker processes.
Line 15:  Variable to set up 1M values to be sent to the pool per iteration.
Line 16:  Chunk up those 1M test values evenly amongst the worker processes.  One worker per CPU.
Lines 21-31:  Set up the worker pool and loop through successive 1M value batches of samples until a partial collision is found.

Results

This was tested on a quad-core machine with hyper-threading (8 logical CPU's).
if __name__ == '__main__':
    singleProcessHash(string.ascii_uppercase)
    print()
    multiProcessHash(string.ascii_uppercase)  
Single-process run time: 18.9084
Message:  ABCDEFGHIJKLMNOPQRSTUVWXYZ:7201566

Multi-process run time: 6.7886
Message:  ABCDEFGHIJKLMNOPQRSTUVWXYZ:7201566

Friday, September 8, 2017

AI-Driven Investing with Lending Club - Part 5: Ensembling


Summary

This is the fifth installment in the series I've written on my experience implementing a machine learning engine for selecting loans for investment from Lending Club.  In this article, I'll discuss and apply a couple techniques for combining the results of multiple classifiers.
There's quite a bit of literature out there on ensembling as it has been a popular (and winning) technique in Kaggle competitions.  In a nutshell, ensembling attempts to combine the strengths of multiple classifiers to produce a classifier that is better than its parts.

The simplest ensemble, and in fact has an out-of-box implementation in scikit-learn - VotingClassifier, is voting.  In 'hard' voting, the majority prediction from multiple classifiers is taken to be the final prediction.  In 'soft' voting, the probabilities from multiple classifiers are summed to come up with the final prediction.

A more complex ensembling technique is known as stacking (sometimes also referred to as 'blending').  In this model, classifiers are 'stacked' in levels.  The outputs of the classifiers in the lower levels become the inputs to the classifier(s) in the upper levels.

An interesting variation of stacking was developed for the Netflix Prize competition in 2009 and was in fact the 2nd place winner.  Feature-weighted Linear Stacking takes 'meta-features' and combines them via linear regression to a stacked model.  Video here from Joe Sill, a member of the 2nd place team, describing the technique.

Implementation

I decided to approach this with a Python class (Ensemble) that implements both voting and stacking.  I settled on four classifiers:  GBC, MLP, RBM, and SVD.  I ran all four through a grid-search to find a good parameter set for each classifier for my particular data set.

Below is some code that tests each classifier in isolation and prints out a Pearson correlation matrix for the four classifiers.  Good classification performance per classifier and low-correlation between classifiers are goals for an ensemble.
        pool = mp.Pool(processes=mp.cpu_count())
        results = []

        for name, clf in self.estimators.items():
            try:
                self.estimators[name] = joblib.load('./models/' + name + '.pkl')
            except FileNotFoundError:  
                logging.debug('{} not pickled'.format(name))    
                results.append(pool.apply_async(lvl1_fit, args=(clf, name, features_train, labels_train)))           
           
        pool.close()
        pool.join() 
        for result in results:
            item = result.get()
            name = item['name']
            self.estimators[name] = item['fittedclf']
        
        #Print confusion matrix and score for each clf.  
        corr_list = []
        clf_list = []
        for name, clf in self.estimators.items():
            preds = clf.predict(features_test)
            self.confusion_matrix(name, labels_test, preds)
            print()
            self.classification_report(name, labels_test, preds)
            corr_list.append((name, preds))
            clf_list.append(name)
        
        #Print a matrix of correlations between clfs
        frame = pd.DataFrame(index=clf_list, columns=clf_list)
    
        for pair in itertools.combinations(corr_list,2):
            res = pearsonr(pair[0][1],pair[1][1])[0]
            frame[pair[0][0]][pair[1][0]] = res
            frame[pair[1][0]][pair[0][0]] = res
        frame['mean'] = frame.mean(skipna=True,axis=1)
        pd.options.display.width = 180
        print('Correlation Matrix')
        print(frame) 

Lines 1-16: Load up the fitted classifiers.  'estimators' is an OrderedDict of scikit-learn classifier objects.  If the fitted classifiers have already been cached to disk, use that.  Otherwise, fit them from scratch on the training set in a multiprocessing pool.
Lines 18-27:  Generate predictions for each classifier for the test set, then print a confusion matrix and classification report for each.
Lines 29-39:  Generate the Pearson correlation between each pair of classifiers and then organize the results in a matrix.

Excerpt of the output below:
svd Confusion Matrix (66392 samples): 
[[ 8599  4654]
 [22715 30424]]

svd Classification Report
             precision    recall  f1-score   support

    Default       0.27      0.65      0.39     13253
       Paid       0.87      0.57      0.69     53139

avg / total       0.75      0.59      0.63     66392

Correlation Matrix
          gbc       mlp       rbm       svd      mean
gbc       NaN  0.746548  0.603429  0.532055  0.627344
mlp  0.746548       NaN  0.516596  0.538988  0.600711
rbm  0.603429  0.516596       NaN  0.408401  0.509475
svd  0.532055  0.538988  0.408401       NaN  0.493148

Voting

The voting portion is quite simple as it is simply a wrapper around the out-of-box implementation from scikit-learn.

Fitting of the voting classifier below:
        try:
            self.voteclf = joblib.load('./models/voteclf.pkl')
        except FileNotFoundError: 
            ti = time() 
            self.voteclf = VotingClassifier(estimators=list(self.estimators.items()), voting='soft',n_jobs=-1)      
            self.voteclf.fit(features_train, labels_train)
            joblib.dump(self.voteclf, './models/voteclf.pkl') #cache the fitted model to disk
        logging.debug('Exiting __fit_vote()')
Lines 1-9:  If the voting classifier has already been fitted and cached to disk, load it.  Otherwise, fit it from scratch and dump the fitted model to disk.

Prediction with the voting classifier below:
preds = self.__predict_with_threshold(self.voteclf, features)

def __predict_with_threshold(self, clf, features):
        ti = time()
        predictions = Ensemble.__custom_predict(clf.predict_proba(features)[:, MINORITY_POS], \
                                                clf.predict(features), self.threshold)
        return predictions

__custom_predict = np.vectorize(vfunc, otypes=[np.int])

def vfunc(probability, prediction, threshold):
    if probability >= threshold:
        return MINORITY_CLASS
    else:
        return prediction
Line 1:  Pass the fitted voting classifier to a custom predict function that operates with thresholds.
Lines 3-15:  Custom prediction function that uses a threshold to decide whether the minority class should be chosen as the prediction.  This is a method to deal with unbalanced classes.

Stacking

The stacking implementation is significantly more complex.  Fitting, for example, requires fitting of each of the four 1st level classifiers and then fitting each of them in K-Fold cross-validation to generate the training data for the 2nd level classifier.  I'm using an unbalanced data set from Lending Club that has over 600K records.  Balancing that takes its size to a ~1M records.  Fitting on data sets of this size mandates use of multiprocessing and caching of the fitted classifiers to disk.

        pool = mp.Pool(processes=mp.cpu_count())
        results = [] #array for holding the result objects from the pool processes
        
        #fit 1st level estimators with a multiprocessing pool of workers
        for name, clf in self.estimators.items():
            try:
                self.estimators[name] = joblib.load('./models/' + name + '.pkl')
            except FileNotFoundError:  
                logging.debug('Level 1: {} not pickled'.format(name))    
                results.append(pool.apply_async(lvl1_fit, args=(clf, name, features_train, labels_train)))           
           
        pool.close()
        pool.join() 
       
        for result in results:
            item = result.get()
            name = item['name']
            self.estimators[name] = item['fittedclf'] #reassign a fitted clf to the estimator dictionary
        
        #fit 2nd level estimator with a multiprocessing pool of workers that perform a k-fold cross-val of 
        #training data
        pool = mp.Pool(processes=mp.cpu_count())
        del results[:]
        try:
            self.lrc = joblib.load('./models/lrc.pkl') #try to load the 2nd level estimator from disk
        except FileNotFoundError: #2nd level estimator not fitted yet
            logging.debug('Level 2: LRC not pickled') 
            folds = list(StratifiedKFold(n_splits=5).split(features_train, labels_train)) 
            #define a frame for holding the k-fold test results of the 1st level classifiers
            lvl2_frame = pd.DataFrame(index=range(0,len(features_train)), 
                                      columns=list(self.estimators.keys()))  
            lvl2_frame[LABEL_COL] = labels_train  
             
            #launch multiprocessing pool workers (1 per fold) that fit 1st level classifers and perform
            #predictions that become the training data for the 2nd level classifier (Logistic Regression)   
            for name,clf in self.estimators.items():
                fold = 1
                for train_idx, test_idx in folds:
                    X_train, X_test = features_train[train_idx], features_train[test_idx]
                    Y_train = labels_train[train_idx]
                    col_loc = lvl2_frame.columns.get_loc(name)
                    results.append(pool.apply_async(lvl2_fit, args=(clf, name, fold, test_idx, \
                                                                    col_loc, X_train, Y_train, X_test)))
                    fold = fold + 1
            pool.close()
            pool.join() 
           
            #fetch worker results and put them into a frame that will be used to train a 2nd Level/Logistic
            #regression classifier
            for result in results:
                item = result.get()
                name = item['name']
                test_idx = item['test_idx']
                col_loc = item['col_loc']
                preds = item['preds']
                lvl2_frame.iloc[test_idx, col_loc] = preds

            self.lrc = LogisticRegression(C=2.0)
            ti = time()
            X = lvl2_frame.drop(LABEL_COL, axis=1).values
            Y = lvl2_frame[LABEL_COL].values
            self.lrc.fit(X, Y)     
            logging.debug('LRC fit time: {:0.4f}'.format(time()-ti))
            joblib.dump(self.lrc, './models/lrc.pkl')  #cache the Logistical Regressor to disk

def lvl1_fit(clf, name, features_train, labels_train):
    logging.debug('Entering lvl1_fit() {}'.format(name))
    ti = time()
    fittedclf = clf.fit(features_train, labels_train)
    logging.debug('{} fit time: {:0.4f}'.format(name, time()-ti))
    joblib.dump(fittedclf, './models/' + name + '.pkl') #cache the fitted model to disk
    logging.debug('Exiting lvl1_fit() {}'.format(name))
    return {'name': name, 'fittedclf': fittedclf}

def lvl2_fit(clf, name, fold, test_idx, col_loc, features_train, labels_train, features_test):  
    logging.debug('Entering lvl2_fit() {} fold {}'.format(name, fold))
    ti = time()
    clf.fit(features_train, labels_train)
    logging.debug('{} fold {} fit time: {:0.4f}'.format(name, fold, time()-ti))
    preds = clf.predict_proba(features_test)[:, MINORITY_POS]
    logging.debug('Exiting lvl2_fit() {} fold {}'.format(name, fold))
    return {'name': name, 'test_idx' : test_idx, 'col_loc' : col_loc, 'preds' : preds}
Lines 1-18:  Attempt to load fitted classifiers from disk.  If they don't exist, use a pool of workers to fit each classifier to the full training set.
Lines 20-56:  With K-Fold (5 folds) cross validation, fit each of the classifiers and then generate predictions with the test set in that fold.  Save the predictions to a data frame.
Lines 58-64:  Fit the 2nd level classifier (Logistic Regression) to the predictions from the 1st level classifiers.  Dump the fitted Logistic classifier to disk.
Lines 66-73:  Function for fitting the 1st level classifiers and dumping them to disk.
Lines 75-82:  Function called within the K-folding for fitting 1st level classifiers and generating predictions to use to train the 2nd level classifier.

Generating predictions from the stacked ensemble requires running the features through all of the 1st level classifiers and then sending their output (predictions) to the 2nd level classifier - Logistic Regression.
 def __predict_stack(self, features):
        lvl1_frame = pd.DataFrame()
        #1st level predictions
        for name, clf in self.estimators.items():
            lvl1_frame[name] = clf.predict_proba(features)[:, MINORITY_POS]
            
        #2nd level predictions
        preds = self.__predict_with_threshold(self.lrc, lvl1_frame.values)
   
        return preds

Source:  https://github.com/joeywhelan/Ensemble/

Thursday, June 8, 2017

Secure DNS Conversion


Summary


I'll be discussing a little distraction I was working on lately in this article.  There are a large number of articles out there about the inherent insecurity of DNS.  Your requests/responses are clear text that run (predominantly) over UDP.  Every ISP, DNS provider, etc basically has goods on whatever Internet sites you may visit.

In a few lines of code (~200), I show one way to keep your DNS traffic private.  Part 1 hinges on Google's HTTPS DNS services.  I convert the UDP DNS request into a HTTPS request to Google for an encrypted DNS request/response system.  To raise this to another level of paranoia (for those that don't even trust Google with their DNS traffic), I show how to tunnel those requests to Google through Tor.

I made little attempt to faithfully follow the DNS RFC in this implementation.  Just wasn't necessary for my limited use-case.

Implementation - Part 1

Below is a diagram of the overall architecture I was looking for.  I use a local caching DNS server for my own network.  I reconfigure that DNS server to send all of its requests to my DNS converter that translates DNS UDP requests into HTTP calls to Google's DNS over HTTPS service.


Python Implementation


cfgParser = configparser.ConfigParser()
cfgParser.optionxform = str
cfgParser.read('sdns.cfg')  
host = cfgParser.get('ConfigData', 'host')
port = int(cfgParser.get('ConfigData', 'port'))
server = socketserver.UDPServer((host, port), DNSHandler)
server.serve_forever()
Lines 1-5:  Read the host IP address and Port for new DNS converter/forwarder.  DNS runs on port 53 typically.
Lines 8-9:  Start a UDP server on that IP address and port.  DNSHandler is a class that contains the code to process the DNS request and generate the response.
class DNSHandler(socketserver.BaseRequestHandler):
    
    def handle(self):
        data = self.request[0]
        socket = self.request[1]
        response = self.__createResponse(data)
        socket.sendto(response, self.client_address)
Lines 3-7:  Main code body.  Accepts the request, processes it, and send out the response.

    def __createResponse(self, data):
        tid = data[0:2] #transaction id
        opcode = data[2] & 0b01111000   #data[2] is the flags field. bits 2-5 is the opcode  
        name, queryType, question = self.__processQuestion(data[12:]) 
        
        if opcode == 0 and queryType == '1':  #RFC departure.  Only processing standard queries (0) and 'A' query types.  
            flags, numbers, records = self.__getRecords(name)
            response = tid + flags + numbers + question + records
        else:
            #qr (response), recursion desired, recursion avail bits set.  set the rcode to 'not implemented'
            flags = ((0b100000011000 << 4) | 4).to_bytes(2, byteorder='big') 
            numbers = (0).to_bytes(8, byteorder='big')
            response = tid + flags + numbers
 
        return response
Lines 2-3:  Parse out the DNS transaction ID and flags field from the request.
Line 4:  Send the rest of the DNS request to another function for parsing out the question.
Lines 6-8:  If it's a DNS request I chose to implement, standard 'A' type,  process it.
Lines 10-13:  If it's not a request I implemented, return a DNS error.

    def __processQuestion(self, quesData):
        i = 0
        name = ''
        
        while True:
            count = int.from_bytes(quesData[i:i+1], byteorder='big')
            i = i+1
            if count == 0:
                break
            else:
                name = name + str(quesData[i:i+count],'utf-8') + '.'
                i = i + count
            
        name = name[:-1]
        queryType = str(int.from_bytes(quesData[i:i+2], byteorder='big'))
        question = quesData[0:i+4]
   
        return name, queryType, question
Lines 5-12:  Loop thru the labels in the DNS question (it has a specific format, see RFC).
Lines 14-16: Set up 3 return variables with the domain name in question, query type, and the entire question byte array (for the response to be sent later).

    def __getRecords(self, name): 
        payload = {'name' : name, 'type' : '1'}
        data = requests.get(GOOGLE_DNS, params=payload).json()
   
        flags = self.__getFlags(data)
        records = bytes(0)
        count = 0
        if 'Answer' in data:
            for answer in data['Answer']:
                if answer['type'] == 1:
                    count = count + 1
                    name = (0xc00c).to_bytes(2, byteorder='big') #RFC departure.  Hard-coded offset to domain name in initial question.
                    rectype = (1).to_bytes(2, byteorder='big')
                    classtype = (1).to_bytes(2, byteorder='big')
                    ttl = answer['TTL'].to_bytes(4, byteorder='big')
                    length = (4).to_bytes(2, byteorder='big') #4 byte IP addresses only
                    quad = list(map(int, answer['data'].split('.')))
                    res = bytes(0)
                    for i in quad:
                        res = res + i.to_bytes(1, byteorder='big')
                    records = records + name + rectype + classtype + ttl + length + res
        
        nques = (1).to_bytes(2, byteorder='big') #hard coded to 1
        nans = (count).to_bytes(2, byteorder='big')
        nath = (0).to_bytes(2, byteorder='big')    #hard coded to 0
        nadd = (0).to_bytes(2, byteorder='big') #hard coded to 0
        numbers = nques + nans + nath + nadd
     
        return flags, numbers, records
Lines 2-3:  Send the DNS request to Google's HTTPS service.
Line 5:  Construct the flags field for the response based on the request flags field.
Lines 8-21:  Construct the Return Records fields for the response from the JSON response from Google's DNS/HTTPS service.
Lines 23-27:  Construct the number of records field for the response.
    def __getFlags(self, data):
        flags = 0b100000 #qr=1, opcode=0000, aa=0
        flags = (flags << 1) | data['TC'] #set tc bit
        flags = (flags << 1) | data['RD'] #set rd bit
        flags = (flags << 1) | data['RA'] #set ra bit
        flags = flags << 1 #One zero
        flags = (flags << 1) | data['AD'] #set ad bit
        flags = (flags << 1) | data['CD'] #set cd bit
        flags = ((flags << 4) | data['Status']).to_bytes(2, byteorder='big') 
 
        return flags
Lines 2-9:  Construct the flags field of the DNS response based on the JSON response from Google (and some hard-coded values).

Implementation - Part 2

Now to prevent even Google to track your DNS activity, we can send all this HTTPS traffic on a world-wide trip through the Tor network.  You need to set up a local Tor controller instance and then add the lines of code below to direct the HTTPS request through that network.  Here's an excellent explanation on that.


Python Implementation


PROXIES = {'http':  'socks5://127.0.0.1:9050',
           'https': 'socks5://127.0.0.1:9050'} 

data = requests.get(GOOGLE_DNS, params=payload, proxies=PROXIES).json()
Lines 1-2:  Tor uses port 9050 as its local SOCKS port.
Line 4: Simply modify the HTTPS Get to Google (from __getRecords()) with these proxies.
def renew():
    with Controller.from_port(port = 9051) as controller:
        controller.authenticate(password="test")
        controller.signal(Signal.NEWNYM)

  
scheduler = BackgroundScheduler()
scheduler.add_job(renew, 'interval', hours=1)
scheduler.start()
Lines 1-4:  For some extra security, signal your Tor controller to change its IP address periodically.
Lines 7-9:  Set that change interval with a scheduler.

Summary

This was a fairly simplistic DNS server implementation in Python.  I didn't attempt to implement the full the RFC as it's not necessary for my use case.  You get secure DNS traffic from this but at a price of latency.  The HTTPS conversion and Tor overhead equates to a factor of 10 increase in DNS latency (vast majority of that is attributable to Tor).  I don't find it noticeable though for my use.

Full source code here.

Sunday, February 26, 2017

IoT Data Pipeline - Part 3: Kafka Integration


Summary

In this post I'll scale up the architecture by simulating multiple IoT devices and then consolidate the resulting data stream into Apache Kafka.

IoT Data Pipeline - Part 2: Node/Java Crypto Interop
IoT Data Pipeline - Part 3:  Kafka Integration

Implementation

Diagram below depicting the overall set up I'm trying to achieve here.  I wanted to scale up the number of devices generating data streams to the IoT cloud and then consolidate them into a scalable streaming architecture - which is Kafka.  

As mentioned in the first post, this exercise is somewhat contrived.  I scaled up data generators by creating device simulators.  On the receiving side of the IoT cloud, multiple REST clients poll for data (1 per device) and then write into Kafka.

Lower level diagram of the code below:


Node.js - Device Server Code Modifications


var devices = [];
for (var i=0; i < properties.deviceCount; i++) {
 devices.push('pws_' + i);
}

function rand(min, max) {
 return Math.random() * (max - min) + min + 1;
}
function randomize(data) {
 for (var prop in data){
  if (data.hasOwnProperty(prop)) {
   rn = rand(-.5,.5);
   data[prop] = (data[prop] * rn).toFixed(2);
  }
 }
 return data;
}

function thingspace(requrl) {
 var query = url.parse(requrl, true).query;
 var wx_data = {'windspeedmph' : query.windspeedmph, 'solarradiation' : query.solarradiation};
 
 var options = {
   host : 'thingspace.io',
   port : '443',
   headers: {'Content-Type' : 'application/json'},
   method : 'POST'
 };
 
 async.map(devices, function(device, callback){
  var iv = crypto.randomBytes(16);
  var cipher = crypto.createCipheriv('aes-256-cbc', key, iv);
  wx_data = randomize(wx_data);
  wx_data.id = device;
  var cipherText = cipher.update(JSON.stringify(wx_data), 'utf8','hex') + cipher.final('hex');
  options.path = '/dweet/for/' + device;
  
  var retVal = '';
  var req = https.request(options, function(res) {
   res.on('data', function(chunk) {
    retVal += chunk;
   });
   res.on('end', function() {
    retVal = JSON.parse(retVal);
    callback(null, retVal.this);
   });
  });

  req.on('error', function(err1) {
   logger.error('File: main.js, Method: thingspace(), Message err1: ', err1.message);
   callback(err1, null);
  });
  req.write(JSON.stringify({'iv': iv.toString('base64'), 'mesg' : cipherText}));
  req.end(); 
 }, 
 function (err, results){
  if (err) {
   logger.error('File: main.js, Method: thingspace(), Message err: ', err.message);
  }
  else {
   logger.debug('Exiting - File: main.js, Method: thingspace()');
  }
 });
}


function handler(req, res) {
 wunderground(req.url, res);
 thingspace(req.url);
}

var httpServer = http.createServer(handler).listen(9080);
logger.debug('File: main.js, Listening on port 9080');
Lines 1-4:  Set up an array of simulated IoT devices.
Lines 6-17:  Couple functions to generate random data metrics to the simulated devices.  Data from the one real device is fed into these functions and random data for the simulators is generated from that.
Line 21:  The two metrics being sent from the devices are solar radiation and wind speed.
Lines 30-47:  This is where the device simulation happens.  Using the Async module, send out multiple HTTP REST requests (1 per simulated device) to the IoT cloud.

Java - Custom JSON Serializer/Deserializer for Kafka

Kafka operates with byte arrays.  It has a built-in serializer for Strings.  If you want pass anything other than that, you have to roll your own.

public class JSONSerializer implements Serializer {
 @Override
  public void close() {
  
  }
 
 @Override
  public void configure(Map arg0, boolean arg1) {
  
  }
 
 @Override
 public byte[] serialize(String s, JSONObject json) {
  byte[] byteArr = null;
  
     try {
      ByteArrayOutputStream out_byte = new ByteArrayOutputStream();
      ObjectOutputStream out_object = new ObjectOutputStream(out_byte);
      out_object.writeObject(json);
      byteArr = out_byte.toByteArray();
     }
     catch (Exception e) {
      e.printStackTrace();
     }
     
     return byteArr;
 }
}
Lines 17-20:  Convert the JSON object into a byte array.

public class JSONDeserializer implements Deserializer {
 @Override
  public void close() {
  
  }
 
 @Override
  public void configure(Map arg0, boolean arg1) {
  
  }
 
 @Override
 public JSONObject deserialize(String s, byte[] bytes) {
  JSONObject json = null;
  
  try {
   ByteArrayInputStream bIstream= new ByteArrayInputStream(bytes);
   ObjectInputStream in = new ObjectInputStream(bIstream);
   json = (JSONObject) in.readObject();
  }
  catch (Exception e) {
   e.printStackTrace();
  }
  return json;
 }
}
Lines 17-19:  Convert a byte array into a JSON object.

Java - REST Clients/Kafka Producer


public class DweetProducer implements Runnable {

 private static final Logger logger = LogManager.getLogger(DweetProducer.class);
 private static final String propFile = "pwskafka.properties";
 private static final String dweetUri = "https://thingspace.io";
 private static final String streamPath = "/listen/for/dweets/from";
 private static byte[] salt;
 private static String password;
 private static String topic;
 private static String bootstrap;
 private static Producer producer;
 private String device;
 
 static {
  try {
   FileInputStream fis = new FileInputStream(propFile);
   Properties props = new Properties();
   props.load(fis);
   fis.close();
   salt = props.getProperty("salt").getBytes();
   password = props.getProperty("password");
   bootstrap = props.getProperty("bootstrap");
   topic = props.getProperty("topic");
   props.clear();
      props.put("bootstrap.servers", DweetProducer.bootstrap); 
      props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      props.put("value.serializer", "pwskafka.JSONSerializer");
      producer = new KafkaProducer(props);
  }
  catch (Exception e) {
   logger.error(e);
   System.exit(1);
  }
 }
 
 public DweetProducer(String device) {
  this.device = device;
  logger.debug("Producer for " + device  + " starting.");
 }
  
 private JSONObject decryptDweet(byte[] iv, String cipherText) throws GeneralSecurityException, 
 UnsupportedEncodingException, ParseException  {
  KeySpec spec = new PBEKeySpec(DweetProducer.password.toCharArray(), DweetProducer.salt, 100000, 256);
  SecretKeyFactory f = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA256");
  SecretKey tmp = f.generateSecret(spec);
  SecretKey key = new SecretKeySpec(tmp.getEncoded(), "AES");
  
  Cipher cipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
        cipher.init(Cipher.DECRYPT_MODE, key,  new IvParameterSpec(iv));
        String val = new String(cipher.doFinal(DatatypeConverter.parseHexBinary(cipherText)), "UTF-8");
        JSONParser parser = new JSONParser();
        return (JSONObject) parser.parse(val); 
 }
 
 public void run() { 
  JSONObject dweet = null;
  Client client = ClientBuilder.newClient();
  WebTarget target = client.target(dweetUri).path(streamPath).path(this.device);
  Response response =  target.request(MediaType.APPLICATION_JSON).get(Response.class);
  ChunkedInput chunkedInput =
          response.readEntity(new GenericType>() {});
  String chunk;
  JSONParser parser = new JSONParser();
  
  while ((chunk = chunkedInput.read()) != null) {
      if (!chunk.chars().allMatch( Character::isDigit )) {
       try {
        chunk = (String) parser.parse(chunk);
        JSONObject obj = (JSONObject) parser.parse(chunk);
        JSONObject content = (JSONObject) obj.get("content");
        byte[] iv = DatatypeConverter.parseBase64Binary((String) content.get("iv"));
     String cipherText = (String) content.get("mesg");
     dweet = decryptDweet(iv, cipherText);
     DweetProducer.producer.send(new ProducerRecord(DweetProducer.topic, this.device, dweet));
     logger.debug(dweet);
       }
       catch (Exception e) {
        logger.error(e);
       }
      }
  }
 }
 
 public static void main(String[] args) { 
  if (args != null && args.length == 2) {
   int startDeviceNum = Integer.parseInt(args[0]);
   int endDeviceNum = Integer.parseInt(args[1]);
   if (startDeviceNum > endDeviceNum) {
    logger.error("Usage:  arg 1: starting device number, arg 2: ending device number");
    System.exit(1);
   }
   ExecutorService pool;
   int cores = Runtime.getRuntime().availableProcessors();
   pool = Executors.newFixedThreadPool(cores);
   String device = null;
   
   for (int i = startDeviceNum; i <= endDeviceNum; i++){
    device = "pws_" + i;
    pool.execute(new DweetProducer(device));
   }
   pool.shutdown();
  }
  else {
   logger.error("Usage:  arg 1: starting device number, arg 2: ending device number");
   System.exit(1);
  }
 }

}
Line 1:  Setting up a multi-threaded class.
Line 12:  The Kafka Producer class is thread-safe, so we only need 1 instance that can be shared by all the threads.
Lines 14-34:  Load all the configuration variables from a properties file and create the Producer.  Line 27:  Tell the Producer how to serialize JSON objects using the customer serializer.
Lines 41-53:  AES decryption routine.  Same code as was discussed in Part 2.
Lines 55-82:  Thread code for HTTP clients.  Each client is doing HTTP chunked input using the IoT cloud's streaming interface.  This was discussed in Part 1.  Line 74:  Send the device data (dweet) to Kafka.
Lines 84-107:  Using the Java ExecutorService, set up a thread pool using all available cores.  Each thread will be executing a HTTP client with a common Kafka producer.

Test Kafka Consumer

public class DweetConsumer {
 
 public static void main(String[] args) throws Exception{
    String topicName = "pws_metrics";
    
    Properties props = new Properties();
    props.put("bootstrap.servers", "webserv3:9092"); 
    props.put("group.id", "group_01");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "pwskafka.JSONDeserializer");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    
    KafkaConsumer consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList(topicName));
    
    while (true) {
     ConsumerRecords records = consumer.poll(100);
     for (ConsumerRecord record : records) {
      System.out.println("Received: " + record.key() + " " + record.value());
     }
    }
   
  }
}
Line 10:  Tell the Consumer how deserialize the incoming byte array back in to a JSON object.
Lines 15-16:  Set up a Kafka Consumer and subscribe to the IoT topic the Producer is creating records for.
Lines 18-23:  Poll for incoming records and print them to console.

Sunday, January 29, 2017

IoT Data Pipeline - Part 2: Node/Java Crypto Interop


Summary

I wasn't planning on dedicating an entire post to encryption, but this task proved to be enough of a pain that I decided it was justified.  Maybe this will help others bypass that pain.

This project developed into a mixed-language solution:  Node on the server side (simulated device) and Java on the client/analytics side.  Getting Node-side encryption to work with the Java-side decryption is focus of this post.

IoT Data Pipeline - Part 2: Node/Java Crypto Interop
IoT Data Pipeline - Part 3: Kafka Integration

Implementation

Below is a diagram depicting the high-level steps I used to get a functioning encryption system between Node and Java.  There are likely other ways to go about this, but this is what worked for me.

Node Server Code

I've highlighted the crypto changes I made from original server code in the first blog post.
var key = crypto.pbkdf2Sync(properties.password, properties.salt, 100000, 32, 'sha256');

function thingspace(requrl) {
 var query = url.parse(requrl, true).query;
 var wx_data = {'tempf' : query.tempf, 'humidity': query.humidity, 
   'winddir' : query.winddir, 'windspeedmph' : query.windspeedmph, 
   'rainin' : query.rainin, 'solarradiation' : query.solarradiation};
 var options = {
   host : 'thingspace.io',
   port : '443',
   path : '/dweet/for/' + properties.device,
   headers: {'Content-Type' : 'application/json'},
   method : 'POST'
 };
 
 var iv = crypto.randomBytes(16);
 var cipher = crypto.createCipheriv('aes-256-cbc', key, iv);
 var cipherText = cipher.update(JSON.stringify(wx_data), 'utf8','hex') + cipher.final('hex');
 
 var retVal = '';
 var req = https.request(options, function(res) {
  res.on('data', function(chunk) {
   retVal += chunk;
  });
  res.on('end', function() {
   retVal = JSON.parse(retVal);
   logger.debug('Exiting - File: main.js, Method:thingspace()', retVal.this);
  });
 });

 req.on('error', function(err1) {
  logger.error('File: main.js, Method: thingspace(), Message err1: ', err1.message);
 });
 req.write(JSON.stringify({'iv': iv.toString('base64'), 'mesg' : cipherText}));
 req.end();
}
Line 1:  Create a secure 256-bit key with PBKDF2.
Line 16:  Generate a random 128-bit initialization vector.  Buffer of 16 bytes.
Line 17-18:  Encrypt the weather data (stringified JSON object) with AES 256 with UTF-8 input encoding and hex output encoding.
Line 34:  Encode the IV bytes into a Base64 string.  Then, put it and the encrypted message inside a JSON object.  Finally, stringify that object and send it out in the body of an HTTP POST.  There's no harm in sending the IV along with the encrypted message.

Java Client Code

Similar to the node server sample code, I've highlighted the crypto-interesting areas.

 public Object getDweet(String device) {
  
  Object dweet = null;
  Client client = ClientBuilder.newClient();
  WebTarget target = client.target(dweetUri).path(getPath).path(device);
  Response response =  target.request(MediaType.APPLICATION_JSON).get(Response.class);
  
  if(response.getStatus() == 200) {
   String message = response.readEntity(String.class);
   JSONParser parser = new JSONParser();
   try {
    JSONObject obj = (JSONObject) parser.parse(message);
    JSONObject item = (JSONObject) ((JSONArray)obj.get("with")).get(0);
    JSONObject content = (JSONObject)item.get("content");
    byte[] iv = DatatypeConverter.parseBase64Binary((String) content.get("iv"));
    String cipherText = (String) content.get("mesg");
    dweet = decryptDweet(iv, cipherText);
   }
   catch (Exception e) {
    e.printStackTrace();
   }
  }  
  return dweet;
 }

 private Object decryptDweet(byte[] iv, String cipherText) throws GeneralSecurityException, 
 UnsupportedEncodingException, ParseException  {
  KeySpec spec = new PBEKeySpec(this.password.toCharArray(), this.salt, 100000, 256);
  SecretKeyFactory f = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA256");
  SecretKey tmp = f.generateSecret(spec);
  SecretKey key = new SecretKeySpec(tmp.getEncoded(), "AES");
  
  Cipher cipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
                cipher.init(Cipher.DECRYPT_MODE, key,  new IvParameterSpec(iv));
                String val = new String(cipher.doFinal(DatatypeConverter.parseHexBinary(cipherText)), "UTF-8");
                JSONParser parser = new JSONParser();
                return parser.parse(val);
 }
Lines 12-14: Parse out the JSON object within the HTTP response body.  I'm using the json.simple toolkit for this.
Line 15:  Decode the Base64 string-encoded IV back to a byte array.
Lines 16-17:  Pass the encrypted text and and IV as input to a decrypt function.
Lines 28-31:  Generate the PBKDF2 secret key.
Lines 33-34:  Set up the AES decrypt cipher, with the IV.
Line 35:  Decrypt the message with hex input and UTF-8 output.
Lines 36-37:  Return the decrypted message as a JSON object.