Source code for segmentation

import cv2
import numpy as np
import datetime
import ephem
np.seterr(divide='ignore', invalid='ignore')
from collections import namedtuple

import colorsys
import matplotlib.colors
from sklearn.cluster import KMeans


[docs]class Segmentation(): """Contains all the methods needed for the segmentation a sky/cloud image Attributes ---------- image : np.array The image to segment imager : Imager The imager used to take the image time: datetime.datetime The time at which the image was captured (UTC+8)""" def __init__(self, image, imager, time): self.img = image self.imager = imager self.time = time
[docs] def showasImage(self,input_matrix): """Normalizes an input matrix to the range [0,255]. It is useful in displaying the matrix as an image. Args: input_matrix (numpy array): Input matrix that needs to be normalized. Returns: numpy array: Returns the normalized matrix.""" return (input_matrix - np.amin(input_matrix))/(np.amax(input_matrix) - np.amin(input_matrix))*255
[docs] def make_cluster_mask(self,input_matrix,mask_image): """Clusters an input sky/cloud image to generate the binary image and the coverage ratio value. Args: input_matrix (numpy array): Input matrix that needs to be normalized. mask_image (numpy array): Mask to remove occlusions from the input image. This mask contains boolean values indicating the allowable pixels from an image. Returns: numpy array: Binary output image, where white denotes cloud pixels and black denotes sky pixels. float: Cloud coverage ratio in the input sky/cloud image. float: The first (out of two) cluster center. float: The second (out of two) cluster center.""" [rows,cols]=mask_image.shape im_mask_flt = mask_image.flatten() find_loc = np.where(im_mask_flt==1) find_loc = list(find_loc) input_vector = input_matrix.flatten() input_select = input_vector[list(find_loc)] X = input_select.reshape(-1, 1) k_means = KMeans(init='k-means++', n_clusters=2) k_means.fit(X) k_means_labels = k_means.labels_ k_means_cluster_centers = k_means.cluster_centers_ k_means_labels_unique = np.unique(k_means_labels) center1 = k_means_cluster_centers[0] center2 = k_means_cluster_centers[1] if center1 < center2: # Interchange the levels. temp = center1 center1 = center2 center2 = temp k_means_labels[k_means_labels == 0] = 99 k_means_labels[k_means_labels == 1] = 0 k_means_labels[k_means_labels == 99] = 1 cent_diff = np.abs(center1-center2) if cent_diff<20: # Segmentation not necessary. if center1>120 and center2>120: # All cloud image k_means_labels[k_means_labels == 0] = 1 else: # All sky image k_means_labels[k_means_labels == 1] = 0 # 0 is sky and 1 is cloud cloud_pixels = np.count_nonzero(k_means_labels == 1) sky_pixels = np.count_nonzero(k_means_labels == 0) total_pixels = cloud_pixels + sky_pixels # print (cloud_pixels,total_pixels) cloud_coverage = float(cloud_pixels)/float(total_pixels) # Final threshold image for transfer index = 0 Th_image = np.zeros([rows,cols]) for i in range(0,rows): for j in range(0,cols): if mask_image[i,j]==1: #print (i,j) #print (index) Th_image[i,j] = k_means_labels[index] index = index + 1 return(Th_image,cloud_coverage,center1,center2)
[docs] def getBRchannel(self, input_image, mask_image): """Extracts the ratio of red and blue blue channel from an input sky/cloud image. It is used in the clustering step to generate the binary sky/cloud image. Args: input_image (numpy array): Input sky/cloud image. mask_image (numpy array): Mask to remove occlusions from the input image. This mask contains boolean values indicating the allowable pixels from an image. Returns: numpy array: Ratio image using red and blue color channels, normalized to [0,255].""" red = input_image[:,:,2] green = input_image[:,:,1] blue = input_image[:,:,0] # RGB images for transfer red_image = red.astype(float) * mask_image green_image = green.astype(float) * mask_image blue_image = blue.astype(float) * mask_image BR = (blue_image - red_image) / (blue_image + red_image) BR[np.isnan(BR)] = 0 return self.showasImage(BR)
[docs] def cmask(self,index,radius,array): """Generates the mask for a given input image. The generated mask is needed to remove occlusions during post-processing steps. Args: index (numpy array): Array containing the x- and y- co-ordinate of the center of the circular mask. radius (float): Radius of the circular mask. array (numpy array): Input sky/cloud image for which the mask is generated. Returns: numpy array: Generated mask image.""" a,b = index is_rgb = len(array.shape) if is_rgb == 3: ash = array.shape nx=ash[0] ny=ash[1] else: nx,ny = array.shape s = (nx,ny) image_mask = np.zeros(s) y,x = np.ogrid[-a:nx-a,-b:ny-b] mask = x*x + y*y <= radius*radius image_mask[mask] = 1 return(image_mask)
[docs] def removeSunIfNonCloudy(self, im_mask): """Removes the circumsolar area from the mask if it is detected as non cloudy. Args: im_mask (numpy array): Segmentation mask Returns: numpy array: Modified segmentation mask.""" if not self.imager.longitude or not self.imager.latitude or not self.imager.elevation: raise RuntimeError("Plese provide longitude, latitude and elevation for imager " + self.imager.name) # Detecting the sun location and checking if covered by clouds obs = ephem.Observer() obs.lon = self.imager.longitude obs.lat = self.imager.latitude obs.elevation = self.imager.elevation obs.date = self.time - datetime.timedelta(hours=8) sun = ephem.Sun(obs) raySun = np.array([[np.cos(sun.alt) * np.cos(sun.az), np.cos(sun.alt) * np.sin(sun.az), -np.sin(sun.alt)]]) raySun = (np.dot(self.imager.rot.T, raySun.T - self.imager.trans)).T coordsSun = self.imager.world2cam(raySun.T) lum = 0.2126 * self.img[:,:,2].astype('float') + 0.7152 * self.img[:,:,1].astype('float') + 0.0722 * self.img[:,:,0].astype('float') maskSun = np.zeros(np.shape(lum)) maskSunCircum = np.zeros(np.shape(lum)) cv2.circle(maskSun, (coordsSun[1], coordsSun[0]), 100, (255), -1) cv2.circle(maskSunCircum, (coordsSun[1], coordsSun[0]), 250, (255), -1) maskCircum = maskSunCircum - maskSun lumSun = np.mean(lum[maskSun == 255]) lumCircum = np.mean(lum[maskCircum == 255]) if lumSun > 200 and lumSun - lumCircum > 30: maskSunCircum = cv2.resize(maskSunCircum, (np.shape(im_mask)[1], np.shape(im_mask)[0])) im_mask[maskSunCircum == 255] = 0 return im_mask
[docs] def segment(self): """Main function for sky/cloud segmentation. Perform all the necessary steps.""" # Segment the image im_mask = self.cmask(self.imager.center, self.imager.radius, self.img) im_mask = cv2.resize(im_mask, None, fx=0.25, fy=0.25) # Extract the color channels medR = cv2.resize(self.img, None, fx=0.25, fy=0.25) #(cc) = self.color16mask(medR, im_mask) self.removeSunIfNonCloudy(im_mask) inp_mat = self.getBRchannel(medR, im_mask) inp_mat[np.isnan(inp_mat)] = 0 (th_img,coverage,center1, center2) = self.make_cluster_mask(inp_mat,im_mask) cent_diff = np.abs(center1-center2) if cent_diff < 18: # Segmentation not necessary. if center1 > 120 and center2 > 120: # All cloud image self.th_img = np.zeros(np.shape(im_mask)) else: # All sky image self.th_img = np.ones(np.shape(im_mask)) else: self.th_img = inp_mat < center1 + (center2 - center1)/2 self.th_img[im_mask == 0] = 0 return self.th_img