Skip to content
Open
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ __pycache__/
# C extensions
*.so

# videos
*.mp4

# Distribution / packaging
.Python
build/
Expand Down
30 changes: 30 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,33 @@ cd vendor/optical_flow_measure/
```bash
hl agent run agents/OpticalFlowAgent.json -f inputs/test.mp4
```

## Using the OpticalFlow Class

The repository includes a standalone `OpticalFlow` class that calculates movement scores from video frames.

### Basic Usage

```python
from neuflowv2 import OpticalFlow

# Initialize with default parameters
flow = OpticalFlow(model_path="models/neuflow_sintel.onnx")

# Process a single frame and get movement score
frame = your_image_processing_function() # OpenCV BGR image
movement_score = flow.update(frame)

# Reset internal state if needed
flow.reset()
```

### Example

Run the included example script to see the movement score calculated from webcam input:

```bash
python examples/optical_flow_example.py
```

See the `examples/optical_flow_example.py` file for a complete implementation.
52 changes: 52 additions & 0 deletions examples/optical_flow_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import cv2
import numpy as np
from neuflowv2 import OpticalFlow

def main():
# Initialize the OpticalFlow class
# Change the model path as needed for your environment
flow = OpticalFlow(model_path="models/neuflow_sintel.onnx")

# Open a video capture (0 for webcam, or a video file path)
cap = cv2.VideoCapture(0)

if not cap.isOpened():
print("Error: Could not open video source.")
return

print("Press 'q' to quit...")

while True:
# Read a frame
ret, frame = cap.read()
if not ret:
print("End of video stream.")
break

# Calculate movement score
movement_score = flow.update(frame)

# Display the movement score on the frame
cv2.putText(
frame,
f"Movement: {movement_score:.2f}",
(10, 30),
cv2.FONT_HERSHEY_SIMPLEX,
1,
(0, 255, 0),
2
)

# Display the frame
cv2.imshow("Optical Flow Movement", frame)

# Exit on 'q' key press
if cv2.waitKey(1) & 0xFF == ord('q'):
break

# Release resources
cap.release()
cv2.destroyAllWindows()

if __name__ == "__main__":
main()
114 changes: 114 additions & 0 deletions examples/video_flow_cold-chicken_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import time
import cv2
import numpy as np
from neuflowv2 import OpticalFlow
import json

def main():
# Initialize the OpticalFlow class
# Change the model path as needed for your environment
flow = OpticalFlow(model_path="models/neuflow_sintel.onnx")

#movement score
movement_scores: list[float] = []


# Path to the input video file
video_path = "vendor/optical_flow_measure/inputs/cold-chickens-combined.mp4"

# Open the video file
cap = cv2.VideoCapture(video_path)

if not cap.isOpened():
print(f"Error: Could not open video file {video_path}")
return

# Get video properties
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))

print(f"Video loaded: {frame_width}x{frame_height} at {fps} FPS")
print("Press 'q' to quit, 'p' to pause/resume...")

paused = False

# list to store all the flow vectors and movement scores and inference times
frame_data: list[dict] = []

while True:
if not paused:
# Read a frame
ret, frame = cap.read()
if not ret:
print("End of video stream.")
break
#time before inference
start = time.perf_counter()

# Calculate flow vector
flow_vectors = flow.update(frame)

#inference time for model to compute the flow vector of two consecutive frames
inference_time = (time.perf_counter() - start) * 1000 #milisecs

# Print the flow vector to the console
print(f"Flow vector: {flow_vectors}")

# Calculate movement score
# check if the flow vector is a numpy array
if isinstance(flow_vectors, np.ndarray):
movement_score = flow.compute_movement_score(flow_vectors)
else:
# the situation where the first frame then no prev_frame then no movement score or sometime invalid value
movement_score = 0

print(f"Movement score: {movement_score}")
movement_scores.append(float(movement_score))

#Store data for each process
data = {
"inference_time": inference_time,
"flow_vectors": flow_vectors.tolist() if hasattr(flow_vectors, 'tolist') else [flow_vectors],
"movement_scores": movement_scores
}

#save data to the list
frame_data.append(data)


# # Display the flow vector on the frame
# cv2.putText(
# frame,
# f"Flow vector: {flow_vectors}",
# (10, 30),
# cv2.FONT_HERSHEY_SIMPLEX,
# 1,
# (0, 255, 0),
# 2
# )

# # Display the frame
# cv2.imshow("Optical Flow Movement", frame)



# Handle key presses
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
break
elif key == ord('p'):
paused = not paused
print("Video paused" if paused else "Video resumed")

