import os
import cv2
import shutil
import itertools
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import urllib.request
from tqdm import tqdm
plt.style.use("ggplot")
plt.rcParams.update(
{
"font.size": 14,
"axes.labelweight": "bold",
"figure.figsize": (8, 6),
"axes.grid": False,
}
)
[docs]class WebCAT:
"""Utility class for viewing local or remote webcat videos.
Attributes
----------
url :
url to the .mp4 file.
name :
Unique name based on url.
video : cv2.VideoCapture
The cv2.VideoCapture object of the video file at url.
frames: int
Total number of frames in the video.
fps : int
Frames per second in the video.
width : int
Width of video frames in pixels.
height : int
Height of video frames in pixels.
Methods
-------
download_url(self, fout=None, verbose=1)
Download the video from the url.
Examples
--------
>>> from webcat_utils import VidRetriever
>>> url = "http://webcat-video.axds.co/buxtoncoastalcam/raw/2019/2019_11/2019_11_13/buxtoncoastalcam.2019-11-13_1000.mp4"
>>> vr = VidRetriever(url)
>>> vr.download_url()
...
Saving to buxtoncoastalcam.2019-11-13_1000.mp4: 0%| | 44.1M/125M [00:08<00:16, 5.18MB/s]
"""
def __init__(self):
self.url = None
self.name = None
self.video = None
@property
def width(self):
return int(self.video.get(3))
@property
def height(self):
return int(self.video.get(4))
@property
def frames(self):
return int(self.video.get(7))
@property
def fps(self):
return int(self.video.get(cv2.CAP_PROP_FPS))
[docs] def generate_url(self, station: str, year: int, month: int, day: int, time: int):
"""Generate WebCAT URLs and expressive name for files from user inputs.
Parameters
----------
station : str
Station name, e.g., "buxtoncoastalcam".
year : int
Year of video, e.g., 2020.
month: int
Month (numerical) of video, e.g., 11.
day: int
Day of video, e.g., 17.
time: int
Time (24 hr) of video rounded to nearest 10 minutes, e.g., 0500 (5:00 am), 1300 (1:00 pm), 1330 (1:30pm).
"""
url = f"http://webcat-video.axds.co/{station}/raw/{year}/{year}_{month}/{year}_{month}_{day}/{station}.{year}-{month}-{day}_{time}.mp4"
vid = cv2.VideoCapture(url)
if int(vid.get(7)) == 0: # check if there are any frames
raise ValueError(f"{url} is not a valid url.")
else:
self.url = url
self.video = vid
self.name = f"{station}_{year}_{month}_{day}_{time}"
[docs] def download_url(self, fout: str = None, verbose: bool = True):
"""Download the video from the instance url.
Parameters
----------
fout : str
The file path to save the downloaded file to, e.g., ~/Downloads/download.mp4, by default None
verbose : {True, False}, optional
Display download progress bar, by default True
"""
fout = self.name + ".mp4" if fout == None else fout
if verbose:
with TqdmUpTo(
unit="B",
unit_scale=True,
unit_divisor=1024,
miniters=1,
desc=f"Saving to {fout}",
) as t: # all optional kwargs
urllib.request.urlretrieve(self.url, fout, reporthook=t.update_to)
else:
urllib.request.urlretrieve(self.url, fout)
[docs] def save_frames(
self, delta_t: int = 10, fout_path: str = "", save_csv=True, verbose=False
):
"""Download the video from the instance url.
Parameters
----------
delta_t : int, optional
A frame will be saved every delta_t seconds, by default 10.
fout : str, optional
Path to save frames and csv to, e.g., "~/Downloads/".
"""
assert delta_t < int(
self.frames / self.fps
), f"delta_t should be less than {int(self.frames / self.fps)}"
print(self.fps)
step = delta_t * self.fps
step_range = range(0, (self.frames + 1), step)
loop = tqdm(step_range) if verbose else step_range
tmp_dir = os.path.join(fout_path, "jpg") # save images in a "jpg" folder
if os.path.exists(tmp_dir):
shutil.rmtree(tmp_dir) # just remove the whole directory
os.makedirs(tmp_dir) # mkdir
for i in loop:
self.video.set(1, i)
_, frame_arr = self.video.read()
cv2.imwrite(os.path.join(tmp_dir, f"frame_{i}.jpg"), frame_arr)
if save_csv:
(
pd.DataFrame(
{
"url": self.url,
"name": self.name,
"frame": list(step_range),
"path": [
os.path.join(tmp_dir, f"frame_{_}.jpg") for _ in step_range
],
}
).to_csv(os.path.join(fout_path, f"{self.name}.csv"))
)
[docs] def plot_frames(self, frames: list = [0]):
"""Plot frames from the video, zero-based indexing.
Parameters
----------
frames : list, optional
List of the frames to display in a grid plot, by default [0].
Examples
--------
>>> from webcat_utils import WebCAT
>>> wc.generate_url("buxtoncoastalcam", 2019, 11, 13, 1000)
>>> wc.plot_frames()
...
<matplotlib.subplots>
"""
assert all(
(_ >= 0) & (_ < self.frames) for _ in frames
), f"frames should be between 0 and {self.frames-1}."
rows = 1 + (len(frames) - 1) // 3
cols = len(frames) if len(frames) < 3 else 3
fig, ax = plt.subplots(rows, cols, figsize=(cols * 4, rows * 3), squeeze=False)
for i, frame in enumerate(frames):
plt.sca(ax[0, i]) if rows == 1 else plt.sca(ax[i // 3, i % 3])
self.video.set(1, frame)
_, frame_arr = self.video.read()
plt.imshow(
cv2.cvtColor(frame_arr, cv2.COLOR_BGR2RGB)
) # plot the image on current axis
plt.title(f"Frame {frame + 1} of {self.frames}")
if len(frames) < rows * cols: # clear any unsued axes (this is dirty)
for j in np.arange(len(frames), rows * cols):
ax[j // 3, j % 3].set_visible(False)
plt.tight_layout()
[docs] def plot_average_frame(self, step: int = 500):
"""Plot the average of every "step" frames in the video.
Parameters
----------
step : int, optional
The step between frames to average by, lower values result in a smoother average, by default 10.
Examples
--------
>>> from webcat_utils import WebCAT
>>> wc.generate_url("buxtoncoastalcam", 2019, 11, 13, 1000)
>>> wc.plot_average_frame()
...
<matplotlib.subplot>
"""
N = self.frames // step # how many frames to average based on step
timex = np.zeros((self.height, self.width, 3, N), np.float) # init array
for i in range(N): # I dislike loops, could probably make this much faster
self.video.set(1, i * step)
_, frame_arr = self.video.read()
timex[:, :, :, i] = cv2.cvtColor(frame_arr, cv2.COLOR_BGR2RGB)
timex = np.mean(timex, axis=-1).astype(int)
plt.subplots(1, 1, figsize=(10, 7))
plt.imshow(timex)
plt.title(f"Average of every {step} frames, {N} frames in total")
[docs]class TqdmUpTo(tqdm):
"""Utility class for displaying a progress bar.
Methods
-------
download_url(self, fout=None, verbose=1)
Download the video from the url.
Examples
--------
>>> url = "http://webcat-video.axds.co/buxtoncoastalcam/raw/2019/2019_11/2019_11_13/buxtoncoastalcam.2019-11-13_1000.mp4"
>>> with TqdmUpTo(unit="B", unit_scale=True, unit_divisor=1024, miniters=1, desc=f"Saving to {fout}") as t:
>>> urllib.request.urlretrieve(self.fin, fout, reporthook=t.update_to)
...
Saving to buxtoncoastalcam.2019-11-13_1000.mp4: 0%| | 44.1M/125M [00:08<00:16, 5.18MB/s]
"""
[docs] def update_to(self, b: int = 1, bsize: int = 1, tsize: int = None):
"""Progress bar udpate function.
Parameters
----------
b : int, optional
Number of blocks transferred so far, by default 1
bsize : int, optional
Size of each block (in tqdm units), by default 1
tsize : int, optional
Total size (in tqdm units), by default None
"""
if tsize is not None:
self.total = tsize
self.update(b * bsize - self.n) # will also set self.n = b * bsize