diff --git a/.gitignore b/.gitignore index f43f887..0c07b89 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,9 @@ __pycache__/ # C extensions *.so +# videos +*.mp4 + # Distribution / packaging .Python build/ diff --git a/README.md b/README.md index 91fb3bf..908cf03 100644 --- a/README.md +++ b/README.md @@ -31,3 +31,33 @@ cd vendor/optical_flow_measure/ ```bash hl agent run agents/OpticalFlowAgent.json -f inputs/test.mp4 ``` + +## Using the OpticalFlow Class + +The repository includes a standalone `OpticalFlow` class that calculates movement scores from video frames. + +### Basic Usage + +```python +from neuflowv2 import OpticalFlow + +# Initialize with default parameters +flow = OpticalFlow(model_path="models/neuflow_sintel.onnx") + +# Process a single frame and get movement score +frame = your_image_processing_function() # OpenCV BGR image +movement_score = flow.update(frame) + +# Reset internal state if needed +flow.reset() +``` + +### Example + +Run the included example script to see the movement score calculated from webcam input: + +```bash +python examples/optical_flow_example.py +``` + +See the `examples/optical_flow_example.py` file for a complete implementation. diff --git a/examples/optical_flow_example.py b/examples/optical_flow_example.py new file mode 100644 index 0000000..8e8f775 --- /dev/null +++ b/examples/optical_flow_example.py @@ -0,0 +1,52 @@ +import cv2 +import numpy as np +from neuflowv2 import OpticalFlow + +def main(): + # Initialize the OpticalFlow class + # Change the model path as needed for your environment + flow = OpticalFlow(model_path="models/neuflow_sintel.onnx") + + # Open a video capture (0 for webcam, or a video file path) + cap = cv2.VideoCapture(0) + + if not cap.isOpened(): + print("Error: Could not open video source.") + return + + print("Press 'q' to quit...") + + while True: + # Read a frame + ret, frame = cap.read() + if not ret: + print("End of video stream.") + break + + # Calculate movement score + movement_score = flow.update(frame) + + # Display the movement score on the frame + cv2.putText( + frame, + f"Movement: {movement_score:.2f}", + (10, 30), + cv2.FONT_HERSHEY_SIMPLEX, + 1, + (0, 255, 0), + 2 + ) + + # Display the frame + cv2.imshow("Optical Flow Movement", frame) + + # Exit on 'q' key press + if cv2.waitKey(1) & 0xFF == ord('q'): + break + + # Release resources + cap.release() + cv2.destroyAllWindows() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/examples/video_flow_cold-chicken_example.py b/examples/video_flow_cold-chicken_example.py new file mode 100644 index 0000000..768b8e6 --- /dev/null +++ b/examples/video_flow_cold-chicken_example.py @@ -0,0 +1,114 @@ +import time +import cv2 +import numpy as np +from neuflowv2 import OpticalFlow +import json + +def main(): + # Initialize the OpticalFlow class + # Change the model path as needed for your environment + flow = OpticalFlow(model_path="models/neuflow_sintel.onnx") + + #movement score + movement_scores: list[float] = [] + + + # Path to the input video file + video_path = "vendor/optical_flow_measure/inputs/cold-chickens-combined.mp4" + + # Open the video file + cap = cv2.VideoCapture(video_path) + + if not cap.isOpened(): + print(f"Error: Could not open video file {video_path}") + return + + # Get video properties + frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + fps = int(cap.get(cv2.CAP_PROP_FPS)) + + print(f"Video loaded: {frame_width}x{frame_height} at {fps} FPS") + print("Press 'q' to quit, 'p' to pause/resume...") + + paused = False + + # list to store all the flow vectors and movement scores and inference times + frame_data: list[dict] = [] + + while True: + if not paused: + # Read a frame + ret, frame = cap.read() + if not ret: + print("End of video stream.") + break + #time before inference + start = time.perf_counter() + + # Calculate flow vector + flow_vectors = flow.update(frame) + + #inference time for model to compute the flow vector of two consecutive frames + inference_time = (time.perf_counter() - start) * 1000 #milisecs + + # Print the flow vector to the console + print(f"Flow vector: {flow_vectors}") + + # Calculate movement score + # check if the flow vector is a numpy array + if isinstance(flow_vectors, np.ndarray): + movement_score = flow.compute_movement_score(flow_vectors) + else: + # the situation where the first frame then no prev_frame then no movement score or sometime invalid value + movement_score = 0 + + print(f"Movement score: {movement_score}") + movement_scores.append(float(movement_score)) + + #Store data for each process + data = { + "inference_time": inference_time, + "flow_vectors": flow_vectors.tolist() if hasattr(flow_vectors, 'tolist') else [flow_vectors], + "movement_scores": movement_scores + } + + #save data to the list + frame_data.append(data) + + + # # Display the flow vector on the frame + # cv2.putText( + # frame, + # f"Flow vector: {flow_vectors}", + # (10, 30), + # cv2.FONT_HERSHEY_SIMPLEX, + # 1, + # (0, 255, 0), + # 2 + # ) + + # # Display the frame + # cv2.imshow("Optical Flow Movement", frame) + + + + # Handle key presses + key = cv2.waitKey(1) & 0xFF + if key == ord('q'): + break + elif key == ord('p'): + paused = not paused + print("Video paused" if paused else "Video resumed") + + #save data to json file + with open("flow_vectors_cold_chicken.json", "w") as f: + json.dump(frame_data, f) + + # Release resources + cap.release() + #cv2.destroyAllWindows() + flow.reset() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/examples/video_flow_hot-chicken_example.py b/examples/video_flow_hot-chicken_example.py new file mode 100644 index 0000000..c45f30d --- /dev/null +++ b/examples/video_flow_hot-chicken_example.py @@ -0,0 +1,114 @@ +import time +import cv2 +import numpy as np +from neuflowv2 import OpticalFlow +import json + +def main(): + # Initialize the OpticalFlow class + # Change the model path as needed for your environment + flow = OpticalFlow(model_path="models/neuflow_sintel.onnx") + + #movement score + movement_scores: list[float] = [] + + + # Path to the input video file + video_path = "vendor/optical_flow_measure/inputs/hot-chickens-combined.mp4" + + # Open the video file + cap = cv2.VideoCapture(video_path) + + if not cap.isOpened(): + print(f"Error: Could not open video file {video_path}") + return + + # Get video properties + frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + fps = int(cap.get(cv2.CAP_PROP_FPS)) + + print(f"Video loaded: {frame_width}x{frame_height} at {fps} FPS") + print("Press 'q' to quit, 'p' to pause/resume...") + + paused = False + + # list to store all the flow vectors and movement scores and inference times + frame_data: list[dict] = [] + + while True: + if not paused: + # Read a frame + ret, frame = cap.read() + if not ret: + print("End of video stream.") + break + #time before inference + start = time.perf_counter() + + # Calculate flow vector + flow_vectors = flow.update(frame) + + #inference time for model to compute the flow vector of two consecutive frames + inference_time = (time.perf_counter() - start) * 1000 #milisecs + + # Print the flow vector to the console + print(f"Flow vector: {flow_vectors}") + + # Calculate movement score + # check if the flow vector is a numpy array + if isinstance(flow_vectors, np.ndarray): + movement_score = flow.compute_movement_score(flow_vectors) + else: + # the situation where the first frame then no prev_frame then no movement score or sometime invalid value + movement_score = 0 + + print(f"Movement score: {movement_score}") + movement_scores.append(float(movement_score)) + + #Store data for each process + data = { + "inference_time": inference_time, + "flow_vectors": flow_vectors.tolist() if hasattr(flow_vectors, 'tolist') else [flow_vectors], + "movement_scores": movement_scores + } + + #save data to the list + frame_data.append(data) + + + # # Display the flow vector on the frame + # cv2.putText( + # frame, + # f"Flow vector: {flow_vectors}", + # (10, 30), + # cv2.FONT_HERSHEY_SIMPLEX, + # 1, + # (0, 255, 0), + # 2 + # ) + + # # Display the frame + # cv2.imshow("Optical Flow Movement", frame) + + + + # Handle key presses + key = cv2.waitKey(1) & 0xFF + if key == ord('q'): + break + elif key == ord('p'): + paused = not paused + print("Video paused" if paused else "Video resumed") + + #save data to json file + with open("flow_vectors_hot_chicken.json", "w") as f: + json.dump(frame_data, f) + + # Release resources + cap.release() + #cv2.destroyAllWindows() + flow.reset() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/flow-cli.py b/flow-cli.py new file mode 100644 index 0000000..4362cbd --- /dev/null +++ b/flow-cli.py @@ -0,0 +1,67 @@ +import click +from neuflowv2 import OpticalFlow +from highlighter.agent.capabilities.sources import VideoFrameIterator +import json + +DEFAULT_MODEL_PATH = "models/neuflow_sintel.onnx" + +@click.group() +def flow_group(): + pass + +@flow_group.command("compute") +@click.argument("video_path", type=str) +@click.option("--model-path", type=str, required=False, default=DEFAULT_MODEL_PATH) +@click.option("--fps", type=int, required=False, default=0) +@click.option("--max-frames", type=int, required=False, default=0) +def compute(video_path, model_path, max_frames, fps): + if not max_frames: + max_frames = float("inf") + + of = OpticalFlow(model_path=model_path) + video_frames = VideoFrameIterator( + source_urls=[video_path] + ) + prev_frame = next(video_frames).content + of.update(prev_frame) + + frame_number = 0 + scores: list[dict] = [] + while True: + try: + cur_frame = next(video_frames).content + except StopIteration: + print(f"End of video, frame: {frame_number}") + break + + flow = of.update(cur_frame) + f_sum, f_mean, f_median = of.compute_movement_scores(flow) + move_score = { + "sum": float(f_sum), + "mean": float(f_mean), + "median": float(f_median) + } + + scores.append(move_score) + + if not (frame_number % 10): + print(f"Sum:{int(f_sum)}, Mean:{f_mean:0.2f}, Med:{f_median:0.2f}") + + prev_frame = cur_frame + frame_number += 1 + + if frame_number >= max_frames: + print(f"Exiting at max_frames: {max_frames}") + break + + with open("flow_vector.json", "w") as f: + json.dump(scores, f, indent=4) + +@flow_group.command("overlay") +def overlay(): + pass + + +if __name__ == "__main__": + flow_group() + diff --git a/neuflowv2/__init__.py b/neuflowv2/__init__.py index d8fdb4c..4c55fba 100644 --- a/neuflowv2/__init__.py +++ b/neuflowv2/__init__.py @@ -1,2 +1,3 @@ from .neuflowv2 import NeuFlowV2 -from .utils import draw_flow \ No newline at end of file +from .utils import draw_flow +from .optical_flow import OpticalFlow \ No newline at end of file diff --git a/neuflowv2/optical_flow.py b/neuflowv2/optical_flow.py new file mode 100644 index 0000000..f18cfee --- /dev/null +++ b/neuflowv2/optical_flow.py @@ -0,0 +1,110 @@ +from typing import Tuple +import numpy as np +import cv2 +import torch +import torchvision.transforms as T +from .neuflowv2 import NeuFlowV2 + +class OpticalFlow: + """ + A standalone optical flow class that calculates a movement score + based on optical flow between consecutive frames. + """ + + def __init__(self, model_path:str="neuflow_sintel.onnx", use_gpu:bool=True, blur_kernel:tuple=(3, 3)) -> None: + """ + Initialize the OpticalFlow class. + + Args: + model_path (str): Path to the optical flow model file. + use_gpu (bool): Whether to use GPU for preprocessing. + blur_kernel (tuple): Size of Gaussian blur kernel for preprocessing. + Returns: + None + """ + self.model_path = model_path + self.device = "cuda" if torch.cuda.is_available() and use_gpu else "cpu" + self.blur_kernel = blur_kernel + self.estimator = self._load_model() #load the model from the model_path + self.prev_frame = None + + # Image preprocessing transform pipeline + self.preprocess_frame = T.Compose([ + T.ToTensor(), # Convert image to tensor + T.ConvertImageDtype(torch.float32), # Convert image to float32 + T.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]) # Normalize image + ]) + + print(f"Initialized OpticalFlow on device: {self.device}") + + def _load_model(self) -> NeuFlowV2: + """ + Load the NeuFlowV2 optical flow model. + + Returns: + The loaded NeuFlowV2 model instance. + """ + return NeuFlowV2(self.model_path) + + def update(self, image) -> float: + """ + Update the algorithm with a new image and return the latest movement score. + + The movement score is calculated as the sum of the magnitudes of all optical + flow vectors between the current image and the previous image. + + Args: + image (numpy.ndarray): Input image (BGR format from OpenCV). + + Returns: + float: Movement score indicating the amount of motion between frames. + """ + # Apply Gaussian blur to reduce noise + filtered_frame = cv2.GaussianBlur(image, self.blur_kernel, 0) + + # Assume RGB + # frame_tensor is CHW + frame_tensor = self.preprocess_frame(filtered_frame).to(self.device) + + # If this is the first frame, store it and return 0 + if self.prev_frame is None: + self.prev_frame = frame_tensor + return 0.0 # Return 0 if it's the first frame + + # Convert tensors back to numpy for OpticalFlow estimation + # convert from CHW back to HWC + prev_frame_np = (self.prev_frame.cpu().permute(1, 2, 0).numpy() * 255.0).astype(np.uint8) + curr_frame_np = (frame_tensor.cpu().permute(1, 2, 0).numpy() * 255.0).astype(np.uint8) + + # Calculate optical flow + flow_vectors = self.estimator(prev_frame_np, curr_frame_np) + + # Store current frame for next comparison + self.prev_frame = frame_tensor + + return flow_vectors + + + def compute_movement_scores(self, flow_vectors:np.ndarray) -> Tuple[float, float, float]: + """Computes the movement score from the flow vectors""" + u, v = flow_vectors[..., 0], flow_vectors[..., 1] + magnitude = np.sqrt(u**2 + v**2) + movement_sum = np.sum(magnitude) + movement_mean = np.mean(magnitude) + movement_median = np.median(magnitude) + + return movement_sum, movement_mean, movement_median + + + def reset(self): + """ + Reset the optical flow state. + """ + self.prev_frame = None + + + @staticmethod + def update_flow_vector_overlay(): + pass + + diff --git a/tests/test_optical_flow-ex.py b/tests/test_optical_flow-ex.py new file mode 100644 index 0000000..1317de7 --- /dev/null +++ b/tests/test_optical_flow-ex.py @@ -0,0 +1,83 @@ +import unittest +import cv2 +import numpy as np +from neuflowv2 import OpticalFlow +import os + +class TestOpticalFlow(unittest.TestCase): + def setUp(self): + # Initialize the OpticalFlow class with default parameters + self.optical_flow = OpticalFlow(model_path="models/neuflow_sintel.onnx") + # Path to the test video + self.test_video_path = "vendor/optical_flow_measure/inputs/test.mp4" + + def test_optical_flow_initialization(self): + # Test that the OpticalFlow class initializes correctly + self.assertIsNotNone(self.optical_flow) + self.assertIsNotNone(self.optical_flow.estimator) + + def test_reset_method(self): + # Test that the reset method works correctly + # First update with an image + cap = cv2.VideoCapture(self.test_video_path) + ret, frame = cap.read() + cap.release() + + self.optical_flow.update(frame) # update the first frame + self.assertIsNotNone(self.optical_flow.prev_frame) + + # Now reset and check prev_frame is None + self.optical_flow.reset() + self.assertIsNone(self.optical_flow.prev_frame) + + def test_optical_flow_calculation(self): + # Test that the optical flow calculation produces expected results + cap = cv2.VideoCapture(self.test_video_path) + + # Read first frame + ret, frame1 = cap.read() + self.assertTrue(ret, "Failed to read first frame") + + # First update should return 0 since there will be no previous frame + score1 = self.optical_flow.update(frame1) + self.assertEqual(score1, 0.0, "First frame should return a score of 0.0") + + # Read second frame + ret, frame2 = cap.read() + self.assertTrue(ret, "Failed to read second frame") + + # Second update should return a non-zero score + score2 = self.optical_flow.update(frame2) + self.assertGreater(score2, 0.0, "Second frame should return a non-zero score") + + # Read third frame + ret, frame3 = cap.read() + self.assertTrue(ret, "Failed to read third frame") + + # Third update should also return a non-zero score + score3 = self.optical_flow.update(frame3) + self.assertGreater(score3, 0.0, "Third frame should return a non-zero score") + + # Check the expected output + + # Clean up + cap.release() + self.optical_flow.reset() + + def test_identical_frames(self): + # Test with identical frames - should return very low movement score + test_image = np.ones((100, 100, 3), dtype=np.uint8) * 128 # expected to be gray image + + # First update should return 0 + score1 = self.optical_flow.update(test_image) + self.assertEqual(score1, 0.0, "First frame should return a score of 0.0") + + # Second update with identical frame should return very low score + score2 = self.optical_flow.update(test_image.copy()) + self.assertAlmostEqual(score2, 0.0, delta=1.0, + msg="Identical frames should return a score close to 0.0") + +if __name__ == "__main__": + unittest.main() + + diff --git a/tests/test_optical_flow.py b/tests/test_optical_flow.py new file mode 100644 index 0000000..38820e2 --- /dev/null +++ b/tests/test_optical_flow.py @@ -0,0 +1,25 @@ +import pytest as pt +import numpy as np +from neuflowv2 import OpticalFlow +import os + +def test_optical_flow(): + of = OpticalFlow(model_path="models/neuflow_sintel.onnx") + + height, width = 5, 5 + image0 = np.zeros((height, width, 3), dtype=np.float32) + image1 = np.zeros((height, width, 3), dtype=np.float32) + + # Create a simple pattern: a 2x2 block of ones + image0[1:3, 1:3, :] = 1.0 + # Shift the block right by 1 pixel + image1[1:3, 2:4, :] = 1.0 + + result = of.update(image0) + assert result == 0 + + result = of.update(image1) + ms_sum, ms_mean, ms_median = of.compute_movement_scores(result) + assert 3712.3428 == pt.approx(ms_sum) + assert 148.49371 == pt.approx(ms_mean) + assert 153.50700 == pt.approx(ms_median) diff --git a/vendor/optical_flow_measure/pyproject.toml b/vendor/optical_flow_measure/pyproject.toml index ebd2d49..728c71d 100644 --- a/vendor/optical_flow_measure/pyproject.toml +++ b/vendor/optical_flow_measure/pyproject.toml @@ -24,8 +24,9 @@ authors = [ dependencies = [ - "aiko_services>=0.6", - "highlighter-sdk>=2.4.73" + "highlighter-sdk[predictors]>=2.5.0", + "torch", + "torchvision" ] [tool.setuptools.packages.find]