#save data to json file
with open("flow_vectors_cold_chicken.json", "w") as f:
json.dump(frame_data, f)

# Release resources
cap.release()
#cv2.destroyAllWindows()
flow.reset()

if __name__ == "__main__":
main()
114 changes: 114 additions & 0 deletions examples/video_flow_hot-chicken_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import time
import cv2
import numpy as np
from neuflowv2 import OpticalFlow
import json

def main():
# Initialize the OpticalFlow class
# Change the model path as needed for your environment
flow = OpticalFlow(model_path="models/neuflow_sintel.onnx")

#movement score
movement_scores: list[float] = []


# Path to the input video file
video_path = "vendor/optical_flow_measure/inputs/hot-chickens-combined.mp4"

# Open the video file
cap = cv2.VideoCapture(video_path)

if not cap.isOpened():
print(f"Error: Could not open video file {video_path}")
return

# Get video properties
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))

print(f"Video loaded: {frame_width}x{frame_height} at {fps} FPS")
print("Press 'q' to quit, 'p' to pause/resume...")

paused = False

# list to store all the flow vectors and movement scores and inference times
frame_data: list[dict] = []

while True:
if not paused:
# Read a frame
ret, frame = cap.read()
if not ret:
print("End of video stream.")
break
#time before inference
start = time.perf_counter()

# Calculate flow vector
flow_vectors = flow.update(frame)

#inference time for model to compute the flow vector of two consecutive frames
inference_time = (time.perf_counter() - start) * 1000 #milisecs

# Print the flow vector to the console
print(f"Flow vector: {flow_vectors}")

# Calculate movement score
# check if the flow vector is a numpy array
if isinstance(flow_vectors, np.ndarray):
movement_score = flow.compute_movement_score(flow_vectors)
else:
# the situation where the first frame then no prev_frame then no movement score or sometime invalid value
movement_score = 0

print(f"Movement score: {movement_score}")
movement_scores.append(float(movement_score))

#Store data for each process
data = {
"inference_time": inference_time,
"flow_vectors": flow_vectors.tolist() if hasattr(flow_vectors, 'tolist') else [flow_vectors],
"movement_scores": movement_scores
}

#save data to the list
frame_data.append(data)


# # Display the flow vector on the frame
# cv2.putText(
# frame,
# f"Flow vector: {flow_vectors}",
# (10, 30),
# cv2.FONT_HERSHEY_SIMPLEX,
# 1,
# (0, 255, 0),
# 2
# )

# # Display the frame
# cv2.imshow("Optical Flow Movement", frame)



# Handle key presses
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
break
elif key == ord('p'):
paused = not paused
print("Video paused" if paused else "Video resumed")

#save data to json file
with open("flow_vectors_hot_chicken.json", "w") as f:
json.dump(frame_data, f)

# Release resources
cap.release()
#cv2.destroyAllWindows()
flow.reset()

if __name__ == "__main__":
main()
67 changes: 67 additions & 0 deletions flow-cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import click
from neuflowv2 import OpticalFlow
from highlighter.agent.capabilities.sources import VideoFrameIterator
import json

DEFAULT_MODEL_PATH = "models/neuflow_sintel.onnx"

@click.group()
def flow_group():
pass

@flow_group.command("compute")
@click.argument("video_path", type=str)
@click.option("--model-path", type=str, required=False, default=DEFAULT_MODEL_PATH)
@click.option("--fps", type=int, required=False, default=0)
@click.option("--max-frames", type=int, required=False, default=0)
def compute(video_path, model_path, max_frames, fps):
if not max_frames:
max_frames = float("inf")

of = OpticalFlow(model_path=model_path)
video_frames = VideoFrameIterator(
source_urls=[video_path]
)
prev_frame = next(video_frames).content
of.update(prev_frame)

frame_number = 0
scores: list[dict] = []
while True:
try:
cur_frame = next(video_frames).content
except StopIteration:
print(f"End of video, frame: {frame_number}")
break

flow = of.update(cur_frame)
f_sum, f_mean, f_median = of.compute_movement_scores(flow)
move_score = {
"sum": float(f_sum),
"mean": float(f_mean),
"median": float(f_median)
}

scores.append(move_score)

if not (frame_number % 10):
print(f"Sum:{int(f_sum)}, Mean:{f_mean:0.2f}, Med:{f_median:0.2f}")

prev_frame = cur_frame
frame_number += 1

if frame_number >= max_frames:
print(f"Exiting at max_frames: {max_frames}")
break

with open("flow_vector.json", "w") as f:
json.dump(scores, f, indent=4)

@flow_group.command("overlay")
def overlay():
pass


if __name__ == "__main__":
flow_group()

Loading