CodeTemp
- !/usr/bin/python3
from flask import Response from flask import Flask from flask import render_template
from imutils.video import VideoStream
import cv2
- import numpy
- import threading
- import argparse
- import datetime
- import imutils
import time
- Initialize a Flask object (by convention, this is how we do it)
app = Flask(__name__)
- Create an object to initialize the camera and manipulate it (we let some time to warm-up)
- vs = VideoStream(usePiCamera=1).start()
- vs = cv2.VideoCapture(0)
- time.sleep(2.0)
- Create a function that will generate frames and return them as jpeg
def gen_Frames():
#Create an object to initialize the camera and manipulate it (we let some time to warm-up) vs = VideoStream(usePiCamera=1).start() #vs = cv2.VideoCapture(0) time.sleep(2.0) #Create an infinite loop while True: # read the frames from our camera error, frame = vs.read() if error: break else: if frame is None: input('empty frame') else: (ret, buffer) = cv2.imencode(".jpg", frame) #if not ret: # continue frame = buffer.tobytes() #frame = bytearray(buffer) yield(b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n') # We add the date #timestamp = datetime.datetime.now() #cv2.putText(frame, timestamp.strftime('%A %d %B %Y %I:%M:%S%p'), (10, frame.shape[0] - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.35, (0, 0, 255), 1)
- We define the URL pattern (let's not reinvent the wheel and stay at the root level)
- We also define the index that will create the index.html
- This is the route of the default page of our Flask web app
@app.route('/') def index():
return render_template('index.html')
- We define the video feed route
@app.route('/video_feed') def video_feed():
return Response(gen_Frames(), mimetype='multipart/x-mixed-replace; boundary=frame')
- Main function, starting the Flask server
if __name__ == "__main__":
#arg = argparse.ArgumentParser() #arg.add_argument("-i", "--ip", type=str, required=True, help="ip address of our server") #arg.add_argument("-p", "--port", type=int, required=True, help="port number of our server (> 1024)") #args = vars(arg.parse_args()) #app.run(host=args["ip"], port=args["port"], debug=True) app.run(debug=True)
- !/usr/bin/python3
- import the necessary packages
from singlemotiondetector import SingleMotionDetector from imutils.video import VideoStream from flask import Response from flask import Flask from flask import render_template import threading import argparse import datetime import imutils import time import cv2
- initialize the output frame and a lock used to ensure thread-safe
- exchanges of the output frames (useful when multiple browsers/tabs
- are viewing the stream)
outputFrame = None lock = threading.Lock()
- initialize a flask object
app = Flask(__name__)
- initialize the video stream and allow the camera sensor to
- warmup
vs = VideoStream(usePiCamera=1).start()
- vs = VideoStream(src=0).start()
time.sleep(2.0)
@app.route("/") def index():
# return the rendered template return render_template("index.html")
def detect_motion(frameCount):
# grab global references to the video stream, output frame, and # lock variables global vs, outputFrame, lock # initialize the motion detector and the total number of frames # read thus far md = SingleMotionDetector(accumWeight=0.1) total = 0 # loop over frames from the video stream while True: # read the next frame from the video stream, resize it, # convert the frame to grayscale, and blur it frame = vs.read() frame = imutils.resize(frame, width=400) gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) gray = cv2.GaussianBlur(gray, (7, 7), 0) # grab the current timestamp and draw it on the frame timestamp = datetime.datetime.now() cv2.putText(frame, timestamp.strftime( "%A %d %B %Y %I:%M:%S%p"), (10, frame.shape[0] - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.35, (0, 0, 255), 1) # if the total number of frames has reached a sufficient # number to construct a reasonable background model, then # continue to process the frame if total > frameCount: # detect motion in the image motion = md.detect(gray) # check to see if motion was found in the frame if motion is not None: # unpack the tuple and draw the box surrounding the # "motion area" on the output frame (thresh, (minX, minY, maxX, maxY)) = motion cv2.rectangle(frame, (minX, minY), (maxX, maxY), (0, 0, 255), 2) # update the background model and increment the total number # of frames read thus far md.update(gray) total += 1 # acquire the lock, set the output frame, and release the # lock with lock: outputFrame = frame.copy()
def generate():
# grab global references to the output frame and lock variables global outputFrame, lock # loop over frames from the output stream while True: # wait until the lock is acquired with lock: # check if the output frame is available, otherwise skip # the iteration of the loop if outputFrame is None: continue # encode the frame in JPEG format (flag, encodedImage) = cv2.imencode(".jpg", outputFrame) # ensure the frame was successfully encoded if not flag: continue # yield the output frame in the byte format yield(b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + bytearray(encodedImage) + b'\r\n')
@app.route("/video_feed") def video_feed():
# return the response generated along with the specific media # type (mime type) return Response(generate(), mimetype = "multipart/x-mixed-replace; boundary=frame")
- check to see if this is the main thread of execution
if __name__ == '__main__':
# construct the argument parser and parse command line arguments ap = argparse.ArgumentParser() ap.add_argument("-i", "--ip", type=str, required=True, help="ip address of the device") ap.add_argument("-o", "--port", type=int, required=True, help="ephemeral port number of the server (1024 to 65535)") ap.add_argument("-f", "--frame-count", type=int, default=32, help="# of frames used to construct the background model") args = vars(ap.parse_args()) # start a thread that will perform motion detection t = threading.Thread(target=detect_motion, args=( args["frame_count"],)) t.daemon = True t.start() # start the flask app app.run(host=args["ip"], port=args["port"], debug=True, threaded=True, use_reloader=False)
- release the video stream pointer
vs.stop()
- !/usr/bin/python3
- import the necessary packages
import numpy as np import imutils import cv2 class SingleMotionDetector:
def __init__(self, accumWeight=0.5): # store the accumulated weight factor self.accumWeight = accumWeight # initialize the background model self.bg = None def update(self, image): # if the background model is None, initialize it if self.bg is None: self.bg = image.copy().astype("float") return
# update the background model by accumulating the weighted # average cv2.accumulateWeighted(image, self.bg, self.accumWeight) def detect(self, image, tVal=25): # compute the absolute difference between the background model # and the image passed in, then threshold the delta image delta = cv2.absdiff(self.bg.astype("uint8"), image) thresh = cv2.threshold(delta, tVal, 255, cv2.THRESH_BINARY)[1] # perform a series of erosions and dilations to remove small # blobs thresh = cv2.erode(thresh, None, iterations=2) thresh = cv2.dilate(thresh, None, iterations=2) # find contours in the thresholded image and initialize the # minimum and maximum bounding box regions for motion cnts = cv2.findContours(thresh.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) cnts = imutils.grab_contours(cnts) (minX, minY) = (np.inf, np.inf) (maxX, maxY) = (-np.inf, -np.inf) # if no contours were found, return None if len(cnts) == 0: return None # otherwise, loop over the contours for c in cnts: # compute the bounding box of the contour and use it to # update the minimum and maximum bounding box regions (x, y, w, h) = cv2.boundingRect(c) (minX, minY) = (min(minX, x), min(minY, y)) (maxX, maxY) = (max(maxX, x + w), max(maxY, y + h)) # otherwise, return a tuple of the thresholded image along # with bounding box return (thresh, (minX, minY, maxX, maxY))
<html> <head> <title>RPi4 - CmV3i</title> </head> <body>
RPI4 - Camera V3
<img src="{{ url_for('video_feed') }}"> </body> </html>